天天看點

聊聊并發(二)——生産者與消費者

一、等待喚醒機制

1、介紹

  wait():一旦執行此方法,目前線程進入阻塞狀态,并釋放鎖。

  notify():一旦執行此方法,就會喚醒一個被wait()的線程。如果有多個,就喚醒優先級高的,如果優先級一樣,則随機喚醒一個。

  notifyAll():一旦執行此方法,會喚醒所有wait()的線程。

  notify()喚醒線程,不會立即釋放鎖對象,需要等到目前同步代碼塊都執行完後才會釋放鎖對象。下次和被喚醒的線程同時競争鎖對象。

  問:wait 等待中的線程被 notify 喚醒了會立馬執行嗎?

  答:不會。被喚醒的線程需要重新競争鎖對象,獲得鎖的線程可以從wait處繼續往下執行。

2、兩個線程交替列印問題

  如何使用兩個線程交替列印1—100?

  代碼示例:先用兩個線程來列印1—100。

1 // 不寫注釋也能看懂的代碼
 2 public class Main {
 3 
 4     public static void main(String[] args) {
 5         Num num = new Num();
 6         Thread thread1 = new Thread(num);
 7         Thread thread2 = new Thread(num);
 8 
 9         thread1.start();
10         thread2.start();
11     }
12 }
13 
14 
15 class Num implements Runnable {
16 
17     private int i = 1;
18 
19     @Override
20     public void run() {
21         while (true) {
22             synchronized (this) {
23                 if (i <= 100) {
24                     System.out.println(Thread.currentThread().getName() + ":" + i);
25                     i++;
26                 } else {
27                     break;
28                 }
29             }
30         }
31     }
32 }
33 
34 // 可能的結果.當然是誰搶到誰列印.
35 Thread-0:1
36 Thread-0:2
37 Thread-1:3
38 Thread-1:4
39 ……      

  了解:兩個線程的共享變量是 i ;兩個線程共同競争的鎖 this 是num。

聊聊并發(二)——生産者與消費者

  再看原問題,線程本來是搶占式的,要想實作交替列印。顯然,需要線程之間有通信。即,線程A列印 1 之後,阻塞一下,等待線程B列印 2 ,然後喚醒A,并且B阻塞,A列印3,以此内推。這就是線程的等待喚醒機制。

  代碼示例:隻需要在上述代碼添加兩行即可,如下:

1 class Num implements Runnable {
 2 
 3     private int i = 1;
 4 
 5     @Override
 6     public void run() {
 7         while (true) {
 8             synchronized (this) {
 9                 // 1.先喚醒對方
10                 notify();
11 
12                 if (i <= 100) {
13                     System.out.println(Thread.currentThread().getName() + ":" + i);
14                     i++;
15 
16                     // 2.目前線程操作完後.等待阻塞
17                     try {
18                         wait();
19                     } catch (InterruptedException e) {
20                         e.printStackTrace();
21                     }
22                 } else {
23                     break;
24                 }
25             }
26         }
27     }
28 }      

  圖解:

聊聊并發(二)——生産者與消費者

  代碼示例:将上述代碼改用Lock實作。

1 class Num implements Runnable {
 2 
 3     private int i = 1;
 4     // 鎖
 5     final private Lock lock = new ReentrantLock();
 6     final Condition condition = lock.newCondition();
 7 
 8     @Override
 9     public void run() {
10         while (true) {
11             // 上鎖
12             lock.lock();
13 
14             try {
15                 // 1.先喚醒對方
16                 condition.signal();
17 
18                 if (i <= 100) {
19                     System.out.println(Thread.currentThread().getName() + ":" + i);
20                     i++;
21 
22                     // 2.目前線程操作完後.等待阻塞
23                     try {
24                         condition.await();
25                     } catch (InterruptedException e) {
26                         e.printStackTrace();
27                     }
28                 } else {
29                     break;
30                 }
31             } finally {
32                 // 釋放鎖
33                 lock.unlock();
34             }
35         }
36     }
37 }      

  使用lock同步鎖,就不需要sychronized關鍵字了,需要建立lock對象和condition執行個體。Condition 接口描述了可能會與鎖有關聯的條件變量。這些變量在用法上與使用 Object.wait 通路的隐式螢幕類似,但提供了更強大的功能。需要特别指出的是,單個 Lock 可能與多個 Condition 對象關聯。

