java 如何实现 ExecutorService 以轮换执行任务?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/10346187/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 00:39:36  来源:igfitidea点击:

How to implement an ExecutorService to execute tasks on a rotation-basis?

javamultithreadingthreadpool

提问by Gnanam

I'm using java.util.concurrent.ExecutorServicewith fixed thread poolto execute list of tasks. My list of tasks will typically be around 80 - 150 and I've limited the number of threads running at any time to 10 as shown below:

我正在使用带有固定线程池的java.util.concurrent.ExecutorService来执行任务列表。我的任务列表通常在 80 - 150 左右,并且我已将任何时间运行的线程数限制为 10,如下所示:

ExecutorService threadPoolService = Executors.newFixedThreadPool(10);

for ( Runnable task : myTasks ) 
{     
    threadPoolService.submit(task); 
}

My use case demands that even the completed task should be re-submitted again to the ExecutorServicebut it should be executed/taken again only when all the alreadysubmitted tasks are serviced/completed. That is basically, the tasks submitted should be executed on a rotation-basis. Hence, there will not be either threadPoolService.shutdown()or threadPoolService.shutdownNow()call in this case.

我的用例要求即使已完成的任务也应再次重新提交给ExecutorService,但只有在所有提交的任务都已提交/完成后才应再次执行/执行。也就是说,提交的任务应该在轮换的基础上执行。因此,在这种情况下不会有threadPoolService.shutdown()or 或threadPoolService.shutdownNow()call。

My question is, how do I implement ExecutorServiceservicing rotation-basis tasks?

我的问题是,如何实现ExecutorService服务轮换基础任务?

回答by Affe

ThreadPoolExecutor provides an extension point for afterExecution where you can put the job back at the end of the queue.

ThreadPoolExecutor 为 afterExecution 提供了一个扩展点,您可以在其中将作业放回队列的末尾。

public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor {

    public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        this.submit(r);
    }
}

You'll have to do a little more work of course to instantiate it yourself without the help of ExecutorService's handy factory method, but the constructors are simple enough to grok.

当然,您必须做更多的工作才能在没有ExecutorService方便的工厂方法的帮助下自己实例化它,但是构造函数很简单,可以理解。

回答by nobeh

The answer is more related to the implementation of the work queueused for the instance of ExecutorService. So, I'd suggest:

答案更多地与用于 的实例的工作队列的实现有关ExecutorService。所以,我建议:

  1. First choose an implementation of java.util.concurrent.BlockingQueue(an example) that provides a circular queuefunctionality. NOTE, the reason BlockingQueuehas been chosen is that to waituntil the next task is provided to queue; so, in case of circular + blockingqueue, you should be careful how to provide the same behavior and functionality.

  2. Instead of using Executors.new...to create a new ThreadPoolExecutoruse a direct constructorsuch as

  1. 首先选择提供循环队列功能的java.util.concurrent.BlockingQueue示例)的实现。注意,选择的原因是等待下一个任务提供给队列;因此,在循环 + 阻塞队列的情况下,您应该小心如何提供相同的行为和功能。BlockingQueue

  2. 而不是Executors.new...用于创建一个新的ThreadPoolExecutor使用直接构造函数,如

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

This way, unless you command the executor to shutdown, it will try to fetch the next task from the queue for execution from its work queuewhich is a circularcontainer for tasks.

这样,除非您命令执行程序执行shutdown,否则它将尝试从队列中获取下一个任务以从其工作队列中执行,该工作队列是任务的循环容器。

回答by Michael Schmei?er

I suggest the following solution which completely uses functionality existing in the standard library concurrency utils. It uses a CyclicBarrierwith a task decorator class and a barrier action which re-submits all tasks:

