java 多生产者多消费者多线程Java

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/25393938/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 07:56:26  来源:igfitidea点击:

Multiple Producer Multiple Consumer Multithreading Java

javamultithreadingproducer-consumerblockingqueue

提问by sgokhales

I'm trying out Multiple Producer - Multiple Consumer use case of Producer-Consumer problem. I'm using BlockingQueue for sharing common queue between multiple producers/consumers.

我正在尝试多生产者 - 生产者-消费者问题的多消费者用例。我正在使用 BlockingQueue 在多个生产者/消费者之间共享公共队列。

Below is my code.
Producer

下面是我的代码。
制作人

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue inputQueue;
    private static volatile int i = 0;
    private volatile boolean isRunning = true;

    public Producer(BlockingQueue q){
        this.inputQueue=q;
    }

    public synchronized void run() {

        //produce messages
        for(i=0; i<10; i++) 
        {
            try {
                inputQueue.put(new Integer(i));

                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Produced "+i);
        }
        finish();
    }

    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
    }

}


Consumer

消费者

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private BlockingQueue inputQueue;
    private volatile boolean isRunning = true;

    private final Integer POISON_PILL = new Integer(-1);

    Consumer(BlockingQueue queue) {
        this.inputQueue = queue;
    }

    public void run() {
        //worker loop keeps taking en element from the queue as long as the producer is still running or as 
        //long as the queue is not empty:
        while(!inputQueue.isEmpty()) {

            try {
                Integer queueElement = (Integer) inputQueue.take();
                System.out.println("Consumed : " + queueElement.toString());

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("Queue ");
    }

    //this is used to signal from the main thread that he producer has finished adding stuff to the queue
    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
        inputQueue.add(POISON_PILL);
    }
}


Test Class

测试班

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ProducerConsumerService {

    public static void main(String[] args) {

        //Creating BlockingQueue of size 10
        BlockingQueue queue = new ArrayBlockingQueue(10);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        System.out.println("Producer and Consumer has been started");
    }

}

I don't see the correct output when I ran the below code.

当我运行以下代码时,我没有看到正确的输出。

Is there any mistake that I'm doing here ?

我在这里做的有什么错误吗?

回答by Peter Lawrey

There is quite a bit of your code which doesn't make sense. I suggest you sit down and work out why the code is there and what it is doing.

你的代码有很多没有意义。我建议你坐下来弄清楚为什么代码在那里以及它在做什么。

If you deleted the isFinshedflag, nothing would change.

如果你删除了这个isFinshed标志,什么都不会改变。

If you deleted the use of synchronizedin the producer you would have concurrent producers. There is no benefit in making a field which is only accessed in a synchronzied block volatile.

如果您删除了synchronized在生产者中的使用,您将拥有并发生产者。将仅在同步块中访问的字段设为 volatile 没有任何好处。

It makes no sense for producers to share a loop counter, if they are to be concurrent. Normally, a producer sends a poison pill, and a consumer doesn't consumer the pill. e.g. if you have two consumers, one might add the pill and the other might consume it. Your consumer ignores poison pills, as it ignores the isFinishedflag.

如果生产者要并发,那么共享一个循环计数器是没有意义的。通常,生产者发送毒丸,而消费者不消费该药丸。例如,如果您有两个消费者,一个可能会添加药丸,另一个可能会服用。您的消费者忽略毒丸,因为它忽略了isFinished标志。

You don't want to stop the consumer just because the queue is temporarily empty. Otherwise it will not see all the message the producer produces, possibly none of them.

您不想仅仅因为队列暂时为空就停止消费者。否则它不会看到生产者产生的所有消息,可能没有。

回答by Ravindra babu

Sample code with multiple producers & multiple consumers.

具有多个生产者和多个消费者的示例代码。

import java.util.concurrent.*;

public class ProducerConsumerDemo {

    public static void main(String args[]){

     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

     Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
     Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
     Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
     Thread consThread2 = new Thread(new Consumer(sharedQueue,2));

     prodThread1.start();
     prodThread2.start();
     consThread1.start();
     consThread2.start();
    }

}

class Producer implements Runnable {

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;

    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }

    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }

}

class Consumer implements Runnable{

    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }

    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

