处理来自 Java ExecutorService 任务的异常

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2248131/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 05:12:46  来源:igfitidea点击:

Handling exceptions from Java ExecutorService tasks

javamultithreadingexceptionexecutorservicethreadpoolexecutor

提问by Tom

I'm trying to use Java's ThreadPoolExecutorclass to run a large number of heavy weight tasks with a fixed number of threads. Each of the tasks has many places during which it may fail due to exceptions.

我正在尝试使用 Java 的ThreadPoolExecutor类来运行大量具有固定线程数的重量级任务。每个任务都有很多地方可能会因为异常而失败。

I've subclassed ThreadPoolExecutorand I've overridden the afterExecutemethod which is supposed to provide any uncaught exceptions encountered while running a task. However, I can't seem to make it work.

我已经子类化ThreadPoolExecutor并覆盖了afterExecute应该提供在运行任务时遇到的任何未捕获异常的方法。但是,我似乎无法让它发挥作用。

For example:

例如:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

The output from this program is "Everything's fine--situation normal!" even though the only Runnable submitted to the thread pool throws an exception. Any clue to what's going on here?

这个程序的输出是“一切都很好——情况正常!” 即使唯一提交给线程池的 Runnable 抛出异常。关于这里发生了什么的任何线索?

Thanks!

谢谢!

采纳答案by nos

From the docs:

文档

Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method.

注意:当动作被明确地或通过submit等方法包含在任务(如FutureTask)中时,这些任务对象捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给该方法.

When you submit a Runnable, it'll get wrapped in a Future.

当你提交一个 Runnable 时,它​​会被包裹在一个 Future 中。

Your afterExecute should be something like this:

你的 afterExecute 应该是这样的:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}

回答by skaffman

WARNING: It should be noted that this solution will block the calling thread.

警告:应该注意的是,这个解决方案会阻塞调用线程。



If you want to process exceptions thrown by the task, then it is generally better to use Callablerather than Runnable.

如果要处理任务抛出的异常,那么一般使用Callable而不是Runnable.

Callable.call()is permitted to throw checked exceptions, and these get propagated back to the calling thread:

Callable.call()允许抛出已检查的异常,这些异常会传播回调用线程:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

If Callable.call()throws an exception, this will be wrapped in an ExecutionExceptionand thrown by Future.get().

如果Callable.call()抛出异常,这将被包装在 an 中ExecutionException并由 抛出Future.get()

This is likely to be much preferable to subclassing ThreadPoolExecutor. It also gives you the opportunity to re-submit the task if the exception is a recoverable one.

这可能比子类化更可取ThreadPoolExecutor。如果异常是可恢复的,它还使您有机会重新提交任务。

回答by Kevin

Instead of subclassing ThreadPoolExecutor, I would provide it with a ThreadFactoryinstance that creates new Threads and provides them with an UncaughtExceptionHandler

我不会继承 ThreadPoolExecutor,而是为它提供一个ThreadFactory实例,该实例创建新线程并为它们提供UncaughtExceptionHandler

回答by Drew Wills

The explanation for this behavior is right in the javadoc for afterExecute:

这种行为的解释在afterExecutejavadoc 中是正确的:

Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method.

注意:当动作被明确地或通过submit等方法包含在任务(如FutureTask)中时,这些任务对象捕获并维护计算异常,因此它们不会导致突然终止,并且内部异常不会传递给该方法.

回答by yegor256

I'm using VerboseRunnableclass from jcabi-log, which swallows all exceptions and logs them. Very convenient, for example:

我正在使用jcabi-log 中的VerboseRunnable类,它会吞下所有异常并记录它们。非常方便,例如:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

回答by mmm

I got around it by wrapping the supplied runnable submitted to the executor.

我通过包装提交给执行程序的提供的 runnable 来解决它。

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);

回答by Bass

If your ExecutorServicecomes from an external source (i. e. it's not possible to subclass ThreadPoolExecutorand override afterExecute()), you can use a dynamic proxy to achieve the desired behavior:

如果您ExecutorService来自外部来源(即不可能子类化ThreadPoolExecutor和覆盖afterExecute()),您可以使用动态代理来实现所需的行为:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}

回答by CSchulz

Another solution would be to use the ManagedTaskand ManagedTaskListener.

另一种解决方案是使用ManagedTaskManagedTaskListener

You need a Callableor Runnablewhich implements the interface ManagedTask.

您需要实现接口ManagedTaskCallableRunnable

The method getManagedTaskListenerreturns the instance you want.

该方法getManagedTaskListener返回您想要的实例。

public ManagedTaskListener getManagedTaskListener() {

And you implement in ManagedTaskListenerthe taskDonemethod:

然后在ManagedTaskListener 中实现该taskDone方法:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

More details about managed task lifecycle and listener.

有关托管任务生命周期和侦听器的更多详细信息。

回答by Kanagavelu Sugumar

This is because of AbstractExecutorService :: submitis wrapping your runnableinto RunnableFuture(nothing but FutureTask) like below

这是因为像下面这样AbstractExecutorService :: submit将你包裹runnableRunnableFuture(不过是FutureTask

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Then executewill pass it to Workerand Worker.run()will call the below.

然后execute将它传递给WorkerWorker.run()调用下面的。

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Finally task.run();in the above code call will call FutureTask.run(). Here is the exception handler code, because of this you are NOT getting the expected exception.

最后task.run();在上面的代码调用中会调用 FutureTask.run(). 这是异常处理程序代码,因此您没有得到预期的异常。

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

回答by Cristian Botiza

If you want to monitor the execution of task, you could spin 1 or 2 threads (maybe more depending on the load) and use them to take tasks from an ExecutionCompletionService wrapper.

如果您想监控任务的执行,您可以旋转 1 或 2 个线程(可能更多取决于负载)并使用它们从 ExecutionCompletionService 包装器中获取任务。