天天看點

Java并發程式設計基礎-線程間通信

章節目錄

  • volatile 與 synchronized 關鍵字
  • 等待/通知機制
  • 等待/通知經典範式
  • 管道輸入/輸出流
  • Thread.join() 的 使用

1. volatile 與 synchronized 關鍵字

線程開始運作,擁有自己的棧空間,就如同一個腳本一樣,按照既定的代碼一行一行的執行,直到終止。如果每個運作中的線程,僅僅是孤立的運作,那麼沒有價值,或者說價值很少,如果多個線程能夠 互相配合 完成工作,這将帶來巨大的價值。

1.1 Java 線程操作的共享變量是對共享記憶體變量的一份拷貝

Java支援多個線程同時通路一個對象或者對象的成員變量,由于每個線程可以擁有這個共享變量的一份拷貝
(雖然對象以及成員變量配置設定的記憶體是在共享記憶體中,但是每個執行的線程還是可以擁有一份拷貝,這樣做的目的是
加速程式的執行)。這是現代多核處理器的一個顯著特性,
是以在程式執行過程中,(未同步的程式代碼塊),一個線程看到的變量并不一定是最新的。
           

1.2 volatile 關鍵字-線程間通信

關鍵字volatile可以用來修飾字段(成員變量),就是告知任何對該變量的通路均
需要從共享記憶體中擷取,而對它的改變必須同步重新整理到共享記憶體,
它能保證雖有線程對共享變量的可見性。
           
舉個例子,定義一個程式是否運作的成員變量,boolean on = true; 那麼另一個
線程可能對它執行關閉動作(on = false),這涉及多個線程對變量的通路,是以
需要将其定義為 volatile boolean on = true,這樣其他線程對他進行改變時,可
以讓所有線程感覺到變化,因為所有對共享變量的通路(load)和修改(store)都需
要以共享記憶體為準。但是過多的使用volatile是不必要的,因為它會降低程式執行的效率。
           

1.3 synchronized 關鍵字-線程間通信

關鍵字 synchronized 可以修飾方法 或者以同步塊的形來進行使用,它主要确
保多個線程在同一時刻,隻能有一個線程執行同步方法或同步塊,它保證了線
程對變量通路的可見性、排他性。
           

如下所示,類中使用了同步塊和同步方法,通過使用javap 工具檢視生成的class檔案資訊來分析synchronized關鍵字實作細節,示例如下:

package org.seckill.Thread;

public class Synchronized {
    public static void ls(String[] args) {
        synchronized (Synchronized.class) {

        }//靜态同步方法,對Synchronized Class對象進行加鎖

        m();
    }

    public static synchronized void m(){

    }
}
           

執行 javap -v Synchronized.class

輸出如下所示:

public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=3, args_size=1
         0: ldc           #2                  // class org/seckill/Thread/Synchronized
         2: dup
         3: astore_1
         4: monitorenter
         5: aload_1
         6: monitorexit
         7: goto          15
        10: astore_2
        11: aload_1
        12: monitorexit
        13: aload_2
        14: athrow
        15: invokestatic  #3                  // Method m:()V
        18: return

public static synchronized void m();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:
      stack=0, locals=0, args_size=0
         0: return
           

對上述彙編指令進行解讀

  • 對于同步代碼塊(臨界區)的實作使用了monitorenter 和 monitorexit 指令。
  • 同步方法則是依靠方法修飾符上的ACC_SYNCHRONIZED。
  • 另種同步方式的原理是 對一個充當鎖的對象的monitor 進行擷取,而這個擷取過程是排他的,也就是同一時刻隻能有一個線程擷取到由syntronized 所保護的對象的螢幕。
  • 任何一個對象都擁有自己的螢幕,當這個對象由同步塊或者這個對象的同步方法調用時,執行方法的線程必須先擷取到對象的螢幕才能進入到同步塊或者同步方法中,那麼沒有擷取到螢幕(執行改方法)的線程将會被阻塞在同步塊和同步方法的入口處,進入blocked 狀态。

如下是對上述解讀過程的圖示:

Java并發程式設計基礎-線程間通信

對象、螢幕、同步隊列、執行線程之間的關系

2.等待/通知機制

等待通知相關方法

方法名稱 描述
wait() 調用lock.wait()(lock是充當鎖的對象)的線程将進入waiting狀态,隻有等待另外線程的通知或者線程對象.interrupted()才能傳回,wait()調用後,會釋放對象的鎖
wait(long) 逾時一段時間,這裡的參數是毫秒,也就是等待n毫秒,如果沒有通知就逾時傳回
wait(long,int) 對于逾時間的更細粒度控制,可以達到納秒級别
notify() 通知一個在鎖對象上等待的線程,使其從wait()方法傳回,而傳回的前提是該線程擷取到了對象的鎖(其實是線程擷取到了該對象的monitor對象的控制權)
notifyAll() 通知所有等待在充當鎖的對象上的線程

對等待通知機制的解釋

  • 等待通知機制,是指一個線程A調用了充當鎖的對象的wait()方法進入等 waiting 狀态
  • 另一個線程B調用了對象的O的 notify() 或者 notifyAll() 方法,線程A接收到通知後從充當鎖的對象上的wait()方法傳回,進而執行後續操作,最近一次操作是線程從等待隊列進入到同步阻塞隊列。
  • 上述兩個線程通過充當鎖的對象 lock 來完成互動,而lock對象上的wait()/notify/notifyAll()的關系就如同開關信号一樣,用來完成等待方和通知方的互動工作
如下代碼清單所示,建立兩個線程 WaitThread & NotifyThread,前者檢查flag是否為false,如果符合要求,進行後續操作,否則在lock上wait,後者在睡眠一段時間後對lock進行通知。
package org.seckill.Thread;

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();//充當鎖的對象

    public static void main(String[] args) {
        //建立wait線程
        Thread waitThread = new Thread(new WaitThread(),"waitThread");
        Thread notifyThread = new Thread(new NotifyThread(),"notifyThread");
        waitThread.start();//等待線程開始運作
        Interrupted.SleepUnit.second(5);//主線程sleep 5s
        notifyThread.start();
    }

    //wait線程
    static class WaitThread implements Runnable {
        public void run() {
            synchronized (lock) {
                //判定flag
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread().getName() + "擷取flag 資訊" + flag);
                        //判定為true 直接wait
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println(Thread.currentThread().getName() + "擷取flag 資訊 為" + flag);
            }
        }
    }

    static class NotifyThread implements Runnable {

        public void run() {
            synchronized (lock) {
                while (flag) {
                    System.out.println(Thread.currentThread().getName() + "擷取flag 資訊 為" + flag+"可以運作");
                    lock.notify();//喚醒wait在lock上的線程,此時wait線程隻能能從waiting隊列進入阻塞隊列,但還沒有開始重新進行monitorenter的動作
                    // 因為鎖沒有釋放
                    flag = false;
                    Interrupted.SleepUnit.second(5);
                }
            }

            synchronized (lock){//有可能擷取到lock對象monitor,擷取到鎖
                System.out.println(Thread.currentThread().getName()+" hold lock again");
                Interrupted.SleepUnit.second(5);
            }
        }
    }

}

           

運作結果如下所示:

Java并發程式設計基礎-線程間通信

運作結果

對如上程式運作流程的解釋如下所示:

上圖中"hold lock again 與 最後一行輸出"的位置可能互換,上述例子說明調用wait()、notify()、notifyAll需要注意的細節

  • 使用wait()、notify() 和 notifyAll() 時需要在同步代碼塊或同步方法中使用,且需要先對調用的鎖對象進行加鎖(擷取充當鎖的對象的monitor對象)
  • 調用wait() 方法後,線程狀态由running 變為 waiting,并将目前線程放置到等待隊列中
  • notify()、notifyAll() 方法調用後,等待線程依舊不會從wait()方法傳回,需要調用notify()、notifyAll()的線程釋放鎖之後,等待線程才有機會從wait()方法傳回
  • notify() 方法将waiting隊列中的一個等待線程從waiting隊列 移動到同步隊列中,而notifyAll() 則是将等待隊列中所有的線程全部移動到同步隊列,被移動的線程狀态由waiting status change to blocked狀态
  • 從wait() 方法傳回的前提是獲得了調用對象的鎖

    等待/通知機制依托于同步機制,其目的就是確定等待線程從wait()方法傳回時能夠感覺到通知線程對變量做出的修改

3.等待/通知經典範式

該範式分為兩部分,分别針對等待方(消費方)、和通知方(生産方)

等待方遵循如下原則:

  • 擷取對象的鎖
  • 如果條件不滿足,則調用對象的wait() 方法,被通知後仍要檢查條件
  • 條件滿足則執行對應的邏輯

    對應僞代碼

syntronized (lock) {
  while( !條件滿足 ){
      lock.wait();
  }
  //對應的處理邏輯
}
           

通知方遵循如下原則:

  • 擷取對象鎖
  • 改變條件
  • 通知所有等待在鎖對象的線程
