Java ExecutorService:awaitTermination 所有递归创建的任务

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/4958330/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-30 08:47:12  来源:igfitidea点击:

Java ExecutorService: awaitTermination of all recursively created tasks

javamultithreadingconcurrencyexecutorservice

提问by Christoph

I use an ExecutorServiceto execute a task. This task can recursively create other tasks which are submitted to the same ExecutorServiceand those child tasks can do that, too.

我使用 anExecutorService来执行任务。这个任务可以递归地创建提交给相同任务的其他任务,ExecutorService这些子任务也可以这样做。

I now have the problem that I want to wait until all the tasks are done (that is, all tasks are finished and they did not submit new ones) before I continue.

我现在有一个问题,我想等到所有任务都完成(即所有任务都完成并且他们没有提交新任务)再继续。

I cannot call ExecutorService.shutdown()in the main thread because this prevents new tasks from being accepted by the ExecutorService.

我无法调用ExecutorService.shutdown()主线程,因为这会阻止ExecutorService.

And Calling ExecutorService.awaitTermination()seems to do nothing if shutdownhasn't been called.

ExecutorService.awaitTermination()如果shutdown没有被调用,调用似乎什么都不做。

So I am kinda stuck here. It can't be that hard for the ExecutorServiceto see that all workers are idle, can it? The only inelegant solution I could come up with is to directly use a ThreadPoolExecutorand query its getPoolSize()every once in a while. Is there really no better way do do that?

所以我有点卡在这里。ExecutorService看到所有工人都闲着应该不会那么难吧?我能想出的唯一不优雅的解决方案是直接使用 aThreadPoolExecutorgetPoolSize()每隔一段时间查询一次。真的没有更好的方法来做到这一点吗?

采纳答案by axtavt

If number of tasks in the tree of recursive tasks is initially unknown, perhaps the easiest way would be to implement your own synchronization primitive, some kind of "inverse semaphore", and share it among your tasks. Before submitting each task you increment a value, when task is completed, it decrements that value, and you wait until the value is 0.

如果递归任务树中的任务数量最初未知,也许最简单的方法是实现您自己的同步原语,某种“反向信号量”,并在您的任务之间共享它。在提交每个任务之前,您增加一个值,当任务完成时,它会减少该值,然后您等待该值变为 0。

Implementing it as a separate primitive explicitly called from tasks decouples this logic from the thread pool implementation and allows you to submit several independent trees of recursive tasks into the same pool.

将它实现为从任务中显式调用的单独原语可以将此逻辑与线程池实现分离,并允许您将递归任务的多个独立树提交到同一个池中。

Something like this:

像这样的东西:

public class InverseSemaphore {
    private int value = 0;
    private Object lock = new Object();

    public void beforeSubmit() {
        synchronized(lock) {
            value++;
        }
    }

    public void taskCompleted() {
        synchronized(lock) {
            value--;
            if (value == 0) lock.notifyAll();
        }
    }

    public void awaitCompletion() throws InterruptedException {
        synchronized(lock) {
            while (value > 0) lock.wait();
        }
    }
}

Note that taskCompleted()should be called inside a finallyblock, to make it immune to possible exceptions.

请注意,taskCompleted()应该在finally块内调用,以使其免受可能的异常的影响。

Also note that beforeSubmit()should be called by the submitting thread before the task is submitted, not by the task itself, to avoid possible "false completion" when old tasks are completed and new ones not started yet.

另请注意,beforeSubmit()应在提交任务之前由提交线程调用,而不是由任务本身调用,以避免在旧任务完成而新任务尚未启动时可能出现“错误完成”。

EDIT:Important problem with usage pattern fixed.

编辑:修复了使用模式的重要问题。

回答by John Vint

This really is an ideal candidate for a Phaser. Java 7 is coming out with this new class. Its a flexible CountdonwLatch/CyclicBarrier. You can get a stable version at JSR 166 Interest Site.

这确实是 Phaser 的理想人选。Java 7 即将推出这个新类。它是一个灵活的 CountdonwLatch/CyclicBarrier。您可以在JSR 166 Interest Site获得稳定版本。

The way it is a more flexible CountdownLatch/CyclicBarrier is because it is able to not only support an unknown number of parties (threads) but its also reusable (thats where the phase part comes in)

它是一个更灵活的 CountdownLatch/CyclicBarrier 的方式是因为它不仅能够支持未知数量的参与方(线程),而且它也是可重用的(这就是阶段部分的用武之地)

