java Executors:如果任务是递归创建的,如何同步等待所有任务完成?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/14535770/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 16:35:24  来源:igfitidea点击:

Executors: How to synchronously wait until all tasks have finished if tasks are created recursively?

javaconcurrencythreadpoolexecutor

提问by Eric

My question is strongly related to this one here. As was posted there, I would like the main thread to wait until the work queue is empty and all tasks have finished. The problem in my situation is, however, that each task may recursively cause new tasks to be submitted for processing. This makes it a little awkward to collect all of those tasks's futures.

我的问题与这里的这个问题密切相关。正如那里发布的那样,我希望主线程等到工作队列为空并且所有任务都完成。然而,我的情况的问题是,每个任务可能会递归地导致提交新任务进行处理。这使得收集所有这些任务的未来有点尴尬。

Our current solution uses a busy-wait loop to await termination:

我们当前的解决方案使用忙等待循环来等待终止:

        do { //Wait until we are done the processing
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    } while (!executor.getQueue().isEmpty()
             || numTasks.longValue() > executor.getCompletedTaskCount());

numTasks is a value that is increased as every new task is created. This works but I think it's not very nice due to the busy waiting. I was wondering whether there is a good way to make the main thread wait synchronously, until being explicitly woken up.

numTasks 是一个随着每个新任务的创建而增加的值。这有效,但我认为由于忙于等待,它不是很好。我想知道是否有一种好方法可以让主线程同步等待,直到被显式唤醒。

采纳答案by Eric

Thanks a lot for all your suggestions!

非常感谢您的所有建议!

In the end I opted for something that I believe to be reasonably simple. I found out that CountDownLatchis almost what I need. It blocks until the counter reaches 0. The only problem is that it can only count down, not up, and thus does not work in the dynamic setting I have where tasks can submit new tasks. I hence implemented a new class CountLatchthat offers additional functionality. (see below) This class I then use as follows.

最后,我选择了一些我认为相当简单的东西。我发现CountDownLatch几乎是我所需要的。它阻塞直到计数器达到 0。唯一的问题是它只能倒计时,不能倒计时,因此在我拥有的任务可以提交新任务的动态设置中不起作用。因此,我实现了一个CountLatch提供附加功能的新类。(见下文)这个类我然后使用如下。

Main thread calls latch.awaitZero(), blocking until latch reaches 0.

主线程调用latch.awaitZero(),阻塞直到闩达到 0。

Any thread, before calling executor.execute(..)calls latch.increment().

任何线索,调用之前executor.execute(..)调用latch.increment()

Any task, just before completing, calls latch.decrement().

任何任务,就在完成之前,调用latch.decrement().

When the last task terminates, the counter will reach 0 and thus release the main thread.

当最后一个任务终止时,计数器将达到 0,从而释放主线程。

Further suggestions and feedback are most welcome!

非常欢迎进一步的建议和反馈!

