java 如何实现 ExecutorService 以轮换执行任务?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/10346187/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to implement an ExecutorService to execute tasks on a rotation-basis?
提问by Gnanam
I'm using java.util.concurrent.ExecutorServicewith fixed thread poolto execute list of tasks. My list of tasks will typically be around 80 - 150 and I've limited the number of threads running at any time to 10 as shown below:
我正在使用带有固定线程池的java.util.concurrent.ExecutorService来执行任务列表。我的任务列表通常在 80 - 150 左右,并且我已将任何时间运行的线程数限制为 10,如下所示:
ExecutorService threadPoolService = Executors.newFixedThreadPool(10);
for ( Runnable task : myTasks )
{
threadPoolService.submit(task);
}
My use case demands that even the completed task should be re-submitted again to the ExecutorServicebut it should be executed/taken again only when all the alreadysubmitted tasks are serviced/completed. That is basically, the tasks submitted should be executed on a rotation-basis. Hence, there will not be either threadPoolService.shutdown()
or threadPoolService.shutdownNow()
call in this case.
我的用例要求即使已完成的任务也应再次重新提交给ExecutorService,但只有在所有已提交的任务都已提交/完成后才应再次执行/执行。也就是说,提交的任务应该在轮换的基础上执行。因此,在这种情况下不会有threadPoolService.shutdown()
or 或threadPoolService.shutdownNow()
call。
My question is, how do I implement ExecutorServiceservicing rotation-basis tasks?
我的问题是,如何实现ExecutorService服务轮换基础任务?
回答by Affe
ThreadPoolExecutor provides an extension point for afterExecution where you can put the job back at the end of the queue.
ThreadPoolExecutor 为 afterExecution 提供了一个扩展点,您可以在其中将作业放回队列的末尾。
public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor {
public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
this.submit(r);
}
}
You'll have to do a little more work of course to instantiate it yourself without the help of ExecutorService
's handy factory method, but the constructors are simple enough to grok.
当然,您必须做更多的工作才能在没有ExecutorService
方便的工厂方法的帮助下自己实例化它,但是构造函数很简单,可以理解。
回答by nobeh
The answer is more related to the implementation of the work queueused for the instance of ExecutorService
. So, I'd suggest:
答案更多地与用于 的实例的工作队列的实现有关ExecutorService
。所以,我建议:
First choose an implementation of
java.util.concurrent.BlockingQueue
(an example) that provides a circular queuefunctionality. NOTE, the reasonBlockingQueue
has been chosen is that to waituntil the next task is provided to queue; so, in case of circular + blockingqueue, you should be careful how to provide the same behavior and functionality.Instead of using
Executors.new...
to create a newThreadPoolExecutor
use a direct constructorsuch as
首先选择提供循环队列功能的
java.util.concurrent.BlockingQueue
(示例)的实现。注意,选择的原因是等待下一个任务提供给队列;因此,在循环 + 阻塞队列的情况下,您应该小心如何提供相同的行为和功能。BlockingQueue
而不是
Executors.new...
用于创建一个新的ThreadPoolExecutor
使用直接构造函数,如
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
This way, unless you command the executor to shutdown
, it will try to fetch the next task from the queue for execution from its work queuewhich is a circularcontainer for tasks.
这样,除非您命令执行程序执行shutdown
,否则它将尝试从队列中获取下一个任务以从其工作队列中执行,该工作队列是任务的循环容器。
回答by Michael Schmei?er
I suggest the following solution which completely uses functionality existing in the standard library concurrency utils. It uses a CyclicBarrier
with a task decorator class and a barrier action which re-submits all tasks:
我建议使用以下解决方案,它完全使用标准库并发工具中现有的功能。它使用CyclicBarrier
带有任务装饰器类和重新提交所有任务的屏障操作:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Rotation {
private static final class RotationDecorator implements Runnable {
private final Runnable task;
private final CyclicBarrier barrier;
RotationDecorator( Runnable task, CyclicBarrier barrier ) {
this.task = task;
this.barrier = barrier;
}
@Override
public void run() {
this.task.run();
try {
this.barrier.await();
} catch(InterruptedException e) {
; // Consider better exception handling
} catch(BrokenBarrierException e) {
; // Consider better exception handling
}
}
}
public void startRotation( List<Runnable> tasks ) {
final ExecutorService threadPoolService = Executors.newFixedThreadPool( 10 );
final List<Runnable> rotatingTasks = new ArrayList<Runnable>( tasks.size() );
final CyclicBarrier barrier = new CyclicBarrier( tasks.size(), new Runnable() {
@Override
public void run() {
Rotation.this.enqueueTasks( threadPoolService, rotatingTasks );
}
} );
for(Runnable task : tasks) {
rotatingTasks.add( new RotationDecorator( task, barrier ) );
}
this.enqueueTasks( threadPoolService, rotatingTasks );
}
private void enqueueTasks( ExecutorService service, List<Runnable> tasks ) {
for(Runnable task : tasks) {
service.submit( task );
}
}
}
回答by assylias
You could simply check that all the tasks have been executed and resubmit them once it is the case, like this for example:
您可以简单地检查所有任务是否已执行,并在情况如此时重新提交,例如:
List<Future> futures = new ArrayList<>();
for (Runnable task : myTasks) {
futures.add(threadPoolService.submit(task));
}
//wait until completion of all tasks
for (Future f : futures) {
f.get();
}
//restart
......
EDIT
It seems you want to resubmit a task as soon as it gets completed. You could use an ExecutorCompletionServicewhich enables you to retrieve tasks as and when they get executed, - see below a simple example with 2 tasks that get resubmitted a few times as soon as they are completed. Sample output:
编辑
您似乎想在任务完成后立即重新提交。您可以使用ExecutorCompletionService,它使您能够在任务执行时检索任务,请参见下面的简单示例,其中包含 2 个任务,这些任务在完成后立即重新提交几次。示例输出:
Task 1 submitted pool-1-thread-1
Task 2 submitted pool-1-thread-2
Task 1 completed pool-1-thread-1
Task 1 submitted pool-1-thread-3
Task 2 completed pool-1-thread-2
Task 1 completed pool-1-thread-3
Task 2 submitted pool-1-thread-4
Task 1 submitted pool-1-thread-5
Task 1 completed pool-1-thread-5
Task 2 completed pool-1-thread-4
任务1提交pool-1-thread-1
任务2提交pool-1-thread-2
任务1完成pool-1-thread-1
任务1提交pool-1-thread-3
任务2完成pool-1-thread-2
任务1完成pool-1-thread-3
任务2提交pool-1-thread-4
任务1提交pool-1-thread-5
任务1完成pool-1-thread-5
任务2完成pool-1-thread-4
public class Test1 {
public final ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
public final AtomicInteger retries = new AtomicInteger();
public final Object lock = new Object();
public static void main(String[] args) throws InterruptedException, ExecutionException {
int count = 0;
List<Runnable> myTasks = new ArrayList<>();
myTasks.add(getRunnable(1));
myTasks.add(getRunnable(2));
ExecutorService threadPoolService = Executors.newFixedThreadPool(10);
CompletionService<Runnable> ecs = new ExecutorCompletionService<Runnable>(threadPoolService);
for (Runnable task : myTasks) {
ecs.submit(task, task);
}
//wait until completion of all tasks
while(count++ < 3) {
Runnable task = ecs.take().get();
ecs.submit(task, task);
}
threadPoolService.shutdown();
}
private static Runnable getRunnable(final int i) {
return new Runnable() {
@Override
public void run() {
System.out.println("Task " + i + " submitted " + Thread.currentThread().getName() + " ");
try {
Thread.sleep(500 * i);
} catch (InterruptedException ex) {
System.out.println("Interrupted");
}
System.out.println("Task " + i + " completed " + Thread.currentThread().getName() + " ");
}
};
}
}