For each task you submit you would register, when that task is completed you arrive. This can be done recursively.

对于您提交的每项任务,您都会进行注册,完成该任务后您就会到达。这可以递归地完成。

Phaser phaser = new Phaser();
ExecutorService e = //

Runnable recursiveRunnable = new Runnable(){
   public void run(){
      //do work recursively if you have to

      if(shouldBeRecursive){
           phaser.register();
           e.submit(recursiveRunnable);
      }

      phaser.arrive();
   }
}

public void doWork(){
   int phase = phaser.getPhase();

   phaser.register();
   e.submit(recursiveRunnable);

   phaser.awaitAdvance(phase);
}

Edit:Thanks @depthofreality for pointing out the race condition in my previous example. I am updating it so that executing thread only awaits advance of the current phase as it blocks for the recursive function to complete.

编辑:感谢@depthofreality 在我之前的例子中指出了竞争条件。我正在更新它,以便执行线程只等待当前阶段的推进,因为它会阻止递归函数完成。

The phase number won't trip until the number of arrives == registers. Since prior to each recursive call invokes registera phase increment will happen when all invocations are complete.

直到arrives == registers的数量,阶段号才会跳闸。由于在每次递归调用之前,register当所有调用都完成时,将发生阶段增量。

回答by Christoph

Wow, you guys are quick:)

哇,你们好快:)

Thank you for all the suggestions. Futures don't easily integrate with my model because I don't know how many runnables are scheduled beforehand. So if I keep a parent task alive just to wait for it's recursive child tasks to finish I have a lot of garbage laying around.

谢谢你的所有建议。Futures 不容易与我的模型集成,因为我不知道事先安排了多少可运行程序。因此,如果我让父任务保持活动状态只是为了等待它的递归子任务完成,我就会有很多垃圾。

I solved my problem using the AtomicInteger suggestion. Essentially, I subclassed ThreadPoolExecutor and increment the counter on calls to execute() and decrement on calls to afterExecute(). When the counter gets 0 I call shutdown(). This seems to work for my problems, not sure if that's a generally good way to do that. Especially, I assume that you only use execute() to add Runnables.

我使用 AtomicInteger 建议解决了我的问题。本质上,我继承了 ThreadPoolExecutor 并在调用 execute() 时增加计数器并在调用 afterExecute() 时减少计数器。当计数器变为 0 时,我调用 shutdown()。这似乎适用于我的问题,不确定这是否是一个普遍的好方法。特别是,我假设您只使用 execute() 添加 Runnables。

As a side node: I first tried to check in afterExecute() the number of Runnables in the queue and the number of workers that are active and shutdown when those are 0; but that didn't work because not all Runnables showed up in the queue and the getActiveCount() didn't do what I expected either.

作为一个侧节点:我首先尝试在 afterExecute() 中检查队列中的 Runnable 数量以及当它们为 0 时处于活动状态和关闭状态的工作线程数量;但这不起作用,因为并非所有 Runnables 都出现在队列中,而且 getActiveCount() 也没有达到我的预期。

Anyhow, here's my solution: (if anybody finds serious problems with this, please let me know:)

无论如何,这是我的解决方案:(如果有人发现严重的问题,请告诉我:)

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final AtomicInteger executing = new AtomicInteger(0);

    public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime,
        TimeUnit seconds, BlockingQueue<Runnable> queue) {
        super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue);
    }


    @Override
    public void execute(Runnable command) {
        //intercepting beforeExecute is too late!
        //execute() is called in the parent thread before it terminates
        executing.incrementAndGet();
        super.execute(command);
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        int count = executing.decrementAndGet();
        if(count == 0) {
            this.shutdown();
        }
    }

}

回答by Peter Lawrey

You could create your own thread pool which extends ThreadPoolExecutor. You want to know when a task has been submitted and when it completes.

您可以创建自己的线程池来扩展ThreadPoolExecutor。您想知道任务何时提交以及何时完成。

public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    private int counter = 0;

    public MyThreadPoolExecutor() {
        super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    public synchronized void execute(Runnable command) {
        counter++;
        super.execute(command);
    }

    @Override
    protected synchronized void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        counter--;
        notifyAll();
    }

    public synchronized void waitForExecuted() throws InterruptedException {
        while (counter == 0)
            wait();
    }
}

回答by Joel