public class CountLatch {

@SuppressWarnings("serial")
private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected int acquireNonBlocking(int acquires) {
        // increment count
        for (;;) {
            int c = getState();
            int nextc = c + 1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c - 1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

private final Sync sync;

public CountLatch(int count) {
    this.sync = new Sync(count);
}

public void awaitZero() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

public boolean awaitZero(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void increment() {
    sync.acquireNonBlocking(1);
}

public void decrement() {
    sync.releaseShared(1);
}

public String toString() {
    return super.toString() + "[Count = " + sync.getCount() + "]";
}

}

Note that the increment()/decrement()calls can be encapsulated into a customized Executorsubclass as was suggested, for instance, by Sami Korhonen, or with beforeExecuteand afterExecuteas was suggested by impl. See here:

请注意,可以将increment()/decrement()调用封装到自定义Executor子类中,例如 Sami Korhonen所建议的,或者impl 所建议的with beforeExecuteand afterExecute。看这里:

public class CountingThreadPoolExecutor extends ThreadPoolExecutor {

protected final CountLatch numRunningTasks = new CountLatch(0);

public CountingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}

@Override
public void execute(Runnable command) {
    numRunningTasks.increment();
    super.execute(command);
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
    numRunningTasks.decrement();
    super.afterExecute(r, t);
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion() throws InterruptedException {
    numRunningTasks.awaitZero();
}

/**
 * Awaits the completion of all spawned tasks.
 */
public void awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
    numRunningTasks.awaitZero(timeout, unit);
}

}

回答by BrandonK.

Java 7 provides a synchronizer that fits this use case called Phaser. It's a re-usable hybrid of a CountDownLatch and CyclicBarrier that can both increase and decrease the number of registered parties (similar to an incrementable CountDownLatch).

Java 7 提供了一个适合此用例的同步器,称为Phaser。它是 CountDownLatch 和 CyclicBarrier 的可重用混合体,可以增加和减少注册方的数量(类似于可增加的 CountDownLatch)。

The basic pattern to using the phaser in this scenario is to registertasks with the phaser when created and arrivewhen completed. When the number of arrived parties matches the number of registered, the phaser "advances" to the next phase, notifying any waitingthreads of the advance when it takes place.

在此场景中使用移相器的基本模式是在创建时向移相器注册任务并在完成时到达。当到达方的数量与注册的数量相匹配时,移相器“前进”到下一阶段,并在发生时通知任何等待的线程。

Here's an example I've created of waiting for recursive task completion. It naively finds the first few numbers of the Fibonacci sequence for demonstration purposes:

这是我创建的一个等待递归任务完成的示例。出于演示目的,它天真地找到了斐波那契数列的前几个数字:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;

/**
 * An example of using a Phaser to wait for the completion of recursive tasks.
 * @author Voxelot
 */
public class PhaserExample {
    /** Workstealing threadpool with reduced queue contention. */
    private static ForkJoinPool executors;

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) throws InterruptedException {
        executors = new ForkJoinPool();
        List<Long> sequence = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            sequence.add(fib(i));
        }
        System.out.println(sequence);
    }

    /**
     * Computes the nth Fibonacci number in the Fibonacci sequence.
     * @param n The index of the Fibonacci number to compute
     * @return The computed Fibonacci number
     */
    private static Long fib(int n) throws InterruptedException {
        AtomicLong result = new AtomicLong();
        //Flexible sychronization barrier
        Phaser phaser = new Phaser();
        //Base task
        Task initialTask = new Task(n, result, phaser);
        //Register fib(n) calling thread
        phaser.register();
        //Submit base task
        executors.submit(initialTask);
        //Make the calling thread arrive at the synchronization
        //barrier and wait for all future tasks to arrive.
        phaser.arriveAndAwaitAdvance();
        //Get the result of the parallel computation.
        return result.get();
    }

    private static class Task implements Runnable {
        /** The Fibonacci sequence index of this task. */
        private final int index;
        /** The shared result of the computation. */
        private final AtomicLong result;
        /** The synchronizer. */
        private final Phaser phaser;

        public Task(int n, AtomicLong result, Phaser phaser) {
            index = n;
            this.result = result;
            this.phaser = phaser;
            //Inform synchronizer of additional work to complete.
            phaser.register();
        }

        @Override
        public void run() {
            if (index == 1) {
                result.incrementAndGet();
            } else if (index > 1) {
                //recurrence relation: Fn = Fn-1 + Fn-2
                Task task1 = new Task(index - 1, result, phaser);
                Task task2 = new Task(index - 2, result, phaser);
                executors.submit(task1);
                executors.submit(task2);
            }
            //Notify synchronizer of task completion.
            phaser.arrive();
        }
    }
}

回答by Sami Korhonen

This one was actually rather interesting problem to solve. I must warn that I have not tested the code fully.

这实际上是一个相当有趣的问题要解决。我必须警告我还没有完全测试代码。

The idea is to simply track the task execution:

这个想法是简单地跟踪任务执行:

  • if task is successfully queued, counter is incremented by one
  • if task is cancelled and it has not been executed, counter is decremented by one
  • if task has been executed, counter is decremented by one
  • 如果任务成功排队,计数器加一
  • 如果任务被取消且尚未执行,则计数器减一
  • 如果任务已经执行,计数器减一

