java.util.concurrent.ThreadPoolExecutor 在等待任务时动态调整大小

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/2791851/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-29 22:51:02  来源:igfitidea点击:

Dynamic resizing of java.util.concurrent.ThreadPoolExecutor while it has waiting tasks

javamultithreadingconcurrency

提问by Edward Shtern

I'm working with a java.util.concurrent.ThreadPoolExecutorto process a number of items in parallel. Although the threading itself works fine, at times we've run into other resource constraints due to actions happening in the threads, which made us want to dial down the number of Threads in the pool.

我正在使用 ajava.util.concurrent.ThreadPoolExecutor并行处理多个项目。尽管线程本身运行良好,但有时我们会因线程中发生的操作而遇到其他资源限制,这使我们想要调低池中的线程数。

I'd like to know if there's a way to dial down the number of the threads while the threads are actually working. I know that you can call setMaximumPoolSize()and/or setCorePoolSize(), but these only resize the pool once threads become idle, but they don't become idle until there are no tasks waiting in the queue.

我想知道是否有办法在线程实际工作时调低线程数。我知道您可以调用setMaximumPoolSize()和/或setCorePoolSize(),但是这些只会在线程空闲时调整池的大小,但是直到队列中没有任务等待时它们才会空闲。

采纳答案by Tim Bender

As far as I can tell, this is not possible in a nice clean way.

据我所知,这是不可能的。

You can implement the beforeExecute method to check some boolean value and force threads to halt temporarily. Keep in mind, they will contain a task which will not be executed until they are re-enabled.

您可以实现 beforeExecute 方法来检查一些布尔值并强制线程暂时停止。请记住,它们将包含一个在重新启用之前不会执行的任务。

Alternatively, you can implement afterExecute to throw a RuntimeException when you are saturated. This will effectively cause the Thread to die and since the Executor will be above the max, no new one would be created.

或者,您可以实现 afterExecute 在饱和时抛出 RuntimeException 。这将有效地导致线程死亡,并且由于 Executor 将高于最大值,因此不会创建新的线程。

I don't recommend you do either. Instead, try to find some other way of controlling concurrent execution of the tasks which are causing you a problem. Possibly by executing them in a separate thread pool with a more limited number of workers.

我也不建议你这样做。相反,尝试找到一些其他方法来控制导致您出现问题的任务的并发执行。可能通过在具有更有限数量的工人的单独线程池中执行它们。

回答by Scott S. McCoy

You absolutely can. Calling setCorePoolSize(int)will change the core size of the pool. Calls to this method are thread-safe and override settings provided to the constructor of ThreadPoolExecutor. If you are trimming the pool size, the remaining threads will shut-down once their current job queue is completed (if they are idle, they will shut-down immediately). If you are increasing the pool size, new threads will be allocated as soon as possible. The timeframe for the allocation of new threads is undocumented — but in the implementation, allocation of new threads is performed upon each call to the executemethod.

你绝对可以。调用setCorePoolSize(int)会改变池的核心大小。对此方法的调用是线程安全的,并且会覆盖提供给ThreadPoolExecutor. 如果您正在调整池大小,则剩余线程将在其当前作业队列完成后关闭(如果它们处于空闲状态,它们将立即关闭)。如果您要增加池大小,将尽快分配新线程。新线程分配的时间框架没有记录——但在实现中,新线程的分配是在每次调用execute方法时执行的。

To pair this with a runtime-tunable job-farm, you can expose this property (either by wrapper or using a dynamic MBean exporter) as a read-write JMX attribute to create a rather nice, on-the-fly tunable batch processor.

要将其与运行时可调的作业群配对,您可以将此属性(通过包装器或使用动态 MBean 导出器)公开为读写 JMX 属性,以创建一个相当不错的动态可调批处理器。

To reduce the pool size forcibly in runtime (which is your request), you must subclass the ThreadPoolExecutorand add a disruption to the beforeExecute(Thread,Runnable)method. Interrupting the thread is not a sufficient disruption, since that only interacts with wait-states and during processing the ThreadPoolExecutortask threads do not go into an interruptable state.

要在运行时强制减小池大小(这是您的要求),您必须子类化ThreadPoolExecutor并为该beforeExecute(Thread,Runnable)方法添加中断。中断线程并不是足够的中断,因为它只与等待状态交互,并且在处理ThreadPoolExecutor任务线程期间不会进入可中断状态。

I recently had the same problem trying to get a thread pool to forcibly terminate before all submitted tasks are executed. To make this happen, I interrupted the thread by throwing a runtime exception only after replacing the UncaughtExceptionHandlerof the thread with one that expects my specific exception and discards it.

我最近遇到了同样的问题,试图在所有提交的任务执行之前强制终止线程池。为了实现这一点,我仅在将UncaughtExceptionHandler线程替换为期望我的特定异常并丢弃它的线程之后才通过抛出运行时异常来中断线程。

