Java Executors:如何设置任务优先级?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/3198660/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 17:19:43  来源:igfitidea点击:

Java Executors: how can I set task priority?

javamultithreadingconcurrencyexecutorthread-priority

提问by Roman

Is there a possibility to set priority to tasks which are executed by Executors? I've found some statements in JCIP about it's possible but I cannot find any example and I cannot find anything related in docs.

是否可以为 Executors 执行的任务设置优先级?我在 JCIP 中找到了一些关于它可能的声明,但我找不到任何示例,也找不到任何相关的文档。

From JCIP:

来自 JCIP:

An execution policy specifies the "what, where, when, and how" of task execution, including:

  • ...
  • In what order should tasks be executed (FIFO, LIFO, priority order)?
  • ...

执行策略指定任务执行的“内容、地点、时间和方式”,包括:

  • ...
  • 任务应该以什么顺序执行(FIFO、LIFO、优先级顺序)?
  • ...

UPD: I realized that I asked not exactly what I wanted to ask. What I really wanted is:

UPD:我意识到我问的并不是我想问的。我真正想要的是:

How to use/emulate setting threads priority (i.e. what was thread.setPriority()) with executors framework?

如何使用/模拟thread.setPriority()使用 executors 框架设置线程优先级(即是什么)?

采纳答案by Davy Meers

Currently the only concrete implementations of the Executor interfaceare the ThreadPoolExecutorand the ScheduledThreadpoolExecutor

目前Executor 接口的唯一具体实现是ThreadPoolExecutorScheduledThreadpoolExecutor

Instead of using the utility / factory class Executors, you should create an instance using a constructor.

您应该使用构造函数创建一个实例,而不是使用实用程序/工厂类Executors

You can pass a BlockingQueueto the constructors of the ThreadPoolExecutor.

您可以将BlockingQueue传递给ThreadPoolExecutor 的构造函数。

One of the implementations of the BlockingQueue, the PriorityBlockingQueuelets you pass a Comparator to a constructor, that way enabling you to decide the order of execution.

BlockingQueue 的实现之一,PriorityBlockingQueue允许您将 Comparator 传递给构造函数,这样您就可以决定执行顺序。

回答by dupdup

you can use ThreadPoolExecutor with Priority blocking queue How to implement PriorityBlockingQueue with ThreadPoolExecutor and custom tasks

您可以将 ThreadPoolExecutor 与优先阻塞队列一起使用 如何使用 ThreadPoolExecutor 和自定义任务实现 PriorityBlockingQueue

回答by aanno

Please be aware that setPriority(..) normally does notwork under Linux. See the following links for the full details:

请注意,setPriority(..)一般不会不会在Linux下工作。有关完整详细信息,请参阅以下链接:

回答by Michael Brewer-Davis

You can specify a ThreadFactoryin the ThreadPoolExecutorconstructor (or Executorsfactory method). This allows you to provide threads of a given thread priority for the executor.

您可以ThreadFactoryThreadPoolExecutor构造函数(或Executors工厂方法)中指定 a 。这允许您为执行程序提供给定线程优先级的线程。

To get different thread priorities for different jobs, you'd need to send them to executors with different thread factories.

要为不同的作业获得不同的线程优先级,您需要将它们发送到具有不同线程工厂的执行程序。

回答by Stanislav Vitvitskyy

The idea here is to use a PriorityBlockingQueue in the executor. For this:

这里的想法是在执行程序中使用 PriorityBlockingQueue。为了这:

  • Create a comparator that would compare our futures.
  • Create a proxy for the Future to hold a priority.
  • Override 'newTaskFor' in order to wrap every future in our proxy.
  • 创建一个比较器来比较我们的未来。
  • 为 Future 创建一个代理以保持优先级。
  • 覆盖 'newTaskFor' 以便在我们的代理中包装每一个未来。

First you need to hold priority on your future:

首先,您需要优先考虑您的未来:

    class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }
}

Next you need to define comparator that would correctly sort the priority futures:

接下来,您需要定义可以正确排序优先期货的比较器:

class PriorityFutureComparator implements Comparator<Runnable> {
    public int compare(Runnable o1, Runnable o2) {
        if (o1 == null && o2 == null)
            return 0;
        else if (o1 == null)
            return -1;
        else if (o2 == null)
            return 1;
        else {
            int p1 = ((PriorityFuture<?>) o1).getPriority();
            int p2 = ((PriorityFuture<?>) o2).getPriority();

            return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
        }
    }
}

Next let's assume we have a lengthy job like this:

接下来让我们假设我们有一个像这样的冗长的工作:

class LenthyJob implements Callable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

