天天看点

聊聊并发(二)——生产者与消费者

一、等待唤醒机制

1、介绍

  wait():一旦执行此方法,当前线程进入阻塞状态,并释放锁。

  notify():一旦执行此方法,就会唤醒一个被wait()的线程。如果有多个,就唤醒优先级高的,如果优先级一样,则随机唤醒一个。

  notifyAll():一旦执行此方法,会唤醒所有wait()的线程。

  notify()唤醒线程,不会立即释放锁对象,需要等到当前同步代码块都执行完后才会释放锁对象。下次和被唤醒的线程同时竞争锁对象。

  问:wait 等待中的线程被 notify 唤醒了会立马执行吗?

  答:不会。被唤醒的线程需要重新竞争锁对象,获得锁的线程可以从wait处继续往下执行。

2、两个线程交替打印问题

  如何使用两个线程交替打印1—100?

  代码示例:先用两个线程来打印1—100。

1 // 不写注释也能看懂的代码
 2 public class Main {
 3 
 4     public static void main(String[] args) {
 5         Num num = new Num();
 6         Thread thread1 = new Thread(num);
 7         Thread thread2 = new Thread(num);
 8 
 9         thread1.start();
10         thread2.start();
11     }
12 }
13 
14 
15 class Num implements Runnable {
16 
17     private int i = 1;
18 
19     @Override
20     public void run() {
21         while (true) {
22             synchronized (this) {
23                 if (i <= 100) {
24                     System.out.println(Thread.currentThread().getName() + ":" + i);
25                     i++;
26                 } else {
27                     break;
28                 }
29             }
30         }
31     }
32 }
33 
34 // 可能的结果.当然是谁抢到谁打印.
35 Thread-0:1
36 Thread-0:2
37 Thread-1:3
38 Thread-1:4
39 ……      

  理解:两个线程的共享变量是 i ;两个线程共同竞争的锁 this 是num。

聊聊并发(二)——生产者与消费者

  再看原问题,线程本来是抢占式的,要想实现交替打印。显然,需要线程之间有通信。即,线程A打印 1 之后,阻塞一下,等待线程B打印 2 ,然后唤醒A,并且B阻塞,A打印3,以此内推。这就是线程的等待唤醒机制。

  代码示例:只需要在上述代码添加两行即可,如下:

1 class Num implements Runnable {
 2 
 3     private int i = 1;
 4 
 5     @Override
 6     public void run() {
 7         while (true) {
 8             synchronized (this) {
 9                 // 1.先唤醒对方
10                 notify();
11 
12                 if (i <= 100) {
13                     System.out.println(Thread.currentThread().getName() + ":" + i);
14                     i++;
15 
16                     // 2.当前线程操作完后.等待阻塞
17                     try {
18                         wait();
19                     } catch (InterruptedException e) {
20                         e.printStackTrace();
21                     }
22                 } else {
23                     break;
24                 }
25             }
26         }
27     }
28 }      

  图解:

聊聊并发(二)——生产者与消费者

  代码示例:将上述代码改用Lock实现。

1 class Num implements Runnable {
 2 
 3     private int i = 1;
 4     // 锁
 5     final private Lock lock = new ReentrantLock();
 6     final Condition condition = lock.newCondition();
 7 
 8     @Override
 9     public void run() {
10         while (true) {
11             // 上锁
12             lock.lock();
13 
14             try {
15                 // 1.先唤醒对方
16                 condition.signal();
17 
18                 if (i <= 100) {
19                     System.out.println(Thread.currentThread().getName() + ":" + i);
20                     i++;
21 
22                     // 2.当前线程操作完后.等待阻塞
23                     try {
24                         condition.await();
25                     } catch (InterruptedException e) {
26                         e.printStackTrace();
27                     }
28                 } else {
29                     break;
30                 }
31             } finally {
32                 // 释放锁
33                 lock.unlock();
34             }
35         }
36     }
37 }      

  使用lock同步锁,就不需要sychronized关键字了,需要创建lock对象和condition实例。Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。

在 Condition 对象中,对应的等待唤醒方法需要改为:

  wait()方法——await()方法

  signal()方法——notify()方法

  signalAll()——notifyAll()方法

3、三个线程交替打印问题

  在上个问题的基础上,升级一下,考虑三个线程交替打印1—99?

  思想同理:接力棒A,交给B,B交给C,C交给A。但是如何指定唤醒一个线程呢?notify()只能随机唤醒一个。这里用lock的condition来解决。

  代码示例:三个线程交替打印

聊聊并发(二)——生产者与消费者
聊聊并发(二)——生产者与消费者
1 public class Main {
  2 
  3     public static void main(String[] args) {
  4         Num num = new Num();
  5 
  6         new Thread(() -> {
  7             num.loopA();
  8         }).start();
  9 
 10         new Thread(() -> {
 11             num.loopB();
 12         }).start();
 13 
 14         new Thread(() -> {
 15             num.loopC();
 16         }).start();
 17     }
 18 }
 19 
 20 class Num {
 21 
 22     private int i = 1;
 23     // 当前正在执行的线程的标记
 24     private int flag = 1;
 25     final private Lock lock = new ReentrantLock();
 26     final Condition conditionA = lock.newCondition();
 27     final Condition conditionB = lock.newCondition();
 28     final Condition conditionC = lock.newCondition();
 29 
 30     public void loopA() {
 31         while (true) {
 32             // 循环不停的抢锁
 33             lock.lock();
 34 
 35             try {
 36                 // 线程A判断是不是该自己打印
 37                 while (flag != 1) {
 38                     conditionA.await();
 39                 }
 40                 
 41                 // 唤醒线程B
 42                 // 注意这里:先唤醒B,再执行A的.
 43                 // 不要这两行代码放在下面的if中,最后会有线程出不来导致程序结束不了
 44                 conditionB.signal();
 45                 flag = 2;
 46 
 47                 if (i <= 99) {
 48                     System.out.println(Thread.currentThread().getName() + ":" + i);
 49                     i++;
 50 
 51                 } else {
 52                     break;
 53                 }
 54             } catch (InterruptedException e) {
 55                 e.printStackTrace();
 56             } finally {
 57                 lock.unlock();
 58             }
 59         }
 60     }
 61 
 62     // 同理
 63     public void loopB() {
 64         while (true) {
 65             lock.lock();
 66 
 67             try {
 68                 while (flag != 2) {
 69                     conditionB.await();
 70                 }
 71 
 72                 conditionC.signal();
 73                 flag = 3;
 74 
 75                 if (i <= 99) {
 76                     System.out.println(Thread.currentThread().getName() + ":" + i);
 77                     i++;
 78 
 79                 } else {
 80                     break;
 81                 }
 82             } catch (InterruptedException e) {
 83                 e.printStackTrace();
 84             } finally {
 85                 lock.unlock();
 86             }
 87         }
 88     }
 89 
 90     // 同理
 91     public void loopC() {
 92         while (true) {
 93             lock.lock();
 94 
 95             try {
 96                 while (flag != 3) {
 97                     conditionC.await();
 98                 }
 99 
100                 conditionA.signal();
101                 flag = 1;
102 
103                 if (i <= 99) {
104                     System.out.println(Thread.currentThread().getName() + ":" + i);
105                     i++;
106 
107                 } else {
108                     break;
109                 }
110             } catch (InterruptedException e) {
111                 e.printStackTrace();
112             } finally {
113                 lock.unlock();
114             }
115         }
116     }
117 }      

三个线程交替打印

4、三个线程定制化打印问题

  开启 3 个线程,要求打印输出为 (A*3B*5C*7) * n。

  思想同理:接力棒A,交给B,B交给C,C交给A。有上一个问题对lock的使用,这个问题不难给出答案。

  代码示例:定制化打印

聊聊并发(二)——生产者与消费者
聊聊并发(二)——生产者与消费者
1 public class Main {
  2 
  3     public static void main(String[] args) {
  4         Num num = new Num();
  5 
  6         new Thread(() -> {
  7             for (int i = 0; i < 10; i++) {
  8                 num.loopA();
  9             }
 10         }, "A").start();
 11 
 12         new Thread(() -> {
 13             for (int i = 0; i < 10; i++) {
 14                 num.loopB();
 15             }
 16         }, "B").start();
 17 
 18         new Thread(() -> {
 19             for (int i = 0; i < 10; i++) {
 20                 num.loopC();
 21             }
 22         }, "C").start();
 23     }
 24 }
 25 
 26 class Num {
 27     // 当前正在执行的线程的标记
 28     private int flag = 1;
 29     final private Lock lock = new ReentrantLock();
 30     final Condition conditionA = lock.newCondition();
 31     final Condition conditionB = lock.newCondition();
 32     final Condition conditionC = lock.newCondition();
 33 
 34     public void loopA() {
 35         lock.lock();
 36 
 37         try {
 38             // 线程A判断是不是该自己打印
 39             while (flag != 1) {
 40                 conditionA.await();
 41             }
 42             // 唤醒B
 43             conditionB.signal();
 44             flag = 2;
 45 
 46             // 将线程A的名称打印 3 遍
 47             for (int i = 0; i < 3; i++) {
 48                 System.out.println(Thread.currentThread().getName());
 49             }
 50         } catch (InterruptedException e) {
 51             e.printStackTrace();
 52         } finally {
 53             lock.unlock();
 54         }
 55     }
 56 
 57     // 同理
 58     public void loopB() {
 59         lock.lock();
 60 
 61         try {
 62             while (flag != 2) {
 63                 conditionB.await();
 64             }
 65 
 66             conditionC.signal();
 67             flag = 3;
 68 
 69             // 将线程B的名称打印 5 遍
 70             for (int i = 0; i < 5; i++) {
 71                 System.out.println(Thread.currentThread().getName());
 72             }
 73         } catch (InterruptedException e) {
 74             e.printStackTrace();
 75         } finally {
 76             lock.unlock();
 77         }
 78     }
 79 
 80     // 同理
 81     public void loopC() {
 82         lock.lock();
 83 
 84         try {
 85             while (flag != 3) {
 86                 conditionC.await();
 87             }
 88 
 89             conditionA.signal();
 90             flag = 1;
 91 
 92             // 将线程C的名称打印 7 遍
 93             for (int i = 0; i < 7; i++) {
 94                 System.out.println(Thread.currentThread().getName());
 95             }
 96         } catch (InterruptedException e) {
 97             e.printStackTrace();
 98         } finally {
 99             lock.unlock();
100         }
101     }
102 }
103 
104 // 结果
105 (AAABBBBBCCCCCCC)*10      

定制化打印

  这种定制化打印理解后,如果想要(ABC)*10,或其他形式的输出。相信修改哪里的参数应该很清楚了。

二、生产者与消费者

  生产者:不停生产产品,然后交给店员。

  消费者:不停消费产品,从店员处消费。

  店员:一次性持有的产品数量固定。

  代码示例:生产者生产20个,消费者消费20个,店员持有10个产品满。

1 // 不写注释也能看懂的代码
 2 // 店员
 3 public class Clerk {
 4     // 产品数量
 5     private int product = 0;
 6 
 7     // 进货
 8     public synchronized void get() {
 9         if (product >= 10) {
10             System.out.println("产品已满!");
11         } else {
12             System.out.println(Thread.currentThread().getName() + " : " + ++product);
13         }
14     }
15 
16     // 卖货
17     public synchronized void sale() {
18         if (product <= 0) {
19             System.out.println("产品缺货!");
20         } else {
21             System.out.println(Thread.currentThread().getName() + " : " + --product);
22         }
23     }
24 }
25 
26 // 生产者
27 class Producer implements Runnable {
28     private final Clerk clerk;
29 
30     public Producer(Clerk clerk) {
31         this.clerk = clerk;
32     }
33 
34     @Override
35     public void run() {
36         for (int i = 0; i < 20; i++) {
37 //            try {
38 //                Thread.sleep(200);
39 //            } catch (InterruptedException e) {
40 //            }
41 
42             clerk.get();
43         }
44     }
45 }
46 
47 // 消费者
48 class Consumer implements Runnable {
49     private final Clerk clerk;
50 
51     public Consumer(Clerk clerk) {
52         this.clerk = clerk;
53     }
54 
55     @Override
56     public void run() {
57         for (int i = 0; i < 20; i++) {
58             clerk.sale();
59         }
60     }
61 }      
1 // 测试类
 2 public class Main {
 3     public static void main(String[] args) {
 4         Clerk clerk = new Clerk();
 5         Producer producer = new Producer(clerk);
 6         Consumer consumer = new Consumer(clerk);
 7         
 8         // 分别开启了一个生产者A 和 一个消费者B
 9         new Thread(producer, "生产者A").start();
10         new Thread(consumer, "消费者B").start();
11     }
12 }
13 
14 // 可能的一种结果
15 生产者A : 1
16 消费者B : 0
17 产品缺货!
18 产品缺货!
19 产品缺货!
20 产品缺货!
21 产品缺货!
22 产品缺货!
23 产品缺货!
24 产品缺货!
25 产品缺货!
26 产品缺货!
27 产品缺货!
28 产品缺货!
29 产品缺货!
30 产品缺货!
31 产品缺货!
32 产品缺货!
33 产品缺货!
34 产品缺货!
35 产品缺货!
36 生产者A : 1
37 生产者A : 2
38 生产者A : 3
39 生产者A : 4
40 生产者A : 5
41 生产者A : 6
42 生产者A : 7
43 生产者A : 8
44 生产者A : 9
45 生产者A : 10
46 产品已满!
47 产品已满!
48 产品已满!
49 产品已满!
50 产品已满!
51 产品已满!
52 产品已满!
53 产品已满!
54 产品已满!      

  理解:两个线程的共享变量是 product;两个线程共同竞争的锁,同步方法默认是this,指 clerk。

  这里没有使用等待唤醒机制。在生产满时,若抢到锁,依然会一直生产;在消费空时,若抢到锁,依然会一直消费。

聊聊并发(二)——生产者与消费者

2、等待唤醒

  上述结果并不是想要的。希望产品满时,等待消费者消费一个时,再生产;而产品空时,等待生产者生产一个时,再消费。用等待唤醒机制改进:

聊聊并发(二)——生产者与消费者
聊聊并发(二)——生产者与消费者
1 public class Clerk {
 2     // 产品数量
 3     private int product = 0;
 4 
 5     // 进货
 6     public synchronized void get() {
 7         if (product >= 10) {
 8             System.out.println("产品已满!");
 9 
10             // 满了就等待.就不生产
11             try {
12                 this.wait();
13             } catch (InterruptedException e) {
14                 e.printStackTrace();
15             }
16         } else {
17             System.out.println(Thread.currentThread().getName() + " : " + ++product);
18             // 通知消费者有货,可以消费
19             this.notify();
20         }
21     }
22 
23     // 卖货
24     public synchronized void sale() {
25         if (product <= 0) {
26             System.out.println("产品缺货!");
27             
28             // 缺货就等待
29             try {
30                 this.wait();
31             } catch (InterruptedException e) {
32                 e.printStackTrace();
33             }
34         } else {
35             System.out.println(Thread.currentThread().getName() + " : " + --product);
36             // 通知生产者,可以生产
37             this.notify();
38         }
39     }
40 }
41 
42 // 可能的一种结果
43 生产者A : 1
44 消费者B : 0
45 产品缺货!
46 生产者A : 1
47 生产者A : 2
48 消费者B : 1
49 消费者B : 0
50 产品缺货!
51 生产者A : 1
52 消费者B : 0
53 产品缺货!
54 生产者A : 1
55 消费者B : 0
56 产品缺货!
57 生产者A : 1
58 消费者B : 0
59 产品缺货!
60 生产者A : 1
61 生产者A : 2
62 消费者B : 1
63 消费者B : 0
64 产品缺货!
65 生产者A : 1
66 消费者B : 0
67 产品缺货!
68 生产者A : 1
69 生产者A : 2
70 生产者A : 3
71 生产者A : 4
72 消费者B : 3
73 消费者B : 2
74 消费者B : 1
75 消费者B : 0
76 生产者A : 1
77 生产者A : 2
78 生产者A : 3
79 生产者A : 4
80 生产者A : 5
81 生产者A : 6
82 生产者A : 7      

等待唤醒

  问题:如果将店员持有 10 个满改成持有 1 个满,如下:

1 if (product >= 1) {}
2 
3 // 结果
4 …………省略前面的
5 生产者A : 1
6 产品已满!
7 消费者B : 0
8 产品缺货!      

  运行的结果没问题,但是程序停不下来。分析运行结果有利于更好的理解多线程编程。结合打印结果,不难得出:最后一次,消费者B缺货,等待,而生产者A执行完毕,已无法再唤醒消费者B。

  解决:把 else 打开即可。

  理解:其实不难理解它的现实语义。生产者A判断产品满,就等待,不满,就生产。消费者B判断产品空,就等待,不空,就消费。

3、虚假唤醒问题

  问题:在上述代码基础上,如果有多个生产者,多个消费者,会出现负数。

1 public class Main {
 2     public static void main(String[] args) {
 3         Clerk clerk = new Clerk();
 4         Producer producer = new Producer(clerk);
 5         Consumer consumer = new Consumer(clerk);
 6         new Thread(producer, "生产者A").start();
 7         new Thread(consumer, "消费者B").start();
 8         
 9         // 新增一个生产者和一个消费者
10         new Thread(producer, "生产者C").start();
11         new Thread(consumer, "消费者D").start();
12     }
13 }
14 
15 // 把上述 this.notify() 都改为 this.notifyAll();      

  原因:消费者B抢到锁,product == 0,等待;消费者D抢到锁,product == 0,等待。然后,生产者A抢到锁,生产一个,product == 1。就会唤醒两个消费者,同时消费,就出现0、-1。这就是虚假唤醒问题。

  解决:把 if 改为 while 即可。

4、用lock实现

  代码示例:完整用lock实现的生产者与消费者

1 public class Clerk {
 2     // 产品数量
 3     private int product = 0;
 4     final private Lock lock = new ReentrantLock();
 5     final Condition condition = lock.newCondition();
 6 
 7     // 进货
 8     public void get() {
 9         lock.lock();
10         try {
11             while (product >= 1) {
12                 System.out.println("产品已满!");
13 
14                 try {
15                     condition.await();
16                 } catch (InterruptedException e) {
17                     e.printStackTrace();
18                 }
19             }
20 
21             System.out.println(Thread.currentThread().getName() + " : " + ++product);
22             condition.signalAll();
23         } finally {
24             lock.unlock();
25         }
26     }
27 
28     // 卖货
29     public void sale() {
30         lock.lock();
31         try {
32             while (product <= 0) {
33                 System.out.println("产品缺货!");
34 
35                 try {
36                     condition.await();
37                 } catch (InterruptedException e) {
38                     e.printStackTrace();
39                 }
40             }
41 
42             System.out.println(Thread.currentThread().getName() + " : " + --product);
43             condition.signalAll();
44         } finally {
45             lock.unlock();
46         }
47     }
48 }
49 
50 // 生产者
51 class Producer implements Runnable {
52     private final Clerk clerk;
53 
54     public Producer(Clerk clerk) {
55         this.clerk = clerk;
56     }
57 
58     @Override
59     public void run() {
60         for (int i = 0; i < 20; i++) {
61             try {
62                 Thread.sleep(200);
63             } catch (InterruptedException e) {
64             }
65 
66             clerk.get();
67         }
68     }
69 }
70 
71 // 消费者
72 class Consumer implements Runnable {
73     private final Clerk clerk;
74 
75     public Consumer(Clerk clerk) {
76         this.clerk = clerk;
77     }
78 
79     @Override
80     public void run() {
81         for (int i = 0; i < 20; i++) {
82             clerk.sale();
83         }
84     }
85 }      
1 // 测试类
 2 public class Main {
 3     public static void main(String[] args) {
 4         Clerk clerk = new Clerk();
 5         Producer producer = new Producer(clerk);
 6         Consumer consumer = new Consumer(clerk);
 7         new Thread(producer, "生产者A").start();
 8         new Thread(consumer, "消费者B").start();
 9 
10         new Thread(producer, "生产者C").start();
11         new Thread(consumer, "消费者D").start();
12     }
13 }      

作者:Craftsman-L