/**
 * A runtime exception used to prematurely terminate threads in this pool.
 */
static class ShutdownException
extends RuntimeException {
    ShutdownException (String message) {
        super(message);
    }
}

/**
 * This uncaught exception handler is used only as threads are entered into
 * their shutdown state.
 */
static class ShutdownHandler 
implements UncaughtExceptionHandler {
    private UncaughtExceptionHandler handler;

    /**
     * Create a new shutdown handler.
     *
     * @param handler The original handler to deligate non-shutdown
     * exceptions to.
     */
    ShutdownHandler (UncaughtExceptionHandler handler) {
        this.handler = handler;
    }
    /**
     * Quietly ignore {@link ShutdownException}.
     * <p>
     * Do nothing if this is a ShutdownException, this is just to prevent
     * logging an uncaught exception which is expected.  Otherwise forward
     * it to the thread group handler (which may hand it off to the default
     * uncaught exception handler).
     * </p>
     */
    public void uncaughtException (Thread thread, Throwable throwable) {
        if (!(throwable instanceof ShutdownException)) {
            /* Use the original exception handler if one is available,
             * otherwise use the group exception handler.
             */
            if (handler != null) {
                handler.uncaughtException(thread, throwable);
            }
        }
    }
}
/**
 * Configure the given job as a spring bean.
 *
 * <p>Given a runnable task, configure it as a prototype spring bean,
 * injecting any necessary dependencices.</p>
 *
 * @param thread The thread the task will be executed in.
 * @param job The job to configure.
 *
 * @throws IllegalStateException if any error occurs.
 */
protected void beforeExecute (final Thread thread, final Runnable job) {
    /* If we're in shutdown, it's because spring is in singleton shutdown
     * mode.  This means we must not attempt to configure the bean, but
     * rather we must exit immediately (prematurely, even).
     */
    if (!this.isShutdown()) {
        if (factory == null) {
            throw new IllegalStateException(
                "This class must be instantiated by spring"
                );
        }

        factory.configureBean(job, job.getClass().getName());
    }
    else {
        /* If we are in shutdown mode, replace the job on the queue so the
         * next process will see it and it won't get dropped.  Further,
         * interrupt this thread so it will no longer process jobs.  This
         * deviates from the existing behavior of shutdown().
         */
        workQueue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

In your case, you may want to create a semaphore (just for use as a threadsafe counter) which has no permits, and when shutting down threads release to it a number of permits that corresponds to the delta of the previous core pool size and the new pool size (requiring you override the setCorePoolSize(int)method). This will allow you to terminate your threads after their current task completes.

在您的情况下,您可能想要创建一个没有许可的信号量(仅用作线程安全计数器),并且在关闭线程时向其释放一些许可,这些许可对应于先前核心池大小的增量和新的池大小(需要您覆盖该setCorePoolSize(int)方法)。这将允许您在当前任务完成后终止线程。

private Semaphore terminations = new Semaphore(0);

protected void beforeExecute (final Thread thread, final Runnable job) {
    if (terminations.tryAcquire()) {
        /* Replace this item in the queue so it may be executed by another
         * thread
         */
        queue.add(job);

        thread.setUncaughtExceptionHandler(
            new ShutdownHandler(thread.getUncaughtExceptionHandler())
            );

        /* Throwing a runtime exception is the only way to prematurely
         * cause a worker thread from the TheadPoolExecutor to exit.
         */
        throw new ShutdownException("Terminating thread");
    }
}

public void setCorePoolSize (final int size) {
    int delta = getActiveCount() - size;

    super.setCorePoolSize(size);

    if (delta > 0) {
        terminations.release(delta);
    }
}

This should interrupt nthreads for f(n) = active - requested. If there is any problem, the ThreadPoolExecutors allocation strategy is fairly durable. It book-keeps on premature termination using a finallyblock which guarantees execution. For this reason, even if you terminate too many threads, they will repopulate.

这应该为f(n) = active-requested中断n 个线程。如果有任何问题,s 分配策略是相当持久的。它使用保证执行的块记录提前终止。因此,即使您终止了太多线程,它们也会重新填充。ThreadPoolExecutorfinally

回答by Stefano-Davide

The solution is to drain the ThreadPoolExecutor queue, set the ThreadPoolExecutor size as needed and then add back the threads, one by one, as soon as the others ends. The method to drain the queue in the ThreadPoolExecutor class is private so you have to create it by yourself. Here is the code:

解决方案是排空 ThreadPoolExecutor 队列,根据需要设置 ThreadPoolExecutor 大小,然后在其他线程结束时将线程一个一个地添加回来。ThreadPoolExecutor 类中排空队列的方法是私有的,因此您必须自己创建它。这是代码:

/**
 * Drains the task queue into a new list. Used by shutdownNow.
 * Call only while holding main lock.
 */
public static List<Runnable> drainQueue() {
    List<Runnable> taskList = new ArrayList<Runnable>();
    BlockingQueue<Runnable> workQueue = executor.getQueue();
    workQueue.drainTo(taskList);
    /*
     * If the queue is a DelayQueue or any other kind of queue
     * for which poll or drainTo may fail to remove some elements,
     * we need to manually traverse and remove remaining tasks.
     * To guarantee atomicity wrt other threads using this queue,
     * we need to create a new iterator for each element removed.
     */
    while (!workQueue.isEmpty()) {
        Iterator<Runnable> it = workQueue.iterator();
        try {
            if (it.hasNext()) {
                Runnable r = it.next();
                if (workQueue.remove(r))
                    taskList.add(r);
            }
        } catch (ConcurrentModificationException ignore) {
        }
    }
    return taskList;
}

Before calling this method you need to get and then release the main lock. To do this you need to use java reflection because the field "mainLock" is private. Again, here is the code:

在调用此方法之前,您需要获取然后释放主锁。为此,您需要使用 Java 反射,因为“mainLock”字段是私有的。再次,这是代码:

private Field getMainLock() throws NoSuchFieldException {
    Field mainLock = executor.getClass().getDeclaredField("mainLock");
    mainLock.setAccessible(true);
    return mainLock;
}

Where "executor" is your ThreadPoolExecutor.

其中“执行者”是您的 ThreadPoolExecutor。

Now you need lock/unlock methods:

现在您需要锁定/解锁方法:

public void lock() {
    try {
        Field mainLock = getMainLock();
        Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null);
        lock.invoke(mainLock.get(executor), (Object[])null);
    } catch {
        ...
    } 
}

public void unlock() {
    try {
        Field mainLock = getMainLock();
        mainLock.setAccessible(true);
        Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null);
        lock.invoke(mainLock.get(executor), (Object[])null);
    } catch {
        ...
    }  
}

