java 如何使用 ThreadPoolExecutor 和自定义任务实现 PriorityBlockingQueue

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/3545623/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-30 02:20:20  来源:igfitidea点击:

How to implement PriorityBlockingQueue with ThreadPoolExecutor and custom tasks

javathreadpoolpriority-queueexecutorfuturetask

提问by greve

I've searched a lot but could not find a solutuion to my problem.

我已经搜索了很多,但找不到解决我的问题的方法。

I have my own class, BaseTask, that uses a ThreadPoolExecutorto handle tasks. I want task prioritization, but when I try to use a PriorityBlockingQueueI get ClassCastExceptionbecause the ThreadPoolExecutorwraps my Tasks into a FutureTaskobject.

我有自己的类,BaseTask,使用 aThreadPoolExecutor来处理任务。我想要任务优先级,但是当我尝试使用 a 时,PriorityBlockingQueue我得到了ClassCastException因为ThreadPoolExecutor它将我的任务包装成一个FutureTask对象。

This obviously makes sense because the FutureTaskdoes not implement Comparable, but how would I go on to solve the priority problem? I've read that you could override newTaskFor()in ThreadPoolExecutor, but I can not seem to find this method at all...?

这显然是有道理的,因为FutureTask没有实现Comparable,但是我将如何继续解决优先级问题?我读过你可以覆盖newTaskFor()in ThreadPoolExecutor,但我似乎根本找不到这个方法......?

Any suggestions would be much appreciated!

我们欢迎所有的建议!

Some code to help:

一些帮助代码:

In my BaseTaskclass I have

在我的BaseTask课上我有

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

private final BaseFutureTask<Result> mFuture;

public BaseTask(int priority) {
    mFuture = new BaseFutureTask<Result>(mWorker, priority);
}

public final BaseTask<Params, Progress, Result> execute(Params... params) {

    /* Some unimportant code here */

    sExecutor.execute(mFuture);
}

In BaseFutureTaskclass

BaseFutureTask

@Override
public int compareTo(BaseFutureTask another) {
    long diff = this.priority - another.priority;

    return Long.signum(diff);
}

In BaseThreadPoolExecutorclass i override the 3 submitmethods... The constructor in this class gets called, but none of the submitmethods

BaseThreadPoolExecutor类中,我覆盖了 3 个submit方法......这个类中的构造函数被调用,但没有任何submit方法

回答by dupdup

public class ExecutorPriority {

public static void main(String[] args) {

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
    exe.execute(new RunWithPriority(2) {

        @Override
        public void run() {

            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });
    exe.execute(new RunWithPriority(10) {

        @Override
        public void run() {
            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });

}

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {
        return o1.getPriority().compareTo(o2.getPriority());
    }
}

}

}

as you can guess RunWithPriority is an abstract class that is Runnable and has a Integer priority field

你可以猜到 RunWithPriority 是一个抽象类,它是 Runnable 并且有一个 Integer 优先级字段

回答by Stanislav Vitvitskyy

You can use these helper classes:

您可以使用这些帮助类:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

AND

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

ANDthis helper method:

这个辅助方法:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

ANDthen use it like this:

然后用它是这样的:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

回答by thedarkpassenger

I will try to explain this problem with a fully functional code. But before diving into the code I would like to explain about PriorityBlockingQueue

我将尝试用功能齐全的代码来解释这个问题。但在深入代码之前,我想解释一下 PriorityBlockingQueue

PriorityBlockingQueue: PriorityBlockingQueue is an implementation of BlockingQueue. It accepts the tasks along with their priority and submits the task with the highest priority for execution first. If any two tasks have same priority, then we need to provide some custom logic to decide which task goes first.

PriorityBlockingQueue: PriorityBlockingQueue 是 BlockingQueue 的一个实现。它接受任务及其优先级,并首先提交具有最高优先级的任务执行。如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定哪个任务先执行。

Now lets get into the code straightaway.

现在让我们直接进入代码。

Driver class: This class creates an executor which accepts tasks and later submits them for execution. Here we create two tasks one with LOW priority and the other with HIGH priority. Here we tell the executor to run a MAX of 1 threads and use the PriorityBlockingQueue.

驱动程序类:这个类创建一个执行程序,它接受任务,然后提交它们以供执行。这里我们创建了两个任务,一个是低优先级,另一个是高优先级。在这里,我们告诉执行程序最多运行 1 个线程并使用 PriorityBlockingQueue。

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
}

MyTask class: MyTask implements Runnable and accepts priority as an argument in the constructor. When this task runs, it prints a message and then puts the thread to sleep for 1 second.

MyTask 类:MyTask 实现 Runnable 并在构造函数中接受优先级作为参数。当这个任务运行时,它会打印一条消息,然后让线程休眠 1 秒。

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask class: Since we are using PriorityBlocingQueue for holding our tasks, our tasks must be wrapped inside FutureTask and our implementation of FutureTask must implement Comparable interface. The Comparable interface compares the priority of 2 different tasks and submits the task with the highest priority for execution.