在 Condition 對象中,對應的等待喚醒方法需要改為:

  wait()方法——await()方法

  signal()方法——notify()方法

  signalAll()——notifyAll()方法

3、三個線程交替列印問題

  在上個問題的基礎上,更新一下,考慮三個線程交替列印1—99?

  思想同理:接力棒A,交給B,B交給C,C交給A。但是如何指定喚醒一個線程呢?notify()隻能随機喚醒一個。這裡用lock的condition來解決。

  代碼示例:三個線程交替列印

聊聊并發(二)——生産者與消費者
聊聊并發(二)——生産者與消費者
1 public class Main {
  2 
  3     public static void main(String[] args) {
  4         Num num = new Num();
  5 
  6         new Thread(() -> {
  7             num.loopA();
  8         }).start();
  9 
 10         new Thread(() -> {
 11             num.loopB();
 12         }).start();
 13 
 14         new Thread(() -> {
 15             num.loopC();
 16         }).start();
 17     }
 18 }
 19 
 20 class Num {
 21 
 22     private int i = 1;
 23     // 目前正在執行的線程的标記
 24     private int flag = 1;
 25     final private Lock lock = new ReentrantLock();
 26     final Condition conditionA = lock.newCondition();
 27     final Condition conditionB = lock.newCondition();
 28     final Condition conditionC = lock.newCondition();
 29 
 30     public void loopA() {
 31         while (true) {
 32             // 循環不停的搶鎖
 33             lock.lock();
 34 
 35             try {
 36                 // 線程A判斷是不是該自己列印
 37                 while (flag != 1) {
 38                     conditionA.await();
 39                 }
 40                 
 41                 // 喚醒線程B
 42                 // 注意這裡:先喚醒B,再執行A的.
 43                 // 不要這兩行代碼放在下面的if中,最後會有線程出不來導緻程式結束不了
 44                 conditionB.signal();
 45                 flag = 2;
 46 
 47                 if (i <= 99) {
 48                     System.out.println(Thread.currentThread().getName() + ":" + i);
 49                     i++;
 50 
 51                 } else {
 52                     break;
 53                 }
 54             } catch (InterruptedException e) {
 55                 e.printStackTrace();
 56             } finally {
 57                 lock.unlock();
 58             }
 59         }
 60     }
 61 
 62     // 同理
 63     public void loopB() {
 64         while (true) {
 65             lock.lock();
 66 
 67             try {
 68                 while (flag != 2) {
 69                     conditionB.await();
 70                 }
 71 
 72                 conditionC.signal();
 73                 flag = 3;
 74 
 75                 if (i <= 99) {
 76                     System.out.println(Thread.currentThread().getName() + ":" + i);
 77                     i++;
 78 
 79                 } else {
 80                     break;
 81                 }
 82             } catch (InterruptedException e) {
 83                 e.printStackTrace();
 84             } finally {
 85                 lock.unlock();
 86             }
 87         }
 88     }
 89 
 90     // 同理
 91     public void loopC() {
 92         while (true) {
 93             lock.lock();
 94 
 95             try {
 96                 while (flag != 3) {
 97                     conditionC.await();
 98                 }
 99 
100                 conditionA.signal();
101                 flag = 1;
102 
103                 if (i <= 99) {
104                     System.out.println(Thread.currentThread().getName() + ":" + i);
105                     i++;
106 
107                 } else {
108                     break;
109                 }
110             } catch (InterruptedException e) {
111                 e.printStackTrace();
112             } finally {
113                 lock.unlock();
114             }
115         }
116     }
117 }      

三個線程交替列印

4、三個線程定制化列印問題

  開啟 3 個線程,要求列印輸出為 (A*3B*5C*7) * n。

  思想同理:接力棒A,交給B,B交給C,C交給A。有上一個問題對lock的使用,這個問題不難給出答案。

  代碼示例:定制化列印

聊聊并發(二)——生産者與消費者
聊聊并發(二)——生産者與消費者
1 public class Main {
  2 
  3     public static void main(String[] args) {
  4         Num num = new Num();
  5 
  6         new Thread(() -> {
  7             for (int i = 0; i < 10; i++) {
  8                 num.loopA();
  9             }
 10         }, "A").start();
 11 
 12         new Thread(() -> {
 13             for (int i = 0; i < 10; i++) {
 14                 num.loopB();
 15             }
 16         }, "B").start();
 17 
 18         new Thread(() -> {
 19             for (int i = 0; i < 10; i++) {
 20                 num.loopC();
 21             }
 22         }, "C").start();
 23     }
 24 }
 25 
 26 class Num {
 27     // 目前正在執行的線程的标記
 28     private int flag = 1;
 29     final private Lock lock = new ReentrantLock();
 30     final Condition conditionA = lock.newCondition();
 31     final Condition conditionB = lock.newCondition();
 32     final Condition conditionC = lock.newCondition();
 33 
 34     public void loopA() {
 35         lock.lock();
 36 
 37         try {
 38             // 線程A判斷是不是該自己列印
 39             while (flag != 1) {
 40                 conditionA.await();
 41             }
 42             // 喚醒B
 43             conditionB.signal();
 44             flag = 2;
 45 
 46             // 将線程A的名稱列印 3 遍
 47             for (int i = 0; i < 3; i++) {
 48                 System.out.println(Thread.currentThread().getName());
 49             }
 50         } catch (InterruptedException e) {
 51             e.printStackTrace();
 52         } finally {
 53             lock.unlock();
 54         }
 55     }
 56 
 57     // 同理
 58     public void loopB() {
 59         lock.lock();
 60 
 61         try {
 62             while (flag != 2) {
 63                 conditionB.await();
 64             }
 65 
 66             conditionC.signal();
 67             flag = 3;
 68 
 69             // 将線程B的名稱列印 5 遍
 70             for (int i = 0; i < 5; i++) {
 71                 System.out.println(Thread.currentThread().getName());
 72             }
 73         } catch (InterruptedException e) {
 74             e.printStackTrace();
 75         } finally {
 76             lock.unlock();
 77         }
 78     }
 79 
 80     // 同理
 81     public void loopC() {
 82         lock.lock();
 83 
 84         try {
 85             while (flag != 3) {
 86                 conditionC.await();
 87             }
 88 
 89             conditionA.signal();
 90             flag = 1;
 91 
 92             // 将線程C的名稱列印 7 遍
 93             for (int i = 0; i < 7; i++) {
 94                 System.out.println(Thread.currentThread().getName());
 95             }
 96         } catch (InterruptedException e) {
 97             e.printStackTrace();
 98         } finally {
 99             lock.unlock();
100         }
101     }
102 }
103 
104 // 結果
105 (AAABBBBBCCCCCCC)*10      

定制化列印

  這種定制化列印了解後,如果想要(ABC)*10,或其他形式的輸出。相信修改哪裡的參數應該很清楚了。

二、生産者與消費者

  生産者:不停生産産品,然後交給店員。

  消費者:不停消費産品,從店員處消費。

  店員:一次性持有的産品數量固定。

聊聊并發(二)——生産者與消費者

  代碼示例:生産者生産20個,消費者消費20個,店員持有10個産品滿。

1 // 不寫注釋也能看懂的代碼
 2 // 店員
 3 public class Clerk {
 4     // 産品數量
 5     private int product = 0;
 6 
 7     // 進貨
 8     public synchronized void get() {
 9         if (product >= 10) {
10             System.out.println("産品已滿!");
11         } else {
12             System.out.println(Thread.currentThread().getName() + " : " + ++product);
13         }
14     }
15 
16     // 賣貨
17     public synchronized void sale() {
18         if (product <= 0) {
19             System.out.println("産品缺貨!");
20         } else {
21             System.out.println(Thread.currentThread().getName() + " : " + --product);
22         }
23     }
24 }
25 
26 // 生産者
27 class Producer implements Runnable {
28     private final Clerk clerk;
29 
30     public Producer(Clerk clerk) {
31         this.clerk = clerk;
32     }
33 
34     @Override
35     public void run() {
36         for (int i = 0; i < 20; i++) {
37 //            try {
38 //                Thread.sleep(200);
39 //            } catch (InterruptedException e) {
40 //            }
41 
42             clerk.get();
43         }
44     }
45 }
46 
47 // 消費者
48 class Consumer implements Runnable {
49     private final Clerk clerk;
50 
51     public Consumer(Clerk clerk) {
52         this.clerk = clerk;
53     }
54 
55     @Override
56     public void run() {
57         for (int i = 0; i < 20; i++) {
58             clerk.sale();
59         }
60     }
61 }      
1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         Clerk clerk = new Clerk();
 5         Producer producer = new Producer(clerk);
 6         Consumer consumer = new Consumer(clerk);
 7         
 8         // 分别開啟了一個生産者A 和 一個消費者B
 9         new Thread(producer, "生産者A").start();
10         new Thread(consumer, "消費者B").start();
11     }
12 }
13 
14 // 可能的一種結果
15 生産者A : 1
16 消費者B : 0
17 産品缺貨!
18 産品缺貨!
19 産品缺貨!
20 産品缺貨!
21 産品缺貨!
22 産品缺貨!
23 産品缺貨!
24 産品缺貨!
25 産品缺貨!
26 産品缺貨!
27 産品缺貨!
28 産品缺貨!
29 産品缺貨!
30 産品缺貨!
31 産品缺貨!
32 産品缺貨!
33 産品缺貨!
34 産品缺貨!
35 産品缺貨!
36 生産者A : 1
37 生産者A : 2
38 生産者A : 3
39 生産者A : 4
40 生産者A : 5
41 生産者A : 6
42 生産者A : 7
43 生産者A : 8
44 生産者A : 9
45 生産者A : 10
46 産品已滿!
47 産品已滿!
48 産品已滿!
49 産品已滿!
50 産品已滿!
51 産品已滿!
52 産品已滿!
53 産品已滿!
54 産品已滿!      

  了解:兩個線程的共享變量是 product;兩個線程共同競争的鎖,同步方法預設是this,指 clerk。

  這裡沒有使用等待喚醒機制。在生産滿時,若搶到鎖,依然會一直生産;在消費空時,若搶到鎖,依然會一直消費。

聊聊并發(二)——生産者與消費者

2、等待喚醒

  上述結果并不是想要的。希望産品滿時,等待消費者消費一個時,再生産;而産品空時,等待生産者生産一個時,再消費。用等待喚醒機制改進:

聊聊并發(二)——生産者與消費者
聊聊并發(二)——生産者與消費者
1 public class Clerk {
 2     // 産品數量
 3     private int product = 0;
 4 
 5     // 進貨
 6     public synchronized void get() {
 7         if (product >= 10) {
 8             System.out.println("産品已滿!");
 9 
10             // 滿了就等待.就不生産
11             try {
12                 this.wait();
13             } catch (InterruptedException e) {
14                 e.printStackTrace();
15             }
16         } else {
17             System.out.println(Thread.currentThread().getName() + " : " + ++product);
18             // 通知消費者有貨,可以消費
19             this.notify();
20         }
21     }
22 
23     // 賣貨
24     public synchronized void sale() {
25         if (product <= 0) {
26             System.out.println("産品缺貨!");
27             
28             // 缺貨就等待
29             try {
30                 this.wait();
31             } catch (InterruptedException e) {
32                 e.printStackTrace();
33             }
34         } else {
35             System.out.println(Thread.currentThread().getName() + " : " + --product);
36             // 通知生産者,可以生産
37             this.notify();
38         }
39     }
40 }
41 
42 // 可能的一種結果
43 生産者A : 1
44 消費者B : 0
45 産品缺貨!
46 生産者A : 1
47 生産者A : 2
48 消費者B : 1
49 消費者B : 0
50 産品缺貨!
51 生産者A : 1
52 消費者B : 0
53 産品缺貨!
54 生産者A : 1
55 消費者B : 0
56 産品缺貨!
57 生産者A : 1
58 消費者B : 0
59 産品缺貨!
60 生産者A : 1
61 生産者A : 2
62 消費者B : 1
63 消費者B : 0
64 産品缺貨!
65 生産者A : 1
66 消費者B : 0
67 産品缺貨!
68 生産者A : 1
69 生産者A : 2
70 生産者A : 3
71 生産者A : 4
72 消費者B : 3
73 消費者B : 2
74 消費者B : 1
75 消費者B : 0
76 生産者A : 1
77 生産者A : 2
78 生産者A : 3
79 生産者A : 4
80 生産者A : 5
81 生産者A : 6
82 生産者A : 7      

等待喚醒

  問題:如果将店員持有 10 個滿改成持有 1 個滿,如下:

1 if (product >= 1) {}
2 
3 // 結果
4 …………省略前面的
5 生産者A : 1
6 産品已滿!
7 消費者B : 0
8 産品缺貨!      

  運作的結果沒問題,但是程式停不下來。分析運作結果有利于更好的了解多線程程式設計。結合列印結果,不難得出:最後一次,消費者B缺貨,等待,而生産者A執行完畢,已無法再喚醒消費者B。

  解決:把 else 打開即可。

  了解:其實不難了解它的現實語義。生産者A判斷産品滿,就等待,不滿,就生産。消費者B判斷産品空,就等待,不空,就消費。

3、虛假喚醒問題

  問題:在上述代碼基礎上,如果有多個生産者,多個消費者,會出現負數。

1 public class Main {
 2     public static void main(String[] args) {
 3         Clerk clerk = new Clerk();
 4         Producer producer = new Producer(clerk);
 5         Consumer consumer = new Consumer(clerk);
 6         new Thread(producer, "生産者A").start();
 7         new Thread(consumer, "消費者B").start();
 8         
 9         // 新增一個生産者和一個消費者
10         new Thread(producer, "生産者C").start();
11         new Thread(consumer, "消費者D").start();
12     }
13 }
14 
15 // 把上述 this.notify() 都改為 this.notifyAll();      

  原因:消費者B搶到鎖,product == 0,等待;消費者D搶到鎖,product == 0,等待。然後,生産者A搶到鎖,生産一個,product == 1。就會喚醒兩個消費者,同時消費,就出現0、-1。這就是虛假喚醒問題。

  解決:把 if 改為 while 即可。

聊聊并發(二)——生産者與消費者
  參考文檔:https://www.matools.com/api/java8

4、用lock實作

  代碼示例:完整用lock實作的生産者與消費者

1 public class Clerk {
 2     // 産品數量
 3     private int product = 0;
 4     final private Lock lock = new ReentrantLock();
 5     final Condition condition = lock.newCondition();
 6 
 7     // 進貨
 8     public void get() {
 9         lock.lock();
10         try {
11             while (product >= 1) {
12                 System.out.println("産品已滿!");
13 
14                 try {
15                     condition.await();
16                 } catch (InterruptedException e) {
17                     e.printStackTrace();
18                 }
19             }
20 
21             System.out.println(Thread.currentThread().getName() + " : " + ++product);
22             condition.signalAll();
23         } finally {
24             lock.unlock();
25         }
26     }
27 
28     // 賣貨
29     public void sale() {
30         lock.lock();
31         try {
32             while (product <= 0) {
33                 System.out.println("産品缺貨!");
34 
35                 try {
36                     condition.await();
37                 } catch (InterruptedException e) {
38                     e.printStackTrace();
39                 }
40             }
41 
42             System.out.println(Thread.currentThread().getName() + " : " + --product);
43             condition.signalAll();
44         } finally {
45             lock.unlock();
46         }
47     }
48 }
49 
50 // 生産者
51 class Producer implements Runnable {
52     private final Clerk clerk;
53 
54     public Producer(Clerk clerk) {
55         this.clerk = clerk;
56     }
57 
58     @Override
59     public void run() {
60         for (int i = 0; i < 20; i++) {
61             try {
62                 Thread.sleep(200);
63             } catch (InterruptedException e) {
64             }
65 
66             clerk.get();
67         }
68     }
69 }
70 
71 // 消費者
72 class Consumer implements Runnable {
73     private final Clerk clerk;
74 
75     public Consumer(Clerk clerk) {
76         this.clerk = clerk;
77     }
78 
79     @Override
80     public void run() {
81         for (int i = 0; i < 20; i++) {
82             clerk.sale();
83         }
84     }
85 }      
1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         Clerk clerk = new Clerk();
 5         Producer producer = new Producer(clerk);
 6         Consumer consumer = new Consumer(clerk);
 7         new Thread(producer, "生産者A").start();
 8         new Thread(consumer, "消費者B").start();
 9 
10         new Thread(producer, "生産者C").start();
11         new Thread(consumer, "消費者D").start();
12     }
13 }      

作者:Craftsman-L

出處:https://www.cnblogs.com/originator

本部落格所有文章僅用于學習、研究和交流目的,版權歸作者所有,歡迎非商業性質轉載。

如果本篇部落格給您帶來幫助,請作者喝杯咖啡吧!點選下面打賞,您的支援是我最大的動力!