Java 如何在排队之前让 ThreadPoolExecutor 将线程增加到最大值?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/19528304/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to get the ThreadPoolExecutor to increase threads to max before queueing?
提问by Gray
I've been frustrated for some time with the default behavior of ThreadPoolExecutor
which backs the ExecutorService
thread-pools that so many of us use. To quote from the Javadocs:
一段时间以来,我一直对ThreadPoolExecutor
支持ExecutorService
我们许多人使用的线程池的默认行为感到沮丧。引用 Javadocs:
If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
如果有超过 corePoolSize 但少于 maximumPoolSize 的线程正在运行,则只有在队列已满时才会创建新线程。
What this means is that if you define a thread pool with the following code, it will neverstart the 2nd thread because the LinkedBlockingQueue
is unbounded.
这意味着如果您使用以下代码定义线程池,它将永远不会启动第二个线程,因为它LinkedBlockingQueue
是无界的。
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Only if you have a boundedqueue and the queue is fullare any threads above the core number started. I suspect a large number of junior Java multithreaded programmers are unaware of this behavior of the ThreadPoolExecutor
.
只有当你有一个有界队列并且队列已满时,核心数以上的任何线程才会启动。我怀疑很多初级 Java 多线程程序员都没有意识到ThreadPoolExecutor
.
Now I have specific use case where this is not-optimal. I'm looking for ways, without writing my own TPE class, to work around it.
现在我有特定的用例,这不是最佳的。我正在寻找方法来解决它,而无需编写自己的 TPE 类。
My requirements are for a web service that is making call-backs to a possibly unreliable 3rd party.
我的要求是针对可能不可靠的第 3 方进行回调的 Web 服务。
- I don't want to make the call-back synchronously with the web-request, so I want to use a thread-pool.
- I typically get a couple of these a minute so I don't want to have a
newFixedThreadPool(...)
with a large number of threads that mostly are dormant. - Every so often I get a burst of this traffic and I want to scale up the number of threads to some max value (let's say 50).
- I need to make a bestattempt to do all callbacks so I want to queue up any additional ones above 50. I don't want to overwhelm the rest of my web-server by using a
newCachedThreadPool()
.
- 我不想与网络请求同步进行回调,所以我想使用线程池。
- 我通常每分钟得到几个这样的,所以我不想有
newFixedThreadPool(...)
大量的线程,这些线程大多处于休眠状态。 - 每隔一段时间,我就会收到大量流量,我想将线程数扩大到某个最大值(假设为 50)。
- 我需要尽最大努力完成所有回调,因此我想将任何其他超过 50 的回调排队。我不想通过使用
newCachedThreadPool()
.
How can I work around this limitation in ThreadPoolExecutor
where the queue needs to be bounded and full beforemore threads will be started? How can I get it to start more threads beforequeuing tasks?
在启动更多线程之前ThreadPoolExecutor
,在队列需要有界且已满的情况下,如何解决此限制?我怎样才能让它在排队任务之前启动更多线程?
Edit:
编辑:
@Flavio makes a good point about using the ThreadPoolExecutor.allowCoreThreadTimeOut(true)
to have the core threads timeout and exit. I considered that but I still wanted the core-threads feature. I did not want the number of threads in the pool to drop below the core-size if possible.
@Flavio 很好地说明了使用ThreadPoolExecutor.allowCoreThreadTimeOut(true)
使核心线程超时和退出。我考虑过,但我仍然想要核心线程功能。如果可能的话,我不希望池中的线程数低于核心大小。
采纳答案by Gray
How can I work around this limitation in
ThreadPoolExecutor
where the queue needs to be bounded and full before more threads will be started.
ThreadPoolExecutor
在启动更多线程之前,在队列需要有界且已满的情况下,我如何解决此限制。
I believe I have finally found a somewhat elegant (maybe a little hacky) solution to this limitation with ThreadPoolExecutor
. It involves extending LinkedBlockingQueue
to have it return false
for queue.offer(...)
when there are already some tasks queued. If the current threads are not keeping up with the queued tasks, the TPE will add additional threads. If the pool is already at max threads, then the RejectedExecutionHandler
will be called. It is the handler which then does the put(...)
into the queue.
我相信我终于通过ThreadPoolExecutor
. 它包括扩展LinkedBlockingQueue
使其返回false
的queue.offer(...)
时候,已经有一些排队的任务。如果当前线程跟不上排队的任务,TPE 将添加额外的线程。如果池已经达到最大线程数,则将RejectedExecutionHandler
调用 。然后是处理程序将其put(...)
放入队列。
It certainly is strange to write a queue where offer(...)
can return false
and put()
never blocks so that's the hack part. But this works well with TPE's usage of the queue so I don't see any problem with doing this.
编写一个offer(...)
可以返回false
且put()
永不阻塞的队列当然很奇怪,所以这就是黑客部分。但这对 TPE 对队列的使用很有效,所以我认为这样做没有任何问题。
Here's the code:
这是代码:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
With this mechanism, when I submit tasks to the queue, the ThreadPoolExecutor
will:
使用这种机制,当我向队列提交任务时,ThreadPoolExecutor
将:
- Scale the number of threads up to the core size initially (here 1).
- Offer it to the queue. If the queue is empty it will be queued to be handled by the existing threads.
- If the queue has 1 or more elements already, the
offer(...)
will return false. - If false is returned, scale up the number of threads in the pool until they reach the max number (here 50).
- If at the max then it calls the
RejectedExecutionHandler
- The
RejectedExecutionHandler
then puts the task into the queue to be processed by the first available thread in FIFO order.
- 最初将线程数扩展到核心大小(此处为 1)。
- 将其提供给队列。如果队列为空,它将排队等待由现有线程处理。
- 如果队列已经有 1 个或多个元素,
offer(...)
则返回 false。 - 如果返回 false,则增加池中线程的数量,直到达到最大数量(此处为 50)。
- 如果在最大值,则它调用
RejectedExecutionHandler
- 在
RejectedExecutionHandler
随后会将任务到队列通过FIFO顺序第一个可用线程处理。
Although in my example code above, the queue is unbounded, you could also define it as a bounded queue. For example, if you add a capacity of 1000 to the LinkedBlockingQueue
then it will:
尽管在我上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果您将 1000 的容量添加到 ,LinkedBlockingQueue
那么它将:
- scale the threads up to max
- then queue up until it is full with 1000 tasks
- then block the caller until space becomes available to the queue.
- 将线程扩展到最大
- 然后排队直到它有 1000 个任务
- 然后阻塞调用者,直到队列有可用空间。
Also, if you needed to use offer(...)
in the
RejectedExecutionHandler
then you could use the offer(E, long, TimeUnit)
method instead with Long.MAX_VALUE
as the timeout.
此外,如果您需要在中使用offer(...)
,
RejectedExecutionHandler
则可以使用该offer(E, long, TimeUnit)
方法代替Long.MAX_VALUE
作为超时。
Warning:
警告:
If you expect tasks to be added to the executor afterit has been shutdown, then you may want to be smarter about throwing RejectedExecutionException
out of our custom RejectedExecutionHandler
when the executor-service has been shutdown. Thanks to @RaduToader for pointing this out.
如果您希望在 executor关闭后将任务添加到 executor ,那么您可能希望在 executor-service 关闭时更聪明地抛弃RejectedExecutionException
我们的自定义RejectedExecutionHandler
。感谢@RaduToader 指出这一点。
Edit:
编辑:
Another tweak to this answer could be to ask the TPE if there are idle threads and only enqueue the item if there is so. You would have to make a true class for this and add ourQueue.setThreadPoolExecutor(tpe);
method on it.
对此答案的另一个调整可能是询问 TPE 是否有空闲线程,并且只有在有空闲线程时才将项目入队。您必须为此创建一个真正的类并ourQueue.setThreadPoolExecutor(tpe);
在其上添加方法。
Then your offer(...)
method might look something like:
那么你的offer(...)
方法可能看起来像:
- Check to see if the
tpe.getPoolSize() == tpe.getMaximumPoolSize()
in which case just callsuper.offer(...)
. - Else if
tpe.getPoolSize() > tpe.getActiveCount()
then callsuper.offer(...)
since there seem to be idle threads. - Otherwise return
false
to fork another thread.
- 检查是否
tpe.getPoolSize() == tpe.getMaximumPoolSize()
在这种情况下只调用super.offer(...)
. - Else if
tpe.getPoolSize() > tpe.getActiveCount()
then callsuper.offer(...)
因为似乎有空闲线程。 - 否则返回
false
fork 另一个线程。
Maybe this:
也许这个:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
Note that the get methods on TPE are expensive since they access volatile
fields or (in the case of getActiveCount()
) lock the TPE and walk the thread-list. Also, there are race conditions here that may cause a task to be enqueued improperly or another thread forked when there was an idle thread.
请注意,TPE 上的 get 方法很昂贵,因为它们访问volatile
字段或(在 的情况下getActiveCount()
)锁定 TPE 并遍历线程列表。此外,这里存在竞争条件,可能会导致任务不正确地排队或在有空闲线程时分叉另一个线程。
回答by Flavio
Set core size and max size to the same value, and allow core threads to be removed from the pool with allowCoreThreadTimeOut(true)
.
将核心大小和最大大小设置为相同的值,并允许从池中删除核心线程allowCoreThreadTimeOut(true)
。
回答by bstempi
The best solution that I can think of is to extend.
我能想到的最佳解决方案是扩展。
ThreadPoolExecutor
offers a few hook methods: beforeExecute
and afterExecute
. In your extension you could maintain use a bounded queue to feed in tasks and a second unbounded queue to handle overflow. When someone calls submit
, you could attempt to place the request into the bounded queue. If you're met with an exception, you just stick the task in your overflow queue. You could then utilize the afterExecute
hook to see if there is anything in the overflow queue after finishing a task. This way, the executor will take care of the stuff in it's bounded queue first, and automatically pull from this unbounded queue as time permits.
ThreadPoolExecutor
提供了一些钩子方法: beforeExecute
和afterExecute
。在您的扩展中,您可以维护使用有界队列来输入任务,并使用第二个无界队列来处理溢出。当有人调用 时submit
,您可以尝试将请求放入有界队列。如果遇到异常,只需将任务放入溢出队列即可。然后,您可以afterExecute
在完成任务后利用钩子查看溢出队列中是否有任何内容。这样,执行器将首先处理它的有界队列中的东西,并在时间允许的情况下自动从这个无界队列中拉取。
It seems like more work than your solution, but at least it doesn't involve giving queues unexpected behaviors. I also imagine that there's a better way to check the status of the queue and threads rather than relying on exceptions, which are fairly slow to throw.
这似乎比您的解决方案工作更多,但至少它不涉及给队列意外行为。我还想象有一种更好的方法来检查队列和线程的状态,而不是依赖于抛出相当慢的异常。
回答by Ralf H
We have a subclass of ThreadPoolExecutor
that takes an additional creationThreshold
and overrides execute
.
我们有一个子类,ThreadPoolExecutor
它需要一个额外的creationThreshold
和覆盖execute
.
public void execute(Runnable command) {
super.execute(command);
final int poolSize = getPoolSize();
if (poolSize < getMaximumPoolSize()) {
if (getQueue().size() > creationThreshold) {
synchronized (this) {
setCorePoolSize(poolSize + 1);
setCorePoolSize(poolSize);
}
}
}
}
maybe that helps too, but yours looks more artsy of course…
也许这也有帮助,但你的当然看起来更艺术......
回答by Robert Tupelo-Schneck
Note: I now prefer and recommend my other answer.
注意:我现在更喜欢并推荐我的其他答案。
Here's a version which feels to me much more straightforward: Increase the corePoolSize (up to the limit of maximumPoolSize) whenever a new task is executed, then decrease the corePoolSize (down to the limit of the user specified "core pool size") whenever a task completes.
这是一个让我感觉更直接的版本:每当执行新任务时增加 corePoolSize(最高到 maximumPoolSize 的限制),然后在每次执行新任务时减少 corePoolSize(减少到用户指定的“核心池大小”的限制)任务完成。
To put it another way, keep track of the number of running or enqueued tasks, and ensure that the corePoolSize is equal to the number of tasks as long as it is between the user specified "core pool size" and the maximumPoolSize.
换句话说,跟踪运行或入队任务的数量,并确保 corePoolSize 等于任务数量,只要它在用户指定的“核心池大小”和 maximumPoolSize 之间。
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
private int userSpecifiedCorePoolSize;
private int taskCount;
public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
userSpecifiedCorePoolSize = corePoolSize;
}
@Override
public void execute(Runnable runnable) {
synchronized (this) {
taskCount++;
setCorePoolSizeToTaskCountWithinBounds();
}
super.execute(runnable);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
synchronized (this) {
taskCount--;
setCorePoolSizeToTaskCountWithinBounds();
}
}
private void setCorePoolSizeToTaskCountWithinBounds() {
int threads = taskCount;
if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
setCorePoolSize(threads);
}
}
As written the class doesn't support changing the user specified corePoolSize or maximumPoolSize after construction, and doesn't support manipulating the work queue directly or via remove()
or purge()
.
如所写,该类不支持在构造后更改用户指定的 corePoolSize 或 maximumPoolSize,也不支持直接或通过remove()
或操作工作队列purge()
。
回答by Robert Tupelo-Schneck
Note: I now prefer and recommend my other answer.
注意:我现在更喜欢并推荐我的其他答案。
I have another proposal, following to the original idea of changing the queue to return false. In this one all tasks can enter the queue, but whenever a task is enqueued after execute()
, we follow it with a sentinel no-op task which the queue rejects, causing a new thread to spawn, which will execute the no-op immediately followed by something from the queue.
我有另一个建议,遵循更改队列以返回 false 的原始想法。在这个任务中,所有任务都可以进入队列,但是每当一个任务在 之后入队时execute()
,我们就会跟随一个哨兵无操作任务,队列拒绝该任务,从而产生一个新线程,该线程将立即执行无操作,紧随其后的是队列中的一些东西。
Because worker threads may be polling the LinkedBlockingQueue
for a new task, it's possible for a task to get enqueued even when there's an available thread. To avoid spawning new threads even when there are threads available, we need to keep track of how many threads are waiting for new tasks on the queue, and only spawn a new thread when there are more tasks on the queue than waiting threads.
因为工作线程可能正在轮询LinkedBlockingQueue
新任务,所以即使有可用线程,任务也可能被排队。为了避免即使有可用线程也产生新线程,我们需要跟踪有多少线程正在等待队列中的新任务,并且只有当队列中的任务多于等待线程时才产生新线程。
final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };
final AtomicInteger waitingThreads = new AtomicInteger(0);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
// offer returning false will cause the executor to spawn a new thread
if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
else return super.offer(e);
}
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.poll(timeout, unit);
} finally {
waitingThreads.decrementAndGet();
}
}
@Override
public Runnable take() throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.take();
} finally {
waitingThreads.decrementAndGet();
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
@Override
public void execute(Runnable command) {
super.execute(command);
if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
}
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r == SENTINEL_NO_OP) return;
else throw new RejectedExecutionException();
}
});
回答by Robert Tupelo-Schneck
I've already got two other answers on this question, but I suspect this one is the best.
关于这个问题,我已经有了另外两个答案,但我怀疑这个是最好的。
It's based on the technique of the currently accepted answer, namely:
它基于当前接受的答案的技术,即:
- Override the queue's
offer()
method to (sometimes) return false, - which causes the
ThreadPoolExecutor
to either spawn a new thread or reject the task, and - set the
RejectedExecutionHandler
to actuallyqueue the task on rejection.
- 覆盖队列的
offer()
方法以(有时)返回 false, - 这会导致
ThreadPoolExecutor
产生一个新线程或拒绝该任务,以及 - 设置
RejectedExecutionHandler
以实际排队上拒绝的任务。
The problem is when offer()
should return false. The currently accepted answer returns false when the queue has a couple of tasks on it, but as I've pointed out in my comment there, this causes undesirable effects. Alternately, if you always return false, you'll keep spawning new threads even when you have threads waiting on the queue.
问题是什么时候offer()
应该返回false。当前接受的答案在队列上有几个任务时返回 false,但正如我在那里的评论中指出的那样,这会导致不良影响。或者,如果您总是返回 false,即使您有线程在队列中等待,您也会不断产生新线程。
The solution is to use Java 7 LinkedTransferQueue
and have offer()
call tryTransfer()
. When there is a waiting consumer thread the task will just get passed to that thread. Otherwise, offer()
will return false and the ThreadPoolExecutor
will spawn a new thread.
解决方案是使用 Java 7LinkedTransferQueue
并offer()
调用tryTransfer()
. 当有一个等待的消费者线程时,任务将被传递给该线程。否则,offer()
将返回 false 并ThreadPoolExecutor
生成一个新线程。
BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
return tryTransfer(e);
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
回答by user2179737
The recommended answer resolves only one (1) of the issue with the JDK thread pool:
推荐的答案仅解决了 JDK 线程池的一 (1) 个问题:
JDK thread pools are biased towards queuing. So instead of spawning a new thread, they will queue the task. Only if the queue reaches its limit will the thread pool spawn a new thread.
Thread retirement does not happen when load lightens. For example if we have a burst of jobs hitting the pool that causes the pool to go to max, followed by light load of max 2 tasks at a time, the pool will use all threads to service the light load preventing thread retirement. (only 2 threads would be needed…)
JDK 线程池偏向于排队。因此,它们不会生成新线程,而是将任务排队。只有当队列达到其限制时,线程池才会产生一个新线程。
负载减轻时不会发生线程退出。例如,如果我们有大量作业击中池,导致池达到最大值,然后是一次最多 2 个任务的轻负载,则池将使用所有线程来为轻负载提供服务,从而防止线程退出。(只需要 2 个线程……)
Unhappy with the behavior above, I went ahead and implemented a pool to overcome the deficiencies above.
对上述行为不满意,我继续实施了一个池来克服上述缺陷。
To resolve 2) Using Lifo scheduling resolves the issue. This idea was presented by Ben Maurer at ACM applicative 2015 conference: Systems @ Facebook scale
解决 2) 使用 Lifo 调度解决了该问题。这个想法是 Ben Maurer 在 ACM applicative 2015 会议上提出的: Systems @ Facebook scale
So a new implementation was born:
于是一个新的实现诞生了:
So far this implementation improves async execution perfomance for ZEL.
到目前为止,此实现改进了ZEL 的异步执行性能。
The implementation is spin capable to reduce context switch overhead, yielding superior performance for certain use cases.
该实现能够减少上下文切换开销,为某些用例产生卓越的性能。
Hope it helps...
希望能帮助到你...
PS: JDK Fork Join Pool implement ExecutorService and works as a "normal" thread pool, Implementation is performant, It uses LIFO Thread scheduling, however there is no control over internal queue size, retirement timeout..., and most importantly tasks cannot be interrupted when canceling them
PS: JDK Fork Join Pool 实现了 ExecutorService 并作为一个“普通”线程池工作,实现是高性能的,它使用 LIFO 线程调度,但是没有控制内部队列大小,退休超时......,最重要的是任务不能被取消时中断
回答by Radu Toader
Note: For JDK ThreadPoolExecutorwhen you have a bounded queue, you are only creating new threads when offer is returning false. You might obtain something usefull with CallerRunsPolicywhich creates a bit of BackPressure and directly calls run in caller thread.
注意:对于 JDK ThreadPoolExecutor,当您有一个有界队列时,您只会在 offer 返回 false 时创建新线程。您可能会使用CallerRunsPolicy获得一些有用的东西,它会创建一些 BackPressure 并直接在调用者线程中调用 run。
I need tasks to be executed from threads created by the pool and have an ubounded queue for scheduling, while the number of threads within the pool may growor shrinkbetween corePoolSizeand maximumPoolSizeso...
我需要从池创建的线程中执行任务,并有一个 ubounded 队列用于调度,而池中的线程数可能会在corePoolSize和maximumPoolSize之间增加或减少,所以......
I ended up doing a full copy pastefrom ThreadPoolExecutorand changea bit the execute method because unfortunatelythis could not be done by extension(it calls private methods).
我最终从ThreadPoolExecutor做了一个完整的复制粘贴,并稍微改变了 execute 方法,因为 不幸的是,这不能通过扩展来完成(它调用私有方法)。
I didn't wanted to spawn new threads just immediately when new request arrive and all threads are busy(because I have in general short lived tasks). I've added a threshold but feel free to change it to your needs ( maybe for mostly IO is better to remove this threshold)
我不想在新请求到达并且所有线程都很忙时立即产生新线程(因为我通常有短期任务)。我已经添加了一个阈值,但可以根据您的需要随意更改它(对于大多数 IO 来说,最好删除此阈值)
private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;
protected void beforeExecute(Thread t, Runnable r) {
activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && this.workQueue.offer(command)) {
int recheck = this.ctl.get();
if (!isRunning(recheck) && this.remove(command)) {
this.reject(command);
} else if (workerCountOf(recheck) == 0) {
this.addWorker((Runnable) null, false);
}
//>>change start
else if (workerCountOf(recheck) < maximumPoolSize //
&& (activeWorkers.get() > workerCountOf(recheck) * threshold
|| workQueue.size() > workerCountOf(recheck) * threshold)) {
this.addWorker((Runnable) null, false);
}
//<<change end
} else if (!this.addWorker(command, false)) {
this.reject(command);
}
}