When shutdown is called and there are pending tasks, delegate will not call shutdown on the actual ExecutorService. It will allow queuing new tasks until pending task count reaches zero and shutdown is called on actual ExecutorService.

当调用 shutdown 并且有挂起的任务时,delegate 不会在实际的 ExecutorService 上调用 shutdown。它将允许对新任务进行排队,直到挂起的任务计数达到零并在实际的 ExecutorService 上调用关闭。

public class ResilientExecutorServiceDelegate implements ExecutorService {
    private final ExecutorService executorService;
    private final AtomicInteger pendingTasks;
    private final Lock readLock;
    private final Lock writeLock;
    private boolean isShutdown;

    public ResilientExecutorServiceDelegate(ExecutorService executorService) {
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        this.pendingTasks = new AtomicInteger();
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
        this.executorService = executorService;
        this.isShutdown = false;
    }

    private <T> T addTask(Callable<T> task) {
        T result;
        boolean success = false;
        // Increment pending tasks counter
        incrementPendingTaskCount();
        try {
            // Call service
            result = task.call();
            success = true;
        } catch (RuntimeException exception) {
            throw exception;
        } catch (Exception exception) {
            throw new RejectedExecutionException(exception);
        } finally {
            if (!success) {
                // Decrement pending tasks counter
                decrementPendingTaskCount();
            }
        }
        return result;
    }

    private void incrementPendingTaskCount() {
        pendingTasks.incrementAndGet();
    }

    private void decrementPendingTaskCount() {
        readLock.lock();
        if (pendingTasks.decrementAndGet() == 0 && isShutdown) {
            try {
                // Shutdown
                executorService.shutdown();
            } catch (Throwable throwable) {
            }
        }
        readLock.unlock();
    }

    @Override
    public void execute(final Runnable task) {
        // Add task
        addTask(new Callable<Object>() {
            @Override
            public Object call() {
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            task.run();
                        } finally {
                            decrementPendingTaskCount();
                        }
                    }
                });
                return null;
            }
        });
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        // Call service
        return executorService.awaitTermination(timeout, unit);
    }

    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> List<Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAll(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
        // It's ok to increment by just one
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
            long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        incrementPendingTaskCount();
        try {
            return executorService.invokeAny(tasks, timeout, unit);
        } finally {
            decrementPendingTaskCount();
        }
    }

    @Override
    public boolean isShutdown() {
        return isShutdown;
    }

    @Override
    public boolean isTerminated() {
        return executorService.isTerminated();
    }

    @Override
    public void shutdown() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        try {
            if (pendingTasks.get() == 0) {
                // Real shutdown
                executorService.shutdown();
            }
        } finally {
            // Unlock write lock
            writeLock.unlock();
        }
    }

    @Override
    public List<Runnable> shutdownNow() {
        // Lock write lock
        writeLock.lock();
        // Set as shutdown
        isShutdown = true;
        // Unlock write lock
        writeLock.unlock();

        return executorService.shutdownNow();
    }

    @Override
    public <T> Future<T> submit(final Callable<T> task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(
                        executorService.submit(new Callable<T>() {
                            @Override
                            public T call() throws Exception {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    return task.call();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public Future<?> submit(final Runnable task) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<?>>() {
            @Override
            @SuppressWarnings("unchecked")
            public Future<?> call() {
                return new FutureDelegate<Object>(
                        (Future<Object>) executorService.submit(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }), futureExecutionStatus);
            }
        });
    }

    @Override
    public <T> Future<T> submit(final Runnable task, final T result) {
        // Create execution status
        final FutureExecutionStatus futureExecutionStatus = new FutureExecutionStatus();
        // Add task
        return addTask(new Callable<Future<T>>() {
            @Override
            public Future<T> call() {
                return new FutureDelegate<T>(executorService.submit(
                        new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    // Mark as executed
                                    futureExecutionStatus.setExecuted();
                                    // Run the actual task
                                    task.run();
                                } finally {
                                    decrementPendingTaskCount();
                                }
                            }
                        }, result), futureExecutionStatus);
            }
        });
    }

    private class FutureExecutionStatus {
        private volatile boolean executed;

        public FutureExecutionStatus() {
            executed = false;
        }

        public void setExecuted() {
            executed = true;
        }

        public boolean isExecuted() {
            return executed;
        }
    }

    private class FutureDelegate<T> implements Future<T> {
        private Future<T> future;
        private FutureExecutionStatus executionStatus;

        public FutureDelegate(Future<T> future,
                FutureExecutionStatus executionStatus) {
            this.future = future;
            this.executionStatus = executionStatus;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = future.cancel(mayInterruptIfRunning);
            if (cancelled) {
                // Lock read lock
                readLock.lock();
                // If task was not executed
                if (!executionStatus.isExecuted()) {
                    decrementPendingTaskCount();
                }
                // Unlock read lock
                readLock.unlock();
            }
            return cancelled;
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            return future.get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            return future.get(timeout, unit);
        }

        @Override
        public boolean isCancelled() {
            return future.isCancelled();
        }

        @Override
        public boolean isDone() {
            return future.isDone();
        }
    }
}

