java 带有 ExecutorService.newFixedThreadPool 的生产者-消费者 - 创建了多少线程?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/7124833/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Producer-Consumer with ExecutorService.newFixedThreadPool - How many threads are created?
提问by Oxford
public class MainClass {
private static final int producerPoolSize = 10;
private static final int consumerPoolSize = 20;
private ExecutorService prodExec = Executors.newFixedThreadPool(producerPoolSize);
private ExecutorService consExec = Executors.newFixedThreadPool(consumerPoolSize);
//main method here, which calls start() below
private void start(String[] args) {
// Get list of ids, split them in to n(producerPoolSize) chunks
for (int index = 0; index < producerPoolSize; index++) {
Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
prodExec.execute(producer);
}
}
public class Producer implements Runnable {
private ExecutorService consExec;
private List<Long> list;
public Producer(ExecutorService exec, List<Long> list) {
this.consExec = exec;
this.list = list;
}
public void run() {
for (Long id: list) {
data = get data from db for the id
consExec.execute(new Consumer(data));
}
}
}
public class Consumer implements Runnable {
public void run() {
// call web service
}
}
In the above code, I have two thread pools - one each for Producers and Consumers. I get a number of IDs from the database,split them in to equal chunks so that they are handed out to Producer threads to process. A producer thread receives a list of IDs and processes each sequentially, retrieving data for each of of the IDs and submitting that data to a Consumer thread to process. Now my question is this:
在上面的代码中,我有两个线程池——生产者和消费者各一个。我从数据库中获取了许多 ID,将它们分成相等的块,以便将它们分发给生产者线程进行处理。生产者线程接收一个 ID 列表并按顺序处理每个 ID,检索每个 ID 的数据并将该数据提交给消费者线程进行处理。现在我的问题是:
I create 10 producer threads above. And I want the size of the Consumer thread pool to be 20. But, while processing each ID, the Producer creates a new Runnable (Consumer) and submits (execute) it to the Consumer executor service. My understanding of the ExecutorService is that the Runnable that you submit to it,gets wrapped in a Worker thread and then executed. So, in the above code, if the number of IDs each producer gets is 50, am I actually creating 50*10=500 Consumer threads? Is it too many?
我在上面创建了 10 个生产者线程。并且我希望 Consumer 线程池的大小为 20。但是,在处理每个 ID 时,Producer 创建一个新的 Runnable(Consumer)并将其提交(执行)到 Consumer 执行程序服务。我对 ExecutorService 的理解是,您提交给它的 Runnable 被包装在一个 Worker 线程中,然后被执行。那么,在上面的代码中,如果每个生产者获得的ID数量是50个,我实际上是在创建50*10=500个消费者线程吗?是不是太多了?
Or does the pool size actually means the number of worker threads? So in the above code I am creating 500 tasks on the Consumer executor which would actually be queued and executed by 20 worker threads? I may not be explaining this correctly, but slightly confused here around the internal implementation of the executor and worried if I am creating too many Consumer threads.
还是池大小实际上意味着工作线程的数量?所以在上面的代码中,我在消费者执行器上创建了 500 个任务,这些任务实际上由 20 个工作线程排队和执行?我可能没有正确解释这一点,但在这里对 executor 的内部实现有点困惑,并担心我是否创建了太多的消费者线程。
If this isn't the way to implement this, can someone suggest a better approach? Thanks.
如果这不是实现这一点的方法,有人可以提出更好的方法吗?谢谢。
回答by dlev
The pool size is what determines the number of worker threads. If you try to submit an item while all the worker threads are busy, it will be queued by the ExecutorService and run once a worker becomes free.
池大小决定了工作线程的数量。如果您尝试在所有工作线程都忙时提交一个项目,它将被 ExecutorService 排队并在工作线程空闲时运行。
The javadocssay this:
javadocs 是这样说的:
Creates a thread pool that reuses a fixed set of threadsoperating off a shared unbounded queue. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks.
创建一个线程池,该线程池重用一组固定的线程,在共享的无界队列中运行。如果任何线程在关闭前的执行过程中由于失败而终止,则在需要执行后续任务时,将有一个新线程代替它。
Note the hilighted parts. The number of threads is fixed, and the queue is unbounded, meaning items submitted when the threads are busy will always be queued, rather than rejected.
注意突出显示的部分。线程数是固定的,队列是无界的,这意味着线程忙时提交的项目将始终排队,而不是被拒绝。
回答by Clint
Does the pool size actually means the number of worker threads? Yes.
池大小实际上意味着工作线程的数量吗?是的。
If the consumer Runnable process takes a long time only 20 will run concurrently. The rest will wait in a collection until a thread is available to run it.
如果消费者 Runnable 进程需要很长时间,那么只有 20 个会并发运行。其余的将在集合中等待,直到有线程可以运行它。
As for if there is a better way to do this. Is there a reason you need to use threads? Unless you have 20 available processors running this in parallel may not increase your processing time because all of the threads will be spending time in context switches etc. that are not useful for processing the data.
至于是否有更好的方法来做到这一点。是否有理由需要使用线程?除非您有 20 个并行运行的可用处理器,否则可能不会增加您的处理时间,因为所有线程都将花费时间在上下文切换等中,这对处理数据没有用。
Also, the producers are getting all of the data and storing it in the Consumers. If the consumers cannot run because you have 500 of them and only 20 can run at once then you are storing (500 minus 20) * the data you can process. You could have the consumers fetching their own data.
此外,生产者正在获取所有数据并将其存储在消费者中。如果消费者因为您有 500 个而无法运行,并且一次只能运行 20 个,那么您正在存储(500 减 20)* 您可以处理的数据。您可以让消费者获取他们自己的数据。
In response to comment:
回应评论:
instead of
代替
for (int index = 0; index < producerPoolSize; index++) {
Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
prodExec.execute(producer);
}
and Processor
和处理器
for (Long id: list) {
data = get data from db for the id
consExec.execute(new Consumer(data));
}
Consumer looks like:
消费者看起来像:
public class Consumer implements Runnable {
long myId;
Consumer(long id){
myId = id;
}
public void run() {
data = get data from db for the id
// do whatever a consumer does with data
}
}
and
和
private void start(String[] args) {
// Get list of ids create a new consumer for each id
for (int index = 0; index < everyID.length; index++) {
consExec.execute(new Consumer(everyID[i]));
}
}
Then you loose a whole class and the 20 pool makes more sense because Consumers that are blocked on IO fetching data will get waited and ones that are ready can continue processing.
然后你失去了整个类,20 池更有意义,因为在 IO 获取数据上被阻塞的消费者将得到等待,准备好的消费者可以继续处理。