Output:

输出:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Produced:23:by thread:2
Produced:24:by thread:2
Produced:25:by thread:2
Consumed: 11:by thread:1
Consumed: 22:by thread:1
Consumed: 23:by thread:1
Consumed: 24:by thread:1
Consumed: 25:by thread:1
Produced:12:by thread:1
Consumed: 21:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Produced:14:by thread:1
Produced:15:by thread:1
Consumed: 13:by thread:2
Consumed: 14:by thread:1
Consumed: 15:by thread:2

This articleprovides simple example producer and consumer problem with BlockingQueue

本文提供了 BlockingQueue 的简单示例生产者和消费者问题

Changes to your code:

对代码的更改:

  1. Different producers will generate different output instead of same output. Producer Thread 1 generates numbers from 11-15 and Producer Thread 2 generates numbers from 21-25
  2. Any of Consumer thread can consume data from any of Producers. Unlike Producers, Consumers don't have constraints to consume the data.
  3. I have added Thread number in both Producer and Consumer.
  1. 不同的生产者将产生不同的输出而不是相同的输出。生产者线程 1 从 11-15 生成数字,生产者线程 2 从 21-25 生成数字
  2. 任何消费者线程都可以消费来自任何生产者的数据。与生产者不同,消费者没有消费数据的限制。
  3. 我在生产者和消费者中都添加了线程号。

You can find alternative solution with ExecutorServiceat :

您可以在以下位置找到替代解决方案ExecutorService

Producer/Consumer threads using a Queue

使用队列的生产者/消费者线程

回答by Holger

It's not too hard when just implementing it straight forward. The example code below does it. It simply uses local variables for everything that is not supposed to be shared.

直接实施它并不太难。下面的示例代码做到了。它只是将局部变量用于不应该共享的所有内容。

Besides the queue, only a thread safe counter maintaining the number of active producers is shared. The counter is used rather than a special “POISON_PILL” value as such a marker value does not work with a single queue and multiple consumers as allconsumers have to recognize the finishing of the producer but only when allproducers have finished.

除了队列之外,只有一个线程安全计数器保持活跃生产者的数量是共享的。使用计数器而不是特殊的“ POISON_PILL”值,因为这样的标记值不适用于单个队列和多个消费者,因为所有消费者都必须识别生产者的完成,但只有当所有生产者都完成时。

The counter is a simple ending condition. The only thing to care about is that after detecting that the counter reached zero, the queue has to be re-checked to avoid race conditions.

计数器是一个简单的结束条件。唯一需要关心的是,在检测到计数器为零后,必须重新检查队列以避免竞争条件。

As a side note, it doesn't make sense to use concurrency features provided by Java?5 and not using Generics for clean type safe code.

附带说明一下,使用 Java?5 提供的并发特性而不使用泛型来编写干净的类型安全代码是没有意义的。

final AtomicInteger activeProducers=new AtomicInteger();
final BlockingQueue<Integer> queue=new ArrayBlockingQueue<>(10);
Runnable producer=new Runnable() {
  public void run() {
    try {
      for(int i=0; i<10; i++) {
          Thread.sleep(TimeUnit.SECONDS.toMillis(1));
          queue.put(i);
          System.out.println("Produced "+i);
      }
    } catch(InterruptedException ex) {
      System.err.println("producer terminates early: "+ex);
    }
    finally { activeProducers.decrementAndGet(); }
  }
};
Runnable consumer=new Runnable() {
  public void run() {
    try {
      for(;;) {
        Integer queueElement = queue.poll(1, TimeUnit.SECONDS);
        if(queueElement!=null)
          System.out.println("Consumed : " + queueElement);
        else if(activeProducers.get()==0 && queue.peek()==null) return;
      }
    } catch(InterruptedException ex) {
      System.err.println("consumer terminates early: "+ex);
    }
  }
};
final int NUM_PRODUCERS = 2, NUM_CONSUMERS = 2;
for(int i=0; i<NUM_PRODUCERS; i++) {
  activeProducers.incrementAndGet();
  new Thread(producer).start();
}
for(int i=0; i<NUM_CONSUMERS; i++) {
  new Thread(consumer).start();
}