java 处理 ThreadPoolExecutor 的异常
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/2554549/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Handling Exceptions for ThreadPoolExecutor
提问by jagamot
I have the following code snippet that basically scans through the list of task that needs to be executed and each task is then given to the executor for execution.
我有以下代码片段,它基本上扫描了需要执行的任务列表,然后将每个任务交给执行程序执行。
The JobExecutorin turn creates another executor (for doing db stuff...reading and writing data to queue) and completes the task.
该JobExecutor反过来又创造了另一个执行人(做数据库的东西...读取和写入数据队列),并完成任务。
JobExecutorreturns a Future<Boolean>for the tasks submitted. When one of the task fails, I want to gracefully interrupt all the threads and shutdown the executor by catching all the exceptions. What changes do I need to do?
JobExecutorFuture<Boolean>为提交的任务返回 a 。当其中一项任务失败时,我想通过捕获所有异常来优雅地中断所有线程并关闭执行程序。我需要做哪些改变?
public class DataMovingClass {
private static final AtomicInteger uniqueId = new AtomicInteger(0);
private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator();
ThreadPoolExecutor threadPoolExecutor = null ;
private List<Source> sources = new ArrayList<Source>();
private static class IDGenerator extends ThreadLocal<Integer> {
@Override
public Integer get() {
return uniqueId.incrementAndGet();
}
}
public void init(){
// load sources list
}
public boolean execute() {
boolean succcess = true ;
threadPoolExecutor = new ThreadPoolExecutor(10,10,
10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("DataMigration-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
List<Future<Boolean>> result = new ArrayList<Future<Boolean>>();
for (Source source : sources) {
result.add(threadPoolExecutor.submit(new JobExecutor(source)));
}
for (Future<Boolean> jobDone : result) {
try {
if (!jobDone.get(100000, TimeUnit.SECONDS) && success) {
// in case of successful DbWriterClass, we don't need to change
// it.
success = false;
}
} catch (Exception ex) {
// handle exceptions
}
}
}
public class JobExecutor implements Callable<Boolean> {
private ThreadPoolExecutor threadPoolExecutor ;
Source jobSource ;
public SourceJobExecutor(Source source) {
this.jobSource = source;
threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("Job Executor-" + uniqueNumber.get());
return t;
}// End method
}, new ThreadPoolExecutor.CallerRunsPolicy());
}
public Boolean call() throws Exception {
boolean status = true ;
System.out.println("Starting Job = " + jobSource.getName());
try {
// do the specified task ;
}catch (InterruptedException intrEx) {
logger.warn("InterruptedException", intrEx);
status = false ;
} catch(Exception e) {
logger.fatal("Exception occurred while executing task "+jobSource.getName(),e);
status = false ;
}
System.out.println("Ending Job = " + jobSource.getName());
return status ;
}
}
}
采纳答案by matt b
When you submit a task to the executor, it returns you a FutureTaskinstance.
当您向执行程序提交任务时,它会返回一个FutureTask实例。
FutureTask.get()will re-throw any exception thrown by the task as an ExecutorException.
FutureTask.get()将重新抛出任务抛出的任何异常作为ExecutorException.
So when you iterate through the List<Future>and call get on each, catch ExecutorExceptionand invoke an orderly shutdown.
因此,当您遍历List<Future>每个并调用 get 时,捕获ExecutorException并调用有序关闭。
回答by Ravindra babu
Since you are submitting tasks to ThreadPoolExecutor, the exceptions are getting swallowed by FutureTask.
由于您正在向 提交任务ThreadPoolExecutor,因此异常被FutureTask.
Have a look at this code
看看这个代码
**Inside FutureTask$Sync**
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
} else {
releaseShared(0); // cancel
}
}
}
protected void setException(Throwable t) {
sync.innerSetException(t);
}
From above code, it is clear that setExceptionmethod catching Throwable. Due to this reason, FutureTaskis swallowing all exceptions if you use "submit()" method on ThreadPoolExecutor
从上面的代码中,很明显setException方法 catch Throwable。由于这个原因,FutureTask如果你submit()在ThreadPoolExecutor
As per java documentation, you can extend afterExecute()method in ThreadPoolExecutor
根据java文档,您可以afterExecute()在ThreadPoolExecutor
protected void afterExecute(Runnable r,
Throwable t)
Sample code as per documentation:
根据文档的示例代码:
class ExtendedExecutor extends ThreadPoolExecutor {
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
You can catch Exceptionsin three ways
您可以Exceptions通过三种方式捕捉
Future.get()as suggested in accepted answer- wrap entire
run()orcall()method intry{}catch{}Exceptoion{}blocks - override
afterExecuteofThreadPoolExecutormethod as shown above
Future.get()正如接受的答案中所建议的那样- 将整个
run()或call()方法包装在try{}catch{}Exceptoion{}块中 - 覆盖
afterExecute的ThreadPoolExecutor方法如上所示
To gracefully interrupt other Threads, have a look at below SE question:
要优雅地中断其他线程,请查看以下 SE 问题:
How to stop next thread from running in a ScheduledThreadPoolExecutor
如何阻止下一个线程在 ScheduledThreadPoolExecutor 中运行
回答by Paul Cameron
Subclass ThreadPoolExecutorand override its protected afterExecute (Runnable r, Throwable t)method.
子类化ThreadPoolExecutor并覆盖其protected afterExecute (Runnable r, Throwable t)方法。
If you're creating a thread pool via the java.util.concurrent.Executorsconvenience class (which you're not), take at look at its source to see how it's invoking ThreadPoolExecutor.
如果您通过java.util.concurrent.Executors便利类(您不是)创建线程池,请查看其源代码以了解它如何调用 ThreadPoolExecutor。

