天天看點

Java 線程間通信

要實作多個線程之間的協同,需要涉及到線程之間互相通信,線程間通信分為以下四類:

  1. 檔案共享
  2. 網絡共享
  3. 共享全局變量
  4. jdk提供的線程協調API

本文隻講解jdk提供的API。

三種線程協作通信的方式:

  • suspend/resume(已棄用)
  • wait/notify
  • park/unpark

suspend/resume

示例(生産者—消費者模型):線程1買包子,發現沒有包子,停止執行,線程2生産出包子,通知線程1繼續執行。

public class Test {
    /** 包子 */
    public static Object baozi = null;

    public static void main(String[] args) throws Exception {
        suspendResumeTest();
    }

    /** 正常的suspend/resume */
    public static void suspendResumeTest() throws Exception {
        // 啟動線程
        Thread consumerThread = new Thread(() -> {
            while (baozi == null) { // 如果沒包子,則進入等待
                System.out.println("沒有包子,進入等待");
                Thread.currentThread().suspend();
            }
            System.out.println("買到包子");
        });
        consumerThread.start();
        // 1秒之後,生産一個包子
        Thread.sleep(1000L);
        baozi = new Object();
        consumerThread.resume();
        System.out.println("生産出包子,通知消費者買包子");
    }
}
           
Java 線程間通信

正常消費情況下沒有出現問題,但是使用suspend/resume 容易出現死鎖情況。

死鎖情況1:

當消費者線程(consumerThread)中存在同步代碼塊時,suspend并不會釋放鎖。而其他線程由于搶不到鎖會一直處于阻塞狀态。也就一直無法通知消費者線程繼續執行。

public class Test {

    /** 包子 */
    public static Object baozi = null;

    public static void main(String[] args) throws Exception {

        new Test().suspendResumeDeadLockTest();
    }
	public void suspendResumeDeadLockTest() throws Exception {
		// 啟動線程
		Thread consumerThread = new Thread(() -> {
			while (baozi == null) { // 如果沒包子,則進入等待
				System.out.println("沒包子,進入等待");
				// 目前線程拿到鎖,然後挂起
				synchronized (this) {
					Thread.currentThread().suspend();
				}
			}
			System.out.println("買到包子");
		});
		consumerThread.start();
		// 1秒之後,生産一個包子
		Thread.sleep(1000L);
		baozi = new Object();
		// 争取到鎖以後,再恢複consumerThread
		synchronized (this) {
			consumerThread.resume();
		}
		System.out.println("生産出包子,通知消費者買包子");
	}
}
           
Java 線程間通信

死鎖情況2:

resume先執行,suspend後執行也會導緻程式永久挂起。

public class Test {

    /** 包子 */
    public static Object baozi = null;

    public static void main(String[] args) throws Exception {

        new Test().suspendResumeDeadLockTest2();
    }
	public void suspendResumeDeadLockTest2() throws Exception {
		// 啟動線程
		Thread consumerThread = new Thread(() -> {
			while (baozi == null) {
				System.out.println("沒包子,進入等待");
				try { // 三秒後挂起
					Thread.sleep(3000L);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				// 這裡的挂起執行在resume後面
				Thread.currentThread().suspend();
			}
			System.out.println("買到包子");
		});
		consumerThread.start();
		// 1秒之後,生産一個包子
		Thread.sleep(1000L);
		baozi = new Object();
		consumerThread.resume();
		System.out.println("生産出包子,通知消費者買包子");
		consumerThread.join();
	}
}
           
Java 線程間通信

wait/notify機制

wait方法會導緻目前線程等待,加入該對象的等待集合中,并且放棄目前持有的對象鎖,notify/notifyAll方法喚醒一個或所有正在等待這個對象鎖的線程。

注意:

雖然wait會自動解鎖,但是對順序有要求,如果notify在wait調用前執行,那麼線程也會永遠處于等待狀态。

public class Test {

    /** 包子 */
    public static Object baozi = null;

    public static void main(String[] args) throws Exception {
        new Test().waitNotifyTest();
    }
	/** 正常的wait/notify */
    public void waitNotifyTest() throws Exception {
        // 啟動線程
        new Thread(() -> {
            while (baozi == null) { // 如果沒包子,則進入等待
                synchronized (this) {
                    try {
                        System.out.println("沒有包子,進入等待\"");
                        this.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println("買到包子");
        }).start();
        // 1秒之後,生産一個包子
        Thread.sleep(1000L);
        baozi = new Object();
        synchronized (this) {
            this.notifyAll();
            System.out.println("生産出包子,通知消費者買包子");
        }
    }
}
           
Java 線程間通信

park/unpark機制

線程調用park等待“許可”,unpark方法為指定線程提供“許可”。

不要求park/unpark方法的調用順序,多次調用unpark不會疊加,之後第一次調用park,線程會直接運作,後續調用則會進入等待。

注意:

unpark不會主動釋放鎖

import java.util.concurrent.locks.LockSupport;

public class Test {

    /** 包子 */
    public static Object baozi = null;

    public static void main(String[] args) throws Exception {

        new Test().parkUnparkTest();
    }
	/** 正常的park/unpark */
    public void parkUnparkTest() throws Exception {
        // 啟動線程
        Thread consumerThread = new Thread(() -> {
            while (baozi == null) { 
                try {
                    //等待三秒
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("進入等待");
                LockSupport.park();
            }
            System.out.println("買到包子");
        });
        consumerThread.start();
        // 1秒之後,生産一個包子
        Thread.sleep(1000L);
        baozi = new Object();
        LockSupport.unpark(consumerThread);
        System.out.println("生産出包子,通知消費者買包子");
    }
}
           
Java 線程間通信