Java 等待任何 Future<T> 完成
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/117690/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Wait until any of Future<T> is done
提问by Pavel Feldman
I have few asynchronous tasks running and I need to wait until at least one of them is finished (in the future probably I'll need to wait util M out of N tasks are finished). Currently they are presented as Future, so I need something like
我有几个异步任务在运行,我需要等到其中至少一个完成(将来可能我需要等待 N 个任务中的 util M 完成)。目前它们显示为 Future,所以我需要类似的东西
/**
* Blocks current thread until one of specified futures is done and returns it.
*/
public static <T> Future<T> waitForAny(Collection<Future<T>> futures)
throws AllFuturesFailedException
Is there anything like this? Or anything similar, not necessary for Future. Currently I loop through collection of futures, check if one is finished, then sleep for some time and check again. This looks like not the best solution, because if I sleep for long period then unwanted delay is added, if I sleep for short period then it can affect performance.
有这样的吗?或类似的东西,未来不需要。目前我循环收集期货,检查一个是否完成,然后睡一段时间再检查。这看起来不是最好的解决方案,因为如果我长时间睡眠,则会增加不必要的延迟,如果我睡眠时间很短,则会影响性能。
I could try using
我可以尝试使用
new CountDownLatch(1)
and decrease countdown when task is complete and do
并在任务完成时减少倒计时并执行
countdown.await()
, but I found it possible only if I control Future creation. It is possible, but requires system redesign, because currently logic of tasks creation (sending Callable to ExecutorService) is separated from decision to wait for which Future. I could also override
,但我发现只有在我控制 Future 创建时才有可能。这是可能的,但需要重新设计系统,因为当前任务创建的逻辑(将 Callable 发送到 ExecutorService)与决定等待哪个 Future 分离。我也可以覆盖
<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)
and create custom implementation of RunnableFuture with ability to attach listener to be notified when task is finished, then attach such listener to needed tasks and use CountDownLatch, but that means I have to override newTaskFor for every ExecutorService I use - and potentially there will be implementation which do not extend AbstractExecutorService. I could also try wrapping given ExecutorService for same purpose, but then I have to decorate all methods producing Futures.
并创建 RunnableFuture 的自定义实现,能够附加侦听器以在任务完成时收到通知,然后将此类侦听器附加到所需任务并使用 CountDownLatch,但这意味着我必须为我使用的每个 ExecutorService 覆盖 newTaskFor - 并且可能会有实现不扩展 AbstractExecutorService。我也可以尝试为相同的目的包装给定的 ExecutorService,但是我必须装饰所有产生 Futures 的方法。
All these solutions may work but seem very unnatural. It looks like I'm missing something simple, like
所有这些解决方案都可能有效,但看起来很不自然。看起来我错过了一些简单的东西,比如
WaitHandle.WaitAny(WaitHandle[] waitHandles)
in c#. Are there any well known solutions for such kind of problem?
在 C# 中。对于此类问题,是否有任何众所周知的解决方案?
UPDATE:
更新:
Originally I did not have access to Future creation at all, so there were no elegant solution. After redesigning system I got access to Future creation and was able to add countDownLatch.countdown() to execution process, then I can countDownLatch.await() and everything works fine. Thanks for other answers, I did not know about ExecutorCompletionService and it indeed can be helpful in similar tasks, but in this particular case it could not be used because some Futures are created without any executor - actual task is sent to another server via network, completes remotely and completion notification is received.
本来我根本没有访问 Future 创建的权限,所以没有优雅的解决方案。重新设计系统后,我可以访问 Future 创建并能够将 countDownLatch.countdown() 添加到执行过程,然后我可以 countDownLatch.await() 并且一切正常。感谢其他答案,我不知道 ExecutorCompletionService 并且它确实可以在类似任务中提供帮助,但在这种特殊情况下它无法使用,因为某些 Futures 是在没有任何执行程序的情况下创建的 - 实际任务通过网络发送到另一台服务器,远程完成并收到完成通知。
采纳答案by jdmichal
As far as I know, Java has no analogous structure to the WaitHandle.WaitAny
method.
据我所知,Java 没有与该WaitHandle.WaitAny
方法类似的结构。
It seems to me that this could be achieved through a "WaitableFuture" decorator:
在我看来,这可以通过“WaitableFuture”装饰器来实现:
public WaitableFuture<T>
extends Future<T>
{
private CountDownLatch countDownLatch;
WaitableFuture(CountDownLatch countDownLatch)
{
super();
this.countDownLatch = countDownLatch;
}
void doTask()
{
super.doTask();
this.countDownLatch.countDown();
}
}
Though this would only work if it can be inserted before the execution code, since otherwise the execution code would not have the new doTask()
method. But I really see no way of doing this without polling if you cannot somehow gain control of the Future object before execution.
虽然这只有在它可以插入到执行代码之前才有效,否则执行代码将没有新doTask()
方法。但是,如果您无法在执行前以某种方式获得对 Future 对象的控制,我真的认为没有轮询就无法做到这一点。
Or if the future always runs in its own thread, and you can somehow get that thread. Then you could spawn a new thread to join each other thread, then handle the waiting mechanism after the join returns... This would be really ugly and would induce a lot of overhead though. And if some Future objects don't finish, you could have a lot of blocked threads depending on dead threads. If you're not careful, this could leak memory and system resources.
或者如果未来总是在它自己的线程中运行,你可以以某种方式获得那个线程。然后你可以产生一个新线程来加入其他线程,然后在加入返回后处理等待机制......这真的很丑陋,但会引起很多开销。如果一些 Future 对象没有完成,根据死线程,你可能会有很多阻塞的线程。如果您不小心,这可能会泄漏内存和系统资源。
/**
* Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
*/
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);
foreach(Thread thread in threads)
{
(new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
}
countDownLatch.await();
}
class JoinThreadHelper
implements Runnable
{
Thread thread;
CountDownLatch countDownLatch;
JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
{
this.thread = thread;
this.countDownLatch = countDownLatch;
}
void run()
{
this.thread.join();
this.countDownLatch.countDown();
}
}
回答by 1800 INFORMATION
Since you don't care which one finishes, why not just have a single WaitHandle for all threads and wait on that? Whichever one finishes first can set the handle.
既然你不在乎哪一个完成,为什么不为所有线程设置一个 WaitHandle 并等待它呢?无论谁先完成,都可以设置手柄。
回答by Scott Stanchfield
This is actually pretty easy with wait() and notifyAll().
这实际上很容易使用 wait() 和 notifyAll()。
First, define a lock object. (You can use any class for this, but I like to be explicit):
首先,定义一个锁对象。(您可以为此使用任何类,但我喜欢明确表示):
package com.javadude.sample;
public class Lock {}
Next, define your worker thread. He must notify that lock object when he's finished with his processing. Note that the notify must be in a synchronized block locking on the lock object.
接下来,定义您的工作线程。当他完成他的处理时,他必须通知那个锁对象。请注意,通知必须在锁定对象上的同步块锁定中。
package com.javadude.sample;
public class Worker extends Thread {
private Lock lock_;
private long timeToSleep_;
private String name_;
public Worker(Lock lock, String name, long timeToSleep) {
lock_ = lock;
timeToSleep_ = timeToSleep;
name_ = name;
}
@Override
public void run() {
// do real work -- using a sleep here to simulate work
try {
sleep(timeToSleep_);
} catch (InterruptedException e) {
interrupt();
}
System.out.println(name_ + " is done... notifying");
// notify whoever is waiting, in this case, the client
synchronized (lock_) {
lock_.notify();
}
}
}
Finally, you can write your client:
最后,您可以编写您的客户端:
package com.javadude.sample;
public class Client {
public static void main(String[] args) {
Lock lock = new Lock();
Worker worker1 = new Worker(lock, "worker1", 15000);
Worker worker2 = new Worker(lock, "worker2", 10000);
Worker worker3 = new Worker(lock, "worker3", 5000);
Worker worker4 = new Worker(lock, "worker4", 20000);
boolean started = false;
int numNotifies = 0;
while (true) {
synchronized (lock) {
try {
if (!started) {
// need to do the start here so we grab the lock, just
// in case one of the threads is fast -- if we had done the
// starts outside the synchronized block, a fast thread could
// get to its notification *before* the client is waiting for it
worker1.start();
worker2.start();
worker3.start();
worker4.start();
started = true;
}
lock.wait();
} catch (InterruptedException e) {
break;
}
numNotifies++;
if (numNotifies == 4) {
break;
}
System.out.println("Notified!");
}
}
System.out.println("Everyone has notified me... I'm done");
}
}
回答by james
simple, check out ExecutorCompletionService.
回答by Alex Miller
Why not just create a results queue and wait on the queue? Or more simply, use a CompletionService since that's what it is: an ExecutorService + result queue.
为什么不创建一个结果队列并在队列中等待?或者更简单地说,使用 CompletionService 因为它是这样的:一个 ExecutorService + 结果队列。
回答by ykaganovich
回答by ykaganovich
See this option:
请参阅此选项:
public class WaitForAnyRedux {
private static final int POOL_SIZE = 10;
public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {
List<Callable<T>> callables = new ArrayList<Callable<T>>();
for (final T t : collection) {
Callable<T> callable = Executors.callable(new Thread() {
@Override
public void run() {
synchronized (t) {
try {
t.wait();
} catch (InterruptedException e) {
}
}
}
}, t);
callables.add(callable);
}
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
return executorService.invokeAny(callables);
}
static public void main(String[] args) throws InterruptedException, ExecutionException {
final List<Integer> integers = new ArrayList<Integer>();
for (int i = 0; i < POOL_SIZE; i++) {
integers.add(i);
}
(new Thread() {
public void run() {
Integer notified = null;
try {
notified = waitForAny(integers);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("notified=" + notified);
}
}).start();
synchronized (integers) {
integers.wait(3000);
}
Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
System.out.println("Waking up " + randomInt);
synchronized (randomInt) {
randomInt.notify();
}
}
}