Java 无法创建具有大小限制的缓存线程池?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/1800317/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 22:46:09  来源:igfitidea点击:

Impossible to make a cached thread pool with a size limit?

javamultithreadingconcurrencyexecutorservicethreadpoolexecutor

提问by Matt Crinklaw-Vogt

It seems to be impossible to make a cached thread pool with a limit to the number of threads that it can create.

似乎不可能创建一个缓存线程池,限制它可以创建的线程数。

Here is how static Executors.newCachedThreadPool is implemented in the standard Java library:

以下是标准 Java 库中静态 Executors.newCachedThreadPool 的实现方式:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

So, using that template to go on to create a fixed sized cached thread pool:

因此,使用该模板继续创建一个固定大小的缓存线程池:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

Now if you use this and submit 3 tasks, everything will be fine. Submitting any further tasks will result in rejected execution exceptions.

现在,如果您使用它并提交 3 个任务,一切都会好起来的。提交任何进一步的任务将导致被拒绝的执行异常。

Trying this:

试试这个:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

Will result in all threads executing sequentially. I.e., the thread pool will never make more than one thread to handle your tasks.

将导致所有线程顺序执行。即,线程池永远不会产生多个线程来处理您的任务。

This is a bug in the execute method of ThreadPoolExecutor? Or maybe this is intentional? Or there is some other way?

这是ThreadPoolExecutor 的execute 方法中的一个bug?或者这可能是故意的?或者有其他方法吗?

Edit: I want something exactly like the cached thread pool (it creates threads on demand and then kills them after some timeout) but with a limit on the number of threads that it can create and the ability to continue to queue additional tasks once it has hit its thread limit. According to sjlee's response this is impossible. Looking at the execute() method of ThreadPoolExecutor it is indeed impossible. I would need to subclass ThreadPoolExecutor and override execute() somewhat like SwingWorker does, but what SwingWorker does in its execute() is a complete hack.

编辑:我想要一些与缓存线程池完全相同的东西(它根据需要创建线程,然后在超时后杀死它们),但是它可以创建的线程数量有限制,并且一旦有其他任务就可以继续排队达到其线程限制。根据 sjlee 的回应,这是不可能的。看ThreadPoolExecutor的execute()方法确实是不可能的。我需要子类化 ThreadPoolExecutor 并覆盖 execute() 有点像 SwingWorker 所做的,但 SwingWorker 在其 execute() 中所做的是一个完整的黑客。

采纳答案by sjlee

The ThreadPoolExecutor has the following several key behaviors, and your problems can be explained by these behaviors.

ThreadPoolExecutor 有以下几个关键行为,你的问题可以用这些行为来解释。

When tasks are submitted,

提交任务时,

  1. If the thread pool has not reached the core size, it creates new threads.
  2. If the core size has been reached and there is no idle threads, it queues tasks.
  3. If the core size has been reached, there is no idle threads, and the queue becomes full, it creates new threads (until it reaches the max size).
  4. If the max size has been reached, there is no idle threads, and the queue becomes full, the rejection policy kicks in.
  1. 如果线程池未达到核心大小,则创建新线程。
  2. 如果已达到核心大小且没有空闲线程,则将任务排队。
  3. 如果已达到核心大小,则没有空闲线程,并且队列已满,则创建新线程(直到达到最大大小)。
  4. 如果已达到最大大小,没有空闲线程,并且队列已满,则拒绝策略启动。