MyFutureTask 类:由于我们使用 PriorityBlocingQueue 来保存我们的任务,我们的任务必须包装在 FutureTask 中,并且我们的 FutureTask 实现必须实现 Comparable 接口。Comparable接口比较2个不同任务的优先级,提交优先级最高的任务执行。

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Priority class: Self explanatory Priority class.

优先级:不言自明的优先级。

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Now when we run this example, we get the following output

现在当我们运行这个例子时,我们得到以下输出

The following Runnable is getting executed High
The following Runnable is getting executed Low

Even though we submitted the LOW priority first, but HIGH priority task later, but since we are using a PriorityBlockingQueue, an task with a higher priority will execute first.

尽管我们先提交了低优先级的任务,但后来提交了高优先级的任务,但由于我们使用的是 PriorityBlockingQueue,因此优先级较高的任务将首先执行。

回答by canghailan

My solution:

我的解决方案:

public class XThreadPoolExecutor extends ThreadPoolExecutor
{
    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory, RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
    {
        return new ComparableFutureTask<>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
    {
        return new ComparableFutureTask<>(callable);
    }

    protected class ComparableFutureTask<V>
        extends FutureTask<V> implements Comparable<ComparableFutureTask<V>>
    {
        private Object object;
        public ComparableFutureTask(Callable<V> callable)
        {
            super(callable);
            object = callable;
        }

        public ComparableFutureTask(Runnable runnable, V result)
        {
            super(runnable, result);
            object = runnable;
        }

        @Override
        @SuppressWarnings("unchecked")
        public int compareTo(ComparableFutureTask<V> o)
        {
            if (this == o)
            {
                return 0;
            }
            if (o == null)
            {
                return -1; // high priority
            }
            if (object != null && o.object != null)
            {
                if (object.getClass().equals(o.object.getClass()))
                {
                    if (object instanceof Comparable)
                    {
                        return ((Comparable) object).compareTo(o.object);
                    }
                }
            }
            return 0;
        }
    }
}

回答by Engineer

To answer your question: The newTaskFor()method is found in ThreadPoolExecutor's superclass, AbstractExecutorService. You can simply override it in ThreadPoolExecutor, however.

回答您的问题:该newTaskFor()方法位于ThreadPoolExecutor的超类中AbstractExecutorService。但是,您可以简单地在 中覆盖它ThreadPoolExecutor

回答by Rich Schuler

It looks like they left that out of apache harmony. There is a svn commit logabout a year ago fixing the absence of newTaskFor. You can probably just override the submitfunctions in an extended ThreadPoolExecutorto create an extended FutureTaskthat is Comparable. They are not very long.

看起来他们把它排除在 apache 和谐之外。大约一年前有一个svn 提交日志修复了newTaskFor. 您可能只需覆盖submit扩展中的函数ThreadPoolExecutor即可创建扩展FutureTaskComparable. 他们不是很长

回答by Gray

This answer is a simplified version of @StanislavVitvitskyy's answer. Thanks to him.

这个答案是@StanislavVitvitskyy 答案的简化版本。感谢他。

I wanted to make the jobsthat I submitted be Comparable. I created an ExecutorServicewith a PriorityBlockingQueueand extend it to handle the newTaskFor(...)methods:

我想让我提交的工作成为Comparable. 我ExecutorService用 a创建了一个PriorityBlockingQueue并扩展它来处理这些newTaskFor(...)方法:

ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
    keepAliveTime, timeUnit, new PriorityBlockingQueue<Runnable>()) {

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new ComparableFutureTask<T>(runnable, value);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new ComparableFutureTask<T>(callable);
    };
};

I defined a ComparableFutureTaskwhich extends FutureTaskand implements Comparableby delegating to the job.compareTo(...)that are submitted to the pool.

我定义了一个ComparableFutureTask,它通过委托给提交到池的来扩展FutureTask和实现。Comparablejob.compareTo(...)

public class ComparableFutureTask<T> extends FutureTask<T>
    implements Comparable<Object> {

    private final Comparable<Object> comparableJob;

    @SuppressWarnings("unchecked")
    public ComparableFutureTask(Runnable runnable, T value) {
        super(runnable, value);
        this.comparableJob = (Comparable<Object>) runnable;
    }

    @SuppressWarnings("unchecked")
    public ComparableFutureTask(Callable<T> callable) {
        super(callable);
        this.comparableJob = (Comparable<Object>) callable;
    }

    @Override
    public int compareTo(Object o) {
        return this.comparableJob
            .compareTo(((ComparableFutureTask<?>) o).comparable);
    }
}

This ExecutorServicethen can handle Runnableor Callablejobs that are also Comparable. For example:

ExecutorService然后可以处理RunnableCallable作业Comparable。例如:

public class MyJob implements Runnable, Comparable<MyJob> {
    private int priority;
    ...
    @Override
    public int compareTo(MyJob other) {
        // we want higher priority to go first
        return other.priority - this.priority;
    }
    ...
}

It is important to note that if you submit a job that is not Comparableto this queue, it will throw a ClassCastException.

需要注意的是,如果您提交的作业不在Comparable此队列中,它将抛出一个ClassCastException.