天天看点

并发编程之线程协作

在谈论到多线程的时候,大部分都是线程之间资源竞争,但是也有时候,线程之间也是需要相互协作的。此文我们就来探讨一下线程之间是如何协作的?

wait/notify(等待通知)

首先要知道 wait和notify/notifyAll都是Object的方法,而不是Thread的方法,这使得每个对象都能调用这几个方法。上回谈到synchronized加锁的时候,每个对象都有一把锁,是通过对象头来实现的,而且对象还有一个等待队列,当线程获取不到对象锁的时候,会进入等待队列。除此之外,对象还有另外一个等待队列,这是个条件队列,用来处理线程之间的协作。wait/notify/notifyAll就是通过这个条件队列来实现的。

下面模拟这个场景:现在高考结束,很多同学即将进入大学殿堂,然后进入大学的第一件事将是恐怖的军训。学生们听着教官的命令 训练和休息。

看一下这段代码,主线程模拟教官,十个子线程模拟学生。学生创建好后就进入训练,教官等待五秒钟通知大家休息。

public class School {

    private synchronized void studentThread() throws InterruptedException {
        // 训练
        System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"开始训练");
        // 训练中等待休息
        wait();
        // 开始休息
        System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"开始休息");
    }

    private synchronized void teacherThread() throws InterruptedException {
        // 通知所有学生休息
        notifyAll();
    }

    public static void main(String[] args) throws InterruptedException {
        School school = new School();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    school.studentThread();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, i + "号学生").start();
        }
        // 教官让学生训练5秒钟
        Thread.sleep(5000);
        System.out.println("教官等待五秒后让学生休息");
        school.teacherThread();
    }
}
           

学生线程进入训练后会调用school对象的wait()方法,这会使当前线程进入对象的条件队列,等待其他线程唤醒。当教官调用school对象的notifyAll()方法时,会唤醒条件队列中所有的等待线程(如果调用notify()则随机唤醒一个),这样所有的学生线程就会继续执行下面的代码。注意一点:wait/notify/notifyAll必须在synchronized代码中,而且执行wait方法会释放锁。

总结一下wait/notify/notifyAll的执行流程和线程状态。首先执行wait方法,线程进入条件队列,此时线程状态:WAITING或TIMED_WAITING;接着其他线程执行notifyAll唤醒条件队列中所有线程(将这些线程从条件队列中移除),此时线程状态:RUNNABLE;然后这些线程会重新争抢锁,争抢到锁的线程且获取时间片,状态会变为:RUNNING,未获得锁的线程则进入等待队列。

并发编程之线程协作

wait/notify/notifyAll局限在于必须在synchronized代码中执行,那如果我们代码使用的显示锁ReentrantLock,怎么实现等待通知机制呢?

ReentrantLock的等待通知

在ReentrantLock中实现等待通知,需要使用到Condition,它表示条件变量,是一个接口。它里面有很多方法,await方法对应Object的wait方法、signal方法对应notify、signalAll对应notifyAll,使用方法也基本一样。

上面的代码可以使用ReentrantLock改写成这样:

public class School {

    private ReentrantLock reentrantLock = new ReentrantLock();
    private Condition condition = reentrantLock.newCondition();

    private void studentThread() throws InterruptedException {
        try {
            reentrantLock.lock();
            // 训练
            System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"开始训练");
            // 训练中等待休息
            condition.await();
            // 开始休息
            System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"开始休息");
        }finally {
            reentrantLock.unlock();
        }
    }

    private synchronized void teacherThread() throws InterruptedException {
        try {
            reentrantLock.lock();
            // 通知所有学生休息
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        School school = new School();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    school.studentThread();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, i + "号学生").start();
        }
        // 教官让学生训练5秒钟
        Thread.sleep(5000);
        System.out.println("教官等待五秒后让学生休息");
        school.teacherThread();
    }
}
           

ReentrantLock的等待通知和synchronized使用流程差不多。

在jdk中,有提供一些线程协作的工具类,下面我们来看看其中使用较多的三个:Semaphore、CountDownLatch和CyclicBarrier。

Semaphore(信号量)

Semaphore一般用来控制一个资源最多同时被几个线程访问。例如下面这个场景:学生训练完开始休息时,很多人会去上厕所,但是厕所的坑位只有那么多,同时最多只能有固定的人数使用。

看看下面这段代码:

public class Student {

    //坑位
    private Semaphore semaphore = new Semaphore(4);