我建议使用以下解决方案,它完全使用标准库并发工具中现有的功能。它使用CyclicBarrier带有任务装饰器类和重新提交所有任务的屏障操作:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Rotation {

    private static final class RotationDecorator implements Runnable {
        private final Runnable          task;
        private final CyclicBarrier barrier;


        RotationDecorator( Runnable task, CyclicBarrier barrier ) {
            this.task = task;
            this.barrier = barrier;
        }


        @Override
        public void run() {
            this.task.run();
            try {
                this.barrier.await();
            } catch(InterruptedException e) {
                ; // Consider better exception handling
            } catch(BrokenBarrierException e) {
                ; // Consider better exception handling
            }
        }
    }


    public void startRotation( List<Runnable> tasks ) {
        final ExecutorService threadPoolService = Executors.newFixedThreadPool( 10 );
        final List<Runnable> rotatingTasks = new ArrayList<Runnable>( tasks.size() );
        final CyclicBarrier barrier = new CyclicBarrier( tasks.size(), new Runnable() {
            @Override
            public void run() {
                Rotation.this.enqueueTasks( threadPoolService, rotatingTasks );
            }
        } );
        for(Runnable task : tasks) {
            rotatingTasks.add( new RotationDecorator( task, barrier ) );
        }
        this.enqueueTasks( threadPoolService, rotatingTasks );
    }


    private void enqueueTasks( ExecutorService service, List<Runnable> tasks ) {
        for(Runnable task : tasks) {
            service.submit( task );
        }
    }

}

回答by assylias

You could simply check that all the tasks have been executed and resubmit them once it is the case, like this for example:

您可以简单地检查所有任务是否已执行,并在情况如此时重新提交,例如:

    List<Future> futures = new ArrayList<>();
    for (Runnable task : myTasks) {
        futures.add(threadPoolService.submit(task));
    }
    //wait until completion of all tasks
    for (Future f : futures) {
        f.get();
    }
    //restart
    ......

EDIT
It seems you want to resubmit a task as soon as it gets completed. You could use an ExecutorCompletionServicewhich enables you to retrieve tasks as and when they get executed, - see below a simple example with 2 tasks that get resubmitted a few times as soon as they are completed. Sample output:

编辑
您似乎想在任务完成后立即重新提交。您可以使用ExecutorCompletionService,它使您能够在任务执行时检索任务,请参见下面的简单示例,其中包含 2 个任务,这些任务在完成后立即重新提交几次。示例输出:

Task 1 submitted pool-1-thread-1
Task 2 submitted pool-1-thread-2
Task 1 completed pool-1-thread-1
Task 1 submitted pool-1-thread-3
Task 2 completed pool-1-thread-2
Task 1 completed pool-1-thread-3
Task 2 submitted pool-1-thread-4
Task 1 submitted pool-1-thread-5
Task 1 completed pool-1-thread-5
Task 2 completed pool-1-thread-4

任务1提交pool-1-thread-1
任务2提交pool-1-thread-2
任务1完成pool-1-thread-1
任务1提交pool-1-thread-3
任务2完成pool-1-thread-2
任务1完成pool-1-thread-3
任务2提交pool-1-thread-4
任务1提交pool-1-thread-5
任务1完成pool-1-thread-5
任务2完成pool-1-thread-4

public class Test1 {

    public final ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
    public final AtomicInteger retries = new AtomicInteger();
    public final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int count = 0;
        List<Runnable> myTasks = new ArrayList<>();
        myTasks.add(getRunnable(1));
        myTasks.add(getRunnable(2));
        ExecutorService threadPoolService = Executors.newFixedThreadPool(10);
        CompletionService<Runnable> ecs = new ExecutorCompletionService<Runnable>(threadPoolService);
        for (Runnable task : myTasks) {
            ecs.submit(task, task);
        }
        //wait until completion of all tasks
        while(count++ < 3) {
            Runnable task = ecs.take().get();
            ecs.submit(task, task);
        }
        threadPoolService.shutdown();
    }

    private static Runnable getRunnable(final int i) {
        return new Runnable() {

            @Override
            public void run() {
                System.out.println("Task " + i + " submitted " + Thread.currentThread().getName() + "  ");
                try {
                    Thread.sleep(500 * i);
                } catch (InterruptedException ex) {
                    System.out.println("Interrupted");
                }
                System.out.println("Task " + i + " completed " + Thread.currentThread().getName() + "  ");
            }
        };
    }
}