Java ExecutorService,如何等待所有任务完成
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/3269445/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
ExecutorService, how to wait for all tasks to finish
提问by george smiley
What is the simplest way to to wait for all tasks of ExecutorService
to finish? My task is primarily computational, so I just want to run a large number of jobs - one on each core. Right now my setup looks like this:
等待所有任务ExecutorService
完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量作业——每个内核一个。现在我的设置如下所示:
ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {
es.execute(new ComputeDTask(singleTable));
}
try{
es.wait();
}
catch (InterruptedException e){
e.printStackTrace();
}
ComputeDTask
implements runnable. This appears to execute the tasks correctly, but the code crashes on wait()
with IllegalMonitorStateException
. This is odd, because I played around with some toy examples and it appeared to work.
ComputeDTask
实现可运行。这似乎正确执行任务,但代码崩溃wait()
了IllegalMonitorStateException
。这很奇怪,因为我玩了一些玩具示例并且它似乎有效。
uniquePhrases
contains several tens of thousands of elements. Should I be using another method? I am looking for something as simple as possible
uniquePhrases
包含数以万计的元素。我应该使用另一种方法吗?我正在寻找尽可能简单的东西
回答by NG.
If you want to wait for all tasks to complete, use the shutdown
method instead of wait
. Then follow it with awaitTermination
.
如果要等待所有任务完成,请使用shutdown
方法而不是wait
。然后跟着它awaitTermination
。
Also, you can use Runtime.availableProcessors
to get the number of hardware threads so you can initialize your threadpool properly.
此外,您可以使用Runtime.availableProcessors
获取硬件线程的数量,以便您可以正确初始化您的线程池。
回答by mdma
If you want to wait for the executor service to finish executing, call shutdown()
and then, awaitTermination(units, unitType), e.g. awaitTermination(1, MINUTE)
. The ExecutorService does not block on it's own monitor, so you can't use wait
etc.
如果您想等待 executor 服务完成执行,请调用shutdown()
,然后调用awaitTermination(units, unitType),例如awaitTermination(1, MINUTE)
. ExecutorService 不会在它自己的监视器上阻塞,所以你不能使用wait
等。
回答by seh
If waiting for all tasks in the ExecutorService
to finish isn't precisely your goal, but rather waiting until a specific batch of taskshas completed, you can use a CompletionService
— specifically, an ExecutorCompletionService
.
如果等待 中的所有任务ExecutorService
完成并不是您的确切目标,而是等待特定批次的任务完成,您可以使用一个CompletionService
——特别是一个ExecutorCompletionService
.
The idea is to create an ExecutorCompletionService
wrapping your Executor
, submitsome known numberof tasks through the CompletionService
, then draw that same numberof results from the completion queueusing either take()
(which blocks) or poll()
(which does not). Once you've drawn all the expected results corresponding to the tasks you submitted, you know they're all done.
这个想法是创建一个ExecutorCompletionService
wrapping your Executor
,通过提交一些已知数量的任务CompletionService
,然后使用(哪些阻塞)或(哪些不阻塞)从完成队列中提取相同数量的结果。一旦您绘制了与您提交的任务相对应的所有预期结果,您就知道它们都已完成。take()
poll()
Let me state this one more time, because it's not obvious from the interface: You must know how many things you put into the CompletionService
in order to know how many things to try to draw out. This matters especially with the take()
method: call it one time too many and it will block your calling thread until some other thread submits another job to the same CompletionService
.
让我再说一遍,因为从界面上看并不明显:您必须知道放入了多少东西CompletionService
才能知道尝试抽出多少东西。这对于take()
方法尤其重要:调用它一次太多,它会阻塞您的调用线程,直到其他线程将另一个作业提交给相同的CompletionService
.
There are some examples showing how to use CompletionService
in the book Java Concurrency in Practice.
在Java Concurrency in Practice一书中,有一些示例展示了如何使用CompletionService
。
回答by Alain O'Dea
You could wait jobs to finish on a certain interval:
您可以等待作业在特定时间间隔内完成:
int maxSecondsPerComputeDTask = 20;
try {
while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
// consider giving up with a 'break' statement under certain conditions
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Or you could use ExecutorService.submit(Runnable) and collect the Futureobjects that it returns and call get()on each in turn to wait for them to finish.
或者你可以使用ExecutorService。提交(Runnable)并收集它返回的Future对象并依次调用get()以等待它们完成。
ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
try {
future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
InterruptedExceptionis extremely important to handle properly. It is what lets you or the users of your library terminate a long process safely.
InterruptedException对于正确处理非常重要。它可以让您或您图书馆的用户安全地终止一个漫长的过程。
回答by andersoj
The simplest approach is to use ExecutorService.invokeAll()
which does what you want in a one-liner. In your parlance, you'll need to modify or wrap ComputeDTask
to implement Callable<>
, which can give you quite a bit more flexibility. Probably in your app there is a meaningful implementation of Callable.call()
, but here's a way to wrap it if not using Executors.callable()
.
最简单的方法是使用ExecutorService.invokeAll()
which 在单行中做你想做的事。按照您的说法,您需要修改或包装ComputeDTask
来实现Callable<>
,这可以为您提供更大的灵活性。可能在您的应用程序中有一个有意义的实现Callable.call()
,但是如果不使用Executors.callable()
.
ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());
for (DataTable singleTable: uniquePhrases) {
todo.add(Executors.callable(new ComputeDTask(singleTable)));
}
List<Future<Object>> answers = es.invokeAll(todo);
As others have pointed out, you could use the timeout version of invokeAll()
if appropriate. In this example, answers
is going to contain a bunch of Future
s which will return nulls (see definition of Executors.callable()
. Probably what you want to do is a slight refactoring so you can get a useful answer back, or a reference to the underlying ComputeDTask
, but I can't tell from your example.
正如其他人所指出的,invokeAll()
如果合适,您可以使用超时版本。在这个例子中,answers
将包含一堆Future
将返回空值的s(参见 的定义Executors.callable()
。可能你想要做的是轻微的重构,这样你就可以得到一个有用的答案,或者对底层的引用ComputeDTask
,但我可以不能从你的例子中看出。
If it isn't clear, note that invokeAll()
will not return until all the tasks are completed. (i.e., all the Future
s in your answers
collection will report .isDone()
if asked.) This avoids all the manual shutdown, awaitTermination, etc... and allows you to reuse this ExecutorService
neatly for multiple cycles, if desired.
如果不清楚,请注意invokeAll()
直到所有任务完成后才会返回。(即,如果询问Future
,您answers
集合中的所有s都会报告.isDone()
。)这避免了所有手动关闭、awaitTermination 等...并允许您ExecutorService
在需要时在多个循环中巧妙地重用它。
There are a few related questions on SO:
SO有几个相关的问题:
None of these are strictly on-point for your question, but they do provide a bit of color about how folks think Executor
/ExecutorService
ought to be used.
这些都不是您的问题的严格重点,但它们确实提供了一些关于人们认为Executor
/ExecutorService
应该如何使用的颜色。
回答by Adrian Smith
I also have the situation that I have a set of documents to be crawled. I start with an initial "seed" document which should be processed, that document contains links to other documents which should also be processed, and so on.
我也有这样的情况,我有一组要抓取的文档。我从应该处理的初始“种子”文档开始,该文档包含指向也应该处理的其他文档的链接,依此类推。
In my main program, I just want to write something like the following, where Crawler
controls a bunch of threads.
在我的主程序中,我只想编写如下内容,其中Crawler
控制了一堆线程。
Crawler c = new Crawler();
c.schedule(seedDocument);
c.waitUntilCompletion()
The same situation would happen if I wanted to navigate a tree; i would pop in the root node, the processor for each node would add children to the queue as necessary, and a bunch of threads would process all the nodes in the tree, until there were no more.
如果我想导航一棵树,也会发生同样的情况;我会弹出根节点,每个节点的处理器会根据需要将子节点添加到队列中,并且一堆线程将处理树中的所有节点,直到没有更多节点。
I couldn't find anything in the JVM which I thought was a bit surprising. So I wrote a class AutoStopThreadPool
which one can either use directly or subclass to add methods suitable for the domain, e.g. schedule(Document)
. Hope it helps!
我在 JVM 中找不到任何我认为有点令人惊讶的东西。所以我写了一个类AutoStopThreadPool
,可以直接使用或子类来添加适合域的方法,例如schedule(Document)
. 希望能帮助到你!
回答by Vicky Kapadia
A simple alternative to this is to use threads along with join. Refer : Joining Threads
一个简单的替代方法是将线程与连接一起使用。参考:加入线程
回答by punkers
I will just wait for the executor to terminate with a specified timeout that you think it is suitable for the tasks to complete.
我将等待执行程序以您认为适合完成任务的指定超时终止。
try {
//do stuff here
exe.execute(thread);
} finally {
exe.shutdown();
}
boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
if (!result)
{
LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
}
回答by zgormez
Add all threads in collection and submit it using invokeAll
.
If you can use invokeAll
method of ExecutorService
, JVM won't proceed to next line until all threads are complete.
添加集合中的所有线程并使用invokeAll
. 如果可以使用 的invokeAll
方法ExecutorService
,则在所有线程完成之前,JVM 不会继续下一行。
Here there is a good example: invokeAll via ExecutorService
这里有一个很好的例子: invokeAll via ExecutorService
回答by J. Ruhe
Just use
只需使用
latch = new CountDownLatch(noThreads)
In each thread
在每个线程中
latch.countDown();
and as barrier
并作为障碍
latch.await();