Java 队列满时线程池执行器阻塞?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/3446011/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-14 00:52:18  来源:igfitidea点击:

ThreadPoolExecutor Block When Queue Is Full?

javamultithreadingconcurrencyexecutorserviceexecutor

提问by ghempton

I am trying to execute lots of tasks using a ThreadPoolExecutor. Below is a hypothetical example:

我正在尝试使用 ThreadPoolExecutor 执行大量任务。下面是一个假设的例子:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

The problem is that I quickly get a java.util.concurrent.RejectedExecutionExceptionsince the number of tasks exceeds the size of the work queue. However, the desired behavior I am looking for is to have the main thread block until there is room in the queue. What is the best way to accomplish this?

问题是我很快得到了一个,java.util.concurrent.RejectedExecutionException因为任务数量超过了工作队列的大小。但是,我正在寻找的所需行为是让主线程阻塞,直到队列中有空间为止。实现这一目标的最佳方法是什么?

回答by Darren Gilroy

In some very narrow circumstances, you can implement a java.util.concurrent.RejectedExecutionHandler that does what you need.

在某些非常狭窄的情况下,您可以实现一个 java.util.concurrent.RejectedExecutionHandler 来满足您的需求。

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Now. This is a very bad ideafor the following reasons

现在。由于以下原因,这是一个非常糟糕的主意

  • It's prone to deadlock because all the threads in the pool may die before the thing you put in the queue is visible. Mitigate this by setting a reasonable keep alive time.
  • The task is not wrapped the way your Executor may expect. Lots of executor implementations wrap their tasks in some sort of tracking object before execution. Look at the source of yours.
  • Adding via getQueue() is strongly discouraged by the API, and may be prohibited at some point.
  • 它很容易发生死锁,因为池中的所有线程可能会在您放入队列的东西可见之前就死了。通过设置合理的保持活动时间来缓解这种情况。
  • 任务没有按照您的 Executor 预期的方式包装。许多执行器实现在执行之前将它们的任务包装在某种跟踪对象中。看看你的来源。
  • API 强烈不鼓励通过 getQueue() 添加,并且在某些时候可能会被禁止。

A almost-always-better strategy is to install ThreadPoolExecutor.CallerRunsPolicy which will throttle your app by running the task on the thread which is calling execute().

一个几乎总是更好的策略是安装 ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用 execute() 的线程上运行任务来限制您的应用程序。

However, sometimes a blocking strategy, with all its inherent risks, is really what you want. I'd say under these conditions

然而,有时一种具有所有固有风险的阻塞策略确实是您想要的。我会说在这些条件下

  • You only have one thread calling execute()
  • You have to (or want to) have a very small queue length
  • You absolutely need to limit the number of threads running this work (usually for external reasons), and a caller-runs strategy would break that.
  • Your tasks are of unpredictable size, so caller-runs could introduce starvation if the pool was momentarily busy with 4 short tasks and your one thread calling execute got stuck with a big one.
  • 你只有一个线程调用 execute()
  • 你必须(或想要)有一个非常小的队列长度
  • 您绝对需要限制运行此工作的线程数(通常是出于外部原因),调用者运行策略会破坏这一点。
  • 您的任务的大小不可预测,因此如果池暂时忙于 4 个短任务并且您的一个调用 execute 的线程卡在一个大任务上,则调用者运行可能会导致饥饿。

So, as I say. It's rarely needed and can be dangerous, but there you go.

所以,正如我所说。它很少需要并且可能很危险,但是你去了。

Good Luck.

祝你好运。

回答by MoeHoss

Here is my code snippet in this case:

在这种情况下,这是我的代码片段:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool is a local instance of java.util.concurrent.ExecutorService which has been initialized already.

threadPool 是已经初始化的 java.util.concurrent.ExecutorService 的本地实例。

回答by TinkerTank

I solved this problem using a custom RejectedExecutionHandler, which simply blocks the calling thread for a little while and then tries to submit the task again:

我使用自定义 RejectedExecutionHandler 解决了这个问题,它只是将调用线程阻塞了一会儿,然后再次尝试提交任务:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

This class can just be used in the thread-pool executor as a RejectedExecutionHandler like any other. In this example:

此类可以像其他任何类一样作为 RejectedExecutionHandler 在线程池执行程序中使用。在这个例子中:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

The only downside I see is that the calling thread might get locked slightly longer than strictly necessary (up to 250ms). For many short-running tasks, perhaps decrease the wait-time to 10ms or so. Moreover, since this executor is effectively being called recursively, very long waits for a thread to become available (hours) might result in a stack overflow.

我看到的唯一缺点是调用线程被锁定的时间可能比严格需要的时间略长(最多 250 毫秒)。对于许多短期运行的任务,也许可以将等待时间减少到 10 毫秒左右。此外,由于该执行程序被有效地递归调用,等待线程变得可用(数小时)的很长时间可能会导致堆栈溢出。

