Java 如何使用 invokeAll() 让所有线程池完成他们的任务?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/18202388/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use invokeAll() to let all thread pool do their task?
提问by
ExecutorService pool=Executors.newFixedThreadPool(7);
List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();
for(int i=0;i<=diff;i++){
String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);
callList.add(new HotelCheapestFare(str));
}
future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){
System.out.println("name is:"+future.get(i).get().getName());
}
Now I want pool to invokeAll
all the task before getting to the for loop but when I run this program for loop gets executed before that invokeAll
and throws this exception:
现在我想invokeAll
在进入 for 循环之前将所有任务池化,但是当我运行这个程序时 for 循环在invokeAll
此之前被执行并抛出这个异常:
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6??5)
Caused by: java.lang.NullPointerException at
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap?estFare(HotelCheapes??tFare.java:166)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe??apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe??apestFare.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow??n Source)
at java.lang.Thread.run
采纳答案by Boris the Spider
The way an ExecutorService
works is that when you call invokeAll
it waits for all tasks to complete:
ExecutorService
工作方式是,当您调用invokeAll
它时,它会等待所有任务完成:
Executes the given tasks, returning a list of Futures holding their status and results when all complete. Future.isDone() is true for each element of the returned list. Note that a completed task could have terminated either normally or by throwing an exception. The results of this method are undefined if the given collection is modified while this operation is in progress.1(emphasis added)
执行给定的任务,在全部完成后返回保存其状态和结果的 Future 列表。Future.isDone() 对于返回列表的每个元素都为真。请注意,已完成的任务可能已正常终止或通过抛出异常终止。如果在此操作进行时修改了给定的集合,则此方法的结果未定义。1(强调)
What that means is that your tasks are all done but some may have thrown an exception. This exception is part of the Future
- calling get
causes the exception to be rethrown wrapped in an ExecutionException
.
这意味着您的任务都已完成,但有些任务可能会引发异常。此异常是Future
- 调用get
导致异常被重新抛出的一部分ExecutionException
。
From you stacktrack
从你堆栈跟踪
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
^^^ <-- from get
You can see that this is indeed the case. One of your tasks has failed with an NPE. The ExecutorService
caught the exception and is telling you about it by throwing an ExecutionException
when you call Future.get
.
您可以看到情况确实如此。您的一项任务因 NPE 而失败。在ExecutorService
捕捉到的异常,并抛出一个告诉你这件事ExecutionException
,当你调用Future.get
。
Now, if you want to take tasks as they completeyou need an ExecutorCompletionService
. This acts as a BlockingQueue
that will allow you to poll for tasks as and when they finish.
现在,如果您想在任务完成时执行任务,您需要一个ExecutorCompletionService
. 这BlockingQueue
将允许您在任务完成时轮询它们。
public static void main(String[] args) throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; ++i) {
try {
final Future<String> myValue = completionService.take();
//do stuff with the Future
final String result = myValue.get();
System.out.println(result);
} catch (InterruptedException ex) {
return;
} catch (ExecutionException ex) {
System.err.println("TASK FAILED");
}
}
}
});
for (int i = 0; i < 100; ++i) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
if (Math.random() > 0.5) {
throw new RuntimeException("FAILED");
}
return "SUCCESS";
}
});
}
executorService.shutdown();
}
In this example I have one task that calls take
on the ExecutorCompletionService
which gets the Future
s as they become available and then I submit tasks to the ExecutorCompletionService
.
在这个例子中,我有一个任务调用take
上ExecutorCompletionService
它得到了Future
S作为他们变得可用,然后我提交任务了ExecutorCompletionService
。
This will allow you to get the failed task as soon as it fails rather than having to wait for all the tasks to either fail of succeed together.
这将允许您在失败后立即获取失败的任务,而不必等待所有任务一起失败或成功。
The only complication is that it is difficult to tell the polling thread that all the tasks are done as everything is now asynchronous. In this instance I have used the knowledge that 100 tasks will be submitted so that it only need to poll 100 times. A more general way would be to collect the Future
s from the submit
method as well and then loop over them to see if everything is completed.
唯一的复杂之处在于很难告诉轮询线程所有任务都已完成,因为现在一切都是异步的。在这个例子中,我使用了将提交 100 个任务的知识,因此它只需要轮询 100 次。更通用Future
的submit
方法是也从方法中收集s ,然后循环它们以查看是否一切都已完成。
回答by Winston
invokeAll is a blocking method. It means – JVM won't proceed to next line until all the threads are completed. So I think there is something wrong with your thread future result.
invokeAll 是一种阻塞方法。这意味着 - 在所有线程完成之前,JVM 不会进入下一行。所以我认为你的线程未来结果有问题。
System.out.println("name is:"+future.get(i).get().getName());
from this line I think there are some futures have no result and can be null, So you should check your code, if there are some Futures null,If so, get a if before this line executed.
从这一行我认为有一些期货没有结果并且可以为空,所以你应该检查你的代码,如果有一些期货为空,如果是这样,在执行此行之前得到一个 if。
回答by Ravindra babu
Future.get()throws below exceptions.
Future.get()抛出以下异常。
CancellationException
- if the computation was cancelled
CancellationException
- 如果计算被取消
ExecutionException
- if the computation threw an exception
ExecutionException
- 如果计算抛出异常
InterruptedException
- if the current thread was interrupted while waiting
InterruptedException
- 如果当前线程在等待时被中断
Catch all these exceptions when you call get()
method.
在调用get()
方法时捕获所有这些异常。
I have simulated divide by zero exception for some Callable
tasks but exception in one Callable
does not affect other Callable
tasks submitted to ExecutorService
if you catch above three exceptions as shown in example code.
我已经模拟了某些Callable
任务的除以零异常,但如果您捕获以上三个异常,如示例代码所示,一个异常Callable
不会影响Callable
提交给的其他任务ExecutorService
。
Example code snippet:
示例代码片段:
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllUsage{
public InvokeAllUsage(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(10);
List<MyCallable> futureList = new ArrayList<MyCallable>();
for ( int i=0; i<10; i++){
MyCallable myCallable = new MyCallable((long)i+1);
futureList.add(myCallable);
}
System.out.println("Start");
try{
List<Future<Long>> futures = service.invokeAll(futureList);
for(Future<Long> future : futures){
try{
System.out.println("future.isDone = " + future.isDone());
System.out.println("future: call ="+future.get());
}
catch (CancellationException ce) {
ce.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
}catch(Exception err){
err.printStackTrace();
}
System.out.println("Completed");
service.shutdown();
}
public static void main(String args[]){
InvokeAllUsage demo = new InvokeAllUsage();
}
class MyCallable implements Callable<Long>{
Long id = 0L;
public MyCallable(Long val){
this.id = val;
}
public Long call(){
if ( id % 5 == 0){
id = id / 0;
}
return id;
}
}
}
output:
输出:
creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Completed