Java 如果ThreadPoolExecutor 的submit() 方法已饱和,如何使其阻塞?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2001086/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 02:19:55  来源:igfitidea点击:

How to make ThreadPoolExecutor's submit() method block if it is saturated?

javaconcurrencyexecutor

提问by Fixpoint

I want to create a ThreadPoolExecutorsuch that when it has reached its maximum size and the queue is full, the submit()method blockswhen trying to add new tasks. Do I need to implement a custom RejectedExecutionHandlerfor that or is there an existing way to do this using a standard Java library?

我想创建一个ThreadPoolExecutor这样的,当它达到最大大小并且队列已满时,该submit()方法在尝试添加新任务时会阻塞。我是否需要为此实现自定义RejectedExecutionHandler,或者是否有使用标准 Java 库执行此操作的现有方法?

采纳答案by Fixpoint

One of the possible solutions I've just found:

我刚刚找到的可能解决方案之一:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

Are there any other solutions? I'd prefer something based on RejectedExecutionHandlersince it seems like a standard way to handle such situations.

还有其他解决方案吗?我更喜欢基于的东西,RejectedExecutionHandler因为它似乎是处理这种情况的标准方法。

回答by danben

You should use the CallerRunsPolicy, which executes the rejected task in the calling thread. This way, it can't submit any new tasks to the executor until that task is done, at which point there will be some free pool threads or the process will repeat.

您应该使用CallerRunsPolicy,它在调用线程中执行被拒绝的任务。这样,在该任务完成之前,它无法向执行程序提交任何新任务,此时将有一些空闲池线程或该过程将重复。

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

From the docs:

从文档:

Rejected tasks

New tasks submitted in method execute(java.lang.Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided:

  1. In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
  2. In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
  3. In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
  4. In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)

被拒绝的任务

当 Executor 已经关闭,并且当 Executor 对最大线程和工作队列容量使用有限边界并且饱和时,在方法 execute(java.lang.Runnable) 中提交的新任务将被拒绝。无论哪种情况,execute 方法都会调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。提供了四个预定义的处理程序策略:

  1. 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序在拒绝时抛出运行时 RejectedExecutionException。
  2. 在 ThreadPoolExecutor.CallerRunsPolicy 中,调用执行自身的线程运行任务。这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度。
  3. 在 ThreadPoolExecutor.DiscardPolicy 中,无法执行的任务被简单地丢弃。
  4. 在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行器没有关闭,工作队列头部的任务就会被丢弃,然后重试执行(可能会再次失败,导致重复执行)。

Also, make sure to use a bounded queue, such as ArrayBlockingQueue, when calling the ThreadPoolExecutorconstructor. Otherwise, nothing will get rejected.

此外,请确保在调用ThreadPoolExecutor构造函数时使用有界队列,例如 ArrayBlockingQueue 。否则,什么都不会被拒绝。

Edit: in response to your comment, set the size of the ArrayBlockingQueue to be equal to the max size of the thread pool and use the AbortPolicy.

编辑:响应您的评论,将 ArrayBlockingQueue 的大小设置为等于线程池的最大大小并使用 AbortPolicy。

Edit 2: Ok, I see what you're getting at. What about this: override the beforeExecute()method to check that getActiveCount()doesn't exceed getMaximumPoolSize(), and if it does, sleep and try again?

编辑2:好的,我明白你在说什么。怎么样:重写beforeExecute()方法以检查getActiveCount()不超过getMaximumPoolSize(),如果超过,睡眠并重试?

回答by Fried Hoeben

Create your own blocking queue to be used by the Executor, with the blocking behavior you are looking for, while always returning available remaining capacity (ensuring the executor will not try to create more threads than its core pool, or trigger the rejection handler).

创建您自己的阻塞队列以供 Executor 使用,具有您正在寻找的阻塞行为,同时始终返回可用的剩余容量(确保 executor 不会尝试创建比其核心池更多的线程,或触发拒绝处理程序)。

I believe this will get you the blocking behavior you are looking for. A rejection handler will never fit the bill, since that indicates the executor can not perform the task. What I could envision there is that you get some form of 'busy waiting' in the handler. That is not what you want, you want a queue for the executor that blocks the caller...

我相信这会让你得到你正在寻找的阻塞行为。拒绝处理程序永远不会满足要求,因为这表明执行者无法执行任务。我可以想象的是,您会在处理程序中获得某种形式的“忙等待”。那不是你想要的,你想要一个阻塞调用者的执行器队列......

回答by Nate Murray

Hibernate has a BlockPolicythat is simple and may do what you want:

Hibernate 有一个BlockPolicy简单的,可以做你想做的事:

See: Executors.java

请参阅:Executors.java

/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}

回答by the_void

Check out four alternativesfor doing this: Creating a NotifyingBlockingThreadPoolExecutor

查看执行此操作的四种替代方法Creating a NotifyingBlockingThreadPoolExecutor

回答by DiTime4Tea

You can use ThreadPoolExecutor and a blockingQueue:

您可以使用 ThreadPoolExecutor 和一个阻塞队列:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}

回答by stephan f

The BoundedExecutoranswer quoted above from Java Concurrency in Practiceonly works correctly if you use an unbounded queue for the Executor, or the semaphore bound is no greater than the queue size. The semaphore is state shared between the submitting thread and the threads in the pool, making it possible to saturate the executor even if queue size < bound <= (queue size + pool size).

BoundedExecutor上面引用的Java Concurrency in Practice 中的答案仅在您为 Executor 使用无界队列时才能正常工作,或者信号量界限不大于队列大小。信号量在提交线程和池中的线程之间共享状态,即使队列大小<绑定<=(队列大小+池大小),也可以使执行器饱和。