回答by Eng.Fouad

Why don't you use a counter? For example:

你为什么不使用计数器?例如:

private AtomicInteger counter = new AtomicInteger(0);

and increment the counter by one just before submitting the task to the queue:

并在将任务提交到队列之前将计数器加一:

counter.incrementAndGet();

and decrement it by one at the end of the task:

并在任务结束时将其减一:

counter.decrementAndGet();

and the check would be something like:

和支票将是这样的:

// ...
while (counter.get() > 0);

回答by impl

Since the last task doesn't know that it's the last, I actually don't think it's possible to have this work 100% correctly without recording both when tasks launch and when they complete.

由于最后一个任务不知道它是最后一个,我实际上认为如果不记录任务启动和完成时间,就不可能 100% 正确地完成这项工作。

If memory serves me right, the getQueue()method returns a queue containing only tasks that are still waiting to be executed, not ones that are currently running. Furthermore, getCompletedTaskCount()is approximate.

如果我没记错的话,该getQueue()方法会返回一个队列,其中只包含仍在等待执行的任务,而不是当前正在运行的任务。此外,getCompletedTaskCount()是近似的。

The solution I'm pondering goes something like this, using an atomic counter like in Eng.Fouad's answer and a Conditionfor signaling the main thread to wake up (pardon the shortcuts for simplicity):

我正在考虑的解决方案是这样的,使用 Eng.Fouad 的答案中的原子计数器和通知主线程唤醒的条件(请原谅为简单起见的快捷方式):

public class MyThreadPoolExecutorState {

    public final Lock lock = new ReentrantLock();
    public final Condition workDone = lock.newCondition();
    public boolean workIsDone = false;

}

public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    private final MyThreadPoolExecutorState state;
    private final AtomicInteger counter = new AtomicInteger(0);

    public MyThreadPoolExecutor(MyThreadPoolExecutorState state, ...) {
        super(...);
        this.state = state;
    }

    protected void beforeExecute(Thread t, Runnable r) {
        this.counter.incrementAndGet();
    }

    protected void afterExecute(Runnable r, Throwable t) {
        if(this.counter.decrementAndGet() == 0) {
            this.state.lock.lock();
            try {
                this.state.workIsDone = true;
                this.state.workDone.signal();
            }
            finally {
                this.state.lock.unlock();
            }
        }
    }

}

public class MyApp {

    public static void main(...) {

        MyThreadPoolExecutorState state = new MyThreadPoolExecutorState();
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(state, ...);

        // Fire ze missiles!
        executor.submit(...);

        state.lock.lock();
        try {
            while(state.workIsDone == false) {
                state.workDone.await();
            }
        }
        finally {
            state.lock.unlock();
        }

    }

}

It could be a little more elegant (maybe just provide a getState()in your thread pool executor or something?), but I think it should get the job done. It's also untested, so implement at your own peril...

