如何使用 Java 5 中的 ExecutorService 实现任务优先级?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/807223/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How do I implement task prioritization using an ExecutorService in Java 5?
提问by Chris R
I am implementing a thread pooling mechanism in which I'd like to execute tasks of varying priorities. I'd like to have a nice mechanism whereby I can submit a high priority task to the service and have it be scheduled before other tasks. The priority of the task is an intrinsic property of the task itself (whether I express that task as a Callableor a Runnableis not important to me).
我正在实施一个线程池机制,我想在其中执行不同优先级的任务。我想要一个很好的机制,我可以通过它向服务提交高优先级任务,并在其他任务之前安排它。任务的优先级是任务本身的固有属性(我将该任务表示为 aCallable还是 aRunnable对我来说并不重要)。
Now, superficially it looks like I could use a PriorityBlockingQueueas the task queue in my ThreadPoolExecutor, but that queue contains Runnableobjects, which may or may not be the Runnabletasks I've submitted to it. Moreover, if I've submitted Callabletasks, it's not clear how this would ever map.
现在,从表面上看,我可以PriorityBlockingQueue在我的 中使用 a作为任务队列ThreadPoolExecutor,但该队列包含Runnable对象,这些对象可能是也可能不是Runnable我提交给它的任务。此外,如果我提交了Callable任务,则不清楚这将如何映射。
Is there a way to do this? I'd really rather not roll my own for this, since I'm far more likely to get it wrong that way.
有没有办法做到这一点?我真的宁愿不为此自己动手,因为我更有可能以这种方式弄错。
(An aside; yes, I'm aware of the possibility of starvation for lower-priority jobs in something like this. Extra points (?!) for solutions that have a reasonable guarantee of fairness)
(顺便说一句;是的,我知道在这样的事情中低优先级的工作可能会饿死。对于具有合理公平保证的解决方案的额外加分(?!))
采纳答案by Adam Jaskiewicz
At first blush it would seem you could define an interface for your tasks that extends Runnableor Callable<T>and Comparable. Then wrap a ThreadPoolExecutorwith a PriorityBlockingQueueas the queue, and only accept tasks that implement your interface.
乍一看,您似乎可以为扩展Runnable或Callable<T>和的任务定义一个接口Comparable。然后ThreadPoolExecutor用 a包裹 aPriorityBlockingQueue作为队列,并且只接受实现你的接口的任务。
Taking your comment into account, it looks like one option is to extend ThreadPoolExecutor, and override the submit()methods. Refer to AbstractExecutorServiceto see what the default ones look like; all they do is wrap the Runnableor Callablein a FutureTaskand execute()it. I'd probably do this by writing a wrapper class that implements ExecutorServiceand delegates to an anonymous inner ThreadPoolExecutor. Wrap them in something that has your priority, so that your Comparatorcan get at it.
考虑到您的评论,似乎一种选择是扩展ThreadPoolExecutor和覆盖submit()方法。请参阅以AbstractExecutorService查看默认的外观;他们所做的只是将Runnableor包裹Callable在 aFutureTask和execute()it 中。我可能会通过编写一个实现ExecutorService并委托给匿名内部的包装类来做到这一点ThreadPoolExecutor。将它们包裹在您优先考虑的事情中,以便您Comparator可以完成。
回答by Mike
I have solved this problem in a reasonable fashion, and I'll describe it below for future reference to myself and anyone else who runs into this problem with the Java Concurrent libraries.
我已经以合理的方式解决了这个问题,我将在下面描述它,以供我自己和其他任何在 Java 并发库中遇到此问题的人参考。
Using a PriorityBlockingQueueas the means for holding onto tasks for later execution is indeed a movement in the correct direction. The problem is that the PriorityBlockingQueuemust be generically instantiated to contain Runnableinstances, and it is impossible to call compareTo(or similiar) on a Runnableinterface.
使用 aPriorityBlockingQueue作为保留任务以供以后执行的手段确实是朝着正确方向的运动。问题是PriorityBlockingQueue必须通用实例化 以包含Runnable实例,并且不可能compareTo在Runnable接口上调用(或类似)。
Onto solving the problem. When creating the Executor, it must be given a PriorityBlockingQueue. The queue should further be given a custom Comparator to do proper in place sorting:
到解决问题。创建 Executor 时,必须为其指定一个PriorityBlockingQueue. 应该进一步为队列提供一个自定义 Comparator 以进行适当的就地排序:
new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());
Now, a peek at CustomTaskComparator:
现在,看一看CustomTaskComparator:
public class CustomTaskComparator implements Comparator<MyType> {
@Override
public int compare(MyType first, MyType second) {
return comparison;
}
}
Everything looking pretty straight forward up to this point. It gets a bit sticky here. Our next problem is to deal with the creation of FutureTasks from the Executor. In the Executor, we must override newTaskForas so:
到目前为止,一切看起来都很简单。这里有点粘。我们的下一个问题是处理从 Executor 创建 FutureTasks。在 Executor 中,我们必须重写newTaskFor:
@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
//Override the default FutureTask creation and retrofit it with
//a custom task. This is done so that prioritization can be accomplished.
return new CustomFutureTask(c);
}
Where cis the Callabletask that we're trying to execute. Now, let's have a peek at CustomFutureTask:
我们试图执行c的Callable任务在哪里。现在,让我们看一看CustomFutureTask:
public class CustomFutureTask extends FutureTask {
private CustomTask task;
public CustomFutureTask(Callable callable) {
super(callable);
this.task = (CustomTask) callable;
}
public CustomTask getTask() {
return task;
}
}
Notice the getTaskmethod. We're gonna use that later to grab the original task out of this CustomFutureTaskthat we've created.
注意getTask方法。我们稍后将使用它从CustomFutureTask我们创建的这个中获取原始任务。
And finally, let's modify the original task that we were trying to execute:
最后,让我们修改我们试图执行的原始任务:
public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {
private final MyType myType;
public CustomTask(MyType myType) {
this.myType = myType;
}
@Override
public MyType call() {
//Do some things, return something for FutureTask implementation of `call`.
return myType;
}
@Override
public int compareTo(MyType task2) {
return new CustomTaskComparator().compare(this.myType, task2.myType);
}
}
You can see that we implement Comparablein the task to delegate to the actual Comparatorfor MyType.
你可以看到我们Comparable在任务中实现了委托给实际的Comparatorfor MyType。
And there you have it, customized prioritization for an Executor using the Java libraries! It takes some bit of bending, but it's the cleanest that I've been able to come up with. I hope this is helpful to someone!
有了它,使用 Java 库为 Executor 定制优先级!这需要一些弯曲,但它是我能想出的最干净的。我希望这对某人有帮助!
回答by Stanislav Vitvitskyy
You can use these helper classes:
您可以使用这些帮助类:
public class PriorityFuture<T> implements RunnableFuture<T> {
private RunnableFuture<T> src;
private int priority;
public PriorityFuture(RunnableFuture<T> other, int priority) {
this.src = other;
this.priority = priority;
}
public int getPriority() {
return priority;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return src.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return src.isCancelled();
}
public boolean isDone() {
return src.isDone();
}
public T get() throws InterruptedException, ExecutionException {
return src.get();
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return src.get(timeout, unit);
}
public void run() {
src.run();
}
public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
public int compare(Runnable o1, Runnable o2) {
if (o1 == null && o2 == null)
return 0;
else if (o1 == null)
return -1;
else if (o2 == null)
return 1;
else {
int p1 = ((PriorityFuture<?>) o1).getPriority();
int p2 = ((PriorityFuture<?>) o2).getPriority();
return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
}
}
};
}
AND
和
public interface PriorityCallable<T> extends Callable<T> {
int getPriority();
}
ANDthis helper method:
和这个辅助方法:
public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
}
};
}
ANDthen use it like this:
和然后用它是这样的:
class LenthyJob implements PriorityCallable<Long> {
private int priority;
public LenthyJob(int priority) {
this.priority = priority;
}
public Long call() throws Exception {
System.out.println("Executing: " + priority);
long num = 1000000;
for (int i = 0; i < 1000000; i++) {
num *= Math.random() * 1000;
num /= Math.random() * 1000;
if (num == 0)
num = 1000000;
}
return num;
}
public int getPriority() {
return priority;
}
}
public class TestPQ {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor exec = getPriorityExecutor(2);
for (int i = 0; i < 20; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("Scheduling: " + priority);
LenthyJob job = new LenthyJob(priority);
exec.submit(job);
}
}
}
回答by thedarkpassenger
I will try to explain this problem with a fully functional code. But before diving into the code I would like to explain about PriorityBlockingQueue
我将尝试用功能齐全的代码来解释这个问题。但在深入代码之前,我想解释一下 PriorityBlockingQueue
PriorityBlockingQueue: PriorityBlockingQueue is an implementation of BlockingQueue. It accepts the tasks along with their priority and submits the task with the highest priority for execution first. If any two tasks have same priority, then we need to provide some custom logic to decide which task goes first.
PriorityBlockingQueue: PriorityBlockingQueue 是 BlockingQueue 的一个实现。它接受任务及其优先级,并首先提交具有最高优先级的任务执行。如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定哪个任务先执行。
Now lets get into the code straightaway.
现在让我们直接进入代码。
Driver class: This class creates an executor which accepts tasks and later submits them for execution. Here we create two tasks one with LOW priority and the other with HIGH priority. Here we tell the executor to run a MAX of 1 threads and use the PriorityBlockingQueue.
驱动程序类:这个类创建一个执行程序,它接受任务,然后提交它们以供执行。这里我们创建了两个任务,一个是低优先级,另一个是高优先级。在这里,我们告诉执行程序最多运行 1 个线程并使用 PriorityBlockingQueue。
public static void main(String[] args) {
/*
Minimum number of threads that must be running : 0
Maximium number of threads that can be created : 1
If a thread is idle, then the minimum time to keep it alive : 1000
Which queue to use : PriorityBlockingQueue
*/
PriorityBlockingQueue queue = new PriorityBlockingQueue();
ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
1000, TimeUnit.MILLISECONDS,queue);
MyTask task = new MyTask(Priority.LOW,"Low");
executor.execute(new MyFutureTask(task));
task = new MyTask(Priority.HIGH,"High");
executor.execute(new MyFutureTask(task));
task = new MyTask(Priority.MEDIUM,"Medium");
executor.execute(new MyFutureTask(task));
}
MyTask class: MyTask implements Runnable and accepts priority as an argument in the constructor. When this task runs, it prints a message and then puts the thread to sleep for 1 second.
MyTask 类:MyTask 实现 Runnable 并在构造函数中接受优先级作为参数。当这个任务运行时,它会打印一条消息,然后让线程休眠 1 秒。
public class MyTask implements Runnable {
public int getPriority() {
return priority.getValue();
}
private Priority priority;
public String getName() {
return name;
}
private String name;
public MyTask(Priority priority,String name){
this.priority = priority;
this.name = name;
}
@Override
public void run() {
System.out.println("The following Runnable is getting executed "+getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
MyFutureTask class: Since we are using PriorityBlocingQueue for holding our tasks, our tasks must be wrapped inside FutureTask and our implementation of FutureTask must implement Comparable interface. The Comparable interface compares the priority of 2 different tasks and submits the task with the highest priority for execution.
MyFutureTask 类:由于我们使用 PriorityBlocingQueue 来保存我们的任务,我们的任务必须包装在 FutureTask 中,并且我们的 FutureTask 实现必须实现 Comparable 接口。Comparable接口比较2个不同任务的优先级,提交优先级最高的任务执行。
public class MyFutureTask extends FutureTask<MyFutureTask>
implements Comparable<MyFutureTask> {
private MyTask task = null;
public MyFutureTask(MyTask task){
super(task,null);
this.task = task;
}
@Override
public int compareTo(MyFutureTask another) {
return task.getPriority() - another.task.getPriority();
}
}
Priority class: Self explanatory Priority class.
优先级:不言自明的优先级。
public enum Priority {
HIGHEST(0),
HIGH(1),
MEDIUM(2),
LOW(3),
LOWEST(4);
int value;
Priority(int val) {
this.value = val;
}
public int getValue(){
return value;
}
}
Now when we run this example, we get the following output
现在当我们运行这个例子时,我们得到以下输出
The following Runnable is getting executed High
The following Runnable is getting executed Medium
The following Runnable is getting executed Low
Even though we submitted the LOW priority first, but HIGH priority task later, but since we are using a PriorityBlockingQueue, any task with a higher priority will execute first.
尽管我们先提交了 LOW 优先级的任务,然后提交了 HIGH 优先级的任务,但是由于我们使用的是 PriorityBlockingQueue,任何具有更高优先级的任务都会先执行。
回答by Daniel Hári
My solution preserves submition order of tasks for same priorities. It's an improvement of this answer
我的解决方案保留相同优先级的任务提交顺序。这是对这个答案的改进
Task execution orderis based on:
任务执行顺序基于:
- Priority
- Submit order (within same priority)
- 优先事项
- 提交订单(在同一优先级内)
Tester class:
测试类:
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1);
//Priority=0
executorService.submit(newCallable("A1", 200)); //Defaults to priority=0
executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0
executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0));
executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0));
executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0));
executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0));
//Priority=1
executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1));
executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1));
executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1));
executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1));
executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1));
executorService.shutdown();
}
private static Runnable newRunnable(String name, int delay) {
return new Runnable() {
@Override
public void run() {
System.out.println(name);
sleep(delay);
}
};
}
private static Callable<Integer> newCallable(String name, int delay) {
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(name);
sleep(delay);
return 10;
}
};
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
Result:
结果:
A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8
First task is A1 because there were no higher priority in the queue when it was inserted. B tasks are 1 priority so executed earlier, A tasks are 0 priority so executed later, but execution order is follows submition order: B1, B2, B3, ... A2, A3, A4 ...
A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8
第一个任务是 A1,因为插入时队列中没有更高的优先级。B任务是1优先级所以先执行,A任务是0优先级所以后来执行,但是执行顺序是提交顺序:B1,B2,B3,...A2,A3,A4...
The solution:
解决方案:
public class PriorityExecutors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS);
}
private static class PriorityExecutor extends ThreadPoolExecutor {
private static final int DEFAULT_PRIORITY = 0;
private static AtomicLong instanceCounter = new AtomicLong();
@SuppressWarnings({"unchecked"})
public PriorityExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10,
ComparableTask.comparatorByPriorityAndSequentialOrder()));
}
@Override
public void execute(Runnable command) {
// If this is ugly then delegator pattern needed
if (command instanceof ComparableTask) //Already wrapped
super.execute(command);
else {
super.execute(newComparableRunnableFor(command));
}
}
private Runnable newComparableRunnableFor(Runnable runnable) {
return new ComparableRunnable(ensurePriorityRunnable(runnable));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new ComparableFutureTask<>(ensurePriorityCallable(callable));
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value);
}
private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) {
return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable
: PriorityCallable.of(callable, DEFAULT_PRIORITY);
}
private PriorityRunnable ensurePriorityRunnable(Runnable runnable) {
return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable
: PriorityRunnable.of(runnable, DEFAULT_PRIORITY);
}
private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask {
private Long sequentialOrder = instanceCounter.getAndIncrement();
private HasPriority hasPriority;
public ComparableFutureTask(PriorityCallable<T> priorityCallable) {
super(priorityCallable);
this.hasPriority = priorityCallable;
}
public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) {
super(priorityRunnable, result);
this.hasPriority = priorityRunnable;
}
@Override
public long getInstanceCount() {
return sequentialOrder;
}
@Override
public int getPriority() {
return hasPriority.getPriority();
}
}
private static class ComparableRunnable implements Runnable, ComparableTask {
private Long instanceCount = instanceCounter.getAndIncrement();
private HasPriority hasPriority;
private Runnable runnable;
public ComparableRunnable(PriorityRunnable priorityRunnable) {
this.runnable = priorityRunnable;
this.hasPriority = priorityRunnable;
}
@Override
public void run() {
runnable.run();
}
@Override
public int getPriority() {
return hasPriority.getPriority();
}
@Override
public long getInstanceCount() {
return instanceCount;
}
}
private interface ComparableTask extends Runnable {
int getPriority();
long getInstanceCount();
public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() {
return (o1, o2) -> {
int priorityResult = o2.getPriority() - o1.getPriority();
return priorityResult != 0 ? priorityResult
: (int) (o1.getInstanceCount() - o2.getInstanceCount());
};
}
}
}
private static interface HasPriority {
int getPriority();
}
public interface PriorityCallable<V> extends Callable<V>, HasPriority {
public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) {
return new PriorityCallable<V>() {
@Override
public V call() throws Exception {
return callable.call();
}
@Override
public int getPriority() {
return priority;
}
};
}
}
public interface PriorityRunnable extends Runnable, HasPriority {
public static PriorityRunnable of(Runnable runnable, int priority) {
return new PriorityRunnable() {
@Override
public void run() {
runnable.run();
}
@Override
public int getPriority() {
return priority;
}
};
}
}
}
回答by willcodejavaforfood
Would it be possible to have one ThreadPoolExecutorfor each level of priority? A ThreadPoolExecutorcan be instanciated with a ThreadFactory and you could have your own implementation of a ThreadFactoryto set the different priority levels.
每个优先级可以有一个ThreadPoolExecutor吗?一个的ThreadPoolExecutor可以用的ThreadFactory被实例化,你可以有自己的实施中的ThreadFactory来设置不同的优先级。
class MaxPriorityThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setPriority(Thread.MAX_PRIORITY);
}
}

