等到Future <T>中的任何一个完成
我运行的异步任务很少,我需要等到至少其中一个完成为止(将来可能需要等待N个任务中的util M完成)。
目前,它们以"未来"的形式出现,所以我需要一些类似的东西
/** * Blocks current thread until one of specified futures is done and returns it. */ public static <T> Future<T> waitForAny(Collection<Future<T>> futures) throws AllFuturesFailedException
像这样吗或者类似的东西,对于Future来说不是必需的。目前,我循环浏览期货,检查一项是否完成,然后入睡一段时间,然后再次检查。这似乎不是最佳解决方案,因为如果我长时间睡眠,那么会增加不必要的延迟,如果我短期睡眠,则可能会影响性能。
我可以尝试使用
new CountDownLatch(1)
并在任务完成时减少倒计时
countdown.await()
,但我发现只有在我控制Future创建时才有可能。这是可能的,但需要重新设计系统,因为当前任务创建的逻辑(将Callable发送给ExecutorService)与决定等待哪个Future分开。我也可以覆盖
<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)
并创建RunnableFuture的自定义实现,并具有将任务完成时通知的侦听器添加到侦听器的功能,然后将侦听器添加到所需的任务并使用CountDownLatch,但这意味着我必须为我使用的每个ExecutorService覆盖newTaskFor,并且可能会有实现不要扩展AbstractExecutorService。我也可以尝试出于相同的目的包装给定的ExecutorService,但是随后我必须装饰产生Futures的所有方法。
所有这些解决方案可能都有效,但看起来非常不自然。好像我缺少一些简单的东西,例如
WaitHandle.WaitAny(WaitHandle[] waitHandles)
在C#中。是否存在针对此类问题的众所周知的解决方案?
更新:
最初,我根本无法使用Future创作,因此没有优雅的解决方案。重新设计系统后,我可以访问Future创建并能够将countDownLatch.countdown()添加到执行过程中,然后我可以countDownLatch.await()正常运行。
感谢我们提供其他答案,我不了解ExecutorCompletionService,它确实对类似任务很有帮助,但是在这种特殊情况下,由于某些Future是在没有任何执行者的情况下创建的,因此无法使用,实际任务是通过网络发送到另一台服务器的。远程完成通知。
解决方案
据我所知,Java与WaitHandle.WaitAny
方法没有类似的结构。
在我看来,这可以通过" WaitableFuture"装饰器来实现:
public WaitableFuture<T> extends Future<T> { private CountDownLatch countDownLatch; WaitableFuture(CountDownLatch countDownLatch) { super(); this.countDownLatch = countDownLatch; } void doTask() { super.doTask(); this.countDownLatch.countDown(); } }
尽管只有将其插入执行代码之前,它才会起作用,因为否则执行代码将没有新的doTask()
方法。但是,如果我们无法以某种方式在执行之前获得对Future对象的控制权,我真的看不到没有轮询就无法执行此操作的方法。
或者,如果将来总是在自己的线程中运行,那么我们可以通过某种方式获得该线程。然后,我们可以产生一个新线程来互相连接线程,然后在连接返回后处理等待机制……这确实很丑陋,但是会引起很多开销。而且,如果某些Future对象没有完成,则根据死线程,我们可能会有很多阻塞线程。如果不小心,可能会泄漏内存和系统资源。
/** * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join(). */ public static joinAny(Collection<Thread> threads, int numberToWaitFor) { CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor); foreach(Thread thread in threads) { (new Thread(new JoinThreadHelper(thread, countDownLatch))).start(); } countDownLatch.await(); } class JoinThreadHelper implements Runnable { Thread thread; CountDownLatch countDownLatch; JoinThreadHelper(Thread thread, CountDownLatch countDownLatch) { this.thread = thread; this.countDownLatch = countDownLatch; } void run() { this.thread.join(); this.countDownLatch.countDown(); } }
由于我们不在乎哪一个完成,为什么不对所有线程仅使用一个WaitHandle并等待呢?谁先完成可以设置手柄。
使用wait()和notifyAll()实际上很容易。
首先,定义一个锁对象。 (我们可以为此使用任何类,但我想明确一点):
package com.javadude.sample; public class Lock {}
接下来,定义工作线程。完成处理后,他必须通知该锁定对象。请注意,通知必须位于锁定在锁对象上的同步块中。
package com.javadude.sample; public class Worker extends Thread { private Lock lock_; private long timeToSleep_; private String name_; public Worker(Lock lock, String name, long timeToSleep) { lock_ = lock; timeToSleep_ = timeToSleep; name_ = name; } @Override public void run() { // do real work -- using a sleep here to simulate work try { sleep(timeToSleep_); } catch (InterruptedException e) { interrupt(); } System.out.println(name_ + " is done... notifying"); // notify whoever is waiting, in this case, the client synchronized (lock_) { lock_.notify(); } } }
最后,我们可以编写客户端:
package com.javadude.sample; public class Client { public static void main(String[] args) { Lock lock = new Lock(); Worker worker1 = new Worker(lock, "worker1", 15000); Worker worker2 = new Worker(lock, "worker2", 10000); Worker worker3 = new Worker(lock, "worker3", 5000); Worker worker4 = new Worker(lock, "worker4", 20000); boolean started = false; int numNotifies = 0; while (true) { synchronized (lock) { try { if (!started) { // need to do the start here so we grab the lock, just // in case one of the threads is fast -- if we had done the // starts outside the synchronized block, a fast thread could // get to its notification *before* the client is waiting for it worker1.start(); worker2.start(); worker3.start(); worker4.start(); started = true; } lock.wait(); } catch (InterruptedException e) { break; } numNotifies++; if (numNotifies == 4) { break; } System.out.println("Notified!"); } } System.out.println("Everyone has notified me... I'm done"); } }
简单,请查看ExecutorCompletionService。
为什么不只创建结果队列并等待队列呢?或者更简单地说,使用CompletionService,因为它就是这样:ExecutorService +结果队列。
ExecutorService.invokeAny
查看此选项:
public class WaitForAnyRedux { private static final int POOL_SIZE = 10; public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException { List<Callable<T>> callables = new ArrayList<Callable<T>>(); for (final T t : collection) { Callable<T> callable = Executors.callable(new Thread() { @Override public void run() { synchronized (t) { try { t.wait(); } catch (InterruptedException e) { } } } }, t); callables.add(callable); } BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE); ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue); return executorService.invokeAny(callables); } static public void main(String[] args) throws InterruptedException, ExecutionException { final List<Integer> integers = new ArrayList<Integer>(); for (int i = 0; i < POOL_SIZE; i++) { integers.add(i); } (new Thread() { public void run() { Integer notified = null; try { notified = waitForAny(integers); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("notified=" + notified); } }).start(); synchronized (integers) { integers.wait(3000); } Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE)); System.out.println("Waking up " + randomInt); synchronized (randomInt) { randomInt.notify(); } } }