Finally you can write your "setThreadsNumber" method, and it will work both increasing and decreasing the ThreadPoolExecutor size:

最后,您可以编写“setThreadsNumber”方法,它可以增加和减少 ThreadPoolExecutor 的大小:

public void setThreadsNumber(int intValue) {
    boolean increasing = intValue > executor.getPoolSize();
    executor.setCorePoolSize(intValue);
    executor.setMaximumPoolSize(intValue);
    if(increasing){
        if(drainedQueue != null && (drainedQueue.size() > 0)){
            executor.submit(drainedQueue.remove(0));
        }
    } else {
        if(drainedQueue == null){
            lock();
            drainedQueue = drainQueue();
            unlock();
        }
    }
}

Note: obviously if you execute N parallel threads and the you change this number to N-1, all the N threads will continue to run. When the first thread ends no new threads will be executed. From now on the number of parallel thread will be the one you have chosen.

注意:很明显,如果你执行 N 个并行线程并且你把这个数字改为 N-1,那么所有 N 个线程将继续运行。当第一个线程结束时,不会执行新线程。从现在开始,并行线程的数量将是您选择的数量。

回答by southerton

I was in a need for the same solution too, and it seems that in JDK8 the setCorePoolSize() and setMaximumPoolSize() do indeed produce the desired result. I made a test case where I submit 4 tasks to the pool and they execute concurently, I shrink the pool size while they are running and submit yet another runnable that I want to be lonesome. Then I restore the pool back to its original size. Here is the test source https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

我也需要相同的解决方案,似乎在 JDK8 中 setCorePoolSize() 和 setMaximumPoolSize() 确实产生了想要的结果。我做了一个测试用例,我将 4 个任务提交到池中并且它们同时执行,我在它们运行时缩小池大小并提交另一个我想要孤独的可运行对象。然后我将池恢复到原来的大小。这里是测试源https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381

It produces the following output (thread "50" is the one that should be executed in isolation)

它产生以下输出(线程“50”是应该单独执行的一个)

run:
test thread 2 enter
test thread 1 enter
test thread 3 enter
test thread 4 enter
test thread 1 exit
test thread 2 exit
test thread 3 exit
test thread 4 exit
test thread 50 enter
test thread 50 exit
test thread 1 enter
test thread 2 enter
test thread 3 enter
test thread 4 enter
test thread 1 exit
test thread 2 exit
test thread 3 exit
test thread 4 exit

回答by Eyal Schneider

I read the documentation of setMaximumPoolSize() and setCorePoolSize(), and it seems like they can produce the behavior you need.

我阅读了 setMaximumPoolSize() 和 setCorePoolSize() 的文档,似乎它们可以产生您需要的行为。

-- EDIT --

- 编辑 -

My conclusion was wrong: please see the discussion below for details...

我的结论是错误的:详情请看下面的讨论...