它可能更优雅一些(也许只是getState()在您的线程池执行程序中提供一个?),但我认为它应该可以完成工作。它也未经测试,因此请自行承担风险......

It is worth noting that this solution will definitely fail if there are no tasks to be executed -- it'll await the signal indefinitely. So don't even bother starting the executor if you have no tasks to run.

值得注意的是,如果没有要执行的任务,这个解决方案肯定会失败——它会无限期地等待信号。因此,如果您没有要运行的任务,甚至不必费心启动执行程序。



Edit:On second thought, incrementing the atomic counter should happen upon submission, not immediately before task execution (because queuing could cause the counter to fall to 0 prematurely). It probably makes sense to override the submit(...)methods instead, and possibly also remove(...)and shutdown()(if you use them). The general idea remains the same, though. (But the more I think about it, the less pretty it is.)

编辑:再想一想,增加原子计数器应该在提交时发生,而不是在任务执行之前(因为排队可能导致计数器过早下降到 0)。重写这些submit(...)方法可能是有意义的,也可能remove(...)shutdown()(如果你使用它们)。不过,总体思路保持不变。(但我想得越多,它就越不漂亮。)

I'd also check out the internals of the class to see if you can glean any knowledge from it: http://hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java. The tryTerminate()method looks interesting.

我还会检查类的内部结构,看看您是否可以从中收集任何知识:http: //hg.openjdk.java.net/build-infra/jdk7/jdk/file/0f8da27a3ea3/src/share/类/java/util/concurrent/ThreadPoolExecutor.java。该tryTerminate()方法看起来很有趣。

回答by Ralf H

You could use an atomic counter to count the submit (like has been said, before actually submitting). Combine this with a semaphore and release it in the afterExecutehook that a ThreadPoolExecutorprovides. Instead of busy-waiting, call semaphore.acquire( counter.get())after the first round of jobs has been submitted. But the number of acquires will be too small when calling acquire since the counter may increase later on. You would have to loop the acquire calls, with the increase since the last call as the argument, until the counter does not increase anymore.

您可以使用原子计数器来计算提交(就像已经说过的那样,在实际提交之前)。将它与信号量结合起来,并在afterExecuteaThreadPoolExecutor提供的钩子中释放它。semaphore.acquire( counter.get())在第一轮作业提交后调用,而不是忙于等待。但是在调用acquire时acquire的次数会太少,因为以后计数器可能会增加。您必须循环获取调用,并将自上次调用以来的增加作为参数,直到计数器不再增加。

回答by assylias

One of the suggested options in the answers you link to is to use a CompletionService

您链接到的答案中的建议选项之一是使用CompletionService

You could replace the busy waiting in your main thread with:

您可以将主线程中的繁忙等待替换为:

while (true) {
    Future<?> f = completionService.take(); //blocks until task completes
    if (executor.getQueue().isEmpty()
         && numTasks.longValue() == executor.getCompletedTaskCount()) break;
}

Note that getCompletedTaskCountonly returns an approximate number so you might need to find a better exit condition.

请注意,getCompletedTaskCount仅返回一个近似数字,因此您可能需要找到更好的退出条件。

回答by thkala

Java 7 has incorporated support for recursive tasks via its ForkJoinPoolexecutor. It is quite simple to useand scales quite well, as long as the tasks themselves are not too trivial. Essentially it provides a controlled interface that allows tasks to wait for the completion of any sub-tasks without blocking the underlying thread indefinitely.

Java 7 通过其ForkJoinPool执行器整合了对递归任务的支持。只要任务本身不是太琐碎,它使用起来非常简单并且可以很好地扩展。本质上,它提供了一个受控接口,允许任务等待任何子任务的完成,而不会无限期地阻塞底层线程。

回答by Dewfy

If you know number of threads to wait and can paste one line of code to increase number for each thread with help of CountDownLatch ( http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html) It can resolve you problem

如果您知道要等待的线程数,并且可以在 CountDownLatch ( http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ ) 的帮助下粘贴一行代码以增加每个线程的数量 CountDownLatch.html) 它可以解决你的问题