java 使用 ExecutorService 控制任务执行顺序
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/2153663/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Controlling Task execution order with ExecutorService
提问by Wiretap
I have a process which delegates asynch tasks to a pool of threads. I need to ensure that certain tasks are executed in order. So for example
我有一个将异步任务委托给线程池的进程。我需要确保某些任务按顺序执行。所以例如
Tasks arrive in order
任务按顺序到达
Tasks a1, b1, c1, d1 , e1, a2, a3, b2, f1
任务 a1、b1、c1、d1、e1、a2、a3、b2、f1
Tasks can be executed in any order except where there is a natural dependancy, so a1,a2,a3 must be processed in that order by either allocating to the same thread or blocking these until I know the previous a# task was completed.
任务可以按任何顺序执行,除非存在自然依赖性,因此 a1,a2,a3 必须按该顺序处理,方法是分配给同一线程或阻塞它们,直到我知道前一个 a# 任务已完成。
Currently it doesn't use the Java Concurrency package, but I'm considering changing to take avantage of the thread management.
目前它不使用 Java Concurrency 包,但我正在考虑更改以利用线程管理。
Does anyone have a similar solution or suggestions of how to achieve this
有没有人有类似的解决方案或如何实现这一目标的建议
回答by Mike Q
When I've done this in the past I've usually had the ordering handled by a component which then submits callables/runnables to an Executor.
当我过去这样做时,我通常由一个组件处理排序,然后将可调用/可运行的提交给执行程序。
Something like.
就像是。
- Got a list of tasks to run, some with dependencies
- Create an Executor and wrap with an ExecutorCompletionService
- Search all tasks, any with no dependencies, schedule them via the completion service
- Poll the completion service
- As each task completes
- Add it to a "completed" list
- Reevaluate any waiting tasks wrt to the "completed list" to see if they are "dependency complete". If so schedule them
- Rinse repeat until all tasks are submitted/completed
- 获得了要运行的任务列表,其中一些具有依赖项
- 创建一个 Executor 并用 ExecutorCompletionService 包装
- 搜索所有任务,任何没有依赖关系的任务,通过完成服务安排它们
- 轮询完成服务
- 随着每个任务完成
- 将其添加到“已完成”列表中
- 重新评估写入“已完成列表”的任何等待任务,以查看它们是否“依赖完成”。如果是这样安排他们
- 冲洗重复直到所有任务都提交/完成
The completion service is a nice way of being able to get the tasks as they complete rather than trying to poll a bunch of Futures. However you will probably want to keep a Map<Future, TaskIdentifier>which is populated when a task is schedule via the completion service so that when the completion service gives you a completed Future you can figure out which TaskIdentifierit is.
完成服务是一种能够在任务完成时获取任务的好方法,而不是尝试轮询一堆 Future。但是Map<Future, TaskIdentifier>,当通过完成服务安排任务时,您可能希望保留填充的内容,以便当完成服务为您提供已完成的 Future 时,您可以确定TaskIdentifier它是哪个。
If you ever find yourself in a state where tasks are still waiting to run, but nothing is running and nothing can be scheduled then your have a circular dependency problem.
如果你发现自己处于任务仍在等待运行的状态,但没有任何东西在运行,也没有任何东西可以安排,那么你就有了循环依赖问题。
回答by Karry
I write own Executor that warrants task ordering for tasks with same key. It uses map of queues for order tasks with same key. Each keyed task execute next task with the same key.
我编写了自己的 Executor 来保证具有相同密钥的任务的任务排序。它对具有相同键的订单任务使用队列映射。每个键控任务使用相同的键执行下一个任务。
This solution don't handle RejectedExecutionExceptionor other exceptions from delegated Executor! So delegated Executor should be "unlimited".
此解决方案不处理RejectedExecutionException或来自委托 Executor 的其他异常!所以委派的 Executor 应该是“无限的”。
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
/**
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly).
*/
public class OrderingExecutor implements Executor{
private final Executor delegate;
private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();
public OrderingExecutor(Executor delegate){
this.delegate = delegate;
}
@Override
public void execute(Runnable task) {
// task without key can be executed immediately
delegate.execute(task);
}
public void execute(Runnable task, Object key) {
if (key == null){ // if key is null, execute without ordering
execute(task);
return;
}
boolean first;
Runnable wrappedTask;
synchronized (keyedTasks){
Queue<Runnable> dependencyQueue = keyedTasks.get(key);
first = (dependencyQueue == null);
if (dependencyQueue == null){
dependencyQueue = new LinkedList<Runnable>();
keyedTasks.put(key, dependencyQueue);
}
wrappedTask = wrap(task, dependencyQueue, key);
if (!first)
dependencyQueue.add(wrappedTask);
}
// execute method can block, call it outside synchronize block
if (first)
delegate.execute(wrappedTask);
}
private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
return new OrderedTask(task, dependencyQueue, key);
}
class OrderedTask implements Runnable{
private final Queue<Runnable> dependencyQueue;
private final Runnable task;
private final Object key;
public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
this.task = task;
this.dependencyQueue = dependencyQueue;
this.key = key;
}
@Override
public void run() {
try{
task.run();
} finally {
Runnable nextTask = null;
synchronized (keyedTasks){
if (dependencyQueue.isEmpty()){
keyedTasks.remove(key);
}else{
nextTask = dependencyQueue.poll();
}
}
if (nextTask!=null)
delegate.execute(nextTask);
}
}
}
}
回答by cletus
When you submit a Runnableor Callableto an ExecutorServiceyou receive a Futurein return. Have the threads that depend on a1 be passed a1's Futureand call Future.get(). This will block until the thread completes.
当您提交 aRunnable或Callableto 时,ExecutorService您会收到 aFuture作为回报。让依赖于 a1 的线程通过 a1Future并调用Future.get(). 这将阻塞直到线程完成。
So:
所以:
ExecutorService exec = Executor.newFixedThreadPool(5);
Runnable a1 = ...
final Future f1 = exec.submit(a1);
Runnable a2 = new Runnable() {
@Override
public void run() {
f1.get();
... // do stuff
}
}
exec.submit(a2);
and so on.
等等。
回答by Hyman
Another option is to create your own executor, call it OrderedExecutor, and create an array of encapsulated ThreadPoolExecutor objects, with 1 thread per internal executor. You then supply a mechanism for choosing one of the internal objects, eg, you can do this by providing an interface that the user of your class can implement:
另一种选择是创建您自己的执行程序,将其命名为 OrderedExecutor,然后创建一个封装的 ThreadPoolExecutor 对象数组,每个内部执行程序有 1 个线程。然后您提供一种机制来选择一个内部对象,例如,您可以通过提供一个您的类的用户可以实现的接口来做到这一点:
executor = new OrderedExecutor( 10 /* pool size */, new OrderedExecutor.Chooser() {
public int choose( Runnable runnable ) {
MyRunnable myRunnable = (MyRunnable)runnable;
return myRunnable.someId();
});
executor.execute( new MyRunnable() );
The implementation of OrderedExecutor.execute() will then use the Chooser to get an int, you mod this with the pool size, and that's your index into the internal array. The idea being that "someId()" will return the same value for all the "a's", etc.
OrderedExecutor.execute() 的实现然后将使用选择器来获取一个整数,你用池大小修改它,这就是你在内部数组中的索引。这个想法是“someId()”将为所有“a”等返回相同的值。
回答by joaosavio
You can use Executors.newSingleThreadExecutor(), but it will use only one thread to execute your tasks. Another option is to use CountDownLatch. Here is a simple example:
您可以使用 Executors.newSingleThreadExecutor(),但它只会使用一个线程来执行您的任务。另一种选择是使用 CountDownLatch。这是一个简单的例子:
public class Main2 {
public static void main(String[] args) throws InterruptedException {
final CountDownLatch cdl1 = new CountDownLatch(1);
final CountDownLatch cdl2 = new CountDownLatch(1);
final CountDownLatch cdl3 = new CountDownLatch(1);
List<Runnable> list = new ArrayList<Runnable>();
list.add(new Runnable() {
public void run() {
System.out.println("Task 1");
// inform that task 1 is finished
cdl1.countDown();
}
});
list.add(new Runnable() {
public void run() {
// wait until task 1 is finished
try {
cdl1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task 2");
// inform that task 2 is finished
cdl2.countDown();
}
});
list.add(new Runnable() {
public void run() {
// wait until task 2 is finished
try {
cdl2.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task 3");
// inform that task 3 is finished
cdl3.countDown();
}
});
ExecutorService es = Executors.newFixedThreadPool(200);
for (int i = 0; i < 3; i++) {
es.submit(list.get(i));
}
es.shutdown();
es.awaitTermination(1, TimeUnit.MINUTES);
}
}
回答by mpocsaji
I created an OrderingExecutor for this problem. If you pass the same key to to method execute() with different runnables, the execution of the runnables with the same key will be in the order the execute() is called and will never overlap.
我为这个问题创建了一个 OrderingExecutor。如果您将相同的键传递给具有不同可运行对象的方法 execute(),则具有相同键的可运行对象的执行将按照调用 execute() 的顺序进行,并且永远不会重叠。
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
/**
* Special executor which can order the tasks if a common key is given.
* Runnables submitted with non-null key will guaranteed to run in order for the same key.
*
*/
public class OrderedExecutor {
private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
new ConcurrentLinkedQueue<Runnable>());
private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>();
private Executor delegate;
private volatile boolean stopped;
public OrderedExecutor(Executor delegate) {
this.delegate = delegate;
}
public void execute(Runnable runnable, Object key) {
if (stopped) {
return;
}
if (key == null) {
delegate.execute(runnable);
return;
}
Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> {
v.add(runnable);
return v;
});
if (queueForKey == null) {
// There was no running task with this key
Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>());
newQ.add(runnable);
// Use putIfAbsent because this execute() method can be called concurrently as well
queueForKey = taskMap.putIfAbsent(key, newQ);
if (queueForKey != null)
queueForKey.add(runnable);
delegate.execute(new InternalRunnable(key));
}
}
public void shutdown() {
stopped = true;
taskMap.clear();
}
/**
* Own Runnable used by OrderedExecutor.
* The runnable is associated with a specific key - the Queue<Runnable> for this
* key is polled.
* If the queue is empty, it tries to remove the queue from taskMap.
*
*/
private class InternalRunnable implements Runnable {
private Object key;
public InternalRunnable(Object key) {
this.key = key;
}
@Override
public void run() {
while (true) {
// There must be at least one task now
Runnable r = taskMap.get(key).poll();
while (r != null) {
r.run();
r = taskMap.get(key).poll();
}
// The queue emptied
// Remove from the map if and only if the queue is really empty
boolean removed = taskMap.remove(key, EMPTY_QUEUE);
if (removed) {
// The queue has been removed from the map,
// if a new task arrives with the same key, a new InternalRunnable
// will be created
break;
} // If the queue has not been removed from the map it means that someone put a task into it
// so we can safely continue the loop
}
}
}
/**
* Special Queue implementation, with equals() and hashCode() methods.
* By default, Java SE queues use identity equals() and default hashCode() methods.
* This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()).
*
* @param <E> The type of elements in the queue.
*/
private static class QueueWithHashCodeAndEquals<E> implements Queue<E> {
private Queue<E> delegate;
public QueueWithHashCodeAndEquals(Queue<E> delegate) {
this.delegate = delegate;
}
public boolean add(E e) {
return delegate.add(e);
}
public boolean offer(E e) {
return delegate.offer(e);
}
public int size() {
return delegate.size();
}
public boolean isEmpty() {
return delegate.isEmpty();
}
public boolean contains(Object o) {
return delegate.contains(o);
}
public E remove() {
return delegate.remove();
}
public E poll() {
return delegate.poll();
}
public E element() {
return delegate.element();
}
public Iterator<E> iterator() {
return delegate.iterator();
}
public E peek() {
return delegate.peek();
}
public Object[] toArray() {
return delegate.toArray();
}
public <T> T[] toArray(T[] a) {
return delegate.toArray(a);
}
public boolean remove(Object o) {
return delegate.remove(o);
}
public boolean containsAll(Collection<?> c) {
return delegate.containsAll(c);
}
public boolean addAll(Collection<? extends E> c) {
return delegate.addAll(c);
}
public boolean removeAll(Collection<?> c) {
return delegate.removeAll(c);
}
public boolean retainAll(Collection<?> c) {
return delegate.retainAll(c);
}
public void clear() {
delegate.clear();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof QueueWithHashCodeAndEquals)) {
return false;
}
QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj;
return Arrays.equals(toArray(), other.toArray());
}
@Override
public int hashCode() {
return Arrays.hashCode(toArray());
}
}
}
回答by Kirana NS
I have written my won executor service which is sequence aware. It sequences the tasks which contain certain related reference and currently inflight.
我已经编写了我赢得的执行程序服务,它是序列感知的。它对包含某些相关参考和当前正在进行的任务进行排序。
You can go through the implementation at https://github.com/nenapu/SequenceAwareExecutorService
您可以在https://github.com/nenapu/SequenceAwareExecutorService 上完成实施
回答by shams
In Habanero-Java library, there is a concept of data-driven tasks which can be used to express dependencies between tasks and avoid thread-blocking operations. Under the covers Habanero-Java library uses the JDKs ForkJoinPool (i.e. an ExecutorService).
在Habanero-Java 库中,有一个数据驱动任务的概念,可以用来表达任务之间的依赖关系,避免线程阻塞操作。在幕后 Habanero-Java 库使用 JDK 的 ForkJoinPool(即 ExecutorService)。
For example, your use case for tasks A1, A2, A3, ... could be expressed as follows:
例如,您的任务 A1、A2、A3、...的用例可以表示如下:
HjFuture a1 = future(() -> { doA1(); return true; });
HjFuture a2 = futureAwait(a1, () -> { doA2(); return true; });
HjFuture a3 = futureAwait(a2, () -> { doA3(); return true; });
Note that a1, a2, and a3 are just references to objects of type HjFuture and can be maintained in your custom data structures to specify the dependencies as and when the tasks A2 and A3 come in at runtime.
请注意,a1、a2 和 a3 只是对 HjFuture 类型对象的引用,可以在您的自定义数据结构中进行维护,以在任务 A2 和 A3 在运行时进入时指定依赖关系。
There are some tutorial slides available. You can find further documentation as javadoc, API summaryand primers.

