Java 如何在不关闭 Executor 的情况下等待 ThreadPoolExecutor 中的所有任务完成?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/3929361/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 07:04:12  来源:igfitidea点击:

How to wait for all tasks in an ThreadPoolExecutor to finish without shutting down the Executor?

javaandroidmultithreadingwaitthreadpoolexecutor

提问by cottonBallPaws

I can't use shutdown()and awaitTermination()because it is possible new tasks will be added to the ThreadPoolExecutor while it is waiting.

我无法使用shutdown()awaitTermination()因为在等待时新任务可能会添加到 ThreadPoolExecutor。

So I'm looking for a way to wait until the ThreadPoolExecutor has emptied it's queue and finished all of it's tasks without stopping new tasks from being added before that point.

所以我正在寻找一种方法来等待 ThreadPoolExecutor 清空它的队列并完成它的所有任务,而不会在此之前停止添加新任务。

If it makes any difference, this is for Android.

如果它有任何区别,这适用于Android。

Thanks

谢谢

Update: Many weeks later after revisiting this, I discovered that a modified CountDownLatch worked better for me in this case. I'll keep the answer marked because it applies more to what I asked.

更新:许多周后,在重新审视这个问题后,我发现在这种情况下,修改后的 CountDownLatch 对我来说效果更好。我会保留答案,因为它更适用于我提出的问题。

采纳答案by Tim Bender

If you are interested in knowing when a certain task completes, or a certain batch of tasks, you may use ExecutorService.submit(Runnable). Invoking this method returns a Futureobject which may be placed into a Collectionwhich your main thread will then iterate over calling Future.get()for each one. This will cause your main thread to halt execution until the ExecutorServicehas processed all of the Runnabletasks.

如果您想知道某项任务或某批任务何时完成,您可以使用ExecutorService.submit(Runnable). 调用此方法会返回一个Future对象,该对象可以放入 a 中Collection,然后您的主线程将迭代调用Future.get()每个对象。这将导致您的主线程停止执行,直到ExecutorService处理完所有Runnable任务。

Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
    future.get();
}

回答by Thilo

Maybe you are looking for a CompletionServiceto manage batches of task, see also this answer.

也许您正在寻找CompletionService来管理批量任务,另请参阅此答案

回答by andersoj

(This is an attempt to reproduce Thilo's earlier, deleted answer with my own adjustments.)

(这是尝试通过我自己的调整来重现 Thilo 之前删除的答案。)

I think you may need to clarify your question since there is an implicit infinite condition... at some point you have to decide to shut down your executor, and at that point it won't accept any more tasks. Your question seems to imply that you want to wait until you knowthat no further tasks will be submitted, which you can only know in your own application code.

我认为你可能需要澄清你的问题,因为有一个隐含的无限条件......在某些时候你必须决定关闭你的执行器,到那时它将不再接受任何任务。你的问题似乎暗示你要等到你知道不会提交进一步的任务,你只能在你自己的应用程序代码中知道。

The following answer will allow you to smoothly transition to a new TPE (for whatever reason), completing all the currently-submitted tasks, and not rejecting new tasks to the new TPE. It might answer your question. @Thilo's might also.

以下答案将使您能够顺利过渡到新的 TPE(无论出于何种原因),完成所有当前提交的任务,并且不会拒绝新的任务到新的 TPE。它可能会回答你的问题。@Thilo 的也可能。

Assuming you have defined somewhere a visible TPE in use as such:

假设您已经在某处定义了一个可见的 TPE,如下所示:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;

You can then write the TPE swap routine as such. It could also be written using a synchronized method, but I think this is simpler:

然后您可以这样编写 TPE 交换例程。它也可以使用同步方法编写,但我认为这更简单:

void rotateTPE()
{
   ThreadPoolExecutor newTPE = createNewTPE();
   // atomic swap with publicly-visible TPE
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

Alternatively, if you really no kidding want to kill the thread pool, then your submitter side will need to cope with rejected tasks at some point, and you could use nullfor the new TPE:

或者,如果您真的不想开玩笑地想杀死线程池,那么您的提交方将需要在某个时候处理被拒绝的任务,您可以将其null用于新的 TPE:

void killTPE()
{
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

Which could cause upstream problems, the caller would need to know what to do with a null.

这可能会导致上游问题,调用者需要知道如何处理null.

You could also swap out with a dummy TPE that simply rejected every new execution, but that's equivalent to what happens if you call shutdown()on the TPE.

您也可以使用简单地拒绝每个新执行的虚拟 TPE 进行替换,但这与调用shutdown()TPE时发生的情况相同。

回答by googol4u

My Scenario is a web crawler to fetch some information from a web site then processing them. A ThreadPoolExecutor is used to speed up the process because many pages can be loaded in the time. So new tasks will be created in the existing task because the crawler will follow hyperlinks in each page. The problem is the same: the main thread do not know when all the tasks are completed and it can start to process the result. I use a simple way to determine this. It is not very elegant but works in my case:

My Scenario 是一个网络爬虫,用于从网站获取一些信息然后处理它们。一个 ThreadPoolExecutor 用于加速进程,因为在同一时间内可以加载许多页面。所以新任务会在现有任务中创建,因为爬虫会跟踪每个页面中的超链接。问题是一样的:主线程不知道什么时候所有的任务都完成了,它可以开始处理结果。我使用一种简单的方法来确定这一点。它不是很优雅,但在我的情况下有效:

while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
    Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);

回答by Ravindra babu

If you don't want to use shutdown, follow below approaches:

如果您不想使用shutdown,请按照以下方法操作:

  1. Iterate through all Futuretasks from submit on ExecutorServiceand check the status with blocking call get()on Futureobject as suggested by Tim Bender

  2. Use one of

    1. Using invokeAllon ExecutorService
    2. Using CountDownLatch
    3. Using ForkJoinPoolor newWorkStealingPoolof Executors(since java 8)
  1. 迭代Future从提交开始的所有任务,ExecutorService并按照建议的get()Future对象的阻塞调用来检查状态Tim Bender

  2. 使用其中之一

    1. 使用的invokeAllExecutorService
    2. 使用CountDownLatch
    3. 使用ForkJoinPoolnewWorkStealingPoolof Executors(自 java 8)

invokeAll()on executor service also achieves the same purpose of CountDownLatch

invokeAll()在 executor service 上也达到了同样的目的 CountDownLatch

Related SE question:

相关 SE 问题:

How to wait for a number of threads to complete?

如何等待多个线程完成?

回答by Matej Tymes

You could call the waitTillDone()on Runnerclass:

您可以在Runner类上调用waitTillDone()

Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks

runner.waitTillDone(); // blocks until all tasks are finished (or failed)

// and now reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();
runner.shutdown();

To use it add this gradle/maven dependency to your project: 'com.github.matejtymes:javafixes:1.0'

要使用它,请将此 gradle/maven 依赖项添加到您的项目中: 'com.github.matejtymes:javafixes:1.0'

For more details look here: https://github.com/MatejTymes/JavaFixesor here: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

有关更多详细信息,请参见此处:https: //github.com/MatejTymes/JavaFixes或此处:http: //matejtymes.blogspot.com/2016/04/executor-that-notifying-you-when-task.html

回答by Tu?rul Karakaya

Try using queue size and active tasks count as shown below

尝试使用队列大小和活动任务计数,如下所示

 while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
                     try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }