在谈论到多线程的时候,大部分都是线程之间资源竞争,但是也有时候,线程之间也是需要相互协作的。此文我们就来探讨一下线程之间是如何协作的?
wait/notify(等待通知)
首先要知道 wait和notify/notifyAll都是Object的方法,而不是Thread的方法,这使得每个对象都能调用这几个方法。上回谈到synchronized加锁的时候,每个对象都有一把锁,是通过对象头来实现的,而且对象还有一个等待队列,当线程获取不到对象锁的时候,会进入等待队列。除此之外,对象还有另外一个等待队列,这是个条件队列,用来处理线程之间的协作。wait/notify/notifyAll就是通过这个条件队列来实现的。
下面模拟这个场景:现在高考结束,很多同学即将进入大学殿堂,然后进入大学的第一件事将是恐怖的军训。学生们听着教官的命令 训练和休息。
看一下这段代码,主线程模拟教官,十个子线程模拟学生。学生创建好后就进入训练,教官等待五秒钟通知大家休息。
public class School {
private synchronized void studentThread() throws InterruptedException {
// 训练
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"开始训练");
// 训练中等待休息
wait();
// 开始休息
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"开始休息");
}
private synchronized void teacherThread() throws InterruptedException {
// 通知所有学生休息
notifyAll();
}
public static void main(String[] args) throws InterruptedException {
School school = new School();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
school.studentThread();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, i + "号学生").start();
}
// 教官让学生训练5秒钟
Thread.sleep(5000);
System.out.println("教官等待五秒后让学生休息");
school.teacherThread();
}
}
学生线程进入训练后会调用school对象的wait()方法,这会使当前线程进入对象的条件队列,等待其他线程唤醒。当教官调用school对象的notifyAll()方法时,会唤醒条件队列中所有的等待线程(如果调用notify()则随机唤醒一个),这样所有的学生线程就会继续执行下面的代码。注意一点:wait/notify/notifyAll必须在synchronized代码中,而且执行wait方法会释放锁。
总结一下wait/notify/notifyAll的执行流程和线程状态。首先执行wait方法,线程进入条件队列,此时线程状态:WAITING或TIMED_WAITING;接着其他线程执行notifyAll唤醒条件队列中所有线程(将这些线程从条件队列中移除),此时线程状态:RUNNABLE;然后这些线程会重新争抢锁,争抢到锁的线程且获取时间片,状态会变为:RUNNING,未获得锁的线程则进入等待队列。
wait/notify/notifyAll局限在于必须在synchronized代码中执行,那如果我们代码使用的显示锁ReentrantLock,怎么实现等待通知机制呢?
ReentrantLock的等待通知
在ReentrantLock中实现等待通知,需要使用到Condition,它表示条件变量,是一个接口。它里面有很多方法,await方法对应Object的wait方法、signal方法对应notify、signalAll对应notifyAll,使用方法也基本一样。
上面的代码可以使用ReentrantLock改写成这样:
public class School {
private ReentrantLock reentrantLock = new ReentrantLock();
private Condition condition = reentrantLock.newCondition();
private void studentThread() throws InterruptedException {
try {
reentrantLock.lock();
// 训练
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"开始训练");
// 训练中等待休息
condition.await();
// 开始休息
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"开始休息");
}finally {
reentrantLock.unlock();
}
}
private synchronized void teacherThread() throws InterruptedException {
try {
reentrantLock.lock();
// 通知所有学生休息
condition.signalAll();
}finally {
reentrantLock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
School school = new School();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
school.studentThread();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, i + "号学生").start();
}
// 教官让学生训练5秒钟
Thread.sleep(5000);
System.out.println("教官等待五秒后让学生休息");
school.teacherThread();
}
}
ReentrantLock的等待通知和synchronized使用流程差不多。
在jdk中,有提供一些线程协作的工具类,下面我们来看看其中使用较多的三个:Semaphore、CountDownLatch和CyclicBarrier。
Semaphore(信号量)
Semaphore一般用来控制一个资源最多同时被几个线程访问。例如下面这个场景:学生训练完开始休息时,很多人会去上厕所,但是厕所的坑位只有那么多,同时最多只能有固定的人数使用。
看看下面这段代码:
public class Student {
//坑位
private Semaphore semaphore = new Semaphore(4);
/**
* 上厕所
* @throws InterruptedException
*/
private void toilet() throws InterruptedException {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到坑位");
int time = RandomUtil.randomInt(1000, 3000);
Thread.sleep(time);
System.out.println(Thread.currentThread().getName()+"结束,耗时:" + time);
semaphore.release();
}
public static void main(String[] args) {
Student student = new Student();
//十个学生去上厕所
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
student.toilet();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, i + "号学生").start();
}
}
}
//输出
0号学生抢到坑位
1号学生抢到坑位
2号学生抢到坑位
3号学生抢到坑位
3号学生结束,耗时:1040
4号学生抢到坑位
1号学生结束,耗时:1147
5号学生抢到坑位
2号学生结束,耗时:1279
6号学生抢到坑位
0号学生结束,耗时:2038
7号学生抢到坑位
5号学生结束,耗时:1209
8号学生抢到坑位
4号学生结束,耗时:1617
9号学生抢到坑位
6号学生结束,耗时:1594
8号学生结束,耗时:1976
7号学生结束,耗时:2471
9号学生结束,耗时:1893
首先使用
Semaphore semaphore = new Semaphore(4);
来设置资源最大访问数,当一个线程访问资源时,需要调用
semaphore.acquire();
来获取访问权限,如果获取成功则继续执行,否则等待资源的释放。当线程执行完以后,需要调用
semaphore.release();
来释放资源。
CountDownLatch(计数器)
CountDownLatch一般用来一个线程等待其他线程执行完以后再执行。例如这个场景:教官等待十个上厕所的学生都回来以后再进行训练。
public class School {
private static CountDownLatch countDownLatch = new CountDownLatch(10);
private void toilet() throws InterruptedException {
int time = RandomUtil.randomInt(1000, 3000);
Thread.sleep(time);
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName()+"已到,耗时:" + time);
}
public static void main(String[] args) throws InterruptedException {
School school = new School();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
school.toilet();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, i + "号学生").start();
}
countDownLatch.await();
System.out.println("所有学生已到齐,开始训练");
}
}
//输出
0号学生已到,耗时:1156
6号学生已到,耗时:1235
5号学生已到,耗时:1384
4号学生已到,耗时:1547
7号学生已到,耗时:1749
3号学生已到,耗时:1753
9号学生已到,耗时:2432
1号学生已到,耗时:2468
8号学生已到,耗时:2481
2号学生已到,耗时:2978
所有学生已到齐,开始训练
首先使用
CountDownLatch countDownLatch = new CountDownLatch(10);
来设置计数器,个数等于学生数量,主线程需要调用
countDownLatch.await();
来等待其他线程执行完,每个学生线程都需调用
countDownLatch.countDown();
,计数器也会相应减1,等到计数器数量减完以后,主线程会等待结束。
CyclicBarrier(循环栅栏)
CyclicBarrier一般用来多个线程相互等待,到某一状态再同时执行,它是循环的,可以重复使用。例如这个场景:五个同学约着一起去厕所,然后再一起去训练场。
public class School {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
private void go() throws Exception {
Thread.sleep(RandomUtil.randomInt(1000, 3000));
//一起去厕所
cyclicBarrier.await();
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"已到厕所");
Thread.sleep(RandomUtil.randomInt(1000, 3000));
//一起去训练场
cyclicBarrier.await();
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"已到训练场");
}
public static void main(String[] args) {
School school = new School();
System.out.println("一起去厕所,再一起去训练场");
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
school.go();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
//输出
一起去厕所,再一起去训练场
1594535433823 Thread-1已到厕所
1594535433823 Thread-2已到厕所
1594535433823 Thread-4已到厕所
1594535433823 Thread-3已到厕所
1594535433828 Thread-0已到厕所
1594535436644 Thread-4已到训练场
1594535436644 Thread-1已到训练场
1594535436644 Thread-3已到训练场
1594535436644 Thread-0已到训练场
1594535436644 Thread-2已到训练场
首先通过
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
初始化一个循环栅栏,当线程调用
cyclicBarrier.await();
时会等待其他线程,且数量减1;等到数量减完以后,所有的等待线程再一起执行。
扫一扫,关注我