推荐:Java并发编程汇总
Java并发编程一Condition初使用
Condition是什么?
Condition
是在
Java1.5
中才出现的,它用来替代传统
Object
中的
wait()
、
notify()
,实现线程间的协作,相比使用
Object
中的
wait()
、
notify()
,使用
Condition
的
await()
、
signal()
这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用
Condition
,阻塞队列实际上是使用了
Condition
来模拟线程间协作。
Condition
是个接口,基本的方法就是
await()
和
signal()
方法。
Condition
依赖于
Lock
接口,生成一个
Condition
的基本代码是
lock.newCondition()
(假如
lock
为
ReentrantLock
的实例,
ReentrantLock
是
Lock
的实现类)。
调用
Condition
的
await()
和
signal()
方法,都必须在
lock
保护之内,就是说必须在
lock.lock()
和
lock.unlock()
之间才可以使用,这和在
synchronized
同步代码块或者同步方法中使用
Object
中的
wait()
、
notify()
类似。
-
中的Conditon
对应await()
中的wait()。Object
-
中的Condition
对应signal()
中的Object
。notify()
-
中的Condition
对应signalAll()
中的Object
。notifyAll()
我们来使用一下它吧。
代码:
package flowcontrol.condition;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo1 {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
void method1() throws InterruptedException {
lock.lock();
try{
System.out.println("条件不满足,开始await");
condition.await();
System.out.println("条件满足了,开始执行后续的任务");
}finally {
lock.unlock();
}
}
void method2() {
lock.lock();
try{
System.out.println("准备工作完成,唤醒其他的线程");
condition.signal();
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
ConditionDemo1 conditionDemo1 = new ConditionDemo1();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
conditionDemo1.method2();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
conditionDemo1.method1();
}
}
输出:
条件不满足,开始await
准备工作完成,唤醒其他的线程
条件满足了,开始执行后续的任务
是不是很像
synchronized
同步代码块或者同步方法中调用的
wait()
和
notify()
。
来看看
Condition
的源码。
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
-
:造成当前线程在接到信号或被中断之前一直处于等待状态。await()
-
:造成当前线程在接到信号之前一直处于等待状态(该方法不响应中断)。awaitUninterruptibly()
-
:造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在awaitNanos(long nanosTimeout)
之前唤醒,那么返回值为nanosTimesout
- 消耗时间,如果返回值nanosTimeout
,则可以认定它已经超时了。<= 0
-
:造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。await(long time, TimeUnit unit)
-
:造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回awaitUntil(Date deadline)
,否则表示到了指定时间,返回true
。false
-
:唤醒一个等待线程。该线程从等待方法返回前必须获得与signal()
相关的锁。Condition
-
:唤醒所有等待线程。能够从等待方法返回的线程必须获得与signalAll()
相关的锁。Condition
说了
Condition
中的方法,就可以将
Condition
中的方法和
Object
中的方法进行对比了,如下图所示:
最后我们用
Lock
、
Condition
和队列来实现一个生产者-消费者模式。
代码:
package flowcontrol.condition;
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo2 {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ConditionDemo2 conditionDemo2 = new ConditionDemo2();
Producer producer = conditionDemo2.new Producer();
Consumer consumer = conditionDemo2.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread {
@Override
public void run() {
consume();
}
private void consume() {
while (true) {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列空,等待数据");
try {
notEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.poll();
notFull.signalAll();
System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素");
} finally {
lock.unlock();
}
}
}
}
class Producer extends Thread {
@Override
public void run() {
produce();
}
private void produce() {
while (true) {
lock.lock();
try {
while (queue.size() == queueSize) {
System.out.println("队列满,等待有空余");
try {
notFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.offer(1);
notEmpty.signal();
System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size()));
} finally {
lock.unlock();
}
}
}
}
}
输出:
向队列插入了一个元素,队列剩余空间9
向队列插入了一个元素,队列剩余空间8
向队列插入了一个元素,队列剩余空间7
向队列插入了一个元素,队列剩余空间6
向队列插入了一个元素,队列剩余空间5
向队列插入了一个元素,队列剩余空间4
向队列插入了一个元素,队列剩余空间3
向队列插入了一个元素,队列剩余空间2
向队列插入了一个元素,队列剩余空间1
向队列插入了一个元素,队列剩余空间0
队列满,等待有空余
从队列里取走了一个数据,队列剩余9个元素
从队列里取走了一个数据,队列剩余8个元素
从队列里取走了一个数据,队列剩余7个元素
从队列里取走了一个数据,队列剩余6个元素
从队列里取走了一个数据,队列剩余5个元素
从队列里取走了一个数据,队列剩余4个元素
从队列里取走了一个数据,队列剩余3个元素
从队列里取走了一个数据,队列剩余2个元素
从队列里取走了一个数据,队列剩余1个元素
从队列里取走了一个数据,队列剩余0个元素
队列空,等待数据
输出我只粘贴了一部分,但从输出结果看,这符合我们的预期。
我们上面用用
Lock
、
Condition
和队列来实现的生产者-消费者模式类似于
ArrayBlockingQueue
中的生产者-消费者模式的实现,只不过
ArrayBlockingQueue
用的是数组。