Java条件接口
时间:2020-01-09 10:35:11 来源:igfitidea点击:
驻留在java.util.concurrent.locks中的条件接口具有与对象类监视器方法类似的线程间通信方法(wait,notify和notifyAll)。条件提供了诸如await(),signal(),signalAll()之类的方法。如果Lock替换了同步方法和语句的使用,而Condition替换了Object监视器方法的使用。
下面给出了java.util.concurrent.locks.Condition接口中定义的一些方法。
- await()–使当前线程等待,直到被信号通知或者中断为止。
- await(长时间,TimeUnit单位)–使当前线程等待,直到发出信号或者被中断或者经过指定的等待时间为止。
- awaitNanos(long nanosTimeout)–使当前线程等待,直到被信号通知或者中断,或者经过了指定的等待时间。
- awaitUninterruptible()–使当前线程等待,直到发出信号为止。
- awaitUntil(日期截止时间)–使当前线程等待,直到发出信号或者被中断或者指定的截止时间过去为止。
- signal()–唤醒一个等待线程。
- signalAll()–唤醒所有等待的线程。
如何获得条件实例
Condition实例从本质上绑定到锁。要获取特定Lock实例的Condition实例,请使用其newCondition()方法。
使用Condition接口方法的示例
随后的生产者消费者程序使用Condition接口的方法在两个线程之间进行互通。
在示例中,消费者线程仅在缓冲区已满时才开始从缓冲区中删除项目,直到那时消费者线程由于await()方法而处于等待状态。
package com.theitroad.proj.Programs;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProduceConsume {
public static void main(String[] args) {
int capacity = 5;
// shared object
Buffer buffer = new Buffer(capacity);
Thread t1 = new Thread(new Producer(buffer, capacity), "Producer");
Thread t2 = new Thread(new Consumer(buffer, capacity), "Consumer");
t1.start();
t2.start();
}
// Producer class to add elements to buffer
static class Producer implements Runnable{
Buffer buffer;
int capacity;
Producer(Buffer buffer, int capacity){
this.buffer = buffer;
this.capacity = capacity;
}
@Override
public void run() {
for(int i = 1; i <= capacity; i++){
try {
buffer.put(i);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
// Consumer class to remove elements from buffer
static class Consumer implements Runnable{
Buffer buffer;
int capacity;
Consumer(Buffer buffer, int capacity){
this.buffer = buffer;
this.capacity = capacity;
}
@Override
public void run() {
for(int i = 1; i <= capacity; i++){
try {
System.out.println("Item removed- " + buffer.take());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
static class Buffer {
private Object[] items;
final Lock lock = new ReentrantLock();
// Conditions
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
int putptr, takeptr, count;
public Buffer(int capacity){
items = new Object[capacity];
}
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[putptr] = x;
System.out.println("Putting- "+ x);
if (++putptr == items.length) {
putptr = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
Object item = items[takeptr];
if (++takeptr == items.length) {
takeptr = 0;
}
--count;
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
}
输出:
Putting- 1 Putting- 2 Putting- 3 Putting- 4 Putting- 5 Item removed- 1 Item removed- 2 Item removed- 3 Item removed- 4 Item removed- 5

