java 多生产者多消费者多线程Java
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25393938/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Multiple Producer Multiple Consumer Multithreading Java
提问by sgokhales
I'm trying out Multiple Producer - Multiple Consumer use case of Producer-Consumer problem. I'm using BlockingQueue for sharing common queue between multiple producers/consumers.
我正在尝试多生产者 - 生产者-消费者问题的多消费者用例。我正在使用 BlockingQueue 在多个生产者/消费者之间共享公共队列。
Below is my code.
Producer
下面是我的代码。
制作人
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue inputQueue;
private static volatile int i = 0;
private volatile boolean isRunning = true;
public Producer(BlockingQueue q){
this.inputQueue=q;
}
public synchronized void run() {
//produce messages
for(i=0; i<10; i++)
{
try {
inputQueue.put(new Integer(i));
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Produced "+i);
}
finish();
}
public void finish() {
//you can also clear here if you wanted
isRunning = false;
}
}
Consumer
消费者
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
private BlockingQueue inputQueue;
private volatile boolean isRunning = true;
private final Integer POISON_PILL = new Integer(-1);
Consumer(BlockingQueue queue) {
this.inputQueue = queue;
}
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(!inputQueue.isEmpty()) {
try {
Integer queueElement = (Integer) inputQueue.take();
System.out.println("Consumed : " + queueElement.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("Queue ");
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
//you can also clear here if you wanted
isRunning = false;
inputQueue.add(POISON_PILL);
}
}
Test Class
测试班
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//Creating BlockingQueue of size 10
BlockingQueue queue = new ArrayBlockingQueue(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//starting producer to produce messages in queue
new Thread(producer).start();
//starting producer to produce messages in queue
new Thread(producer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
//starting consumer to consume messages from queue
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
I don't see the correct output when I ran the below code.
当我运行以下代码时,我没有看到正确的输出。
Is there any mistake that I'm doing here ?
我在这里做的有什么错误吗?
回答by Peter Lawrey
There is quite a bit of your code which doesn't make sense. I suggest you sit down and work out why the code is there and what it is doing.
你的代码有很多没有意义。我建议你坐下来弄清楚为什么代码在那里以及它在做什么。
If you deleted the isFinshed
flag, nothing would change.
如果你删除了这个isFinshed
标志,什么都不会改变。
If you deleted the use of synchronized
in the producer you would have concurrent producers. There is no benefit in making a field which is only accessed in a synchronzied block volatile.
如果您删除了synchronized
在生产者中的使用,您将拥有并发生产者。将仅在同步块中访问的字段设为 volatile 没有任何好处。
It makes no sense for producers to share a loop counter, if they are to be concurrent.
Normally, a producer sends a poison pill, and a consumer doesn't consumer the pill. e.g. if you have two consumers, one might add the pill and the other might consume it. Your consumer ignores poison pills, as it ignores the isFinished
flag.
如果生产者要并发,那么共享一个循环计数器是没有意义的。通常,生产者发送毒丸,而消费者不消费该药丸。例如,如果您有两个消费者,一个可能会添加药丸,另一个可能会服用。您的消费者忽略毒丸,因为它忽略了isFinished
标志。
You don't want to stop the consumer just because the queue is temporarily empty. Otherwise it will not see all the message the producer produces, possibly none of them.
您不想仅仅因为队列暂时为空就停止消费者。否则它不会看到生产者产生的所有消息,可能没有。
回答by Ravindra babu
Sample code with multiple producers & multiple consumers.
具有多个生产者和多个消费者的示例代码。
import java.util.concurrent.*;
public class ProducerConsumerDemo {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
Thread consThread2 = new Thread(new Consumer(sharedQueue,2));
prodThread1.start();
prodThread2.start();
consThread1.start();
consThread2.start();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
Output:
输出:
Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 11:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:12:by thread:1
Consumed: 21:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:15:by thread:1
Consumed: 13:by thread:2
Consumed: 14:by thread:1
Consumed: 15:by thread:2
This articleprovides simple example producer and consumer problem with BlockingQueue
本文提供了 BlockingQueue 的简单示例生产者和消费者问题
Changes to your code:
对代码的更改:
- Different producers will generate different output instead of same output. Producer Thread 1 generates numbers from 11-15 and Producer Thread 2 generates numbers from 21-25
- Any of Consumer thread can consume data from any of Producers. Unlike Producers, Consumers don't have constraints to consume the data.
- I have added Thread number in both Producer and Consumer.
- 不同的生产者将产生不同的输出而不是相同的输出。生产者线程 1 从 11-15 生成数字,生产者线程 2 从 21-25 生成数字
- 任何消费者线程都可以消费来自任何生产者的数据。与生产者不同,消费者没有消费数据的限制。
- 我在生产者和消费者中都添加了线程号。
You can find alternative solution with ExecutorService
at :
您可以在以下位置找到替代解决方案ExecutorService
:
回答by Holger
It's not too hard when just implementing it straight forward. The example code below does it. It simply uses local variables for everything that is not supposed to be shared.
直接实施它并不太难。下面的示例代码做到了。它只是将局部变量用于不应该共享的所有内容。
Besides the queue, only a thread safe counter maintaining the number of active producers is shared. The counter is used rather than a special “POISON_PILL
” value as such a marker value does not work with a single queue and multiple consumers as allconsumers have to recognize the finishing of the producer but only when allproducers have finished.
除了队列之外,只有一个线程安全计数器保持活跃生产者的数量是共享的。使用计数器而不是特殊的“ POISON_PILL
”值,因为这样的标记值不适用于单个队列和多个消费者,因为所有消费者都必须识别生产者的完成,但只有当所有生产者都完成时。
The counter is a simple ending condition. The only thing to care about is that after detecting that the counter reached zero, the queue has to be re-checked to avoid race conditions.
计数器是一个简单的结束条件。唯一需要关心的是,在检测到计数器为零后,必须重新检查队列以避免竞争条件。
As a side note, it doesn't make sense to use concurrency features provided by Java?5 and not using Generics for clean type safe code.
附带说明一下,使用 Java?5 提供的并发特性而不使用泛型来编写干净的类型安全代码是没有意义的。
final AtomicInteger activeProducers=new AtomicInteger();
final BlockingQueue<Integer> queue=new ArrayBlockingQueue<>(10);
Runnable producer=new Runnable() {
public void run() {
try {
for(int i=0; i<10; i++) {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
queue.put(i);
System.out.println("Produced "+i);
}
} catch(InterruptedException ex) {
System.err.println("producer terminates early: "+ex);
}
finally { activeProducers.decrementAndGet(); }
}
};
Runnable consumer=new Runnable() {
public void run() {
try {
for(;;) {
Integer queueElement = queue.poll(1, TimeUnit.SECONDS);
if(queueElement!=null)
System.out.println("Consumed : " + queueElement);
else if(activeProducers.get()==0 && queue.peek()==null) return;
}
} catch(InterruptedException ex) {
System.err.println("consumer terminates early: "+ex);
}
}
};
final int NUM_PRODUCERS = 2, NUM_CONSUMERS = 2;
for(int i=0; i<NUM_PRODUCERS; i++) {
activeProducers.incrementAndGet();
new Thread(producer).start();
}
for(int i=0; i<NUM_CONSUMERS; i++) {
new Thread(consumer).start();
}