syntronized(lock) {
       //1.執行邏輯
       //2.更新條件
       lock.notify();
 }
           

4.管道輸入輸出流

  • 管道輸入 / 輸出流和普通的檔案輸入/輸出流 或者網絡輸入/輸出流的不同之處在于它主要用于線程之間的資料傳輸,而傳輸的媒介為記憶體。
  • 管道輸入 / 輸出流主要包括如下4種具體實作:PipedOutputStream、PipedInputStream、PipedReader 、PipedWriter 前兩種面向位元組,後兩種面向字元

    對于Piped類型的流,必須先進行綁定,也就是調用connect()方法,如果沒有輸入/輸出流綁定起來,對于該流的通路将抛出異常。

5.Thread.join() 的 使用

如果使用了一個線程A執行了thread.join ,其含義是線程A等待thread線程終止之後才從thread.join()傳回。

如下筆試題:

有A、B、C、D四個線程,在main線程中運作,要求 執行順序是A->B->C->D->mian

變種->main等待A、B、C、D四個線程順序執行,且進行sum,之後main線程列印sum

解法1-join()

其實就是插隊

package org.seckill.Thread;

public class InOrderThread {
    static  int num = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread previous = null;
        for (int i = 0; i < 4; i++) {
            char threadName = (char) (i + 65);
            Thread thread = new Thread(new RunnerThread(previous), String.valueOf(threadName));
            previous = thread;
            thread.start();
        }
        previous.join();
        System.out.println("total num=" + num);
        System.out.println(Thread.currentThread().getName() + "terminal");
    }

    static class RunnerThread implements Runnable {
        Thread previous;//持有前一個線程引用

        public RunnerThread(Thread previous) {
            this.previous = previous;
        }

        public void run() {
            if (this.previous == null) {
//                num += 25;
                System.out.println(Thread.currentThread().getName() + " terminate ");
            } else {
                try {
                    previous.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
//                num += 25;
                System.out.println(Thread.currentThread().getName() + " terminate ");
            }

        }
    }
}

           

解法2-wait/notify

package org.seckill.Thread;

//wait/notify
public class InOrderThread2 {
//    static int state = 0;//運作标志
//    static Object lock = new Object();

    public static void main(String[] args) {
//        RunnerThread runnerThreadA = new RunnerThread();
//        RunnerThread runnerThreadB = new RunnerThread();
//        RunnerThread runnerThreadC = new RunnerThread();
//        RunnerThread runnerThreadD = new RunnerThread();
//        Thread threadA = new Thread(runnerThreadA, "A");
//        Thread threadB = new Thread(runnerThreadB, "B");
//        Thread threadC = new Thread(runnerThreadC, "C");
//        Thread threadD = new Thread(runnerThreadD, "D");
        RunnerThread runnerThread = new RunnerThread();
        Thread threadA = new Thread(runnerThread, "A");
        Thread threadB = new Thread(runnerThread, "B");
        Thread threadC = new Thread(runnerThread, "C");
        Thread threadD = new Thread(runnerThread, "D");

        threadD.start();
        threadA.start();
        threadB.start();
        threadC.start();
    }

    static class RunnerThread implements Runnable {
//        private  boolean flag = true;
        static int state = 0;//運作标志
        static Object lock = new Object();

        public void run() {
            String threadName = Thread.currentThread().getName();
//            while (flag) {
//                synchronized (lock) {
//                    if (state % 4 == threadName.charAt(0) - 65) {
//                        state++;
//                        flag = false;
//                        System.out.println(threadName + " run over");
//                    }
//                }
//            }

            synchronized (lock) {
                while (state % 4 != threadName.charAt(0) - 65) {
                    try {
                        lock.wait();
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
                state++;
                System.out.println(threadName+" run over ");
                lock.notifyAll();
            }
        }
    }
}

           

等待/通知範式做線程同步 是非常友善的。

解法3-循環擷取鎖

package org.seckill.Thread;

//wait/notify
public class InOrderThread2 {
    static int state = 0;//運作标志
    static Object lock = new Object();

    public static void main(String[] args) {
        RunnerThread runnerThreadA = new RunnerThread();
        RunnerThread runnerThreadB = new RunnerThread();
        RunnerThread runnerThreadC = new RunnerThread();
        RunnerThread runnerThreadD = new RunnerThread();
        Thread threadA = new Thread(runnerThreadA, "A");
        Thread threadB = new Thread(runnerThreadB, "B");
        Thread threadC = new Thread(runnerThreadC, "C");
        Thread threadD = new Thread(runnerThreadD, "D");
//        RunnerThread runnerThread = new RunnerThread();
//        Thread threadA = new Thread(runnerThread, "A");
//        Thread threadB = new Thread(runnerThread, "B");
//        Thread threadC = new Thread(runnerThread, "C");
//        Thread threadD = new Thread(runnerThread, "D");

        threadD.start();
        threadA.start();
        threadB.start();
        threadC.start();
    }

