java ThreadPoolExecutor 和队列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/11820120/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
ThreadPoolExecutor and the queue
提问by Cratylus
I thought that using ThreadPoolExecutorwe can submit Runnable
s to be executed either in the BlockingQueue
passed in the constructor or using the execute
method.
Also my understanding was that if a task is available it will be executed.
What I don't understand is the following:
我认为使用ThreadPoolExecutor我们可以在构造函数中传递或使用方法提交Runnable
要执行的 s 。
另外我的理解是,如果任务可用,它将被执行。
我不明白的是以下内容: BlockingQueue
execute
public class MyThreadPoolExecutor {
private static ThreadPoolExecutor executor;
public MyThreadPoolExecutor(int min, int max, int idleTime, BlockingQueue<Runnable> queue){
executor = new ThreadPoolExecutor(min, max, 10, TimeUnit.MINUTES, queue);
//executor.prestartAllCoreThreads();
}
public static void main(String[] main){
BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
final String[] names = {"A","B","C","D","E","F"};
for(int i = 0; i < names.length; i++){
final int j = i;
q.add(new Runnable() {
@Override
public void run() {
System.out.println("Hi "+ names[j]);
}
});
}
new MyThreadPoolExecutor(10, 20, 1, q);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
/*executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("++++++++++++++");
}
}); */
for(int i = 0; i < 100; i++){
final int j = i;
q.add(new Runnable() {
@Override
public void run() {
System.out.println("Hi "+ j);
}
});
}
}
}
This code does not do absolutely anything unless I either uncomment the executor.prestartAllCoreThreads();
in the constructor OR I call execute
of the runnable that prints System.out.println("++++++++++++++");
(it is also commented out).
除非我executor.prestartAllCoreThreads();
在构造函数中取消注释或调用execute
打印的可运行System.out.println("++++++++++++++");
(它也被注释掉),否则此代码绝对不会做任何事情。
Why?
Quote (my emphasis):
为什么?
引用(我的重点):
By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method prestartCoreThread() or prestartAllCoreThreads(). You probably want to prestart threads if you construct the pool with a non-empty queue.
默认情况下,即使是核心线程也只有在新任务到达时才最初创建和启动,但这可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 动态覆盖。如果您使用非空队列构造池,您可能想要预启动线程。
Ok. So my queue is not empty. But I create the executor
, I do sleep
and then I add new Runnable
s to the queue (in the loop to 100).
Doesn't this loop count as new tasks arrive
?
Why doesn't it work and I have to either prestart
or explicitely call execute
?
行。所以我的队列不是空的。但是我创建了executor
,sleep
然后我将新的Runnable
s添加到队列中(在循环中添加到 100)。
这个循环算不算new tasks arrive
?
为什么它不起作用,我必须要么prestart
或明确调用execute
?
回答by oldrinb
Worker threads are spawned as tasks arrive by execute, and these are the ones that interact with the underlying work queue. You need to prestart the workers if you begin with a non-empty work queue. See the implementation in OpenJDK 7.
当任务通过执行到达时产生工作线程,这些是与底层工作队列交互的线程。如果您从非空工作队列开始,则需要预先启动工作人员。请参阅OpenJDK 7 中的实现。
I repeat, the workers are the ones that interact with the work queue. They are only spawned on demand when passed via execute
. (or the layers above it, e.g. invokeAll
, submit
, etc.) If they are not started, it will not matter how much work you add to the queue, since there is nothing checking it as there are no workers started.
我再说一遍,工作人员是与工作队列交互的人。它们仅在通过execute
. (或它上面的层,例如invokeAll
,submit
等)如果它们没有启动,那么添加到队列中的工作量就无关紧要,因为没有任何检查它,因为没有工作人员已启动。
ThreadPoolExecutor
does not spawn worker threads until necessary or if you pre-empt their creation by the methods prestartAllCoreThreadsand prestartCoreThread. If there are no workers started, then there is no way any of the work in your queue is going to be done.
ThreadPoolExecutor
除非必要,否则不会产生工作线程,或者如果您通过方法prestartAllCoreThreads和prestartCoreThread 抢占它们的创建。如果没有工作人员启动,那么队列中的任何工作都无法完成。
The reason adding an initial execute
works is that it forces the creation of a solecore worker thread, which then can begin processing the work from your queue. You could also call prestartCoreThread
and receive similar behavior. If you want to start allthe workers, you must call prestartAllCoreThreads
or submit that number of tasks via execute
.
添加初始execute
作品的原因是它强制创建一个唯一的核心工作线程,然后它可以开始处理队列中的工作。您也可以调用prestartCoreThread
并接收类似的行为。如果要启动所有工作程序,则必须通过调用prestartAllCoreThreads
或提交该数量的任务execute
。
See the code for execute
below.
请参阅execute
下面的代码。
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
回答by Blake Beaupain
A BlockingQueue is not a magic thread dispatcher. If you submit Runnable objects to the queue and there are no running threads to consume those tasks, they of course will not be executed. The execute method on the other hand will automatically dispatch threads according to the thread pool configuration if it needs to. If you pre-start all of the core threads, there will be threads there to consume tasks from the queue.
BlockingQueue 不是一个神奇的线程调度器。如果您将 Runnable 对象提交到队列并且没有正在运行的线程来使用这些任务,它们当然不会被执行。另一方面,如果需要,execute 方法将根据线程池配置自动调度线程。如果你预先启动所有的核心线程,那里就会有线程来消费队列中的任务。