Java中的自定义BlockingQueue实现
时间:2020-02-23 14:34:47 来源:igfitidea点击:
在本教程中,我们将看到如何创建自己的自定义BlockingQueue。
这是BlockingQueue的简单实现。
- 我们将使用数组在内部存储BlockingQueue中的元素。
此阵列的大小定义一次可以驻留在BlockingQueue中的最大元素数。
- 我们将使用
lock和conditions对象创建自定义BlockingQueue。 - 在将元素放在队列中,如果队列已满,则生产者将会
wait队列要为空。 - 虽然从队列中消耗元素,但如果队列为空,则
consumer会等待queue充满填满。
创建一个名为的类 CustomBlockingQueue.java
package org.arpit.theitroad;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CustomBlockingQueue {
final Lock lock = new ReentrantLock();
//Conditions
final Condition produceCond = lock.newCondition();
final Condition consumeCond = lock.newCondition();
//Array to store element for CustomBlockingQueue
final Object[] array = new Object[6];
int putIndex, takeIndex;
int count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == array.length){
//Queue is full, producers need to wait
produceCond.await();
}
array[putIndex] = x;
System.out.println("Producing - " + x);
putIndex++;
if (putIndex == array.length){
putIndex = 0;
}
//Increment the count for the array
++count;
consumeCond.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0){
//Queue is empty, consumers need to wait
consumeCond.await();
}
Object x = array[takeIndex];
System.out.println("Consuming - " + x);
takeIndex++;
if (takeIndex == array.length){
takeIndex = 0;
}
//reduce the count for the array
--count;
//send signal producer
produceCond.signal();
return x;
} finally {
lock.unlock();
}
}
}
创建另一个将在CustomBlockingQueue上方使用的主类。
package org.arpit.theitroad;
public class CustomBlockingQueueMain {
public static void main(String[] args) {
CustomBlockingQueue customBlockingQueue = new CustomBlockingQueue();
//Creating producer and consumer threads
Thread producer = new Thread(new Producer(customBlockingQueue));
Thread consumer = new Thread(new Consumer(customBlockingQueue));
producer.start();
consumer.start();
}
}
class Producer implements Runnable {
private CustomBlockingQueue customBlockingQueue;
public Producer(CustomBlockingQueue customBlockingQueue){
this.customBlockingQueue = customBlockingQueue;
}
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
customBlockingQueue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
private CustomBlockingQueue customBlockingQueue;
public Consumer(CustomBlockingQueue customBlockingQueue){
this.customBlockingQueue = customBlockingQueue;
}
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
customBlockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
我们创建了两个可追加的类,一个用于制作人,另一个用于消费者,并使用这些Runnables创建了两个线程。
Producing - 1 Producing - 2 Producing - 3 Producing - 4 Producing - 5 Producing - 6 Consuming - 1 Consuming - 2 Consuming - 3 Consuming - 4 Consuming - 5 Consuming - 6 Producing - 7 Producing - 8 Producing - 9 Producing - 10 Consuming - 7 Consuming - 8 Consuming - 9 Consuming - 10
输出可能对我们有所不同,但CustomBlockingQueue在某一个时间只能有6个元素。

