Java 使用队列的生产者/消费者线程
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/2332537/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Producer/Consumer threads using a Queue
提问by Gareth
I'd like to create some sort of Producer/Consumer
threading app. But I'm not sure what the best way to implement a queue between the two.
我想创建某种Producer/Consumer
线程应用程序。但我不确定在两者之间实现队列的最佳方法是什么。
So I've some up with two ideas (both of which could be entirely wrong). I would like to know which would be better and if they both suck then what would be the best way to implement the queue. It's mainly my implementation of the queue in these examples that I'm concerned about. I'm extending a Queue class that is an in house class and is thread safe. Below are two examples with 4 classes each.
所以我有两个想法(这两个想法都可能完全错误)。我想知道哪个更好,如果它们都糟糕,那么实现队列的最佳方法是什么。我关心的主要是我在这些示例中对队列的实现。我正在扩展一个 Queue 类,它是一个内部类并且是线程安全的。下面是两个例子,每个例子有 4 个类。
Main class-
主课——
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
Consumer class-
消费类-
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
Producer class-
生产者类-
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
QueueHandler.enqueue(new Object());
}
}
}
Queue class-
队列类-
public class QueueHandler
{
//This Queue class is a thread safe (written in house) class
public static Queue<Object> readQ = new Queue<Object>(100);
public static void enqueue(Object object)
{
//do some stuff
readQ.add(object);
}
public static Object dequeue()
{
//do some stuff
return readQ.get();
}
}
OR
或者
Main class-
主课——
public class SomeApp
{
Queue<Object> readQ;
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
readQ = new Queue<Object>(100);
consumer = new Consumer(readQ);
producer = new Producer(readQ);
}
}
Consumer class-
消费类-
public class Consumer implements Runnable
{
Queue<Object> queue;
public Consumer(Queue<Object> readQ)
{
queue = readQ;
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = queue.dequeue();
//do some stuff with the object
}
}
}
Producer class-
生产者类-
public class Producer implements Runnable
{
Queue<Object> queue;
public Producer(Queue<Object> readQ)
{
queue = readQ;
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
queue.enqueue(new Object());
}
}
}
Queue class-
队列类-
//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
public QueueHandler(int size)
{
super(size); //All I'm thinking about now is McDonalds.
}
public void enqueue(Object object)
{
//do some stuff
readQ.add();
}
public Object dequeue()
{
//do some stuff
return readQ.get();
}
}
And go!
去!
采纳答案by cletus
Java 5+ has all the tools you need for this kind of thing. You will want to:
Java 5+ 拥有完成此类事情所需的所有工具。你会想要:
- Put all your Producers in one
ExecutorService
; - Put all your Consumers in another
ExecutorService
; - If necessary, communicate between the two using a
BlockingQueue
.
- 将所有生产者合二为一
ExecutorService
; - 把你所有的消费者放在另一个
ExecutorService
; - 如有必要,请使用
BlockingQueue
.
I say "if necessary" for (3) because from my experience it's an unnecessary step. All you do is submit new tasks to the consumer executor service. So:
我对 (3) 说“如有必要”,因为根据我的经验,这是一个不必要的步骤。您所做的就是向消费者执行程序服务提交新任务。所以:
final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
So the producers
submit directly to consumers
.
所以producers
直接提交给consumers
.
回答by flybywire
You are reinventing the wheel.
你正在重新发明轮子。
If you need persistence and other enterprise features use JMS(I'd suggest ActiveMq).
如果您需要持久性和其他企业功能,请使用JMS(我建议使用ActiveMq)。
If you need fast in-memory queues use one of the impementations of java's Queue.
如果您需要快速的内存队列,请使用 java 的Queue 的实现之一。
If you need to support java 1.4 or earlier, use Doug Lea's excellent concurrentpackage.
如果需要支持 java 1.4 或更早版本,请使用 Doug Lea 的优秀并发包。
回答by Enno Shioji
OK, as others note, the best thing to do is to use java.util.concurrent
package. I highly recommend "Java Concurrency in Practice". It's a great book that covers almost everything you need to know.
好的,正如其他人所指出的,最好的做法是使用java.util.concurrent
package.json 。我强烈推荐“Java 并发实践”。这是一本很棒的书,几乎涵盖了您需要了解的所有内容。
As for your particular implementation, as I noted in the comments, don't start Threads from Constructors -- it can be unsafe.
至于您的特定实现,正如我在评论中指出的,不要从构造函数启动线程——它可能是不安全的。
Leaving that aside, the second implementation seem better. You don't want to put queues in static fields. You are probably just loosing flexibility for nothing.
撇开这一点不谈,第二个实现似乎更好。您不想将队列放在静态字段中。您可能只是无缘无故地失去了灵活性。
If you want to go ahead with your own implementation (for learning purpose I guess?), supply a start()
method at least. You should construct the object (you can instantiate the Thread
object), and then call start()
to start the thread.
如果您想继续自己的实现(我猜是出于学习目的?),start()
至少提供一种方法。您应该构造对象(您可以实例化该Thread
对象),然后调用start()
以启动线程。
Edit: ExecutorService
have their own queue so this can be confusing.. Here's something to get you started.
编辑:ExecutorService
有自己的队列,所以这可能会令人困惑.. 这里有一些东西可以让你开始。
public class Main {
public static void main(String[] args) {
//The numbers are just silly tune parameters. Refer to the API.
//The important thing is, we are passing a bounded queue.
ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));
//No need to bound the queue for this executor.
//Use utility method instead of the complicated Constructor.
ExecutorService producer = Executors.newSingleThreadExecutor();
Runnable produce = new Produce(consumer);
producer.submit(produce);
}
}
class Produce implements Runnable {
private final ExecutorService consumer;
public Produce(ExecutorService consumer) {
this.consumer = consumer;
}
@Override
public void run() {
Pancake cake = Pan.cook();
Runnable consume = new Consume(cake);
consumer.submit(consume);
}
}
class Consume implements Runnable {
private final Pancake cake;
public Consume(Pancake cake){
this.cake = cake;
}
@Override
public void run() {
cake.eat();
}
}
Further EDIT:
For producer, instead of while(true)
, you can do something like:
进一步编辑:对于生产者,while(true)
您可以执行以下操作,而不是:
@Override
public void run(){
while(!Thread.currentThread().isInterrupted()){
//do stuff
}
}
This way you can shutdown the executor by calling .shutdownNow()
. If you'd use while(true)
, it won't shutdown.
这样你就可以通过调用关闭执行程序.shutdownNow()
。如果您使用while(true)
,它不会关闭。
Also note that the Producer
is still vulnerable to RuntimeExceptions
(i.e. one RuntimeException
will halt the processing)
另请注意,Producer
仍然容易受到RuntimeExceptions
(即RuntimeException
会停止处理)
回答by Kasthuri
- Java code "BlockingQueue" which has synchronized put and get method.
- Java code "Producer" , producer thread to produce data.
- Java code "Consumer" , consumer thread to consume the data produced.
- Java code "ProducerConsumer_Main", main function to start the producer and consumer thread.
- Java 代码“BlockingQueue”已经同步了 put 和 get 方法。
- Java代码“Producer”,生产者线程来生产数据。
- Java代码“Consumer”,消费者线程消费产生的数据。
- Java代码“ProducerConsumer_Main”,启动生产者和消费者线程的main函数。
BlockingQueue.java
阻塞队列.java
public class BlockingQueue
{
int item;
boolean available = false;
public synchronized void put(int value)
{
while (available == true)
{
try
{
wait();
} catch (InterruptedException e) {
}
}
item = value;
available = true;
notifyAll();
}
public synchronized int get()
{
while(available == false)
{
try
{
wait();
}
catch(InterruptedException e){
}
}
available = false;
notifyAll();
return item;
}
}
Consumer.java
消费者.java
package com.sukanya.producer_Consumer;
public class Consumer extends Thread
{
blockingQueue queue;
private int number;
Consumer(BlockingQueue queue,int number)
{
this.queue = queue;
this.number = number;
}
public void run()
{
int value = 0;
for (int i = 0; i < 10; i++)
{
value = queue.get();
System.out.println("Consumer #" + this.number+ " got: " + value);
}
}
}
ProducerConsumer_Main.java
ProducerConsumer_Main.java
package com.sukanya.producer_Consumer;
public class ProducerConsumer_Main
{
public static void main(String args[])
{
BlockingQueue queue = new BlockingQueue();
Producer producer1 = new Producer(queue,1);
Consumer consumer1 = new Consumer(queue,1);
producer1.start();
consumer1.start();
}
}
回答by Ravindra babu
I have extended cletus proposed answer to working code example.
我已经扩展了 cletus 对工作代码示例的建议答案。
- One
ExecutorService
(pes) acceptsProducer
tasks. - One
ExecutorService
(ces) acceptsConsumer
tasks. - Both
Producer
andConsumer
sharesBlockingQueue
. - Multiple
Producer
tasks generates different numbers. - Any of
Consumer
tasks can consume number generated byProducer
- 一
ExecutorService
(pes)接受Producer
任务。 - 一
ExecutorService
(ces) 接受Consumer
任务。 - 无论
Producer
和Consumer
股票BlockingQueue
。 - 多个
Producer
任务生成不同的数字。 - 任何
Consumer
任务都可以消耗由Producer
Code:
代码:
import java.util.concurrent.*;
public class ProducerConsumerWithES {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue,1));
pes.submit(new Producer(sharedQueue,2));
ces.submit(new Consumer(sharedQueue,1));
ces.submit(new Consumer(sharedQueue,2));
// shutdown should happen somewhere along with awaitTermination
/ * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
pes.shutdown();
ces.shutdown();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
output:
输出:
Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1
Note. If you don't need multiple Producers and Consumers, keep single Producer and Consumer. I have added multiple Producers and Consumers to showcase capabilities of BlockingQueue among multiple Producers and Consumers.
笔记。如果不需要多个生产者和消费者,请保留单个生产者和消费者。我添加了多个生产者和消费者,以在多个生产者和消费者之间展示 BlockingQueue 的功能。
回答by roottraveller
This is a very simple code.
这是一个非常简单的代码。
import java.util.*;
// @author : rootTraveller, June 2017
class ProducerConsumer {
public static void main(String[] args) throws Exception {
Queue<Integer> queue = new LinkedList<>();
Integer buffer = new Integer(10); //Important buffer or queue size, change as per need.
Producer producerThread = new Producer(queue, buffer, "PRODUCER");
Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");
producerThread.start();
consumerThread.start();
}
}
class Producer extends Thread {
private Queue<Integer> queue;
private int queueSize ;
public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
super(ThreadName);
this.queue = queueIn;
this.queueSize = queueSizeIn;
}
public void run() {
while(true){
synchronized (queue) {
while(queue.size() == queueSize){
System.out.println(Thread.currentThread().getName() + " FULL : waiting...\n");
try{
queue.wait(); //Important
} catch (Exception ex) {
ex.printStackTrace();
}
}
//queue empty then produce one, add and notify
int randomInt = new Random().nextInt();
System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt);
queue.add(randomInt);
queue.notifyAll(); //Important
} //synchronized ends here : NOTE
}
}
}
class Consumer extends Thread {
private Queue<Integer> queue;
private int queueSize;
public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
super (ThreadName);
this.queue = queueIn;
this.queueSize = queueSizeIn;
}
public void run() {
while(true){
synchronized (queue) {
while(queue.isEmpty()){
System.out.println(Thread.currentThread().getName() + " Empty : waiting...\n");
try {
queue.wait(); //Important
} catch (Exception ex) {
ex.printStackTrace();
}
}
//queue not empty then consume one and notify
System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
queue.notifyAll();
} //synchronized ends here : NOTE
}
}
}