生产者 - Java 中的消费者多线程
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/18205407/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
producer - consumer multithreading in Java
提问by Saeed Mirzaee
I want to write program using multithreading wait and notify methods in Java.
This program has a stack (max-length = 5). Producer generate number forever and put it in the stack, and consumer pick it from stack.
When stack is full producer must wait and when stack is empty consumers must wait.
The problem is that it runs just once, I mean once it produce 5 number it stops but i put run methods in while(true) block to run nonstop able but it doesn't.
Here is what i tried so far.
Producer class:
我想使用 Java 中的多线程等待和通知方法编写程序。
该程序有一个堆栈(最大长度 = 5)。生产者永远生成数字并将其放入堆栈中,消费者从堆栈中选择它。
当堆栈已满时,生产者必须等待,当堆栈为空时,消费者必须等待。
问题是它只运行一次,我的意思是一旦它产生 5 个数字它就会停止,但我将 run 方法放在 while(true) 块中以不间断地运行,但它没有。
这是我到目前为止所尝试的。
生产者类:
package trail;
import java.util.Random;
import java.util.Stack;
public class Thread1 implements Runnable {
int result;
Random rand = new Random();
Stack<Integer> A = new Stack<>();
public Thread1(Stack<Integer> A) {
this.A = A;
}
public synchronized void produce()
{
while (A.size() >= 5) {
System.out.println("List is Full");
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
result = rand.nextInt(10);
System.out.println(result + " produced ");
A.push(result);
System.out.println(A);
this.notify();
}
@Override
public void run() {
System.out.println("Producer get started");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
produce();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
And the consumer:
而消费者:
package trail;
import java.util.Stack;
public class Thread2 implements Runnable {
Stack<Integer> A = new Stack<>();
public Thread2(Stack<Integer> A) {
this.A = A;
}
public synchronized void consume() {
while (A.isEmpty()) {
System.err.println("List is empty" + A + A.size());
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.err.println(A.pop() + " Consumed " + A);
this.notify();
}
@Override
public void run() {
System.out.println("New consumer get started");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
consume();
}
}
}
and here is the main method:
这是主要方法:
public static void main(String[] args) {
Stack<Integer> stack = new Stack<>();
Thread1 thread1 = new Thread1(stack);// p
Thread2 thread2 = new Thread2(stack);// c
Thread A = new Thread(thread1);
Thread B = new Thread(thread2);
Thread C = new Thread(thread2);
A.start();
B.start();
C.start();
}
回答by SeniorJD
Seems like you skipped something about wait()
, notify()
and synchronized
.
See this example, it should help you.
好像你跳过了一些关于wait()
,notify()
和 的内容synchronized
。看这个例子,它应该对你有帮助。
回答by saurav
You should synchronize on the stack instead of putting it at the method level try this code.
您应该在堆栈上同步而不是将其放在方法级别尝试此代码。
Also don't initalize the stack in your thread classes anyways you are passing them in the constructor from the main class, so no need of that.
也不要初始化线程类中的堆栈,无论如何您是在主类的构造函数中传递它们,所以不需要这样做。
Always try to avoid mark any method with synchronized keyword instead of that try to put critical section of code in the synchronized block because the more size of your synchronized area more it will impact on performance.
始终尽量避免使用 synchronized 关键字标记任何方法,而不是尝试将代码的关键部分放在 synchronized 块中,因为同步区域的大小越大,它对性能的影响就越大。
So, always put only that code into synchronized block that need thread safety.
因此,始终只将该代码放入需要线程安全的同步块中。
Producer Code :
生产者代码:
public void produce() {
synchronized (A) {
while (A.size() >= 5) {
System.out.println("List is Full");
try {
A.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
result = rand.nextInt(10);
System.out.println(result + " produced ");
A.push(result);
System.out.println("stack ---"+A);
A.notifyAll();
}
}
Consumer Code :
消费者代码:
public void consume() {
synchronized (A) {
while (A.isEmpty()) {
System.err.println("List is empty" + A + A.size());
try {
System.err.println("wait");
A.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.err.println(A.pop() + " Consumed " + A);
A.notifyAll();
}
}
回答by Bex
Your consumer and you producer are synchronized on differentobjects and do not block each other. If this works, I daresay it's accidental.
你的消费者和你的生产者在不同的对象上是同步的,不会互相阻塞。如果这有效,我敢说这是偶然的。
Read up on java.util.concurrent.BlockingQueue
and java.util.concurrent.ArrayBlockingQueue
. These provide you with more modern and easier way to implement this pattern.
继续阅读java.util.concurrent.BlockingQueue
和java.util.concurrent.ArrayBlockingQueue
。这些为您提供了更现代、更简单的方式来实现此模式。
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html
回答by Alex P
I think it will be better for understanding and dealing with synchronisation in general if you try to separate three things which are currently mixed:
我认为,如果您尝试将当前混合的三件事分开,那么通常会更好地理解和处理同步:
Task which is going to do the actual job. Names for classes
Thread1
&Thread2
are misleading. They are not Thread objects, but they are actually jobs or tasks implementing Runnable interface you are giving toThread
objects.Thread object itself which you are creating in main
Shared object which encapsulates synchronised operations/logic on a queue, a stack etc. This object will be shared between tasks. And inside this shared object you will take care of add/remove operations (either with synchronized blocks or synchronized methods). Currently (as it was pointed out already), synchronization is done on a task itself (i.e. each task waits and notifies on its own lock and nothing happens). When you separate concerns, i.e. let one class do one thing properly it will eventually become clear where is the problem.
将要完成实际工作的任务。对于类名称
Thread1
和Thread2
具有误导性。它们不是 Thread 对象,但它们实际上是实现您提供给Thread
对象的Runnable 接口的作业或任务。您在 main 中创建的线程对象本身
将同步操作/逻辑封装在队列、堆栈等上的共享对象。该对象将在任务之间共享。在此共享对象中,您将负责添加/删除操作(使用同步块或同步方法)。目前(正如已经指出的那样),同步是在任务本身上完成的(即每个任务等待并通知自己的锁,什么也没有发生)。当您分离关注点时,即让一个班级正确地做一件事,最终会清楚问题出在哪里。
回答by Ankur Lathi
You can use Java's awesome java.util.concurrent
package and its classes.
您可以使用 Java 的超棒java.util.concurrent
包及其类。
You can easily implement the producer consumer problem using the
BlockingQueue
. ABlockingQueue
already supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.Without
BlockingQueue
, every time we put data to queue at the producer side, we need to check if queue is full, and if full, wait for some time, check again and continue. Similarly on the consumer side, we would have to check if queue is empty, and if empty, wait for some time, check again and continue. However withBlockingQueue
we don't have to write any extra logic than to just add data from Producer and poll data from Consumer.
您可以使用
BlockingQueue
. ABlockingQueue
已经支持在检索元素时等待队列变为非空的操作,以及在存储元素时等待队列中有可用空间的操作。没有
BlockingQueue
,每次我们在生产者端把数据放入队列时,我们需要检查队列是否已满,如果已满,则等待一段时间,再次检查并继续。同样在消费者端,我们必须检查队列是否为空,如果为空,则等待一段时间,再次检查并继续。然而,BlockingQueue
除了添加来自生产者的数据和来自消费者的轮询数据之外,我们不需要编写任何额外的逻辑。
Read more From:
阅读更多来自:
http://javawithswaranga.blogspot.in/2012/05/solving-producer-consumer-problem-in.html
http://javawithswaranga.blogspot.in/2012/05/solving-producer-consumer-problem-in.html
http://www.javajee.com/producer-consumer-problem-in-java-using-blockingqueue
http://www.javajee.com/producer-consumer-problem-in-java-using-blockingqueue
回答by Sabuj Das
Try this:
尝试这个:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CircularArrayQueue<T> {
private volatile Lock rwLock = new ReentrantLock();
private volatile Condition emptyCond = rwLock.newCondition();
private volatile Condition fullCond = rwLock.newCondition();
private final int size;
private final Object[] buffer;
private volatile int front;
private volatile int rare;
/**
* @param size
*/
public CircularArrayQueue(int size) {
this.size = size;
this.buffer = new Object[size];
this.front = -1;
this.rare = -1;
}
public boolean isEmpty(){
return front == -1;
}
public boolean isFull(){
return (front == 0 && rare == size-1) || (front == rare + 1);
}
public void enqueue(T item){
try {
// get a write lock
rwLock.lock();
// if the Q is full, wait the write lock
if(isFull())
fullCond.await();
if(rare == -1){
rare = 0;
front = 0;
} else if(rare == size - 1){
rare = 0;
} else {
rare ++;
}
buffer[rare] = item;
//System.out.println("Added\t: " + item);
// notify the reader
emptyCond.signal();
} catch(InterruptedException e){
e.printStackTrace();
} finally {
// unlock the write lock
rwLock.unlock();
}
}
public T dequeue(){
T item = null;
try{
// get the read lock
rwLock.lock();
// if the Q is empty, wait the read lock
if(isEmpty())
emptyCond.await();
item = (T)buffer[front];
//System.out.println("Deleted\t: " + item);
if(front == rare){
front = rare = -1;
} else if(front == size - 1){
front = 0;
} else {
front ++;
}
// notify the writer
fullCond.signal();
} catch (InterruptedException e){
e.printStackTrace();
} finally{
// unlock read lock
rwLock.unlock();
}
return item;
}
}
回答by Shimon Doodkin
use BlockingQueue,LinkedBlockingQueue this was really simple. http://developer.android.com/reference/java/util/concurrent/BlockingQueue.html
使用 BlockingQueue,LinkedBlockingQueue 这真的很简单。 http://developer.android.com/reference/java/util/concurrent/BlockingQueue.html
回答by geekyBird
package javaapplication;
import java.util.Stack;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ProducerConsumer {
public static Object lock = new Object();
public static Stack stack = new Stack();
public static void main(String[] args) {
Thread producer = new Thread(new Runnable() {
int i = 0;
@Override
public void run() {
do {
synchronized (lock) {
while (stack.size() >= 5) {
try {
lock.wait();
} catch (InterruptedException e) {
}
}
stack.push(++i);
if (stack.size() >= 5) {
System.out.println("Released lock by producer");
lock.notify();
}
}
} while (true);
}
});
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
do {
synchronized (lock) {
while (stack.empty()) {
try {
lock.wait();
} catch (InterruptedException ex) {
Logger.getLogger(ProdCons1.class.getName()).log(Level.SEVERE, null, ex);
}
}
while(!stack.isEmpty()){
System.out.println("stack : " + stack.pop());
}
lock.notifyAll();
}
} while (true);
}
});
producer.start();
consumer.start();
}
}
回答by Ravindra babu
Have a look at this code example:
看看这个代码示例:
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerMulti {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
Thread prodThread = new Thread(new Producer(sharedQueue,1));
Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
Thread consThread2 = new Thread(new Consumer(sharedQueue,2));
prodThread.start();
consThread1.start();
consThread2.start();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random rng;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
this.rng = new Random();
}
@Override
public void run() {
while(true){
try {
int number = rng.nextInt(100);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(100);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
Thread.sleep(100);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
Notes:
笔记:
- Started one
Producer
and twoConsumers
as per your problem statement Producer
will produce random numbers between 0 to 100 in infinite loopConsumer
will consume these numbers in infinite loop- Both
Producer
andConsumer
share lock free and Thread safe LinkedBlockingQueuewhich is Thread safe. You can remove wait() and notify() methods if you use these advanced concurrent constructs.
- 根据您的问题陈述开始一
Producer
和二Consumers
Producer
将在无限循环中产生 0 到 100 之间的随机数Consumer
将在无限循环中消耗这些数字- 双方
Producer
并Consumer
免费共享锁和线程安全的LinkedBlockingQueue是线程安全的。如果您使用这些高级并发构造,您可以删除 wait() 和 notify() 方法。