Java 如何使用 invokeAll() 让所有线程池完成他们的任务?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18202388/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 23:16:02  来源:igfitidea点击:

How to use invokeAll() to let all thread pool do their task?

javamultithreadingarraylistexecutorservicefuture

提问by

    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++){

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

        }       
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){

        System.out.println("name is:"+future.get(i).get().getName());
    }

Now I want pool to invokeAllall the task before getting to the for loop but when I run this program for loop gets executed before that invokeAlland throws this exception:

现在我想invokeAll在进入 for 循环之前将所有任务池化,但是当我运行这个程序时 for 循环在invokeAll此之前被执行并抛出这个异常:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6??5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap?estFare(HotelCheapes??tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe??apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe??apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow??n Source)
at java.lang.Thread.run

采纳答案by Boris the Spider

The way an ExecutorServiceworks is that when you call invokeAllit waits for all tasks to complete:

ExecutorService工作方式是,当您调用invokeAll它时,它会等待所有任务完成:

Executes the given tasks, returning a list of Futures holding their status and results when all complete. Future.isDone() is true for each element of the returned list. Note that a completed task could have terminated either normally or by throwing an exception. The results of this method are undefined if the given collection is modified while this operation is in progress.1(emphasis added)

执行给定的任务,在全部完成后返回保存其状态和结果的 Future 列表。Future.isDone() 对于返回列表的每个元素都为真。请注意,已完成的任务可能已正常终止或通过抛出异常终止。如果在此操作进行时修改了给定的集合,则此方法的结果未定义。1(强调)

What that means is that your tasks are all done but some may have thrown an exception. This exception is part of the Future- calling getcauses the exception to be rethrown wrapped in an ExecutionException.

这意味着您的任务都已完成,但有些任务可能会引发异常。此异常是Future- 调用get导致异常被重新抛出的一部分ExecutionException

From you stacktrack

从你堆栈跟踪

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

You can see that this is indeed the case. One of your tasks has failed with an NPE. The ExecutorServicecaught the exception and is telling you about it by throwing an ExecutionExceptionwhen you call Future.get.

您可以看到情况确实如此。您的一项任务因 NPE 而失败。在ExecutorService捕捉到的异常,并抛出一个告诉你这件事ExecutionException,当你调用Future.get

Now, if you want to take tasks as they completeyou need an ExecutorCompletionService. This acts as a BlockingQueuethat will allow you to poll for tasks as and when they finish.

现在,如果您想在任务完成时执行任务您需要一个ExecutorCompletionService. 这BlockingQueue将允许您在任务完成时轮询它们。

public static void main(String[] args) throws Exception {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    return;
                } catch (ExecutionException ex) {
                    System.err.println("TASK FAILED");
                }
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("FAILED");
                }
                return "SUCCESS";
            }
        });
    }
    executorService.shutdown();
}

In this example I have one task that calls takeon the ExecutorCompletionServicewhich gets the Futures as they become available and then I submit tasks to the ExecutorCompletionService.

在这个例子中,我有一个任务调用takeExecutorCompletionService它得到了FutureS作为他们变得可用,然后我提交任务了ExecutorCompletionService

This will allow you to get the failed task as soon as it fails rather than having to wait for all the tasks to either fail of succeed together.

这将允许您在失败后立即获取失败的任务,而不必等待所有任务一起失败或成功。

The only complication is that it is difficult to tell the polling thread that all the tasks are done as everything is now asynchronous. In this instance I have used the knowledge that 100 tasks will be submitted so that it only need to poll 100 times. A more general way would be to collect the Futures from the submitmethod as well and then loop over them to see if everything is completed.

唯一的复杂之处在于很难告诉轮询线程所有任务都已完成,因为现在一切都是异步的。在这个例子中,我使用了将提交 100 个任务的知识,因此它只需要轮询 100 次。更通用Futuresubmit方法是也从方法中收集s ,然后循环它们以查看是否一切都已完成。

回答by Winston

invokeAll is a blocking method. It means – JVM won't proceed to next line until all the threads are completed. So I think there is something wrong with your thread future result.

invokeAll 是一种阻塞方法。这意味着 - 在所有线程完成之前,JVM 不会进入下一行。所以我认为你的线程未来结果有问题。

System.out.println("name is:"+future.get(i).get().getName());

from this line I think there are some futures have no result and can be null, So you should check your code, if there are some Futures null,If so, get a if before this line executed.

从这一行我认为有一些期货没有结果并且可以为空,所以你应该检查你的代码,如果有一些期货为空,如果是这样,在执行此行之前得到一个 if。

回答by Ravindra babu

Future.get()throws below exceptions.

Future.get()抛出以下异常。

CancellationException- if the computation was cancelled

CancellationException- 如果计算被取消

ExecutionException- if the computation threw an exception

ExecutionException- 如果计算抛出异常

InterruptedException- if the current thread was interrupted while waiting

InterruptedException- 如果当前线程在等待时被中断

Catch all these exceptions when you call get()method.

在调用get()方法时捕获所有这些异常。

I have simulated divide by zero exception for some Callabletasks but exception in one Callabledoes not affect other Callabletasks submitted to ExecutorServiceif you catch above three exceptions as shown in example code.

我已经模拟了某些Callable任务的除以零异常,但如果您捕获以上三个异常,如示例代码所示,一个异常Callable不会影响Callable提交给的其他任务ExecutorService

Example code snippet:

示例代码片段:

import java.util.concurrent.*;
import java.util.*;

public class InvokeAllUsage{
    public InvokeAllUsage(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(10);

        List<MyCallable> futureList = new ArrayList<MyCallable>();
        for ( int i=0; i<10; i++){
            MyCallable myCallable = new MyCallable((long)i+1);
            futureList.add(myCallable);
        }
        System.out.println("Start");
        try{
            List<Future<Long>> futures = service.invokeAll(futureList);  
            for(Future<Long> future : futures){
                try{
                    System.out.println("future.isDone = " + future.isDone());
                    System.out.println("future: call ="+future.get());
                }
                catch (CancellationException ce) {
                    ce.printStackTrace();
                } catch (ExecutionException ee) {
                    ee.printStackTrace();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        System.out.println("Completed");
        service.shutdown();
    }
    public static void main(String args[]){
        InvokeAllUsage demo = new InvokeAllUsage();
    }
    class MyCallable implements Callable<Long>{
        Long id = 0L;
        public MyCallable(Long val){
            this.id = val;
        }
        public Long call(){

            if ( id % 5 == 0){
                id = id / 0;
            }           
            return id;
        }
    }
}

output:

输出:

creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Completed