Use a Futurefor your tasks (instead of submitting Runnable's), a callback updates it's state when it's completed, so you can use Future.isDoneto track the sate of all your tasks.

为您的任务使用Future(而不是submitRunnable的),回调会在完成时更新它的状态,因此您可以使用Future.isDone来跟踪所有任务的状态。

回答by Suraj Chandran

Use CountDownLatch. Pass the CountDownLatch object to each of your tasks and code your tasks something like below.

使用CountDownLatch。将 CountDownLatch 对象传递给您的每个任务,并对您的任务进行如下编码。

public void doTask() {
    // do your task
    latch.countDown(); 
}

Whereas the thread which needs to wait should execute the following code:

而需要等待的线程应该执行以下代码:

public void doWait() {
    latch.await();
}

But ofcourse, this assumes you already know the number of child tasks so that you could initialize the latch's count.

但是,当然,这假设您已经知道子任务的数量,以便您可以初始化闩锁的计数。

回答by kittylyst

If you want to use JSR166y classes - e.g. Phaser or Fork/Join - either of which might work for you, you can always download the Java 6 backport of them from: http://gee.cs.oswego.edu/dl/concurrency-interest/and use that as a basis rather than writing a completely homebrew solution. Then when 7 comes out you can just drop the dependency on the backport and change a few package names.

如果您想使用 JSR166y 类 - 例如 Phaser 或 Fork/Join - 任何一个都可能适合您,您可以随时从以下位置下载它们的 Java 6 backport:http: //gee.cs.oswego.edu/dl/concurrency -interest/并将其用作基础,而不是编写完全自制的解决方案。然后当 7 出现时,您可以删除对向后移植的依赖并更改一些包名称。

(Full disclosure: We've been using the LinkedTransferQueue in prod for a while now. No issues)

(完全披露:我们已经在 prod 中使用 LinkedTransferQueue 一段时间了。没问题)

回答by hariprasad

I must say, that solutions described above of problem with recursive calling task and wait for end suborder tasks doesn't satisfy me. There is my solution inspired by original documentation from Oracle there: CountDownLatchand example there: Human resources CountDownLatch.

我必须说,上面描述的递归调用任务和等待结束子订单任务的问题的解决方案并不满足我。我的解决方案灵感来自 Oracle 的原始文档:CountDownLatch和示例:Human resources CountDownLatch

The first common thread in process in instance of class HRManagerCompact has waiting latch for two daughter's threads, wich has waiting latches for their subsequent 2 daughter's threads... etc.

类 HRManagerCompact 实例中的第一个公共线程具有等待两个子线程的锁存器,其中等待它们的后续 2 个子线程的锁存器......等等。

Of course, latch can be set on the different value than 2 (in constructor of CountDownLatch), as well as the number of runnable objects can be established in iteration i.e. ArrayList, but it must correspond (number of count downs must be equal the parameter in CountDownLatch constructor).

当然,latch 可以设置为与 2 不同的值(在 CountDownLatch 的构造函数中),也可以在迭代中建立可运行对象的数量,即 ArrayList,但必须对应(倒计时的数量必须等于参数在 CountDownLatch 构造函数中)。

Be careful, the number of latches increases exponentially according restriction condition: 'level.get() < 2', as well as the number of objects. 1, 2, 4, 8, 16... and latches 0, 1, 2, 4... As you can see, for four levels (level.get() < 4) there will be 15 waiting threads and 7 latches in the time, when peak 16 threads are running.

请注意,闩锁的数量根据限制条件呈指数增长:'level.get() < 2',以及对象的数量。1, 2, 4, 8, 16... 和闩锁 0, 1, 2, 4... 如您所见,对于四个级别(level.get() < 4),将有 15 个等待线程和 7 个闩锁在峰值 16 个线程运行的时候。

package processes.countdownlatch.hr;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/** Recursively latching running classes to wait for the peak threads
 *
 * @author hariprasad
 */
public class HRManagerCompact extends Thread {
  final int N = 2; // number of daughter's tasks for latch
  CountDownLatch countDownLatch;
  CountDownLatch originCountDownLatch;
  AtomicInteger level = new AtomicInteger(0);
  AtomicLong order = new AtomicLong(0); // id latched thread waiting for

  HRManagerCompact techLead1 = null;
  HRManagerCompact techLead2 = null;
  HRManagerCompact techLead3 = null;

// constructor
public HRManagerCompact(CountDownLatch countDownLatch, String name,
    AtomicInteger level, AtomicLong order){
  super(name);
  this.originCountDownLatch=countDownLatch;
  this.level = level;
  this.order = order;
 }

 private void doIt() {
    countDownLatch = new CountDownLatch(N);
    AtomicInteger leveli = new AtomicInteger(level.get() + 1);
    AtomicLong orderi = new AtomicLong(Thread.currentThread().getId());
    techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi);
    techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi);
    //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli);

    techLead1.start();
    techLead2.start();
    //techLead3.start();

    try {
     synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread
       System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi);
       countDownLatch.await(); // wait actual thread
     }
     System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
  }

 @Override
 public void run() {
  try {
   System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId());
   Thread.sleep(10*level.intValue());
   if (level.get() < 2) doIt();
   Thread.yield();
  }
  catch (Exception e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  /*catch (InterruptedException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }*/
  // TODO Auto-generated method stub
  System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId());
  originCountDownLatch.countDown(); // count down
 }

 public static void main(String args[]){
  AtomicInteger levelzero = new AtomicInteger(0);
  HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue()));
  hr.doIt();
 }
}

