Java中的ConcurrentLinkedQueue
时间:2020-01-09 10:35:12 来源:igfitidea点击:
Java中的ConcurrentLinkedQueue是线程安全的无界队列。它将其元素存储为链接节点,其中每个节点都存储对下一个节点的引用。 ConcurrentLinkedQueue类实现Queue接口,并且是java.util.concurrent包的一部分。
ConcurrentLinkedQueue与ArrayBlockingQueue之类的BlockingQueue实现有何不同,PriorityBlockingQueue是ConcurrentLinkedQueue是非阻塞的,因此该队列中的操作不会阻塞。由于ConcurrentLinkedQueue是非阻塞的,因此没有put()或者take()方法会在需要时阻塞。
该队列对元素FIFO(先进先出)进行排序。队列的开头是已在队列中最长时间的元素。队列的尾部是最短时间位于队列中的元素。新元素插入到队列的尾部,并且队列检索操作在队列的开头获取元素。
ConcurrentLinkedQueue不允许使用空元素
像大多数其他并发集合实现一样,此类也不允许使用null元素。
public class ConcurrentLQ {
public static void main(String[] args) {
Queue<Integer> conQueue = new ConcurrentLinkedQueue<>();
conQueue.add(5);
conQueue.add(null);
}
}
输出:
Exception in thread "main" java.lang.NullPointerException at java.base/java.util.Objects.requireNonNull(Objects.java:221) at java.base/java.util.concurrent.ConcurrentLinkedQueue.offer(ConcurrentLinkedQueue.java:355) at java.base/java.util.concurrent.ConcurrentLinkedQueue.add(ConcurrentLinkedQueue.java:283) at com.theitroad.programs.ConcurrentLQ.main(ConcurrentLQ.java:11)
如我们所见,尝试将null添加到队列会导致NullPointerException。
Java ConcurrentLinkedQueue构造函数
- ConcurrentLinkedQueue()–创建一个最初为空的ConcurrentLinkedQueue。
- ConcurrentLinkedQueue(Collection <?extends E> c)-创建一个ConcurrentLinkedQueue,最初包含给定集合的元素,并以集合迭代器的遍历顺序添加。
ConcurrentLinkedQueue Java示例
这是使用ConcurrentLinkedQueue的Java生产者-消费者示例。有一个生产者线程和两个消费者线程。
public class ConcurrentLQ {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
Queue<Integer> conQueue = new ConcurrentLinkedQueue<>();
// One Producer thread
executor.execute(new ConProducer(conQueue));
// Two Consumer thread
executor.execute(new ConConsumer(conQueue));
executor.execute(new ConConsumer(conQueue));
executor.shutdown();
}
}
//Producer
class ConProducer implements Runnable{
Queue<Integer> conQueue;
ConProducer(Queue<Integer> conQueue){
this.conQueue = conQueue;
}
@Override
public void run() {
for(int i = 0; i < 6; i++){
System.out.println("Adding to queue-" + i);
conQueue.add(i);
}
}
}
//Consumer
class ConConsumer implements Runnable{
Queue<Integer> conQueue;
ConConsumer(Queue<Integer> conQueue){
this.conQueue = conQueue;
}
@Override
public void run() {
for(int i = 0; i < 3; i++){
try {
TimeUnit.MILLISECONDS.sleep(50);
System.out.println("Thread Name -" + Thread.currentThread().getName() + " Consumer retrieved- " + conQueue.poll());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
输出:
Adding to queue-0 Adding to queue-1 Adding to queue-2 Adding to queue-3 Adding to queue-4 Adding to queue-5 Thread Name -pool-1-thread-2 Consumer retrieved- 0 Thread Name -pool-1-thread-3 Consumer retrieved- 1 Thread Name -pool-1-thread-3 Consumer retrieved- 3 Thread Name -pool-1-thread-2 Consumer retrieved- 2 Thread Name -pool-1-thread-3 Consumer retrieved- 4 Thread Name -pool-1-thread-2 Consumer retrieved- 5

