生产者-消费者问题Java程序
在这篇文章中,我们将看到使用线程解决生产者-消费者问题的Java程序。
目录
生产者消费者问题
生产者-消费者Java程序
生产者-消费者的Java程序,使用wait-notify
使用BlockingQueue的生产者-消费者Java程序
生产者消费者问题
生产者使用者是一个经典的并发问题,其中同步和线程间通信要求正确执行。
在生产者-消费者问题中,生产者和消费者有两个进程共享一个称为队列的公共有界缓冲区。
生产者进程生成数据并将其插入共享队列。
使用者进程使用共享队列中的数据。
这里的要求是,生产者不应尝试将数据添加到共享缓冲区(如果已满),而应等待队列中有空间容纳新元素。以同样的方式,使用者不应尝试使用空缓冲区中的数据,而应等待将数据插入队列中。
生产者-消费者Java程序
由于正确执行Producer-Consumer需要进行线程间通信,因此可以使用wait-notify方法编写该程序。
我们还可以使用Java并发包,其中添加了许多队列实现。使用ArrayBlockingQueue,我们可以轻松地用Java实现Producer-Consumer程序。
生产者-消费者的Java程序,使用wait-notify
在Java程序中,需要一个共享缓冲区,供生产者和使用者进程使用,以便可以使用LinkedList实例。
生产者和使用者还有两个Runnable任务,由两个单独的线程执行。将值添加到队列后,生产者应通知使用者任务唤醒,并应进入等待状态。
如果队列为空,则使用者任务应以相同的方式处于等待状态。
import java.util.LinkedList;
// Producer task
class Producer implements Runnable{
LinkedList<Integer> list;
Producer(LinkedList<Integer> list){
this.list = list;
}
@Override
public void run() {
for(int i = 1; i <= 5; i++){
synchronized(list) {
// If there is already an element in the list wait
while(list.size() >= 1){
System.out.println("Waiting as queue is full..");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Adding to queue- " + Thread.currentThread().getName() + " " + i);
list.add(i);
list.notify();
}
}
}
}
//Consumer task
class Consumer implements Runnable{
LinkedList<Integer> list;
Consumer(LinkedList<Integer> list){
this.list = list;
}
@Override
public void run() {
for(int i = 1; i <= 5; i++){
synchronized(list) {
// if there is no element in the list wait
while(list.size() < 1){
System.out.println("Waiting as queue is empty..");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// if there is element in the list then retrieve it
System.out.println("Consuming from queue- " + Thread.currentThread().getName() + " " + list.remove());
list.notify();
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
// shared list
LinkedList<Integer> list = new LinkedList<Integer>();
Thread t1 = new Thread(new Producer(list), "Producer");
Thread t2 = new Thread(new Consumer(list), "Consumer");
t1.start();
t2.start();
}
}
输出:
Adding to queue- Producer 1 Waiting as queue is full.. Consuming from queue- Consumer 1 Waiting as queue is empty.. Adding to queue- Producer 2 Waiting as queue is full.. Consuming from queue- Consumer 2 Waiting as queue is empty.. Adding to queue- Producer 3 Waiting as queue is full.. Consuming from queue- Consumer 3 Waiting as queue is empty.. Adding to queue- Producer 4 Waiting as queue is full.. Consuming from queue- Consumer 4 Waiting as queue is empty.. Adding to queue- Producer 5 Consuming from queue- Consumer 5
使用BlockingQueue的面向生产者-消费者的Java程序
使用ArrayBlockingQueue之类的BlockingQueue实现,我们可以轻松地用Java实现Producer-Consumer程序。
BlockingQueue具有put()方法用于添加到队列中,如果队列容量已满,它将阻塞。同样,有一个take()方法可从队列的开头检索,如果没有可用的元素,它将阻塞。
在容量为1的代码ArrayBlockingQueue中创建,因此队列将只有一个元素并且插入将被阻塞,直到检索到该元素为止。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// Producer task
class Producer implements Runnable{
BlockingQueue<Integer> queue;
Producer(BlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
for(int i = 1; i <= 5; i++){
try {
queue.put(i);
System.out.println("Adding to queue- " + i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
//Consumer task
class Consumer implements Runnable{
BlockingQueue<Integer> queue;
Consumer(BlockingQueue<Integer> queue){
this.queue = queue;
}
@Override
public void run() {
for(int i = 1; i <= 5; i++){
try {
// if there is element in the list then retrieve it
System.out.println("Consuming from queue- " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ProducerConsumer {
public static void main(String[] args) {
BlockingQueue<Integer> bQueue = new ArrayBlockingQueue<Integer>(1);
Thread t1 = new Thread(new Producer(bQueue), "Producer");
Thread t2 = new Thread(new Consumer(bQueue), "Consumer");
t1.start();
t2.start();
}
}
输出:
Adding to queue- 1 Consuming from queue- 1 Adding to queue- 2 Consuming from queue- 2 Adding to queue- 3 Consuming from queue- 3 Adding to queue- 4 Consuming from queue- 4 Adding to queue- 5 Consuming from queue- 5
如我们所见,使用ArrayBlockingQueue无需编写用于同步线程的逻辑,也无需调用等待并明确通知,从而使编写生产者-消费者Java程序非常简单。使用Lambda表达式可以使其更紧凑。
public class ArrayBQ {
public static void main(String[] args) {
// BlockingQueue of capacity 1
BlockingQueue<Integer> bQueue = new ArrayBlockingQueue<Integer>(1);
// Producer
new Thread(()->{
for(int i = 0; i < 5; i++){
try {
bQueue.put(i);
System.out.println("Added to queue-" + i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
// Consumer
new Thread(()->{
for(int i = 0; i < 5; i++){
try {
System.out.println("Consumer retrieved- " + bQueue.take());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}
}