Possible commented output (with some probability):

可能的注释输出(有一定的概率):

first: working... 1, 1, 10 // thread 1, first daughter's task (10)
second: working... 1, 1, 11 // thread 1, second daughter's task (11)
first: working... 2, 10, 12 // thread 10, first daughter's task (12)
first: working... 2, 11, 14 // thread 11, first daughter's task (14)
second: working... 2, 11, 15 // thread 11, second daughter's task (15)
second: working... 2, 10, 13 // thread 10, second daughter's task (13)
--- first: recruted 2, 10, 12 // finished 12
--- first: recruted 2, 11, 14 // finished 14
--- second: recruted 2, 10, 13  // finished 13 (now can be opened latch 10)
--- second: recruted 2, 11, 15  // finished 15 (now can be opened latch 11)
*** HR Manager waiting for recruitment to complete... 0, 0, 1
*** HR Manager waiting for recruitment to complete... 1, 1, 10
*** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened
--- first: recruted 1, 1, 10 // finished 10
*** HR Manager waiting for recruitment to complete... 1, 1, 11
*** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened
--- second: recruted 1, 1, 11  // finished 11 (now can be opened latch 1)
*** Distribute Offer Letter, it means finished. 0, 0, 1  // latch on 1 opened

回答by Ravindra babu

The only inelegant solution I could come up with is to directly use a ThreadPoolExecutor and query its getPoolSize() every once in a while. Is there really no better way do do that?

我能想到的唯一不优雅的解决方案是直接使用 ThreadPoolExecutor 并每隔一段时间查询它的 getPoolSize() 。真的没有更好的方法来做到这一点吗?

You have to use shutdown() ,awaitTermination()and shutdownNow()methods in a proper sequence.

您必须以正确的顺序使用shutdown() ,awaitTermination()and shutdownNow()方法。

shutdown(): Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.

shutdown():启动有序关闭,其中执行先前提交的任务,但不会接受新任务。

awaitTermination():Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

awaitTermination():阻塞直到所有任务在关闭请求后完成执行,或超时发生,或当前线程被中断,以先发生者为准。

shutdownNow(): Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.

shutdownNow(): 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。

Recommended way from oracle documentation page of ExecutorService:

来自ExecutorService 的oracle 文档页面的推荐方式:

 void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }

You can replace if condition with while condition in case of long duration in completion of tasks as below:

如果任务完成时间较长,您可以将 if 条件替换为 while 条件,如下所示:

Change

改变

if (!pool.awaitTermination(60, TimeUnit.SECONDS))

To

 while(!pool.awaitTermination(60, TimeUnit.SECONDS)) {
     Thread.sleep(60000);
 }  

You can refer to other alternatives (except join(), which can be used with standalone thread ) in :

您可以在 中参考其他替代方案(除了join(),可以与独立线程一起使用):

wait until all threads finish their work in java

等到所有线程在 java 中完成它们的工作

回答by Matej Tymes

You could use a runner that keeps track of running threads:

您可以使用跟踪正在运行的线程的运行程序:

Runner runner = Runner.runner(numberOfThreads);

runner.runIn(2, SECONDS, callable);
runner.run(callable);


// blocks until all tasks are finished (or failed)
runner.waitTillDone();


// and reuse it
runner.runRunnableIn(500, MILLISECONDS, runnable);


runner.waitTillDone();


// and then just kill it
runner.shutdownAndAwaitTermination();

to use it you just add a dependency:

要使用它,您只需添加一个依赖项:

compile 'com.github.matejtymes:javafixes:1.3.0'

编译 'com.github.matejtymes:javafixes:1.3.0'