Java中的LinkedTransferQueue
时间:2020-01-09 10:35:11 来源:igfitidea点击:
Java中的LinkedTransferQueue是TransferQueue接口的实现,并且是java.util.concurrent包的一部分。它是在Java 7中添加的。
Java中的TransferQueue接口
扩展BlockingQueue接口的TransferQueue接口增加了生产者可以等待使用者接收元素的功能。
在诸如ArrayBlockingQueue,PriorityBlockingQueue之类的BlockingQueue实现中,有一些操作会在检索元素时等待队列为空,并在存储元素时等待队列中的空间可用。在TransferQueue中,也有一些操作在元素级别阻塞。
Java TransferQueue方法
除了从BlockingQueue继承的方法外,TransferQueue还添加了以下方法来添加功能,使线程等待直到元素被另一个线程消耗为止。
- transfer(E e)–将元素转移给使用者,并在必要时等待。
- tryTransfer(E e)–如果存在已经在等待接收它的使用者,则立即传输指定的元素,否则返回false
- tryTransfer(E e,long timeout,TimeUnit unit)–如果存在已经在等待接收它的使用者,则立即传输指定的元素。等待直到使用者接收到元素,如果在元素可以传输之前经过了指定的等待时间,则返回false。
TransferQueue还具有以下查询方法:
- hasWaitingConsumer()–如果至少有一个消费者正在等待接收元素,则返回true。
- getWaitingConsumerCount()–返回等待接收元素的使用者数量的估计值。
Java中的LinkedTransferQueue
LinkedTransferQueue是一个无界的TransferQueue,它将其元素存储为链接节点,其中每个节点都存储对下一个节点的引用。此队列中的元素以FIFO(先进先出)的方式排序。队列的开头是某个生产者在队列中停留时间最长的元素。队列的尾部是某个生产者最短时间进入队列的元素。
Java LinkedTransferQueue构造函数
- LinkedTransferQueue()–创建一个最初为空的LinkedTransferQueue。
- LinkedTransferQueue(Collection <?extends E> c)–创建一个LinkedTransferQueue,最初包含给定集合的元素,并以集合迭代器的遍历顺序添加。
LinkedTransferQueue Java示例
这是使用LinkedTransferQueue的Java生产者-消费者示例。在使用者线程中,有一个睡眠方法,该方法的时间为2秒,以使使用者线程暂停2秒,即使生产者线程等待该元素被使用者检索。
public class LinkedTQ {
public static void main(String[] args) {
TransferQueue<Integer> tQueue = new LinkedTransferQueue<>();
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new LinkedProducer(tQueue));
executor.execute(new LinkedConsumer(tQueue));
executor.shutdown();
}
}
//Producer
class LinkedProducer implements Runnable{
TransferQueue<Integer> tQueue;
LinkedProducer(TransferQueue<Integer> tQueue){
this.tQueue = tQueue;
}
@Override
public void run() {
for(int i = 0; i < 5; i++){
try {
System.out.println("Adding to queue-" + i);
tQueue.transfer(i);
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
//Consumer
class LinkedConsumer implements Runnable{
TransferQueue<Integer> tQueue;
LinkedConsumer(TransferQueue<Integer> tQueue){
this.tQueue = tQueue;
}
@Override
public void run() {
for(int i = 0; i < 5; i++){
try {
// Delay of 2 seconds
TimeUnit.SECONDS.sleep(2);
System.out.println("Consumer retrieved- " + tQueue.take());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
输出:
Adding to queue-0 Consumer retrieved- 0 Adding to queue-1 Consumer retrieved- 1 Adding to queue-2 Consumer retrieved- 2 Adding to queue-3 Consumer retrieved- 3 Adding to queue-4 Consumer retrieved- 4