Nevertheless, I personally like this method. It's compact, easy to understand, and works well. Am I missing anything important?

尽管如此,我个人还是喜欢这种方法。它紧凑,易于理解,并且运行良好。我错过了什么重要的东西吗?

回答by devkaoru

You could use a semaphoreto block threads from going into the pool.

您可以使用 asemaphore来阻止线程进入池。

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Some gotchas:

一些问题

  • Only use this pattern with a fixed thread pool. The queue is unlikely to be full often, thus new threads won't be created. Check out the java docs on ThreadPoolExecutor for more details: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.htmlThere is a way around this, but it is out of scope of this answer.
  • Queue size should be higher than the number of core threads. If we were to make the queue size 3, what would end up happening is:

    • T0: all three threads are doing work, the queue is empty, no permits are available.
    • T1: Thread 1 finishes, releases a permit.
    • T2: Thread 1 polls the queue for new work, finds none, and waits.
    • T3: Main thread submits work into the pool, thread 1 starts work.

    The example above translates to thread the main thread blockingthread 1. It may seem like a small period, but now multiply the frequency by days and months. All of a sudden, short periods of time add up to a large amount of time wasted.

  • 仅将此模式与固定线程池一起使用。队列不太可能经常满,因此不会创建新线程。查看 ThreadPoolExecutor 上的 Java 文档以获取更多详细信息:https: //docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html有一种解决方法,但它已超出这个答案的范围。
  • 队列大小应大于核心线程数。如果我们将队列大小设为 3,最终会发生的情况是:

    • T0:三个线程都在工作,队列为空,没有可用的许可。
    • T1:线程 1 结束,释放许可。
    • T2:线程 1 轮询队列以寻找新工作,没有找到,然后等待
    • T3:主线程提交工作入池,线程1开始工作。

    上面的例子翻译成线程阻塞了线程1的主线程。这看起来像是一个小周期,但现在将频率乘以天数和月数。突然之间,很短的时间加起来浪费了大量的时间。

回答by Konstantin Abakumov

What you need to do is to wrap your ThreadPoolExecutor into Executor which explicitly limits amount of concurrently executed operations inside it:

您需要做的是将您的 ThreadPoolExecutor 包装到 Executor 中,该 Executor 明确限制其中并发执行的操作数量:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

You can adjust concurrentTasksLimit to the threadPoolSize + queueSize of your delegate executor and it will pretty much solve your problem

您可以将 concurrentTasksLimit 调整为委托执行程序的 threadPoolSize + queueSize,它几乎可以解决您的问题

回答by disrupt

This is what I ended up doing:

这就是我最终做的:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}

回答by Thomm

A fairly straightforward option is to wrap your BlockingQueuewith an implementation that calls put(..)when offer(..)is being invoked:

一个相当简单的选择是BlockingQueue用一个put(..)offer(..)被调用时调用的实现来包装你的:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {

(..)

  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }

(.. implement all other methods simply by delegating ..)

}

This works because by default put(..)waits until there is capacity in the queue when it is full, see:

这是有效的,因为默认情况下put(..),当队列已满时,会等待队列中有容量,请参阅

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

No catching of RejectedExecutionExceptionor complicated locking necessary.

无需捕捉RejectedExecutionException或复杂的锁定。

回答by Milo van der Zee

Ok, old thread but this is what I found when searching for blocking thread executor. My code tries to get a semaphore when the task is submitted to the task queue. This blocks if there are no semaphores left. As soon as a task is done the semaphore is released with the decorator. Scary part is that there is a possibility of losing semaphore but that could be solved with for example a timed job that just clears semaphores on a timed basis.

好的,旧线程,但这是我在搜索阻塞线程执行程序时发现的内容。当任务提交到任务队列时,我的代码尝试获取信号量。如果没有剩余的信号量,这将阻塞。一旦任务完成,信号量就会与装饰器一起释放。可怕的是,可能会丢失信号量,但这可以通过例如定时工作来解决,该作业只是在定时的基础上清除信号量。

So here my solution:

所以这里我的解决方案:

class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
    companion object {
        lateinit var semaphore: Semaphore
    }

    init {
        semaphore = Semaphore(concurrency)
        val semaphoreTaskDecorator = SemaphoreTaskDecorator()
        this.setTaskDecorator(semaphoreTaskDecorator)
    }

    override fun <T> submit(task: Callable<T>): Future<T> {
        log.debug("submit")
        semaphore.acquire()
        return super.submit(task)
    }
}

private class SemaphoreTaskDecorator : TaskDecorator {
    override fun decorate(runnable: Runnable): Runnable {
        log.debug("decorate")
        return Runnable {
            try {
                runnable.run()
            } finally {
                log.debug("decorate done")
                semaphore.release()
            }
        }
    }
}