Java 如何使用 ExecutorService 等待所有线程完成?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/1250643/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 07:29:00  来源:igfitidea点击:

How to wait for all threads to finish, using ExecutorService?

javamultithreadingconcurrencyparallel-processingexecutorservice

提问by serg

I need to execute some amount of tasks 4 at a time, something like this:

我需要一次执行一些任务 4,如下所示:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

How can I get notified once all of them are complete? For now I can't think about anything better than setting some global task counter and decrease it at the end of every task, then monitor in infinite loop this counter to become 0; or get a list of Futures and in infinite loop monitor isDone for all of them. What are better solutions not involving infinite loops?

一旦所有这些都完成,我如何得到通知?现在我想不出比设置一些全局任务计数器并在每个任务结束时减少它,然后在无限循环中监控这个计数器变为 0 更好的方法;或获取期货列表,并在无限循环中监控所有期货。什么是不涉及无限循环的更好解决方案?

Thanks.

谢谢。

采纳答案by cletus

Basically on an ExecutorServiceyou call shutdown()and then awaitTermination():

基本上在ExecutorService你打电话shutdown()然后awaitTermination()

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

回答by ChssPly76

Use a CountDownLatch:

使用CountDownLatch

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

and within your task (enclose in try / finally)

并在您的任务中(附在 try / finally 中)

latch.countDown();

回答by Zed

You could wrap your tasks in another runnable, that will send notifications:

您可以将您的任务包装在另一个 runnable 中,它将发送通知:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

回答by Alex Martelli

You could use your own subclass of ExecutorCompletionServiceto wrap taskExecutor, and your own implementation of BlockingQueueto get informed when each task completes and perform whatever callback or other action you desire when the number of completed tasks reaches your desired goal.

您可以使用您自己的ExecutorCompletionService子类来包装taskExecutor,并使用您自己的BlockingQueue实现来在每个任务完成时获得通知,并在完成的任务数量达到您想要的目标时执行您想要的任何回调或其他操作。

回答by sjlee

ExecutorService.invokeAll()does it for you.

ExecutorService.invokeAll()为你做。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

回答by Pekka Enberg

The CyclicBarrierclass in Java 5 and later is designed for this sort of thing.

Java 5 及更高版本中的CyclicBarrier类就是为这种事情设计的。

回答by stryba

Just my two cents. To overcome the requirement of CountDownLatchto know the number of tasks beforehand, you could do it the old fashion way by using a simple Semaphore.

只有我的两分钱。为了克服CountDownLatch事先知道任务数量的要求,您可以使用一个简单的Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

In your task just call s.release()as you would latch.countDown();

在您的任务中,只需s.release()像您一样调用latch.countDown();

回答by rogerdpack

You can use Lists of Futures, as well:

您也可以使用期货列表:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

then when you want to join on all of them, its essentially the equivalent of joining on each, (with the added benefit that it re-raises exceptions from child threads to the main):

然后当你想加入所有这些时,它本质上相当于加入每个,(额外的好处是它将子线程的异常重新引发到主线程):

for(Future f: this.futures) { f.get(); }

Basically the trick is to call .get() on each Future one at a time, instead of infinite looping calling isDone() on (all or each). So you're guaranteed to "move on" through and past this block as soon as the last thread finishes. The caveat is that since the .get() call re-raises exceptions, if one of the threads dies, you would raise from this possibly before the other threads have finished to completion [to avoid this, you could add a catch ExecutionExceptionaround the get call]. The other caveat is it keeps a reference to all threads so if they have thread local variables they won't get collected till after you get past this block (though you might be able to get around this, if it became a problem, by removing Future's off the ArrayList). If you wanted to know which Future "finishes first" you could use some something like https://stackoverflow.com/a/31885029/32453

基本上诀窍是一次在每个 Future 上调用 .get() ,而不是无限循环地在(全部或每个)上调用 isDone() 。因此,一旦最后一个线程完成,您就可以保证“继续”并通过此块。需要注意的是,由于 .get() 调用重新引发异常,如果其中一个线程死亡,您可能会在其他线程完成完成之前从这里引发[为避免这种情况,您可以catch ExecutionException在 get 调用周围添加一个]。另一个警告是它保留对所有线程的引用,因此如果它们具有线程局部变量,则在您通过此块之前它们不会被收集(尽管您可能能够解决这个问题,如果它成为一个问题,通过删除Future 不在 ArrayList 中)。如果你想知道哪个 Future “先完成”https://stackoverflow.com/a/31885029/32453

回答by Kiran

I've just written a sample program that solves your problem. There was no concise implementation given, so I'll add one. While you can use executor.shutdown()and executor.awaitTermination(), it is not the best practice as the time taken by different threads would be unpredictable.

我刚刚编写了一个示例程序来解决您的问题。没有给出简洁的实现,所以我会添加一个。虽然您可以使用executor.shutdown()and executor.awaitTermination(),但这不是最佳实践,因为不同线程所花费的时间是不可预测的。

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

回答by user

There is a method in executor getActiveCount()- that gives the count of active threads.

executor 中有一个方法getActiveCount()——它给出了活动线程的数量。

After spanning the thread, we can check if the activeCount()value is 0. Once the value is zero, it is meant that there are no active threads currently running which means task is finished:

跨越线程后,我们可以检查activeCount()值是否为0。一旦该值为零,则表示当前没有活动线程正在运行,这意味着任务已完成:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}