    /**
     * 上厕所
     * @throws InterruptedException
     */
    private void toilet() throws InterruptedException {
        semaphore.acquire();
        System.out.println(Thread.currentThread().getName()+"抢到坑位");
        int time = RandomUtil.randomInt(1000, 3000);
        Thread.sleep(time);
        System.out.println(Thread.currentThread().getName()+"结束,耗时:" + time);
        semaphore.release();
    }

    public static void main(String[] args) {
        Student student = new Student();
        //十个学生去上厕所
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    student.toilet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, i + "号学生").start();
        }
    }
}
//输出
0号学生抢到坑位
1号学生抢到坑位
2号学生抢到坑位
3号学生抢到坑位
3号学生结束,耗时:1040
4号学生抢到坑位
1号学生结束,耗时:1147
5号学生抢到坑位
2号学生结束,耗时:1279
6号学生抢到坑位
0号学生结束,耗时:2038
7号学生抢到坑位
5号学生结束,耗时:1209
8号学生抢到坑位
4号学生结束,耗时:1617
9号学生抢到坑位
6号学生结束,耗时:1594
8号学生结束,耗时:1976
7号学生结束,耗时:2471
9号学生结束,耗时:1893
           

首先使用

Semaphore semaphore = new Semaphore(4);

来设置资源最大访问数,当一个线程访问资源时,需要调用

semaphore.acquire();

来获取访问权限,如果获取成功则继续执行,否则等待资源的释放。当线程执行完以后,需要调用

semaphore.release();

来释放资源。

CountDownLatch(计数器)

CountDownLatch一般用来一个线程等待其他线程执行完以后再执行。例如这个场景:教官等待十个上厕所的学生都回来以后再进行训练。

public class School {

    private static CountDownLatch countDownLatch = new CountDownLatch(10);

    private void toilet() throws InterruptedException {
        int time = RandomUtil.randomInt(1000, 3000);
        Thread.sleep(time);
        countDownLatch.countDown();
        System.out.println(Thread.currentThread().getName()+"已到,耗时:" + time);
    }

    public static void main(String[] args) throws InterruptedException {
        School school = new School();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                try {
                    school.toilet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, i + "号学生").start();
        }
        countDownLatch.await();
        System.out.println("所有学生已到齐,开始训练");
    }
}
//输出
0号学生已到,耗时:1156
6号学生已到,耗时:1235
5号学生已到,耗时:1384
4号学生已到,耗时:1547
7号学生已到,耗时:1749
3号学生已到,耗时:1753
9号学生已到,耗时:2432
1号学生已到,耗时:2468
8号学生已到,耗时:2481
2号学生已到,耗时:2978
所有学生已到齐,开始训练
           

首先使用

CountDownLatch countDownLatch = new CountDownLatch(10);

来设置计数器,个数等于学生数量,主线程需要调用

countDownLatch.await();

来等待其他线程执行完,每个学生线程都需调用

countDownLatch.countDown();

,计数器也会相应减1,等到计数器数量减完以后,主线程会等待结束。

CyclicBarrier(循环栅栏)

CyclicBarrier一般用来多个线程相互等待,到某一状态再同时执行,它是循环的,可以重复使用。例如这个场景:五个同学约着一起去厕所,然后再一起去训练场。

public class School {

    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

    private void go() throws Exception {
        Thread.sleep(RandomUtil.randomInt(1000, 3000));
        //一起去厕所
        cyclicBarrier.await();
        System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"已到厕所");
        Thread.sleep(RandomUtil.randomInt(1000, 3000));
        //一起去训练场
        cyclicBarrier.await();
        System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()+"已到训练场");
    }

    public static void main(String[] args) {
        School school = new School();
        System.out.println("一起去厕所,再一起去训练场");
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    school.go();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}
//输出
一起去厕所,再一起去训练场
1594535433823 Thread-1已到厕所
1594535433823 Thread-2已到厕所
1594535433823 Thread-4已到厕所
1594535433823 Thread-3已到厕所
1594535433828 Thread-0已到厕所
1594535436644 Thread-4已到训练场
1594535436644 Thread-1已到训练场
1594535436644 Thread-3已到训练场
1594535436644 Thread-0已到训练场
1594535436644 Thread-2已到训练场
           

首先通过

CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

初始化一个循环栅栏,当线程调用

cyclicBarrier.await();

时会等待其他线程,且数量减1;等到数量减完以后,所有的等待线程再一起执行。

扫一扫,关注我

并发编程之线程协作