In the first example, note that the SynchronousQueue has essentially size of 0. Therefore, the moment you reach the max size (3), the rejection policy kicks in (#4).

在第一个示例中,请注意 SynchronousQueue 的大小基本上为 0。因此,当您达到最大大小 (3) 时,拒绝策略就会启动 (#4)。

In the second example, the queue of choice is a LinkedBlockingQueue which has an unlimited size. Therefore, you get stuck with behavior #2.

在第二个示例中,选择的队列是具有无限大小的 LinkedBlockingQueue。因此,您会被行为#2 困住。

You cannot really tinker much with the cached type or the fixed type, as their behavior is almost completely determined.

您不能真正修改缓存类型或固定类型,因为它们的行为几乎完全确定。

If you want to have a bounded and dynamic thread pool, you need to use a positive core size and max size combined with a queue of a finite size. For example,

如果你想拥有一个有界和动态的线程池,你需要使用一个正的核心大小和最大大小以及一个有限大小的队列。例如,

new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

Addendum: this is a fairly old answer, and it appears that JDK changed its behavior when it comes to core size of 0. Since JDK 1.6, if the core size is 0 and the pool does not have any threads, the ThreadPoolExecutor will add a thread to execute that task. Therefore, the core size of 0 is an exception to the rule above. Thanks Stevefor bringingthat to my attention.

附录:这是一个相当老的答案,似乎 JDK 在核心大小为 0 时改变了它的行为。从 JDK 1.6 开始,如果核心大小为 0 并且池没有任何线程,则 ThreadPoolExecutor 将添加一个线程来执行该任务。因此,核心大小为 0 是上述规则的一个例外。感谢史蒂夫使这引起我的注意。

回答by jitter

This is what you want (atleast I guess so). For an explanation check Jonathan Feinberg answer

这就是你想要的(至少我猜是这样)。有关解释,请查看Jonathan Feinberg 的回答

Executors.newFixedThreadPool(int n)

Executors.newFixedThreadPool(int n)

Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks. The threads in the pool will exist until it is explicitly shutdown.

创建一个线程池,该池重用固定数量的线程在共享的无界队列中运行。在任何时候,最多 nThreads 个线程将是活动的处理任务。如果在所有线程都处于活动状态时提交了其他任务,它们将在队列中等待,直到有线程可用。如果任何线程在关闭前的执行过程中因失败而终止,则在需要执行后续任务时,将有一个新线程取代它。池中的线程将一直存在,直到它被明确关闭。

回答by Jonathan Feinberg

Per the Javadoc for ThreadPoolExecutor:

根据 ThreadPoolExecutor 的 Javadoc:

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool.

如果有超过 corePoolSize 但少于 maximumPoolSize 的线程正在运行,则只有在队列已满时才会创建新线程。通过将 corePoolSize 和 maximumPoolSize 设置为相同,您可以创建一个固定大小的线程池。

(Emphasis mine.)

(强调我的。)

jitter's answer is what you want, although mine answers your other question. :)

jitter 的答案是你想要的,虽然我的回答了你的另一个问题。:)

回答by brianegge

In your first example, subsequent tasks are rejected because the AbortPolicyis the default RejectedExecutionHandler. The ThreadPoolExecutor contains the following policies, which you can change via the setRejectedExecutionHandlermethod:

在您的第一个示例中,后续任务被拒绝,因为AbortPolicy是默认值RejectedExecutionHandler。ThreadPoolExecutor 包含以下策略,您可以通过setRejectedExecutionHandler方法更改这些策略:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

It sounds like you want cached thread pool with a CallerRunsPolicy.

听起来您希望使用 CallerRunsPolicy 缓存线程池。

回答by paul

None of the answers here fixed my problem, which had to do with creating a limited amount of HTTP connections using Apache's HTTP client (3.x version). Since it took me some hours to figure out a good setup, I'll share:

这里的答案都没有解决我的问题,这与使用 Apache 的 HTTP 客户端(3.x 版本)创建有限数量的 HTTP 连接有关。由于我花了几个小时才找到一个好的设置,我将分享:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

This creates a ThreadPoolExecutorwhich starts with five and holds a maximum of ten simultaneously running threads using CallerRunsPolicyfor executing.

这将创建一个ThreadPoolExecutor以五个开头并最多容纳十个同时运行的CallerRunsPolicy用于执行的线程。

回答by Ashkrit

there is one more option. Instead of using new SynchronousQueue you can use any other queue also, but you have to make sure its size is 1, so that will force executorservice to create new thread.

还有一种选择。除了使用 new SynchronousQueue,您还可以使用任何其他队列,但您必须确保其大小为 1,以便强制 executorservice 创建新线程。

回答by Stuart

Doesn't look as though any of the answers actually answer the question - in fact I can't see a way of doing this - even if you subclass from PooledExecutorService since many of the methods/properties are private e.g. making addIfUnderMaximumPoolSize was protected you could do the following:

看起来好像没有任何答案实际上回答了这个问题 - 事实上我看不到这样做的方法 - 即使你从 PooledExecutorService 子类化,因为许多方法/属性都是私有的,例如让 addIfUnderMaximumPoolSize 受到保护,你可以请执行下列操作:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

The closest I got was this - but even that isn't a very good solution

我得到的最接近的是这个 - 但即使这样也不是一个很好的解决方案

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

p.s. not tested the above

ps没有测试以上

回答by Stuart

Here is another solution. I think this solution behaves as you want it to (though not proud of this solution):

这是另一种解决方案。我认为此解决方案的行为符合您的要求(尽管对此解决方案并不感到自豪):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);

回答by Stuart

Unless I've missed something, the solution to the original question is simple. The following code implements the desired behavior as described by the original poster. It will spawn up to 5 threads to work on an unbounded queue and idle threads will terminate after 60 seconds.

除非我遗漏了什么,否则原始问题的解决方案很简单。以下代码实现了原始海报所描述的所需行为。它将产生多达 5 个线程来处理无界队列,空闲线程将在 60 秒后终止。

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

回答by S.D.

Had same issue. Since no other answer puts all issues together, I'm adding mine:

有同样的问题。由于没有其他答案可以将所有问题放在一起,因此我添加了我的:

It is now clearly written in docs: If you use a queue that does not blocks (LinkedBlockingQueue) max threads setting has no effect, only core threads are used.

现在在文档中清楚地写了:如果您使用不阻塞的队列(LinkedBlockingQueue)最大线程数设置无效,则仅使用核心线程。

so:

所以:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

This executor has:

这个执行者有:

  1. No concept of max threads as we are using an unbounded queue. This is a good thing because such queue may cause executor to create massive number of non-core, extra threads if it follows its usual policy.

  2. A queue of max size Integer.MAX_VALUE. Submit()will throw RejectedExecutionExceptionif number of pending tasks exceeds Integer.MAX_VALUE. Not sure we will run out of memory first or this will happen.

  3. Has 4 core threads possible. Idle core threads automatically exit if idle for 5 seconds.So, yes, strictly on demand threads.Number can be varied using setThreads()method.

  4. Makes sure min number of core threads is never less than one, or else submit()will reject every task. Since core threads need to be >= max threads the method setThreads()sets max threads as well, though max thread setting is useless for an unbounded queue.

  1. 没有最大线程的概念,因为我们使用的是无界队列。这是一件好事,因为如果执行器遵循其通常的策略,则此类队列可能会导致执行器创建大量非核心的额外线程。

  2. 最大大小的队列Integer.MAX_VALUE。如果挂起的任务数量超过 ,Submit()则会抛出。不确定我们会先耗尽内存,否则会发生这种情况。RejectedExecutionExceptionInteger.MAX_VALUE

  3. 可能有 4 个核心线程。如果空闲 5 秒,空闲核心线程会自动退出。所以,是的,严格按需线程。可以使用setThreads()方法更改数量。

  4. 确保核心线程的最小数量永远不会小于 1,否则submit()将拒绝每个任务。由于核心线程需要 >= 最大线程数,该方法也setThreads()设置最大线程数,尽管最大线程数设置对于无界队列是无用的。