    static class RunnerThread implements Runnable {
        private  boolean flag = true;//每個線程的私有變量
//        static int state = 0;//運作标志
//        static Object lock = new Object();

        public void run() {
            String threadName = Thread.currentThread().getName();
            while (flag) {//主動循環加鎖
                synchronized (lock) {
                    if (state % 4 == threadName.charAt(0) - 65) {
                        state++;
                        flag = false;
                        System.out.println(threadName + " run over");
                    }
                }
            }
//
//            synchronized (lock) {
//                while (state % 4 != threadName.charAt(0) - 65) {
//                    try {
//                        lock.wait();
//                    }catch (InterruptedException e){
//                        e.printStackTrace();
//                    }
//                }
//                state++;
//                System.out.println(threadName+" run over ");
//                lock.notifyAll();
//            }
        }
    }
}

           

開銷是極大的、難以確定及時性

解法4-CountDownLatch

package org.seckill.Thread;


import java.util.concurrent.CountDownLatch;


public class InOrderThread3 {
//    static int state = 0;//運作标志
//    static Object lock = new Object();

    public static void main(String[] args) throws  InterruptedException{
        CountDownLatch countDownLatchA = new CountDownLatch(1);
        CountDownLatch countDownLatchB = new CountDownLatch(1);
        CountDownLatch countDownLatchC = new CountDownLatch(1);
        CountDownLatch countDownLatchD = new CountDownLatch(1);
        RunnerThread runnerThreadA = new RunnerThread(countDownLatchA);
        RunnerThread runnerThreadB = new RunnerThread(countDownLatchB);
        RunnerThread runnerThreadC = new RunnerThread(countDownLatchC);
        RunnerThread runnerThreadD = new RunnerThread(countDownLatchD);

        Thread threadA = new Thread(runnerThreadA, "A");
        Thread threadB = new Thread(runnerThreadB, "B");
        Thread threadC = new Thread(runnerThreadC, "C");
        Thread threadD = new Thread(runnerThreadD, "D");


//        RunnerThread runnerThread = new RunnerThread();
//        Thread threadA = new Thread(runnerThread, "A");
//        Thread threadB = new Thread(runnerThread, "B");
//        Thread threadC = new Thread(runnerThread, "C");
//        Thread threadD = new Thread(runnerThread, "D");

        threadA.start();
        countDownLatchA.await();//主線程阻塞,待countDownLatch 減為0即可繼續向下運作
        threadB.start();
        countDownLatchB.await();
        threadC.start();
        countDownLatchC.await();
        threadD.start();
        countDownLatchD.await();

        System.out.println(Thread.currentThread().getName()+" run over ");

    }

    static class RunnerThread implements Runnable {
//        private  boolean flag = true;
//        static int state = 0;//運作标志
//        static Object lock = new Object();
        CountDownLatch countDownLatch;

        RunnerThread(CountDownLatch countDownLatch){
            this.countDownLatch = countDownLatch;
        }

        public void run() {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName+" run over");
            countDownLatch.countDown();

//            while (flag) {
//                synchronized (lock) {
//                    if (state % 4 == threadName.charAt(0) - 65) {
//                        state++;
//                        flag = false;
//                        System.out.println(threadName + " run over");
//                    }
//                }
//            }
//
//            synchronized (lock) {
//                while (state % 4 != threadName.charAt(0) - 65) {
//                    try {
//                        lock.wait();
//                    }catch (InterruptedException e){
//                        e.printStackTrace();
//                    }
//                }
//                state++;
//                System.out.println(threadName+" run over ");
//                lock.notifyAll();
//            }
        }
    }
}

           

countDownLatch 的使用場景 :比如系統完全開啟需要等待系統軟體全部運作之後才能開啟。最終的結果一定是發生在子(部分)結果完成之後的。也可作為線程同步的一種方式

Thread join() 源碼

public final synchronized void join() throws InterruptedException {
   while (isAlive) {
       wait(0);
   }
}
           

當被調用thread.join() 的線程(thread)終止運作時,會調用自身的notifyAll()方法,會通知所有等待該線程對象上完成運作的線程,可以看到join方法的邏輯結構與等待/通知經典範式一緻,即加鎖、循環、處理邏輯3個步驟。