Using CallerRunsPolicyis only valid if your tasks don't run forever, in which case your submitting thread will remain in rejectedExecutionforever, and a bad idea if your tasks take a long time to run, because the submitting thread can't submit any new tasks or do anything else if it's running a task itself.

CallerRunsPolicy仅当您的任务不会永远运行时使用才有效,在这种情况下,您的提交线程将rejectedExecution永远存在,如果您的任务需要很长时间才能运行,这是一个坏主意,因为提交线程无法提交任何新任务或如果它本身正在运行任务,则执行其他任何操作。

If that's not acceptable then I suggest checking the size of the executor's bounded queue before submitting a task. If the queue is full, then wait a short time before trying to submit again. The throughput will suffer, but I suggest it's a simpler solution than many of the other proposed solutions and you're guaranteed no tasks will get rejected.

如果这是不可接受的,那么我建议在提交任务之前检查执行程序的有界队列的大小。如果队列已满,请稍等片刻,然后再次尝试提交。吞吐量会受到影响,但我建议这是一个比许多其他提议的解决方案更简单的解决方案,并且您可以保证不会拒绝任何任务。

回答by vamsu

To avoid issues with @FixPoint solution. One could use ListeningExecutorService and release the semaphore onSuccess and onFailure inside FutureCallback.

避免@FixPoint 解决方案出现问题。可以使用 ListeningExecutorService 并在 FutureCallback 中释放信号量 onSuccess 和 onFailure。

回答by Harald

Recently I found this question having the same problem. The OP does not say so explicitly, but we do not want to use the RejectedExecutionHandlerwhich executes a task on the submitter's thread, because this will under-utilize the worker threads if this task is a long running one.

最近我发现这个问题有同样的问题。OP 没有明确说明,但我们不想使用在RejectedExecutionHandler提交者线程上执行任务的 ,因为如果该任务是长时间运行的任务,这将无法充分利用工作线程。

Reading all the answers and comments, in particular the flawed solution with the semaphore or using afterExecuteI had a closer look at the code of the ThreadPoolExecutorto see if there is some way out. I was amazed to see that there are more than 2000 lines of (commented) code, some of which make me feel dizzy. Given the rather simple requirement I actually have --- one producer, several consumers, let the producer block when no consumers can take work --- I decided to roll my own solution. It is not an ExecutorServicebut just an Executor. And it does not adapt the number of threads to the work load, but holds a fixed number of threads only, which also fits my requirements. Here is the code. Feel free to rant about it :-)

阅读所有答案和评论,特别是信号量或使用的有缺陷的解决方案,afterExecute我仔细查看了ThreadPoolExecutor的代码,看看是否有一些出路。看到有2000多行(注释)的代码,我很惊讶,其中一些让我感到头晕目眩。鉴于我实际上有一个相当简单的要求——一个生产者,几个消费者,当没有消费者可以工作时让生产者阻塞——我决定推出我自己的解决方案。它不是一个ExecutorService而只是一个Executor. 并且它不会根据工作负载调整线程数,而是仅保留固定数量的线程,这也符合我的要求。这是代码。随意吐槽一下:-)

package x;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;

/**
 * distributes {@code Runnable}s to a fixed number of threads. To keep the
 * code lean, this is not an {@code ExecutorService}. In particular there is
 * only very simple support to shut this executor down.
 */
public class ParallelExecutor implements Executor {
  // other bounded queues work as well and are useful to buffer peak loads
  private final BlockingQueue<Runnable> workQueue =
      new SynchronousQueue<Runnable>();
  private final Thread[] threads;

  /*+**********************************************************************/
  /**
   * creates the requested number of threads and starts them to wait for
   * incoming work
   */
  public ParallelExecutor(int numThreads) {
    this.threads = new Thread[numThreads];
    for(int i=0; i<numThreads; i++) {
      // could reuse the same Runner all over, but keep it simple
      Thread t = new Thread(new Runner());
      this.threads[i] = t;
      t.start();
    }
  }
  /*+**********************************************************************/
  /**
   * returns immediately without waiting for the task to be finished, but may
   * block if all worker threads are busy.
   * 
   * @throws RejectedExecutionException if we got interrupted while waiting
   *         for a free worker
   */
  @Override
  public void execute(Runnable task)  {
    try {
      workQueue.put(task);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RejectedExecutionException("interrupt while waiting for a free "
          + "worker.", e);
    }
  }
  /*+**********************************************************************/
  /**
   * Interrupts all workers and joins them. Tasks susceptible to an interrupt
   * will preempt their work. Blocks until the last thread surrendered.
   */
  public void interruptAndJoinAll() throws InterruptedException {
    for(Thread t : threads) {
      t.interrupt();
    }
    for(Thread t : threads) {
      t.join();
    }
  }
  /*+**********************************************************************/
  private final class Runner implements Runnable {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        Runnable task;
        try {
          task = workQueue.take();
        } catch (InterruptedException e) {
          // canonical handling despite exiting right away
          Thread.currentThread().interrupt(); 
          return;
        }
        try {
          task.run();
        } catch (RuntimeException e) {
          // production code to use a logging framework
          e.printStackTrace();
        }
      }
    }
  }
}

回答by Radoslaw Kachel

I believe there is quite elegant way to solve this problem by using java.util.concurrent.Semaphoreand delegating behavior of Executor.newFixedThreadPool. The new executor service will only execute new task when there is a thread to do so. Blocking is managed by Semaphore with number of permits equal to number of threads. When a task is finished it returns a permit.

我相信通过使用java.util.concurrent.Semaphore和委托Executor.newFixedThreadPool. 新的执行器服务只会在有线程执行时执行新任务。阻塞由信号量管理,许可数等于线程数。当任务完成时,它返回一个许可。

public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}