Then in order to execute these jobs in priority the code will look like:

然后为了优先执行这些作业,代码将如下所示:

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int nThreads = 2;
        int qInitialSize = 10;

        ExecutorService exec = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new PriorityBlockingQueue<Runnable>(qInitialSize, new PriorityFutureComparator())) {

            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
                return new PriorityFuture<T>(newTaskFor, ((LenthyJob) callable).getPriority());
            }
        };

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}

This is a lot of code but that's nearly the only way this can be accomplished.

这是很多代码,但这几乎是可以完成的唯一方法。

On my machine the output is like the following:

在我的机器上,输出如下所示:

Scheduling: 39
Scheduling: 90
Scheduling: 88
Executing: 39
Scheduling: 75
Executing: 90
Scheduling: 15
Scheduling: 2
Scheduling: 5
Scheduling: 24
Scheduling: 82
Scheduling: 81
Scheduling: 3
Scheduling: 23
Scheduling: 7
Scheduling: 40
Scheduling: 77
Scheduling: 49
Scheduling: 34
Scheduling: 22
Scheduling: 97
Scheduling: 33
Executing: 2
Executing: 3
Executing: 5
Executing: 7
Executing: 15
Executing: 22
Executing: 23
Executing: 24
Executing: 33
Executing: 34
Executing: 40
Executing: 49
Executing: 75
Executing: 77
Executing: 81
Executing: 82
Executing: 88
Executing: 97

回答by fast3r

Just want to add my bit of contribution to this discussion. I've implemented this ReorderingThreadPoolExecutorfor a very specific purpose, which is being able to explicitly bring to the front of the executor's BlockingQueue (in this case LinkedBlockingDeque) whenever I want and without having to deal with priorities (which can lead to deadlocks and are, anyway, fixed).

只是想在这个讨论中添加我的一点贡献。我已经为一个非常具体的目的实现了这个ReorderingThreadPoolExecutor,它能够在我想要并且不需要处理优先级(这可能导致死锁并且是,无论如何,固定)。

I'm using this to manage (inside an Android app) the case where I have to download many images that are displayed in a long list view. Whenever the user scrolls down quickly, the executor queue gets flooded of image download requests: by moving the latest ones on the top of the queue, I have achieved much better performances in loading the images which are actually on the screen, delaying the download of the ones that will probably needed later. Note that I use an internal concurrent map key (which can be as simple as the image URL string) to add the tasks to the executor so that I can retrieve them later for the reordering.

我正在使用它来管理(在 Android 应用程序内)我必须下载许多显示在长列表视图中的图像的情况。每当用户快速向下滚动时,执行器队列就会被图像下载请求淹没:通过将最新的图像移动到队列顶部,我在加载实际在屏幕上的图像方面取得了更好的性能,延迟了下载以后可能需要的那些。请注意,我使用内部并发映射键(可以像图像 URL 字符串一样简单)将任务添加到执行程序,以便我稍后可以检索它们以进行重新排序。

There'd have been many other ways of doing the same and maybe it's overcomplicated, but it works fine and also Facebook in his Android SDK is doing something similar in its own working threads queue.

有很多其他方法可以做同样的事情,也许它过于复杂,但它运行良好,而且 Facebook 在他的 Android SDK 中也在它自己的工作线程队列中做类似的事情。

Feel free to have a look at the code and give me suggestions, it's inside an Android project but stripping a few logs and annotations would make the class pure Java 6.

随意查看代码并给我建议,它在一个 Android 项目中,但剥离一些日志和注释将使该类成为纯 Java 6。

回答by dfreis

You can implement your own ThreadFactory and set it within ThreadPoolExecutor like this:

您可以实现自己的 ThreadFactory 并将其设置在 ThreadPoolExecutor 中,如下所示:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, numOfWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
threadPool.setThreadFactory(new OpJobThreadFactory(Thread.NORM_PRIORITY-2));

where my OpJobThreadFactory looks like the following:

我的 OpJobThreadFactory 如下所示:

public final static class OpJobThreadFactory implements ThreadFactory {
   private int priority;
   private boolean daemon;
   private final String namePrefix;
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final AtomicInteger threadNumber = new AtomicInteger(1);

   public OpJobThreadFactory(int priority) {
      this(priority, true);
   }

   public OpJobThreadFactory(int priority, boolean daemon) {
      this.priority = priority;
      this.daemon = daemon;
      namePrefix = "jobpool-" +poolNumber.getAndIncrement() + "-thread-";
   }

   @Override
   public Thread newThread(Runnable r) {
      Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
      t.setDaemon(daemon);
      t.setPriority(priority);
      return t;
   }
}