文章目錄
-
- 一.多線程基礎
-
- 1.多任務概念
- 2.程序和線程的概念
- 3.程序 vs 線程
- 4.Java多線程
- 二.建立線程
-
- 1.線程執行順序
- 2.線程的優先級
- 三.線程的狀态
- 四.線程禮讓
- 五.中斷線程
-
- 1.設定标志位中斷線程
- 2.volatile
- 五.守護線程
- 六.線程同步
-
- 1.線程同步問題産生
- 2.Synchronized
- 3.錯誤使用Synchronized例子1
- 4.錯誤使用Synchronized例子2
- 5.不需要synchronized的操作
- 6.小結
- 七.同步方法
- Java中沒有特殊說明時,一個類`預設是非線程安全的`
- 八.死鎖
-
- 1.什麼是可重入鎖
- 2.不可重入鎖
- 3.死鎖
- 九.使用wait和notify
-
- 1.什麼是多線程協調?
- 2.使用wait()和notify()解決多線程協調?
- 3.完整例子
- 4.小結
- 十.使用ReentrantLock
-
- 1.什麼是ReentrantLock?
- 2.java中使用ReentrantLock
- 3.完整代碼
- 4.小結
- 十一.使用ReentrantLock + Condition對象來實作wait和notify的功能
-
- 1.如何使用ReentrantLock + Condition對象來實作wait和notify的功能
- 2.小結
- 十二.使用ReadWriteLock
-
- 1.什麼是ReadWriteLock?
- 2.Java中實作ReadWriteLock?
- 3.小結
- 十三.樂觀鎖和悲觀鎖
-
- 1.什麼是悲觀鎖
- 2.什麼是樂觀鎖
- 3.使用場景:
- 4.什麼是CAS
- 5.樂觀鎖常見的兩種實作方式
-
- 1.版本号機制
- 2.CAS算法
- 十四.使用StampedLock
-
- 1.什麼是StampedLock
- 2.Java中使用StampedLock
- 3.小結
- 十五.使用Concurrent集合
-
- 1.java.util.concurrent下的并發集合
- 2.小結
- 十六.使用Atomic(原子類)
-
- 1.什麼是原子類?
- 2.關于原子類個數說明
- 3.原子類的分類:
-
- 1.原子更新基本類型類
- 2.原子更新數組
- 3.原子更新引用類型
- 4.原子更新字段類
- 5.JDK8新增原子類簡介
- 4.使用java.util.concurrent.atomic提供的原子操作可以簡化多線程程式設計:
- 十七.使用線程池
-
- 1.什麼是線程池?
- 2.Java中使用線程池
- 3.小結
- 十八.使用Future
-
- 1.什麼是Future?
- 2.Java中使用Future?
- 3.小結
- 十九. 使用CompletableFuture
-
- 1.什麼是CompletableFuture?
- 2.Java中使用CompletableFuture?
- 3.CompletableFuture相比Future的優勢
- 4.小結
- 二十.使用ForkJoin
-
- 1.什麼是ForkJoin
- 2.Java中使用Fork/Join
- 3.小結
- 二十一.使用ThreadLocal
-
- 1.什麼是ThreadLocal?
- 2.Java中使用ThreadLocal
- 3.小結
一.多線程基礎
1.多任務概念
- 現代作業系統
都可以執行多任務。多任務就是(Windows,macOS,Linux)
同時運作多個任務
,例如:
同時打開ie浏覽器/QQ/QQ音樂
- CPU執行代碼都是
的,即使是單核cpu,也可以同時運作多個任務。 因為作業系統執行多任務實際上就是一條一條順序執行
讓CPU對多個任務輪流交替執行
。
例如,假設我們有國文、數學、英語3門作業要做,每個作業需要30分鐘。我們把這3門作業看成是3個任務,可以做1分鐘國文作業,再做1分鐘數學作業,再做1分鐘英語作業:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLxUkeORTS650MjR0T4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5ITN0QDO0MTMxMTMwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
這樣輪流做下去,在某些人眼裡看來,做作業的速度就非常快,看上去就像同時在做3門作業一樣
類似的,作業系統輪流讓多個任務交替執行,例如,讓浏覽器執行0.001秒,讓QQ執行0.001秒,再讓QQ音樂執行0.001秒,在人看來,CPU就是在同時執行多個任務。 即使是多核CPU,因為通常任務的數量遠遠多于CPU的核數,是以任務也是交替執行的。
2.程序和線程的概念
- 在計算機中,我們把
,浏覽器就是一個程序,視訊播放器是另一個程序,類似的,音樂播放器和Word都是程序。一個任務稱為一個程序
- 某些程序内部還需要同時執行
。 例如,我們在使用Word時,Word可以讓我們一邊打字,一邊進行拼寫檢查,同時還可以在背景進行列印,我們把子任務稱為線程。多個子任務
- 程序和線程的關系就是:一個程序可以包含一個或多個線程,但至少會有一個線程。
【Java基礎】多線程從入門到掌握 - 作業系統排程的
。 常用的Windows、Linux等作業系統都采用最小任務機關是線程
,搶占式多任務
如何排程線程完全由作業系統決定,程式自己不能決定什麼時候執行,以及執行多長時間。
因為
同一個應用程式,既可以有多個程序,也可以有多個線程
,
是以,實作多任務的方法,有以下幾種:
- 多程序模式(每個程序隻有一個線程):
【Java基礎】多線程從入門到掌握 - 多線程模式(一個程序有多個線程)
【Java基礎】多線程從入門到掌握 - 多程序+多線程模式(複雜度最高)
【Java基礎】多線程從入門到掌握
3.程序 vs 線程
- 程序和線程是
,但是多任務既可以由多程序實作,也可以由單程序内的多線程實作,還可以混合多程序+多線程。具體采用哪種方式,要考慮到程序和線程的特點。包含關系
- 和多線程相比,多程序的缺點在于:
- 建立程序比建立線程開銷大,尤其是在Windows系統上;
- 程序間通信比線程間通信要慢,因為線程間通信就是讀寫同一個變量,速度很快。
- 而多程序的優點在于:
- 多程序穩定性比多線程高,因為在多程序的情況下,一個程序崩潰不會影響其他程序,而在多線程的情況下,任何一個線程崩潰會直接導緻整個程序崩潰。
4.Java多線程
Java語言内置了多線程支援:一個Java程式實際上是一個
JVM程序
,JVM程序用一個
主線程來執行main()方法
,在
main()方法内部
,我們又可以
啟動多個線程
。 此外,
JVM還有負責垃圾回收的其他工作線程(守護線程)等。
- 是以,對于大多數Java程式來說,我們說多任務,實際上是說如何使用多線程實作多任務。
- 和單線程相比,多線程程式設計的特點在于:多線程
需要經常
。讀寫共享資料,并且需要同步
- 例如,播放電影時,就必須由一個線程播放視訊,另一個線程播放音頻,兩個線程需要協調運作,否則畫面和聲音就不同步。是以,
多線程程式設計的複雜度高,調試更困難。
- 例如,播放電影時,就必須由一個線程播放視訊,另一個線程播放音頻,兩個線程需要協調運作,否則畫面和聲音就不同步。是以,
- Java多線程程式設計的特點又在于:
- 多線程模型是Java程式最基本的并發模型;
- 後續讀寫網絡、資料庫、Web開發等都依賴Java多線程模型。
二.建立線程
- Java語言内置了多線程支援。當Java程式啟動的時候,實際上是啟動了一個JVM程序,然後,JVM啟動主線程來執行main()方法。在main()方法中,我們又可以啟動其他線程。
要建立一個新線程非常容易,我們需要執行個體化一個Thread執行個體,然後調用它的start()方法:
- 方法一:繼承Thread類重寫run方法:
public class Main {
public static void main(String[] args) {
Thread t = new MyThread();
t.start(); // 啟動新線程
}
}
class MyThread extends Thread {
@Override
public void run() {
System.out.println("start new thread!");
}
}
- 方法二:實作Runnable接口重寫run方法,建立Thread執行個體時,傳入一個Runnable執行個體:
public class Main {
public static void main(String[] args) {
Thread t = new Thread(new MyRunnable());
t.start(); // 啟動新線程
}
}
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("start new thread!");
}
}
或者用Java8引入的lambda文法進一步簡寫為:
public class Main {
public static void main(String[] args) {
Thread t = new Thread(() -> {
System.out.println("start new thread!");
});
t.start(); // 啟動新線程
}
}
1.線程執行順序
public class Main {
public static void main(String[] args) {
System.out.println("main start...");
Thread t = new Thread() {
public void run() {
System.out.println("thread run...");
System.out.println("thread end.");
}
};
t.start();
System.out.println("main end...");
}
}
main線程執行的代碼有4行,首先列印
main start
,然後建立
Thread對象
,緊接着調用
start()
啟動新線程。當start()方法被調用時,
JVM就建立了一個新線程
,我們通過執行個體
變量t
來表示這個新線程對象,并開始執行。
接着,
main線程
繼續執行列印
main end
語句,而
t線程
在
main線程
執行的同時會
并發執行
,列印
thread run
和
thread end
語句。
當run()方法結束時,新線程就結束了。而
main()方法結束時,主線程也結束了。
我們再來看線程的執行順序:
-
肯定是先列印main線程
,再列印main start
2.main end;
定是先列印t線程肯
,再列印thread run
。thread end
- 但是,除了可以肯定,
,main start會先列印外
列印在main end
之前、thread run
之後或者之間,都thread end
。因為從無法确定
開始運作以後,t線程
,并且兩個線程就開始同時運作了
,由作業系統排程
程式本身無法确定線程的排程順序。
2.線程的優先級
可以對線程設定優先級,設定優先級的方法是:
優先級高的線程被作業系統排程的優先級較高,作業系統對高優先級線程可能排程更頻繁,但不能保證優先級高的線程一定會先執行。
線程排程由作業系統決定,程式本身無法決定排程順序;
三.線程的狀态
在Java程式中,
一個線程對象隻能調用一次start()方法
啟動新線程,并在新線程中執行
run()方法
。一旦run()方法執行完畢,線程就結束了。是以,Java線程的狀态有以下幾種:
-
:新建立的線程,尚未執行;New
-
:運作中的線程,正在執行run()方法的Java代碼;Runnable
-
:運作中的線程,因為某些操作被阻塞而挂起;Blocked
-
:運作中的線程,因為某些操作在等待中;Waiting
-
:運作中的線程,因為執行sleep()方法正在計時等待;Timed Waiting
-
Terminated
:線程已終止,因為run()方法執行完畢。
-
當線程啟動後,它可以在【Java基礎】多線程從入門到掌握
這幾個狀态之間切換,直到最後變成Runnable、Blocked、Waiting和Timed Waiting
狀态,線程終止。Terminated
線程終止的原因有:
- 線程正常終止:run()方法執行到return語句傳回;
- 線程意外終止:run()方法因為未捕獲的異常導緻線程終止;
- 調用線程執行個體的stop()方法強制終止
。(強烈不推薦使用)
四.線程禮讓
- 通過對另一個線程對象調用join()方法可以等待其執行結束;
- 可以指定等待時間,超過等待時間線程仍然沒有結束就不再等待;
- 對已經運作結束的線程調用join()方法會立刻傳回。
- join()方法誰的線程體中哪個線程就會等待目前join()方法對應執行個體線程執行結束在執行
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
System.out.println("hello");
});
System.out.println("start");
t.start();
t.join();
System.out.println("end");
}
}
當
main線程
對
線程t
調用join()方法時,
主線程
将等待
線程t
運作結束在繼續執行,即join就是指等待該線程結束,然後才繼續往下執行自身線程。 是以,上述代碼列印順序可以肯定是
main線程先列印start,t線程再列印hello,main線程最後再列印end。
如果
線程t
已經結束,對執行個體t調用join()會立刻傳回。
- 此外,
的重載方法也可以指定一個等待時間,join(long)
。超過等待時間後就不再繼續等待
五.中斷線程
-
如果線程需要執行一個長時間任務,就可能需要能中斷線程。中斷線程就是其他線程給該線程發一個信号,該線程收到信号後結束執行run()方法,使得自身線程能立刻結束運作。
我們舉個栗子:假設從網絡下載下傳一個100M的檔案,如果網速很慢,使用者等得不耐煩,就可能在下載下傳過程中點“取消”,這時,程式就需要中斷下載下傳線程的執行。
中斷一個線程非常簡單,隻需要在其他線程中對目标線程調用
interrupt()
(預設值為false,需要取反判斷) 方法,
目标線程循環調用interrupted()方法判斷自身狀态
,如果是,就立刻結束運作。
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t = new MyThread();
t.start();
Thread.sleep(1); // 暫停1毫秒
t.interrupt(); // 中斷t線程
t.join(); // 等待t線程結束
System.out.println("end");
}
}
class MyThread extends Thread {
public void run() {
int n = 0;
while (! isInterrupted()) {
n ++;
System.out.println(n + " hello!");
}
}
}
上述代碼 main線程 通過調用·線程t.interrupt()·方法中斷t線程,但是要注意,
interrupt()方法僅僅向t線程發出了“中斷請求”
,至于t線程是否能立刻響應,要看具體代碼。而
線程t的while循環會檢測isInterrupted()
,是以上述代碼能正确響應interrupt()請求,使得自身立刻結束運作run()方法。
如果線程處于等待狀态,調用目前線程的interrupt()會抛出InterruptedException,例如,
t.join()
會讓
main
線程進入等待狀态,此時,如果對main線程調用interrupt(),
join()方法會立刻抛出InterruptedException
,是以,目标線程隻要捕獲到join()方法抛出的InterruptedException`,就說明有其他線程對其調用了interrupt()方法,通常情況下該線程應該立刻結束運作。
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread t = new MyThread();
t.start();
Thread.sleep(1000);
t.interrupt(); // 中斷t線程
t.join(); // 等待t線程結束
System.out.println("end");
}
}
class MyThread extends Thread {
public void run() {
Thread hello = new HelloThread();
hello.start(); // 啟動hello線程
try {
hello.join(); // 等待hello線程結束
} catch (InterruptedException e) {
System.out.println("interrupted!");
}
hello.interrupt();
}
}
class HelloThread extends Thread {
public void run() {
int n = 0;
while (!isInterrupted()) {
n++;
System.out.println(n + " hello!");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
}
}
}
main線程
通過調用
t.interrupt()
進而通知t線程中斷,而此時
線程t正在等待hello線程執行結束
,
此方法會立刻結束等待并抛出InterruptedException
。由于我們在
線程t中捕獲了InterruptedException
,是以,就可以準備結束該線程。在t線程結束前,對hello線程也進行了interrupt()調用通知其中斷。
如果去掉這一行代碼,可以發現hello線程仍然會繼續運作,且JVM不會退出。
1.設定标志位中斷線程
常用的中斷線程的方法是設定标志位。我們通常會用一個boolean 類型的标記位來辨別線程是否應該繼續運作,在外部線程中,通過把它置為false,就可以讓線程結束:
public class Main {
public static void main(String[] args) throws InterruptedException {
HelloThread t = new HelloThread();
t.start();
Thread.sleep(1);
t.running = false; // 标志位置為false
}
}
class HelloThread extends Thread {
public volatile boolean running = true;
public void run() {
int n = 0;
while (running) {
n ++;
System.out.println(n + " hello!");
}
System.out.println("end!");
}
}
注意到HelloThread的标志位
boolean running
是一個
線程間共享的變量
。
- 線程間共享變量需要使用
标記,確定每個線程都能讀取到更新後的變量值。volatile關鍵字
2.volatile
為什麼要對線程間共享的變量用關鍵字volatile聲明?
- 這涉及到
。在Java虛拟機中,變量的值儲存在Java的記憶體模型
中,當主記憶體
。如果線程修改了變量的值,虛拟機會在某個時刻把修改後的值回寫到主記憶體,但這個時間是不确定的!線程通路變量時,它會先擷取一個副本,并儲存在自己的工作記憶體中
【Java基礎】多線程從入門到掌握 這會導緻一個線程更新了某個變量,另一個線程讀取的值可能還是更新前的。
例如,
,它在此刻僅僅是把主記憶體變量a = true,線程1執行a = false時
,變量a的副本變成了false
,在JVM把修改後的a回寫到主記憶體之前,主記憶體的變量a還是true
, 這 就造成了多線程之間共享的變量不一緻。其他線程讀取到的a的值仍然是true
volatile關鍵字的目的是告訴虛拟機:
- 每次通路變量時,總是擷取主記憶體的最新值;
- 每次修改變量後,立刻回寫到主記憶體。
volatile關鍵字解決的是
共享變量線上程間的可見性問題
:當一個線程修改了某個共享變量的值,其他線程能夠立刻看到修改後的值。
如果我們去掉volatile關鍵字,運作上述程式,發現效果和帶volatile差不多,這是因為在的架構下,JVM回寫主記憶體的速度非常快,但是,換成
x86
的架構,就會有顯著的延遲。
ARM
五.守護線程
- Java程式入口就是由JVM啟動main線程,main線程又可以啟動其他線程。
。當所有線程都運作結束時,JVM退出,程序結束
- 如果有一個線程沒有退出,JVM程序就不會退出。是以,必須保證所有線程都能及時結束。
但是有一種線程的目的就是無限循環,例如,
一個定時觸發任務的線程
:
class TimerThread extends Thread {
@Override
public void run() {
while (true) {
System.out.println(LocalTime.now());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
}
}
如果這個線程不結束,JVM程序就無法結束。問題是,由誰負責結束這個線程?
然而這類線程經常沒有負責人來負責結束它們。但是,當其他線程結束時,JVM程序又必須要結束,怎麼辦?
答案是使用
守護線程(Daemon Thread)
。
守護線程是指
為其他線程服務的線程
。在JVM中,
所有非守護線程都執行完畢後,無論有沒有守護線程,虛拟機都會自動退出。
是以,JVM退出時,不必關心守護線程是否已結束。
如何建立守護線程
- 在調用start()方法前,調用setDaemon(true)把該線程标記為守護線程:
Thread t = new MyThread();
t.setDaemon(true);
t.start();
在守護線程中,編寫代碼要注意:守護線程不能持有任何需要關閉的資源,例如打開檔案等,因為虛拟機退出時,守護線程沒有任何機會來關閉檔案,這會導緻資料丢失。
六.線程同步
當多個線程同時運作時,線程的排程由作業系統決定,程式本身無法決定。是以,任何一個線程都有可能在任何指令處被作業系統暫停,然後在某個時間段後繼續執行。
這個時候,有個單線程模型下不存在的問題就來了:如果
多個線程時讀寫共享變量
,會出現資料不一緻的問題。
1.線程同步問題産生
public class Main {
public static void main(String[] args) throws Exception {
AddThread add = new AddThread();
DecThread dec = new DecThread();
add.start();
dec.start();
add.join();
dec.join();
System.out.println(Counter.count);
}
}
class Counter {
public static int count = 0;
}
class AddThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) { Counter.count += 1; }
}
}
class DecThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) { Counter.count -= 1; }
}
}
上面的代碼
兩個線程同時對一個int變量進行操作
,一個加10000次,一個減10000次,最後結果應該是0,但是,
每次運作,結果實際上都是不一樣的。
連續執行三次結果
這是因為對變量進行讀取和寫入時,結果要正确,必須保證是原子操作。原子操作是指不能被中斷的一個或一系列操作。
實際上執行
n = n + 1
并不是一個原子操作,它的執行過程如下:
1. 從主存中讀取變量x副本到工作記憶體
2. 給x加1
3. 将x加1後的值寫回主存
我們假設
n
的值是
100
,如果
兩個線程同時執行n = n + 1
,得到的結果很可能不是
102,而是101
,
原因在于:
多個線程執行時,CPU對線程的排程是随機的,我們不知道目前程式被執行到哪步就切換到了下一個線程
- 如果線程1在從主記憶體将n=100的值同步到工作記憶體時,此時cpu切換到線程2,線程2也将n=100的值同步到工作記憶體
- 線程1 n+=1 = 101,然後同步到主記憶體此時主記憶體為101
- 線程2 n-=1 = 99,然後同步到主記憶體此時主記憶體為99
- 顯然由于執行順序的不同n最終的結果可能為101也可能為99
這說明多線程模型下,要保證邏輯正确,即某一個線程對共享變量進行讀寫時,其他線程必須等待
2.Synchronized
- 通過加鎖和解鎖的操作,就能保證在一個線程執行期間,不會有其他線程會進入此代碼塊。
- 即使在執行期線程被作業系統中斷執行,其他線程也會因為無法獲得鎖導緻無法進入此代碼塊。隻有執行線程将鎖釋放後,其他線程才有機會獲得鎖并執行。這種
我們稱之為加鎖和解鎖之間的代碼塊
,臨界區(Critical Section)
任何時候臨界區最多隻有一個線程能執行。
- 保證一段代碼的原子性就是通過加鎖和解鎖實作的。Java程式使用
對一個對象進行加鎖synchronized關鍵字
synchronized(lock) {
n = n + 1;
}
synchronized保證了代碼塊在任意時刻最多隻有一個線程能執行。
public class TestSynchronized {
public static void main(String[] args) throws Exception {
AddThread add = new AddThread();
DecThread dec = new DecThread();
add.start();
dec.start();
add.join();
dec.join();
System.out.println(Counter.count);
}
}
//計數器
class Counter {
public static final Object lock = new Object();
public static int count = 0;
}
//新增線程
class AddThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) { synchronized(Counter.lock) { Counter.count += 1;} }
}
}
//減少線程
class DecThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) { synchronized(Counter.lock){ Counter.count -= 1;} }
}
}
執行結果
代碼
synchronized(Counter.lock) {//擷取鎖
}//釋放鎖
- 它表示用
作為鎖,兩個線程在執行各自的Counter.lock執行個體
代碼塊時,synchronized(Counter.lock) { ... }
必須先獲得鎖,才能進入代碼塊進行。
這将會導緻對Counter.count變量進行讀寫就執行結束後,在synchronized語句塊結束會自動釋放鎖。
。論運作多少次,最終結果都是0。不能同時進行
synchronized解決了多線程同步通路共享變量的有序性問題。
但它的缺點是帶來了性能下降。因為synchronized代碼塊無法并發執行。此外
加鎖和解鎖需要消耗一定的時間
,是以,
synchronized會降低程式的執行效率。
如何使用Synchronized
- 找出修改共享變量的線程代碼塊;
- 選擇一個共享執行個體作為鎖;
- 使用
synchronized(lockObject) { }.
- 在使用synchronized的時候,
。因為無論是否有異常,都會在synchronized結束處正确釋放鎖:不必擔心抛出異常
public void add(int m) {
synchronized (obj) {
if (m < 0) {
throw new RuntimeException();
}
this.value += m;
} // 無論有無異常,都會在此釋放鎖
}
3.錯誤使用Synchronized例子1
public class Main {
public static void main(String[] args) throws Exception {
AddThread add = new AddThread();
DecThread dec = new DecThread();
add.start();
dec.start();
add.join();
dec.join();
System.out.println(Counter.count);
}
}
class Counter {
public static final Object lock1 = new Object();
public static final Object lock2 = new Object();
public static int count = 0;
}
class AddThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) {
synchronized(Counter.lock1) {
Counter.count += 1;
}
}
}
}
class DecThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) {
synchronized(Counter.lock2) {
Counter.count -= 1;
}
}
}
}
執行結果
結果并不是0,這是因為兩個線程各自的synchronized鎖住的不是同一個對象! 這使得兩個線程各自都可以同時獲得鎖:
因為JVM隻保證同一個鎖在任意時刻隻能被一個線程擷取,但兩個不同的鎖在同一時刻可以被兩個線程分别擷取
。 使用synchronized的時候,擷取到的是哪個鎖非常重要。鎖對象如果不對,代碼邏輯就不對。
4.錯誤使用Synchronized例子2
public class Main {
public static void main(String[] args) throws Exception {
Thread [] ts = new Thread[] { new AddStudentThread(), new DecStudentThread(), new AddTeacherThread(), new DecTeacherThread() };
for (Thread t : ts) {
t.start();
}
for (Thread t : ts) {
t.join();
}
System.out.println(Counter.studentCount);
System.out.println(Counter.teacherCount);
}
}
class Counter {
public static final Object lock = new Object();
public static int studentCount = 0;
public static int teacherCount = 0;
}
class AddStudentThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) {
synchronized(Counter.lock) {
Counter.studentCount += 1;
}
}
}
}
class DecStudentThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) {
synchronized(Counter.lock) {
Counter.studentCount -= 1;
}
}
}
}
class AddTeacherThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) {
synchronized(Counter.lock) {
Counter.teacherCount += 1;
}
}
}
}
class DecTeacherThread extends Thread {
public void run() {
for (int i=0; i<10000; i++) {
synchronized(Counter.lock) {
Counter.teacherCount -= 1;
}
}
}
}
執行結果
- 上面
對4個線程
分别進行讀寫操作,但是使用的鎖都是兩個共享變量
,這就造成了Counter.lock對象
,執行效率大大降低。原本可以并發執行的Counter.studentCount += 1和Counter.teacherCount += 1無法并發執行了
- 實際上,需要同步的線程可以分成兩組:
,AddStudentThread和DecStudentThread
,組之間不存在競争,是以,應該使用兩個不同的鎖AddTeacherThread和DecTeacherThread
public class TestSynchronizedMulti {
public static void main(String[] args) throws Exception {
//建立線程
Thread[] ts = new Thread[]{new AddStudentThread(), new DecStudentThread(), new AddTeacherThread(), new DecTeacherThread()};
//啟動線程
for (Thread t : ts) {
t.start();
}
//優先子線程先執行
for (Thread t : ts) {
t.join();
}
//最後列印執行結果
System.out.println(Counter.studentCount);
System.out.println(Counter.teacherCount);
}
}
//計數器
class Counter {
public static final Object lockTeacher = new Object();//學生線程鎖對象
public static final Object lockStudent = new Object();//老師線程鎖對象
public static int studentCount = 0;
public static int teacherCount = 0;
}
//增加學生數量線程
class AddStudentThread extends Thread {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lockStudent) {
Counter.studentCount += 1;
}
}
}
}
//減少學生數量線程
class DecStudentThread extends Thread {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lockStudent) {
Counter.studentCount -= 1;
}
}
}
}
//增加老師數量線程
class AddTeacherThread extends Thread {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lockTeacher) {
Counter.teacherCount += 1;
}
}
}
}
//減少老師數量線程
class DecTeacherThread extends Thread {
public void run() {
for (int i = 0; i < 10000; i++) {
synchronized (Counter.lockTeacher) {
Counter.teacherCount -= 1;
}
}
}
}
執行結果
5.不需要synchronized的操作
JVM規範定義了幾種原子操作:
- 基本類型(long和double除外)指派,例如:int n = m;
- 引用類型指派,例如:List list = anotherList。
- long和double是64位資料,JVM沒有明确規定64位指派操作是不是一個原子操作,不過在
x64平台的JVM是把long和double的指派作為原子操作實作的。
- long和double是64位資料,JVM沒有明确規定64位指派操作是不是一個原子操作,不過在
- 單條原子操作的語句不需要同步。例如:
public void set(int m) {
synchronized(lock) {
this.value = m;
}
}
就不需要同步
//引用類型指派
public void set(String s) {
this.value = s;
}
- 如果是多行指派語句,就必須保證是同步操作
class Pair {
int first;
int last;
public void set(int first, int last) {
synchronized(this) {
this.first = first;
this.last = last;
}
}
}
有些時候,通過一些巧妙的轉換,可以把非原子操作變為原子操作。例如,上述代碼如果改造成:
class Pair {
int[] pair;
public void set(int first, int last) {
int[] ps = new int[] { first, last };
this.pair = ps;
}
}
就不再需要同步,因為
this.pair = ps
是
引用指派的原子操作
。而語句:
int[] ps = new int[] { first, last };
,這裡的
ps是方法内部定義的局部變量
,每個線程都會有各自的局部變量,互不影響,并且互不可見,并不需要同步。
6.小結
- 多線程
,會造成邏輯錯誤,是以需要通過同時讀寫共享變量時
同步;synchronized
- 同步的本質就是給指定對象加鎖,加鎖後才能繼續執行後續代碼;
- 注意加鎖對象必須是同一個執行個體;
- 對JVM定義的單個原子操作不需要同步
七.同步方法
Java程式依靠
synchronized
對線程進行同步,使用synchronized的時候,鎖住的是哪個對象非常重要。
讓線程自己選擇鎖對象往往會使得代碼邏輯混亂,也不利于封裝。
更好的方法是把synchronized邏輯封裝起來。
例如,我們編寫一個計數器
//計數器
public class Counter {
private int count = 0;
public synchronized void add(int n) {
count += n;
}
public synchronized void dec(int n) {
count -= n;
}
public int get() {
return count;
}
}
//測試方法
class Main {
public static void main(String[] args) throws InterruptedException {
Counter c1 = new Counter();
Counter c2 = new Counter();
// 對c1進行操作的線程:
new Thread(() -> {
c1.add(1);
}).start();
new Thread(() -> {
c1.dec(1);
}).start();
// 對c2進行操作的線程:
new Thread(() -> {
c2.add(1);
}).start();
new Thread(() -> {
c2.dec(1);
}).start();
//主線程休眠20毫秒
Thread.sleep(20);
System.out.println(c1.get());
System.out.println(c2.get());
}
線程調用
add()
、
dec()
方法時不必關心同步邏輯,因為
synchronized代碼塊
在
add()
、
dec()
方法内部。并且
synchronized
鎖住的對象是
this
,
即目前執行個體
,
這使得建立多個Counter執行個體的時候,它們之間互不影響,可以并發執行
- 使用synchronized修飾方法,表示整個方法都必須用this執行個體加鎖
public synchronized void add(int n) { // 鎖住this count += n; } //
-
使用synchronized修飾靜态方法,鎖住的是該類的class執行個體
static方法沒有this執行個體的,因為static方法是針對類而不是執行個體。
,任何一個類都有一個由JVM自動建立的Class執行個體
對static方法添加synchronized鎖住的是該類的class執行個體
public class Counter { public static void test(int n) { synchronized(Counter.class) { // ... } } }
Java中沒有特殊說明時,一個類 預設是非線程安全的
預設是非線程安全的
八.死鎖
1.什麼是可重入鎖
Java的線程鎖是
可重入的鎖
- 什麼是可重入的鎖?
public synchronized void method1(){
System.out.println("sysn method1");
method2();
}
private synchronized void method2() {
System.out.println("syn method2");
}
如果一旦線程執行到
add()
方法内部,說明它已經
擷取了目前執行個體的this鎖
。如果傳入的
n < 0
,将在
add()
方法内部調用
dec()
方法。由于dec()方法也需要擷取this鎖,現在問題來了:
對同一個線程,能否在擷取到鎖以後繼續擷取同一個鎖?
- 答案是肯定的。JVM允許
,這種能被同一個線程反複擷取的鎖,就叫做同一個線程重複擷取同一個鎖
。可重入鎖
- 廣義上的可重入鎖指的是可重複可遞歸調用的鎖,
,這樣的鎖就叫做可重入鎖.在外層使用鎖之後,在内層仍然可以使用,并且不發生死鎖(前提得是同一個對象或者class)
- 可重入鎖(也叫遞歸鎖):指的是
在同一線程
方法獲得鎖之後,外層
仍然可以擷取該鎖的代碼,在同一線程在外層方法擷取鎖的時候内層遞歸方法
,在進入内層方法會自動擷取鎖。也就是說,+1
。線程可以進入任何一個它已經擁有的鎖所同步着的代碼塊
- 重入鎖以
為機關,當一個線程擷取線程
之後,這個線程可以再次擷取本對象上的鎖,而其他的線程是不可以的。ReentrantLock和synchronized都是可重入鎖對象鎖
- 可重入鎖的意義便在于
防止死鎖!!!
- 由于Java的線程鎖是可重入鎖,是以擷取鎖的時,
不但要判斷是否是第一次擷取,還要記錄這是第幾次擷取。每擷取一次鎖,記錄+1,每退出synchronized塊,記錄-1,減到0的時候,才會真正釋放鎖。
- 實作原理是通過為
關聯一個每個鎖
和一個請求計數器
。當計數為0時,認為鎖是未被占有的;線程請求一個未被占有的鎖時,JVM将記錄鎖的占有者,并且将請求計數器置為1 。占有它的線程
- 如果同一個線程再次請求這個鎖,計數将遞增;
- 每次占用線程退出同步塊,計數器值将遞減。直到計數器為0,鎖被釋放。
java之可重入鎖和遞歸鎖理論知識
2.不可重入鎖
所謂不可重入鎖,即指的是同一線程在外層方法獲得鎖之後,那麼在
内層遞歸方法
中嘗試再次擷取鎖時,就會擷取不到被阻塞。
- 不可重入鎖,與可重入鎖相反,不可遞歸調用,遞歸調用就發生
。死鎖
public class Lock{
private boolean isLocked = false;
public synchronized void lock() throws InterruptedException{
while(isLocked){
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify();
}
}
public class Count{
Lock lock = new Lock();
public void print(){
lock.lock();
doAdd();
lock.unlock();
}
public void doAdd(){
lock.lock();
//do something
lock.unlock();
}
}
目前線程執行
print()
方法首先擷取
lock
,接下來執行
doAdd()
方法就無法執行
doAdd()
中的邏輯,必須先釋放鎖。
Java不可重入鎖和可重入鎖了解
3.死鎖
-
什麼是死鎖
死鎖産生的條件是多線程各自持有不同的鎖,并互相試圖擷取對方已持有的鎖,導緻無限等待
public void add(int m) {
synchronized(lockA) { // 獲得lockA的鎖
this.value += m;
synchronized(lockB) { // 獲得lockB的鎖
this.another += m;
} // 釋放lockB的鎖
} // 釋放lockA的鎖
}
public void dec(int m) {
synchronized(lockB) { // 獲得lockB的鎖
this.another -= m;
synchronized(lockA) { // 獲得lockA的鎖
this.value -= m;
} // 釋放lockA的鎖
} // 釋放lockB的鎖
}
在擷取多個鎖的時候,不同線程擷取多個不同對象的鎖可能導緻死鎖
。
線程1和線程2如果分别執行add()和dec()方法時:
- 線程1:進入add(),獲得lockA;
- 線程2:進入dec(),獲得lockB。
随後:
- 線程1:準備獲得lockB,失敗,等待中;
- 線程2:準備獲得lockA,失敗,等待中。
此時
兩個線程各自持有不同的鎖,然後各自試圖擷取對方手裡的鎖,造成了雙方無限等待下去
,這就是
死鎖
。
- 死鎖發生後,
。在編寫多線程應用時,要特别注意防止死鎖。因為沒有任何機制能解除死鎖,隻能強制結束JVM程序
死鎖一旦形成,就隻能強制結束程序。
-
如何避免死鎖?
線程擷取鎖的順序要一緻。即嚴格按照先擷取鎖A,再擷取鎖B的順序,或者先擷取鎖B,再擷取鎖A的順序
将上面代碼中的dec方法擷取鎖的順序改成和add()一樣即可
public void dec(int m) {
synchronized(lockA) { // 獲得lockA的鎖
this.value -= m;
synchronized(lockB) { // 獲得lockB的鎖
this.another -= m;
} // 釋放lockB的鎖
} // 釋放lockA的鎖
}
- 避免死鎖的3中辦法
-
加鎖順序
當多個線程需要相同的一些鎖,但是按照不同的順序加鎖,死鎖就很容易發生。如果能確定所有的線程都是按照相同的順序獲得鎖,那麼死鎖就不會發生。
-
加鎖時限
另外一個可以避免死鎖的方法是在嘗試擷取鎖的時候加一個逾時時間,這也就意味着在嘗試擷取鎖的過程中若超過了這個時限該線程則放棄對該鎖請求。若一個線程沒有在給定的時限内成功獲得所有需要的鎖,則會進行回退并釋放所有已經獲得的鎖,然後等待一段随機的時間再重試。這段随機的等待時間讓其它線程有機會嘗試擷取相同的這些鎖,并且讓該應用在沒有獲得鎖的時候可以繼續運作(注:加鎖逾時後可以先繼續運作幹點其它事情,再回頭來重複之前加鎖的邏輯)。
-
死鎖檢測
死鎖檢測是一個更好的死鎖預防機制,它主要是針對那些不可能實作按序加鎖并且鎖逾時也不可行的場景。
- 每當一個線程獲得了鎖,會線上程和鎖相關的資料結構中(map、graph等等)将其記下。除此之外,每當有線程請求鎖,也需要記錄在這個資料結構中。
- 當一個線程請求鎖失敗時,這個線程可以周遊鎖的關系圖看看是否有死鎖發生。例如,線程A請求鎖7,但是鎖7這個時候被線程B持有,這時線程A就可以檢查一下線程B是否已經請求了線程A目前所持有的鎖。如果線程B确實有這樣的請求,那麼就是發生了死鎖(線程A擁有鎖1,請求鎖7;線程B擁有鎖7,請求鎖1)。
- 當然,死鎖一般要比兩個線程互相持有對方的鎖這種情況要複雜的多。線程A等待線程B,線程B等待線程C,線程C等待線程D,線程D又在等待線程A。線程A為了檢測死鎖,它需要遞進地檢測所有被B請求的鎖。從線程B所請求的鎖開始,線程A找到了線程C,然後又找到了線程D,發現線程D請求的鎖被線程A自己持有着。這是它就知道發生了死鎖。
關于四個線程(A,B,C和D)之間鎖占有和請求的關系圖。像這樣的資料結構就可以被用來檢測死鎖。
那麼當檢測出死鎖時,這些線程該做些什麼呢?
- 一個可行的做法是釋放所有鎖,回退,并且等待一段随機的時間後重試。這個和簡單的加鎖逾時類似,不一樣的是隻有死鎖已經發生了才回退,而不會是因為加鎖的請求逾時了。雖然有回退和等待,但是如果有大量的線程競争同一批鎖,它們還是會重複地死鎖(編者注:原因同逾時類似,不能從根本上減輕競争)。
- 一個更好的方案是給這些線程設定優先級,讓一個(或幾個)線程回退,剩下的線程就像沒發生死鎖一樣繼續保持着它們需要的鎖。如果賦予這些線程的優先級是固定不變的,同一批線程總是會擁有更高的優先級。為避免這個問題,可以在死鎖發生的時候設定随機的優先級。
九.使用wait和notify
1.什麼是多線程協調?
synchronized
解決了
多線程競争
的問題。例如,對于一個
任務管理器
,多個線程同時往隊列中添加任務,可以用
synchronized
加鎖:
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
}
}
但是
synchronized
并沒有解決
多線程協調
的問題。
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
}
public synchronized String getTask() {
while (queue.isEmpty()) {
}
return queue.remove();
}
}
上述代碼看上去沒有問題:
getTask()
内部先判斷隊列
是否為空
,如果為空就
循環等待
,直到另一個線程往隊列中放入了一個任務,while()
循環退出
,就可以傳回隊列的元素了。
- 但實際上
。因為線程在執行while()循環時,已經在while()循環永遠不會退出
,其他線程因為addTask()執行條件也是擷取this鎖,根本無法調用addTask(),線程會在getTask()入口擷取了this鎖
。getTask()中因為死循環而100%占用CPU資源
而我們想要的執行效果是:
- 線程1可以調用
不斷往隊列中添加任務;addTask()
- 線程2可以調用
從隊列中擷取任務。getTask()
如果隊列為空,則getTask()應該等待,直到隊列中至少有一個任務時再傳回。
多線程協調運作的原則就是:
當條件不滿足時,線程進入等待狀态;當條件滿足時,線程被喚醒,已喚醒的線程還需要重新獲得鎖後才能繼續執行任務。
2.使用wait()和notify()解決多線程協調?
對于上述TaskQueue,我們先改造
getTask()
方法,在條件不滿足時,線程進入等待狀态:
public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()) {
// 釋放this鎖:
this.wait();
// 重新擷取this鎖
}
return queue.remove();
}
當一個線程執行到
getTask()
方法内部的while循環時,它必定已經擷取到了
this鎖
,此時,線程執行while條件判斷,
如果條件成立(隊列為空),線程将執行this.wait(),進入等待狀态,且釋放目前占用鎖
- 這裡的關鍵是:
方法必須在目前擷取的鎖對象上調用,這裡擷取的是wait()
,是以調用this鎖
。this.wait()
- 調用
方法後,線程進入wait()
,等待狀态
方法不會傳回,直到将來某個時刻,線程從等待狀态被其他線程喚醒後,wait()方法才會傳回,然後,繼續執行下一條語句。wait()
- 定義在
的一個Object類
,也就是由native方法
。其次,JVM的C代碼實作的
,因為wait()方法調用時,必須在synchronized塊中才能調用wait()方法
,wait()方法傳回後,會釋放線程獲得的鎖
。線程又會重新試圖獲得鎖
當一個線程在
this.wait()等待
時,它就會
釋放this鎖
,進而使得
其他線程能夠在addTask()方法獲得this鎖。
如何讓等待的線程被重新喚醒,然後從wait()方法傳回? 答案是
在相同的鎖對象上調用notify()方法
。我們修改
addTask()
如下:
public synchronized void addTask(String s) {
this.queue.add(s);// 喚醒在this鎖等待的線程
this.notify();
}
注意到在往隊列中
添加任務
後,線程立刻對this鎖對象調用notify()方法,這個方法會喚醒一個
正在等待this鎖的線程
(就是在getTask()中位于this.wait()的線程),
進而使得等待線程從this.wait()方法傳回。
- wait()、notify()方法屬于
中的方法;對于Object中的方法,每個對象都擁有。Object
- wait()方法:使目前線程進入等待狀态并釋放鎖,讓其他線程可以有機會運作,直到接到通知或者被中斷打斷為止。在調用wait()方法之前,線程必須要獲得該對象的對象級鎖;換句話說就是
,如果沒有持有合适的鎖的話,線程将會抛出異常該方法隻能在同步方法或者同步塊中調用
。IllegalArgumentException
如果調用成功後,目前線程則釋放鎖。
- notify()方法:用來
。如果有多個線程則線程排程器任意選出一個線程進行喚醒,使其去競争擷取對象鎖,喚醒處于等待狀态擷取對象鎖的其他線程
但調用notify()的線程并不會馬上就釋放該對象鎖,wait()所在的線程也不能馬上擷取該對象鎖,要程式退出同步塊或者同步方法之後,目前線程才會釋放鎖,wait()所在的線程才可以擷取該對象鎖。
- wait()和notify()持有同一把鎖 ,wait()方法是釋放鎖的;notify()方法不釋放鎖,必須等到所線上程把代碼執行完。
3.完整例子
public class TaskQueueMain {
public static void main(String[] args) throws InterruptedException {
TaskQueue taskQueue = new TaskQueue();
List<Thread> threadList = new ArrayList<Thread>();
//建立5個線程用于從隊列中不斷取任務,如果隊列為空,getTask()就會釋放目前this鎖,進入等待喚醒狀态
for (int i = 0; i < 5; i++) {
Thread thread = new Thread() {
@Override
public void run() {
// 執行task:
while (true) {
try {
String s = taskQueue.getTask();
System.out.println("execute task: " + s);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+":InterruptedException");
return;
}
}
}
};
//啟動線程
thread.start();
//添加目前線程執行個體帶list中
threadList.add(thread);
}
//建立一個線程循環添加10個任務到隊列中,每次添加都會喚醒處理等待狀态的任意一個線程
Thread add = new Thread() {
@Override
public void run() {
// 執行task:
for (int i = 0; i < 10; i++) {
// 放入task:
String str = "t-" + Math.random();
System.out.println("add task: " + str);
taskQueue.addTask(str);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
};
//啟動線程
add.start();
//阻塞main,必須等待目前線程執行完
add.join();
//休眠100毫秒
Thread.sleep(100);
//中斷處于等待狀态的線程
//1.如果線程處于等待狀态,調用目前線程的interrupt()會抛出InterruptedExceptio
// 2.是以,目标線程隻要捕獲到抛出的InterruptedException,就說明有其他線程對其調用了interrupt()方法,通常情況下該線程應該立刻結束運作。
for (Thread thread : threadList) {
thread.interrupt();
}
}
}
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
//喚醒所有等待 this鎖共同競争this鎖
this.notifyAll();
}
public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()) {
//釋放鎖,進入等待狀态
this.wait();
// 擷取鎖,繼續執行
}
return queue.remove();
}
}
執行結果:
重點關注
addTask()
方法,内部調用了
this.notifyAll()
而不是
this.notify()
,使用notifyAll()将喚醒所有目前正在this鎖等待的線程,而notify()隻會喚醒其中一個(具體哪個依賴作業系統,有一定的随機性)。
- 這是因為
。通常來說,可能有多個線程正在getTask()方法内部的wait()中等待,使用notifyAll()将一次性全部喚醒
。有些時候,如果我們的代碼邏輯考慮不周,用notify()會導緻隻喚醒了一個線程,而其他線程可能永遠等待下去醒不過來了。notifyAll()更安全
- 注意到wait()方法傳回時需要重新獲得this鎖。假設目前有3個線程被喚醒,喚醒後,首先要等待執行addTask()的線程結束此方法後,才能釋放this鎖,随後,這3個線程中隻能有一個擷取到this鎖,剩下兩個将繼續等待。
再注意到我們在
while()
循環中調用wait(),而不是
if語句
public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
如果使用if語句實際上是錯誤的,因為線程被喚醒時,需要再次擷取
this鎖
。
多個線程被喚醒後,隻有一個線程能擷取this鎖
,此時,該線程執行
queue.remove()
可以擷取到隊列的元素,然而,剩下的線程如果擷取
this鎖
後執行
queue.remove()
,此刻隊列可能已經沒有任何元素了。 是以,要
始終在while循環中wait(),并且每次被喚醒後拿到this鎖就必須再次判斷隊列是否為空,如果為空則調用this.wait(),釋放鎖進入等待狀态
:
4.小結
- wait和notify用于多線程協調運作:
- 在
可以調用synchronized内部
使線程進入wait()
;等待狀态
- 必須在
上調用wait()方法;已獲得的鎖對象
- 在
可以調用synchronized内部
喚醒其他等待線程;notify()或notifyAll()
- 必須在已獲得的鎖對象上調用notify()或notifyAll()方法;
- 已喚醒的線程還需要重新獲得鎖後才能繼續執行。
十.使用ReentrantLock
1.什麼是ReentrantLock?
- 從
開始,引入了一個進階的處理并發的Java 5
包,它提供了大量更進階的并發功能,能大大簡化多線程程式的編寫。java.util.concurrent
- Java語言直接提供了
用于加鎖,但這種鎖synchronized關鍵字
一是很重,二是擷取時必須一直等待,沒有額外的嘗試機制。
-
包提供的java.util.concurrent.locks
用于替代ReentrantLock
加鎖,synchronized
2.java中使用ReentrantLock
- 傳統的
synchronized
public class Counter {
private int count;
public void add(int n) {
synchronized(this) {
count += n;
}
}
}
- 如果用
替代,可以把代碼改造為:ReentrantLock
public class Counter {
private final Lock lock = new ReentrantLock();
private int count;
public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}
因為
synchronized
是Java語言層面提供的文法,是以我們
不需要考慮異常
,而
ReentrantLock
是Java代碼實作的鎖,我們就必須
先擷取鎖,然後在finally中正确釋放鎖。
- 顧名思義,
,它和synchronized一樣,ReentrantLock是可重入鎖
一個線程可以多次擷取同一個鎖。
和synchronized不同的是,
ReentrantLock可以嘗試擷取鎖
:
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
// ...
} finally {
lock.unlock();
}
}
上述代碼在嘗試擷取鎖的時候,最多等待1秒。如果1秒後仍未擷取到鎖,tryLock()傳回false,程式就可以做一些額外處理,而不是無限等待下去。
- 是以,使用
,線程在tryLock()失敗的時候不會導緻死鎖。ReentrantLock比直接使用synchronized更安全
ReentrantLock的lock(), tryLock(), tryLock(long timeout, TimeUnit unit), lockInterruptibly() 及使用場景示例
3.完整代碼
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantLock {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();
Thread add = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
counter.add(1);
}
}
};
add.start();
add.join();
Thread dec = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
counter.dec(1);
}
}
};
dec.start();
dec.join();
Thread.sleep(100);
System.out.println(counter.getCount());
}
}
class Counter {
private final Lock lock = new ReentrantLock();
private int count;
public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
public void dec(int n) {
lock.lock();
try {
count -= n;
} finally {
lock.unlock();
}
}
public int getCount() {
return count;
}
}
4.小結
- ReentrantLock可以替代synchronized進行同步;
- ReentrantLock
;擷取鎖更安全
- 必須先擷取到鎖,再進入
代碼塊,最後使用try {...}
保證釋放鎖;finally
- 可以使用
嘗試擷取鎖。tryLock()
- 必須先擷取到鎖,再進入
十一.使用ReentrantLock + Condition對象來實作wait和notify的功能
1.如何使用ReentrantLock + Condition對象來實作wait和notify的功能
使用
ReentrantLock
比直接使用
synchronized
更
安全
,可以
替代synchronized進行線程同步
。
synchronized
可以配合
wait和notify
實作線程在條件不滿足時等待,條件滿足時喚醒,用ReentrantLock我們怎麼編寫wait和notify的功能呢?
- 答案是
使用Condition對象來實作wait和notify的功能。
我們仍然以
TaskQueue
為例,把前面用synchronized實作的功能通過ReentrantLock和Condition來實作:
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantLockConditionMain {
public static void main(String[] args) throws InterruptedException {
TaskQueue taskQueue = new TaskQueue();// 聲明任務隊列
List<Thread> threadList = new ArrayList<>();// 聲明線程集合
//建立5個線程用于從隊列中不斷取任務,如果隊列為空,getTask()就會釋放目前this鎖,進入等待喚醒狀态
for (int i = 0; i < 5; i++) {
Thread thread = new Thread() {
@Override
public void run() {
while (true) {
String str = null;
try {
str = taskQueue.getTask();
System.out.println("execute task: " + str);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ":InterruptedException");
return;
}
}
}
};
//啟動線程
thread.start();
//添加目前線程執行個體帶list中
threadList.add(thread);
}
//建立一個線程循環添加10個任務到隊列中,每次添加都會喚醒處理等待狀态的任意一個線程
Thread add = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
// 放入task:
String str = "t-" + Math.random();
System.out.println("add task: " + str);
taskQueue.addTask(str);
try {
Thread.sleep(100);
} catch (InterruptedException e) { }
}
}
};
//啟動線程
add.start();
//阻塞main,必須等待目前線程執行完
add.join();
//休眠100毫秒
Thread.sleep(100);
//中斷處于等待狀态的線程
//1.如果線程處于等待狀态,調用目前線程的interrupt()會抛出InterruptedExceptio
// 2.是以,目标線程隻要捕獲到抛出的InterruptedException,就說明有其他線程對其調用了interrupt()方法,通常情況下該線程應該立刻結束運作。
for (Thread thread : threadList) {
thread.interrupt();
}
}
}
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String str) {
lock.lock();
try {
queue.add(str);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
執行結果
使用
Condition
時,引用的Condition對象必須從
Lock執行個體的newCondition()
傳回,這樣才能獲得一個綁定了Lock執行個體的Condition執行個體。
-
提供的Condition
原理和await()、signal()、signalAll()
的synchronized鎖對象
是一緻的,并且其行為也是一樣的:wait()、notify()、notifyAll()
-
會釋放目前鎖,進入等待狀态;await()
-
會喚醒某個等待線程;signal()
-
會喚醒所有等待線程;signalAll()
- 喚醒線程從await()傳回後需要重新獲得鎖。
-
- 和
類似,tryLock()
可以在await()
,如果還沒有被其他線程通過等待指定時間後
或signal()
,signalAll()喚醒
可以自己醒來:
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他線程喚醒
} else {
// 指定時間内沒有被其他線程喚醒
}
可見,使用
Condition配合Lock
,我們可以
實作更靈活的線程同步
。
2.小結
- Condition可以替代
實作線程同步;synchronized + wait和notify
- Condition對象必須從Lock對象擷取。
十二.使用ReadWriteLock
1.什麼是ReadWriteLock?
前面講到的
ReentrantLock保證了隻有一個線程可以執行臨界區代碼:
public class Counter {
private final Lock lock = new ReentrantLock();
private int[] counts = new int[10];
public void inc(int index) {
lock.lock();
try {
counts[index] += 1;
} finally {
lock.unlock();
}
}
public int[] get() {
lock.lock();
try {
return Arrays.copyOf(counts, counts.length);
} finally {
lock.unlock();
}
}
}
但是有些時候,這種保護有點過頭。 因為我們發現,
任何時刻,隻允許一個線程修改
,也就是
調用inc()方法
是
必須擷取鎖
,但是,
get()方法隻讀取資料,不修改資料
,
它實際上允許多個線程同時調用。
實際上我們想要的是:允許多個線程同時讀,但隻要有一個線程在寫,其他線程就必須等待:
讀 | 寫 | |
---|---|---|
讀 | 允許 | 不允許 |
寫 | 不允許 | 不允許 |
使用
ReadWriteLock
可以解決這個問題,它保證:
- 隻允許一個線程寫入(
);其他線程既不能寫入也不能讀取
- 沒有寫入時,多個線程允許同時讀(提高性能)
2.Java中實作ReadWriteLock?
用
ReadWriteLock
實作這個功能十分容易。我們需要建立一個ReadWriteLock執行個體,然後分别擷取
讀鎖
和
寫鎖
:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class TestReadWriteLockMain {
public static void main(String[] args) throws InterruptedException {
Counter counter = new Counter();// 聲明任務隊列
List<Thread> threadList = new ArrayList<>();// 聲明線程集合
//建立5個線程用于從隊列中不斷取任務,如果隊列為空,getTask()就會釋放目前this鎖,進入等待喚醒狀态
for (int i = 0; i < 5; i++) {
Thread readThread = new Thread() {
@Override
public void run() {
while (true) {
counter.get();
System.out.println(Thread.currentThread().getName() + ":get("+ Arrays.toString(counter.get())+")");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ":InterruptedException");
return;
}
}
}
};
//啟動線程
readThread.start();
//添加目前線程執行個體帶list中
threadList.add(readThread);
}
//建立一個線程循環添加10個任務到隊列中,每次添加都會喚醒處理等待狀态的任意一個線程
Thread incThread = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
counter.inc(i);
System.out.println(Thread.currentThread().getName() + ":inc("+i+")");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
//啟動線程
incThread.start();
//阻塞main,必須等待目前線程執行完
incThread.join();
//休眠100毫秒
Thread.sleep(100);
//中斷處于等待狀态的線程
//1.如果線程處于等待狀态,調用目前線程的interrupt()會抛出InterruptedExceptio
// 2.是以,目标線程隻要捕獲到抛出的InterruptedException,就說明有其他線程對其調用了interrupt()方法,通常情況下該線程應該立刻結束運作。
for (Thread thread : threadList) {
thread.interrupt();
}
}
}
class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];
public void inc(int index) {
wlock.lock(); // 加寫鎖
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 釋放寫鎖
}
}
public int[] get() {
rlock.lock(); // 加讀鎖
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 釋放讀鎖
}
}
}
執行結果
Thread-0:get([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-2:get([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-4:get([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-1:get([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-3:get([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-5:inc(0)
Thread-2:get([1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-4:get([1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-3:get([1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-5:inc(1)
Thread-1:get([1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-0:get([1, 0, 0, 0, 0, 0, 0, 0, 0, 0])
Thread-5:inc(2)
Thread-3:get([1, 1, 1, 0, 0, 0, 0, 0, 0, 0])
Thread-1:get([1, 1, 1, 0, 0, 0, 0, 0, 0, 0])
Thread-4:get([1, 1, 1, 0, 0, 0, 0, 0, 0, 0])
Thread-0:get([1, 1, 1, 0, 0, 0, 0, 0, 0, 0])
Thread-2:get([1, 1, 1, 0, 0, 0, 0, 0, 0, 0])
Thread-5:inc(3)
Thread-3:get([1, 1, 1, 1, 0, 0, 0, 0, 0, 0])
Thread-4:get([1, 1, 1, 1, 0, 0, 0, 0, 0, 0])
Thread-2:get([1, 1, 1, 1, 0, 0, 0, 0, 0, 0])
Thread-1:get([1, 1, 1, 1, 0, 0, 0, 0, 0, 0])
Thread-0:get([1, 1, 1, 1, 0, 0, 0, 0, 0, 0])
Thread-5:inc(4)
Thread-3:get([1, 1, 1, 1, 1, 0, 0, 0, 0, 0])
Thread-1:get([1, 1, 1, 1, 1, 0, 0, 0, 0, 0])
Thread-2:get([1, 1, 1, 1, 1, 0, 0, 0, 0, 0])
Thread-0:get([1, 1, 1, 1, 1, 0, 0, 0, 0, 0])
Thread-4:get([1, 1, 1, 1, 1, 0, 0, 0, 0, 0])
Thread-5:inc(5)
Thread-1:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-2:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-0:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-4:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-3:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-1:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-3:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-5:inc(6)
Thread-4:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-2:get([1, 1, 1, 1, 1, 1, 0, 0, 0, 0])
Thread-0:get([1, 1, 1, 1, 1, 1, 1, 0, 0, 0])
Thread-5:inc(7)
Thread-1:get([1, 1, 1, 1, 1, 1, 1, 1, 0, 0])
Thread-0:get([1, 1, 1, 1, 1, 1, 1, 1, 0, 0])
Thread-2:get([1, 1, 1, 1, 1, 1, 1, 1, 0, 0])
Thread-3:get([1, 1, 1, 1, 1, 1, 1, 1, 0, 0])
Thread-4:get([1, 1, 1, 1, 1, 1, 1, 1, 0, 0])
Thread-5:inc(8)
Thread-2:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 0])
Thread-0:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 0])
Thread-4:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 0])
Thread-1:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 0])
Thread-3:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 0])
Thread-5:inc(9)
Thread-1:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-3:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-0:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-2:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-4:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-4:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-1:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-0:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-3:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-2:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-1:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-4:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-2:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-0:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-3:get([1, 1, 1, 1, 1, 1, 1, 1, 1, 1])
Thread-4:InterruptedException
Thread-2:InterruptedException
Thread-0:InterruptedException
Thread-1:InterruptedException
Thread-3:InterruptedException
- 把
分别用讀寫操作
和讀鎖
來加鎖,在讀取時寫鎖
,這樣就大大提高了并發讀的執行效率。多個線程可以同時獲得讀鎖
- 使用ReadWriteLock時,适用條件是
例如: 一個論壇的文章,回複可以看做寫入操作,它是不頻繁的,但是,浏覽可以看做讀取操作,是非常頻繁的,這種情況就可以使用ReadWriteLock。同一個資料,有大量線程讀取,但僅有少數線程修改。
3.小結
使用ReadWriteLock可以提高
讀取
效率:
- ReadWriteLock
;隻允許一個線程寫入
- ReadWriteLock
;允許多個線程在沒有寫入時同時讀取
- ReadWriteLock
。适合讀多寫少的場景
十三.樂觀鎖和悲觀鎖
1.什麼是悲觀鎖
總是假設最壞的情況,
每次去拿資料的時候都認為别人會修改,是以每次在拿資料的時候都會上鎖,這樣别人想拿這個資料就會阻塞直到它拿到鎖(
共享資源每次隻給一個線程使用,其它線程阻塞,用完後再把資源轉讓給其它線程)。傳統的關系型資料庫裡邊就用到了很多這種鎖機制,比如
行鎖,表鎖等,讀鎖,寫鎖等
,都是在做操作之前先上鎖。Java中
synchronized
和
ReentrantLock
等獨占鎖就是悲觀鎖思想的實作。
2.什麼是樂觀鎖
總是假設最好的情況,
每次去拿資料的時候都認為别人不會修改,是以不會上鎖
,但是在
更新的時候會判斷一下在此期間别人有沒有去更新這個資料
,可以使用
版本号機制
和
CAS算
法實作。樂觀鎖适用于
多讀
的應用類型,這樣可以提高吞吐量,像資料庫提供的類似于
write_condition機制
,其實都是提供的樂觀鎖。在Java中
java.util.concurrent.atomic
包下面的
原子變量類
就是使用了
樂觀鎖
的一種實作方式
CAS
實作的。
3.使用場景:
- 悲觀鎖适合
的場景,先加鎖可以保證寫操作時資料正确。寫操作多
- 樂觀鎖适合
的場景,不加鎖的特點能夠使其讀操作的性能大幅提升。讀操作多
4.什麼是CAS
CAS(compare and swap):當多個線程使用CAS擷取鎖,隻能有一個成功,其他線程傳回失敗,繼續嘗試擷取鎖;
- CAS操作中包含三個參數:
需讀寫的記憶體位置)+V(
(準備用來比較的參數)+A
(準備寫入的新值):若A的參數與V的對應的值相比對,就寫入值B;若不比對,就寫入這個不比對的值而非B;B
5.樂觀鎖常見的兩種實作方式
樂觀鎖一般會使用版本号機制或CAS(Compare-and-Swap,即比較并替換)算法實作。
1.版本号機制
一般是在資料表中加上一個
資料版本号version字段
,表示資料被修改的次數,當資料被修改時,version值會加一。當線程A要更新資料值時,在讀取資料的同時也會讀取version值,在送出更新時,若剛才讀取到的version值為目前資料庫中的version值相等時才更新,否則重試更新操作,直到更新成功。
舉一個簡單的例子: 假設資料庫中帳戶資訊表中有一個
version
字段,目前值為 1 ;而目前帳戶餘額字段(
balance )為 $100
。
- 操作員 A 此時将其讀出
,并從其帳戶餘額中扣除( version=1 )
。$50( $100-$50 )
- 在操作員 A 操作的過程中,操作員B 也讀入此使用者資訊
,并從其帳戶餘額中扣除( version=1 )
。$20 ( $100-$20 )
- 操作員 A 完成了修改工作,将資料版本号加1
,連同帳戶扣除後餘額( version=2 )
,送出至資料庫更新,此時由于送出資料版本大于資料庫記錄目前版本,資料被更新,資料庫記錄( balance=$50 )
更新為 2 。version
- 操作員 B 完成了操作,也将版本号加1
試圖向資料庫送出資料( version=2 )
( balance=$80 )
,但此時比對資料庫記錄版本時發現,操作員 B 送出的資料版本号為 2 ,資料庫記錄目前版本也為 2 ,不滿足 “ 送出版本必須大于記錄目前版本才能執行更新 “ 的樂觀鎖政策,是以,操作員 B 的送出被駁回。
這樣,就避免了操作員 B 用基于
的舊資料修改的結果覆寫操作員A 的操作結果的可能。version=1
2.CAS算法
即
compare and swap(比較與交換)
,是一種有名的無鎖算法。無鎖程式設計,即
不使用鎖的情況下實作多線程之間的變量同步
,也就是在
沒有線程被阻塞的情況下實作變量的同步
,是以也叫
非阻塞同步(Non-blocking Synchronization)
。CAS算法涉及到三個操作數
- 需要讀寫的記憶體值 V
- 進行比較的值 A
- 拟寫入的新值 B
- 當且僅當 V 的值等于 A時,CAS通過原子方式用新值B來更新V的值,否則不會執行任何操作(比較和替換是一個原子操作)。一般情況下是一個
,即不斷的重試。自旋操作(自旋鎖)
Java 多線程之悲觀鎖與樂觀鎖
十四.使用StampedLock
1.什麼是StampedLock
第十二章節講了
ReadWriteLock
可以解決
多線程同時讀
,但隻有
一個線程能寫
的問題
- 但是ReadWriteLock有個
:如果有線程正在讀,寫線程需要等待讀線程釋放鎖後才能擷取寫鎖,即潛在的問題
讀的過程中不允許寫,這是一種悲觀的讀鎖。
要進一步提升并發執行效率,
Java 8
引入了新的
讀寫鎖
:
StampedLock
。
-
相比,改進之處在于:StampedLock和ReadWriteLock
這樣一來,讀的過程中也允許擷取寫鎖後寫入!
,是以需要我們讀的資料就可能不一緻
,這種讀鎖是一種編寫一點額外的代碼來判斷讀的過程中是否有寫入
。樂觀鎖
- 樂觀鎖的意思: 就是
。反過來,悲觀鎖則是樂觀地估計讀的過程中大機率不會有寫入,是以被稱為樂觀鎖
顯然讀的過程中拒絕有寫入,也就是寫入必須等待。
,但一旦有小機率的寫入導緻讀取的資料不一緻,需要能檢測出來,再讀一遍就行。樂觀鎖的并發效率更高
2.Java中使用StampedLock
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.StampedLock;
public class TestStampedLockMain {
public static void main(String[] args) throws InterruptedException {
Point point = new Point();
List<Thread> threadList = new ArrayList<>();// 聲明線程集合
//建立5個線程用于不斷從point讀
for (int i = 0; i < 5; i++) {
Thread readThread = new Thread() {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + ":get("+ point.distanceFromOrigin()+")");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ":InterruptedException");
return;
}
}
}
};
//啟動線程
readThread.start();
//添加目前線程執行個體帶list中
threadList.add(readThread);
}
//建立一個線程不斷往point寫
Thread writeThread = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
point.move(2,2);
System.out.println(Thread.currentThread().getName() + ":move(2,2)=>"+i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
//啟動線程
writeThread.start();
//阻塞main,必須等待目前線程執行完
writeThread.join();
//休眠100毫秒
Thread.sleep(100);
//中斷處于等待狀态的線程
//1.如果線程處于等待狀态,調用目前線程的interrupt()會抛出InterruptedExceptio
// 2.是以,目标線程隻要捕獲到抛出的InterruptedException,就說明有其他線程對其調用了interrupt()方法,通常情況下該線程應該立刻結束運作。
for (Thread thread : threadList) {
thread.interrupt();
}
}
}
class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 擷取寫鎖
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 釋放寫鎖
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 獲得一個樂觀讀鎖,并傳回版本号
// 注意下面兩行代碼不是原子操作
// 假設x,y = (100,200)
double currentX = x;
// 此處已讀取到x=100,但x,y可能被寫線程修改為(300,400)
double currentY = y;
// 此處已讀取到y,如果沒有寫入,讀取是正确的(100,200)
// 如果有寫入,讀取是錯誤的(100,400)
if (!stampedLock.validate(stamp)) { // 檢查樂觀讀鎖後是否有其他寫鎖發生,即校驗版本号是否一緻,不一緻重新讀取x,y的值
stamp = stampedLock.readLock(); // 擷取一個悲觀讀鎖
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 釋放悲觀讀鎖
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
執行結果:
Thread-0:get(0.0)
Thread-2:get(0.0)
Thread-1:get(0.0)
Thread-3:get(0.0)
Thread-4:get(0.0)
Thread-5:move(2,2)=>0
Thread-4:get(2.8284271247461903)
Thread-5:move(2,2)=>1
Thread-1:get(2.8284271247461903)
Thread-2:get(2.8284271247461903)
Thread-3:get(2.8284271247461903)
Thread-0:get(2.8284271247461903)
Thread-5:move(2,2)=>2
Thread-4:get(8.48528137423857)
Thread-3:get(8.48528137423857)
Thread-0:get(8.48528137423857)
Thread-2:get(8.48528137423857)
Thread-1:get(8.48528137423857)
Thread-5:move(2,2)=>3
Thread-4:get(11.313708498984761)
Thread-2:get(11.313708498984761)
Thread-0:get(11.313708498984761)
Thread-1:get(11.313708498984761)
Thread-3:get(11.313708498984761)
Thread-5:move(2,2)=>4
Thread-0:get(14.142135623730951)
Thread-3:get(14.142135623730951)
Thread-4:get(14.142135623730951)
Thread-2:get(14.142135623730951)
Thread-1:get(14.142135623730951)
Thread-5:move(2,2)=>5
Thread-4:get(16.97056274847714)
Thread-3:get(16.97056274847714)
Thread-2:get(16.97056274847714)
Thread-0:get(16.97056274847714)
Thread-1:get(16.97056274847714)
Thread-5:move(2,2)=>6
Thread-4:get(19.79898987322333)
Thread-0:get(19.79898987322333)
Thread-1:get(19.79898987322333)
Thread-3:get(19.79898987322333)
Thread-2:get(19.79898987322333)
Thread-5:move(2,2)=>7
Thread-2:get(22.627416997969522)
Thread-0:get(22.627416997969522)
Thread-3:get(22.627416997969522)
Thread-4:get(22.627416997969522)
Thread-1:get(22.627416997969522)
Thread-5:move(2,2)=>8
Thread-3:get(25.45584412271571)
Thread-1:get(25.45584412271571)
Thread-0:get(25.45584412271571)
Thread-4:get(25.45584412271571)
Thread-2:get(25.45584412271571)
Thread-5:move(2,2)=>9
Thread-3:get(28.284271247461902)
Thread-0:get(28.284271247461902)
Thread-1:get(28.284271247461902)
Thread-2:get(28.284271247461902)
Thread-4:get(28.284271247461902)
Thread-4:get(28.284271247461902)
Thread-2:get(28.284271247461902)
Thread-3:get(28.284271247461902)
Thread-0:get(28.284271247461902)
Thread-1:get(28.284271247461902)
Thread-1:get(28.284271247461902)
Thread-4:get(28.284271247461902)
Thread-3:get(28.284271247461902)
Thread-2:get(28.284271247461902)
Thread-0:get(28.284271247461902)
Thread-2:InterruptedException
Thread-1:InterruptedException
Thread-3:InterruptedException
Thread-4:InterruptedException
Thread-0:InterruptedException
- 和ReadWriteLock相比,寫入的加鎖是完全一樣的,
不同的是讀取。
- 注意到首先我們通過
擷取一個樂觀讀鎖,并傳回版本号。接着進行讀取,讀取完成後,我們通過tryOptimisticRead()
去驗證版本号,如果在validate()
過程中讀取
,沒有寫入
,版本号不變
,我們就可以放心地驗證成功
。如果在繼續後續操作
過程中讀取
,有寫入
,版本号會發生變化
。在失敗的時候,我們再通過驗證将失敗
。由于擷取悲觀讀鎖再次讀取
,程式在絕大部分情況下可以通過寫入的機率不高
,極少數情況下樂觀讀鎖擷取資料
。使用悲觀讀鎖擷取資料
- 可見,StampedLock把
細分為讀鎖
和樂觀讀
,能進一步提升并發效率。悲觀讀
- 但這也是有代價的:一是代碼更加複雜,二是
,StampedLock是不可重入鎖
。不能在一個線程中反複擷取同一個鎖
- 可見,StampedLock把
StampedLock還提供了更複雜的
将悲觀讀鎖更新為寫鎖的功能
,它主要使用在
if-then-update
的場景:即
先讀,如果讀的資料滿足條件,就傳回,如果讀的資料不滿足條件,再嘗試寫。
3.小結
- StampedLock提供了
,可取代ReadWriteLock以進一步提升并發性能;樂觀讀鎖
- StampedLock是不可重入鎖。
十五.使用Concurrent集合
1.java.util.concurrent下的并發集合
在
前面十一章
已經通過
ReentrantLock和Condition
實作了一個BlockingQueue:
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TestReentrantLockConditionMain {
public static void main(String[] args) throws InterruptedException {
TaskQueue taskQueue = new TaskQueue();// 聲明任務隊列
List<Thread> threadList = new ArrayList<>();// 聲明線程集合
//建立5個線程用于從隊列中不斷取任務,如果隊列為空,getTask()就會釋放目前this鎖,進入等待喚醒狀态
for (int i = 0; i < 5; i++) {
Thread thread = new Thread() {
@Override
public void run() {
while (true) {
String str = null;
try {
str = taskQueue.getTask();
System.out.println("execute task: " + str);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + ":InterruptedException");
return;
}
}
}
};
//啟動線程
thread.start();
//添加目前線程執行個體帶list中
threadList.add(thread);
}
//建立一個線程循環添加10個任務到隊列中,每次添加都會喚醒處理等待狀态的任意一個線程
Thread add = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
// 放入task:
String str = "t-" + Math.random();
System.out.println("add task: " + str);
taskQueue.addTask(str);
try {
Thread.sleep(100);
} catch (InterruptedException e) { }
}
}
};
//啟動線程
add.start();
//阻塞main,必須等待目前線程執行完
add.join();
//休眠100毫秒
Thread.sleep(100);
//中斷處于等待狀态的線程
//1.如果線程處于等待狀态,調用目前線程的interrupt()會抛出InterruptedExceptio
// 2.是以,目标線程隻要捕獲到抛出的InterruptedException,就說明有其他線程對其調用了interrupt()方法,通常情況下該線程應該立刻結束運作。
for (Thread thread : threadList) {
thread.interrupt();
}
}
}
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String str) {
lock.lock();
try {
queue.add(str);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
執行結果:
BlockingQueue(阻塞隊列)
的意思就是說:當一個線程調用這個
TaskQueue
的
getTask()
方法時,該方法内部可能會
讓線程變成等待狀态
,
直到隊列條件滿足不為空
,
線程被喚醒後,getTask()方法才會傳回
。
- 因為
非常有用,是以我們不必自己編寫,可 直接使用Java标準庫的BlockingQueue
包提供的java.util.concurrent
:ArrayBlockingQueue。線程安全的集合
interface | non-thread-safe | thread-safe |
---|---|---|
List | ArrayList | CopyOnWriteArrayList |
Map | HashMap | ConcurrentHashMap |
Set | HashSet / TreeSet | CopyOnWriteArraySet |
Queue | ArrayDeque / LinkedList | ArrayBlockingQueue / LinkedBlockingQueue |
Deque | ArrayDeque / LinkedList | LinkedBlockingDeque |
使用這些并發集合
與使用非線程安全的集合類完全相同
。我們以ConcurrentHashMap為例:
Map<String, String> map = ConcurrentHashMap<>();
// 在不同的線程讀寫:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");
因為
所有的同步和加鎖的邏輯都在集合内部實作
,對外部調用者來說,
隻需要正常按接口引用,其他代碼和原來的非線程安全代碼完全一樣
。即當我們需要多線程通路時,把:
Map<String, String> map = HashMap<>();
//改為
Map<String, String> map = ConcurrentHashMap<>();
java.util.Collections工具類
還提供了一個舊的線程安全集合轉換器處理List/Set/Map
文法為 Collections.synchronizedXXX(Collection c)
Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);
- 它實際上是用一個包裝類包裝了非線程安全的Map,然後對所有
都用讀寫方法
,這樣獲得的synchronized加鎖
比線程安全集合的性能
要低很多,是以不推薦使用。java.util.concurrent集合
2.小結
- 使用
包提供的java.util.concurrent
可以大大簡化多線程程式設計:線程安全的并發集合
- 多線程同時讀寫并發集合是安全的;
- 盡量使用Java标準庫提供的并發集合,避免自己編寫同步代碼。
Java并發集合類
Java并發集合類
十六.使用Atomic(原子類)
1.什麼是原子類?
Atomic 翻譯成中文是原子的意思。在化學上,我們知道
原子是構成一般物質的最小機關,在化學反應中是不可分割的。
在我們這裡 Atomic是指一個
操作是不可中斷
的。即使是在
多個線程一起執行的時候,一個操作一旦開始,就不會被其他線程幹擾。
是以,所謂原子類說簡單點就是具有原子/原子操作特征的類。
Java的
java.util.concurrent包
除了提供
底層鎖
、
并發集合
外,還提供了一組
原子操作的封裝類
,它們位于
java.util.concurrent.atomic
包。
我們以
AtomicInteger
為例,它提供的主要操作有:
- 增加值并傳回新值:
int addAndGet(int delta)
- 加1後傳回新值:
int incrementAndGet()
- 擷取目前值:
int get()
- 用CAS方式設定:
int compareAndSet(int expect, int update)
Atomic類是通過
無鎖(lock-free)的方式實作的線程安全(thread-safe)通路
。它的主要原理是利用了
CAS
:Compare and Set。
如果我們自己通過CAS編寫incrementAndGet(),它大概長這樣:
public int incrementAndGet(AtomicInteger var) {
int prev, next;
do {
prev = var.get();
next = prev + 1;
} while ( ! var.compareAndSet(prev, next));
return prev;
}
CAS是指,在這個操作中,
如果AtomicInteger的目前值是prev
,那麼就
更新為next
,傳回
true
。如果
AtomicInteger的目前值不是prev,就什麼也不幹
,傳回
false
。通過
CAS操作
并
配合do ... while循環
,
即使其他線程修改了AtomicInteger的值,最終的結果也是正确的。
我們利用AtomicLong可以編寫一個多線程安全的全局唯一ID生成器:
class IdGenerator {
AtomicLong var = new AtomicLong(0);
public long getNextId() {
return var.incrementAndGet();
}
}
- 通常情況下,我們并不需要直接用
循環調用do ... while
實作compareAndSet
,而是用複雜的并發操作
這樣的封裝好的方法,是以,使用起來非常簡單。incrementAndGet()
- 在高度競争的情況下,還可以使用Java 8提供的
。LongAdder和LongAccumulator
2.關于原子類個數說明
在
JDK7包括7之前
,
java原子類有12個
,圖檔如下,有些資料說有13個,多出來的是
AtomicBooleanArray
類,可是我在JDK8之前的源碼裡并沒有發現有這個類,當然我也沒去8以上的版本去看,是以這裡不确定這個類到底在哪個版本中存在。
在JDK8時出現了4個原子操作類,分别是如下圖檔所示
3.原子類的分類:
- 原子更新基本資料類型
- 原子更新數組類型
- 原子更新抽象資料類型
- 原子更新字段
并發包的原子類都存放在
java.util.concurrent
下,如下圖所示。
java.util.concurrent.atomic
![]()
【Java基礎】多線程從入門到掌握
1.原子更新基本類型類
- AtomicBoolean: 原子更新布爾類型。
- AtomicInteger: 原子更新整型。
- AtomicLong: 原子更新長整型。
以上3個類提供的方法幾乎一模一樣,以AtomicInteger為例進行詳解,AtomicIngeter的常用方法如下:
- int addAndGet(int delta): 以原子的方式将輸入的數值與執行個體中的值相加,并傳回結果。
- boolean compareAndSet(int expect, int update): 如果輸入的值等于預期值,則以原子方式将該值設定為輸入的值。
- int getAndIncrement(): 以原子的方式将目前值加 1,注意,這裡傳回的是自增前的值,也就是舊值。
- void lazySet(int newValue): 最終會設定成newValue,使用lazySet設定值後,可能導緻其他線程在之後的一小段時間内還是可以讀到舊的值。
- int getAndSet(int newValue): 以原子的方式設定為newValue,并傳回舊值。
代碼示例
static AtomicInteger ai =new AtomicInteger(1);
public static void main(String[] args) {
System.out.println(ai.getAndIncrement());
System.out.println(ai.get());
}
2.原子更新數組
- AtomicIntegerArray: 原子更新整型數組裡的元素。
- AtomicLongArray: 原子更新長整型數組裡的元素。
- AtomicReferenceArray: 原子更新引用類型數組裡的元素。
三個類的最常用的方法是如下兩個方法:
- get(int index):擷取索引為index的元素值。
- compareAndSet(int i, int expect, int update): 如果目前值等于預期值,則以原子方式将數組位置 i 的元素設定為update值。
代碼示例
//下面以 AtomicReferenceArray 舉例如下
static int[] value =new int[]{1,2};
static AtomicIntegerArray ai =new AtomicIntegerArray(value);
public static void main(String[] args) {
ai.getAndSet(0,2);
System.out.println(ai.get(0));
System.out.println(value[0]);
}
3.原子更新引用類型
原子更新基本類型的AtomicInteger,隻能更新一個值,如果更新多個值,比如更新一個對象裡的值,那麼就要用
原子更新引用類型提供的類
,Atomic包提供了以下三個類:
- AtomicReference: 原子更新引用類型。
- AtomicReferenceFieldUpdater: 原子更新引用類型的字段。
- AtomicMarkableReferce: 原子更新帶有标記位的引用類型,可以使用構造方法更新一個布爾類型的标記位和引用類型。
代碼示例
public static AtomicReference<User> ai = new AtomicReference<User>();
public static void main(String[] args) {
User u1 = new User("pangHu", 18);
ai.set(u1);
User u2 = new User("piKaQiu", 15);
ai.compareAndSet(u1, u2);
System.out.println(ai.get().getAge() + ai.get().getName());
}
static class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
//輸出結果:piKaQiu 15
4.原子更新字段類
如果需要原子的更新類裡某個字段時,需要用到原子更新字段類,Atomic包提供了3個類進行原子字段更新:
- AtomicIntegerFieldUpdater: 原子更新整型的字段的更新器。
- AtomicLongFieldUpdater: 原子更新長整型字段的更新器。
- AtomicStampedFieldUpdater: 原子更新帶有版本号的引用類型。
代碼示例
//建立原子更新器,并設定需要更新的對象類和對象的屬性
private static AtomicIntegerFieldUpdater<User> ai = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
public static void main(String[] args) {
User u1 = new User("pangHu", 18);
//原子更新年齡,+1
System.out.println(ai.getAndIncrement(u1));
System.out.println(u1.getAge());
}
static class User {
private String name;
public volatile int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
//輸出結果
//18
//19
要想原子地更新字段類需要兩步。
- 第一步,因為原子更新字段類都是抽象類,每次使用的時候必須使用
建立一個靜态方法newUpdater()
,并且需要更新器
。設定想要更新的類和屬性
- 第二步,
必須使用更新類的字段
修飾。public volatile
5.JDK8新增原子類簡介
- DoubleAccumulator
- LongAccumulator
- DoubleAdder
- LongAdder
下面以 LongAdder 為例介紹一下,并列出使用注意事項
這些類對應把
AtomicLong
等類的改進。比如
LongAccumulator 高并發環境下比 AtomicLong 更高效。
-
在低并發環境下,Atomic、Adder
。但在高并發環境下,兩者性能很相似
Adder 有着明顯更高的吞吐量,但是有着更高的空間複雜度。
- LongAdder其實是LongAccumulator的一個特例,調用LongAdder相當使用下面的方式調LongAccumulator。
- sum() 方法在沒有并發的情況下調用,如果在并發情況下使用會存在計數不準,下面有代碼為例。
-
,雖然 LongAdder 的 add() 方法可以原子性操作,但是并沒有使用 Unsafe 的CAS算法,隻是使用了CAS的思想。LongAdder不可以代替AtomicLong
-
,構造函數其中LongAccumulator,LongAccumulator提供了比LongAdder更強大的功能
一個雙目運算器接口,根據輸入的兩個參數傳回一個計算值,accumulatorFunction
則是LongAccumulator累加器的初始identity
如圖
LongAdder
則是
内部維護多個變量
,每個變量初始化都0,在同等并發量的情況下,争奪單個變量的線程量會減少這是變相的減少了争奪共享資源的并發量,另外多個線程在争奪同一個原子變量時候如果失敗并不是自旋CAS重試,而是嘗試擷取其他原子變量的鎖,最後擷取目前值時候是把所有變量的值累加後傳回的。
//構造函數
LongAdder()
//建立初始和為零的新加法器。
//方法摘要
void add(long x) //添加給定的值。
void decrement() //相當于add(-1)。
double doubleValue() //在擴充原始轉換之後傳回sum()as double。
float floatValue() //在擴充原始轉換之後傳回sum()as float。
void increment() //相當于add(1)。
int intValue() //傳回sum()作為int一個基本收縮轉換之後。
long longValue() //相當于sum()。
void reset() //重置将總和保持為零的變量。
long sum() //傳回目前的總和。
long sumThenReset() //等同于sum()後面的效果reset()。
String toString() //傳回。的字元串表示形式sum()。
4.使用java.util.concurrent.atomic提供的原子操作可以簡化多線程程式設計:
- 原子操作實作了無鎖的線程安全;
- 适用于計數器,累加器等
十七.使用線程池
1.什麼是線程池?
Java語言雖然内置了
多線程支援
,啟動一個新線程非常友善,但建立線程需要作業系統資源(線程資源,棧空間等),頻繁建立和銷毀大量線程需要消耗大量時間。
如果可以複用一組線程:
那麼我們就可以
把很多小任務讓一組線程來執行
,
而不是一個任務對應一個新線程
。這種能接收大量小任務并進行分發處理的就是
線程池
。
- 簡單地說,
,沒有任務的時候,這些線程都處于線程池内部維護了若幹個線程
。 **如果有新任務,就等待狀态
。如果配置設定一個空閑線程執行
線程都處于所有
,忙碌狀态
,要麼新任務要麼放入隊列等待
。增加一個新線程進行處理
2.Java中使用線程池
Java标準庫提供了
ExecutorService接口
表示線程池,它的典型用法如下:
// 建立固定大小的線程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 送出任務:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);
因為
ExecutorService
隻是接口,Java标準庫提供的幾個常用實作類有:
- FixedThreadPool:線程數固定的線程池;
- CachedThreadPool:線程數根據任務動态調整的線程池;
- SingleThreadExecutor:僅單線程執行的線程池。
FixedThreadPool
建立這些線程池的方法都被封裝到
Executors
這個類中。我們以
FixedThreadPool
為例,看看線程池的執行邏輯:
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
// 建立一個固定大小的線程池:
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i = 0; i < 6; i++) {
es.submit(new Task("" + i));
}
// 關閉線程池:
es.shutdown();
}
}
class Task implements Runnable {
private final String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("start task " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("end task " + name);
}
}
一次性放入6個任務
,由于線程池隻有固定的4個線程,是以,
前4個任務會同時執行
,
等到有線程空閑後,才會執行後面的兩個任務。
線程池在
程式結束的時候要關閉
。
-
方法關閉線程池的時候,shutdown()
。它會等待正在執行的任務先完成,然後再關閉
-
shutdownNow()
會立刻停止正在執行的任務
-
則會等待指定的時間讓線程池關閉。awaitTermination()
CachedThreadPool
如果我們把線程池改為
CachedThreadPool
,由于這個線程池的實作
會根據任務數量動态調整線程池的大小,是以6個任務可一次性全部同時執行
如果我們想把線程池的大小限制在4~10個之間動态調整怎麼辦?我們
看Executors.newCachedThreadPool()方法的源碼
:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
想建立
指定動态範圍
的線程池可以這麼寫:
int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
ScheduledThreadPool
還有一種任務需要
定期反複執行
,例如:每秒重新整理證券價格。
這種任務
本身固定,需要反複執行的
,可以使用
ScheduledThreadPool
。放入ScheduledThreadPool的任務可以
定期反複執行。
建立一個
ScheduledThreadPool
仍然是通過
Executors
類:
我們可以送出一次性任務,它會在
指定延遲後隻執行一次:
// 1秒後執行一次性任務:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
如果任務以
固定的每3秒執行
,我們可以這樣寫:
// 5秒後開始執行定時任務,每3秒執行一次:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 5, 3, TimeUnit.SECONDS);
如果任務以
固定的3秒為間隔執行
,我們可以這樣寫:
// 3秒後開始執行定時任務,以3秒為間隔執行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
注意
FixedRate
和
FixedDelay
的差別
-
是指任務總是以FixedRate
:固定時間間隔觸發,不管任務執行多長時間
【Java基礎】多線程從入門到掌握 -
是指,FixedDelay
:上一次任務執行完畢後,等待固定的時間間隔,再執行下一次任務
【Java基礎】多線程從入門到掌握
是以,使用
ScheduledThreadPool
時,我們要根據需要選擇
執行一次
、
FixedRate
執行還是
FixedDelay
執行。
細心的童鞋還可以思考下面的問題:
- 在
模式下,假設FixedRate
?每秒觸發,如果某次任務執行時間超過1秒,後續任務會不會并發執行
- 如果
?任務抛出了異常,後續任務是否繼續執行
Java标準庫還提供了一個
java.util.Timer
類,這個類也可以
定期執行任務
,但是,
一個Timer會對應一個Thread
,是以,
一個Timer隻能定期執行一個任務
,
多個定時任務必須啟動多個Timer
,
- 一個
l就可以ScheduledThreadPoo
,是以,我們完全可以排程多個定時任務
r。用ScheduledThreadPool取代舊的Time
3.小結
JDK提供了ExecutorService實作了線程池功能:
- 線程池内部維護一組線程,可以
;高效執行大量小任務
-
Executors
提供了
靜态方法
建立不同類型的
ExecutorService
;
-
;必須調用shutdown()關閉ExecutorService
-
。ScheduledThreadPool可以定期排程多個任務
Java線程學習體系
Java 中的幾種線程池,你之前用對了嗎
java中的線程池有哪些,分别有什麼作用?
最詳細的Java線程池原了解析
java線程池,阿裡為什麼不允許使用Executors?
java線程池詳解
Java線程池詳解
十八.使用Future
1.什麼是Future?
class Task implements Runnable {
public String result;
public void run() {
this.result = longTimeCalculation();
}
}
Runnable接口
有個問題,它的方法沒有傳回值。如果
任務需要一個傳回結果
,那麼隻能
儲存到變量
,還要
提供額外的方法讀取
,非常不便。是以,Java标準庫還提供了一個
Callable接口
,和Runnable接口比,它多了一個
傳回值
:并且Callable接口是一個
泛型接口
,可以
指定傳回類型的結果
。
class Task implements Callable<String> {
public String call() throws Exception {
return longTimeCalculation();
}
}
2.Java中使用Future?
現在的問題是,
如何獲得異步執行的結果?
如果仔細看
ExecutorService.submit()方法
,可以看到,它傳回了一個
Future類型
,一個
Future類型
的執行個體代表
一個未來能擷取結果的對象
:
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定義任務:
Callable<String> task = new Task();
// 送出任務并獲得Future:
Future<String> future = executor.submit(task);
// 從Future擷取異步執行傳回的結果:
String result = future.get(); // 可能阻塞
- 當我們送出一個
後,我們會同時Callable任務
,然後,我們獲得一個Future對象
,就可以在主線程某個時刻調用Future對象的get()方法
。獲得異步執行的結果
- 在
,如果調用get()時
,我們就異步任務已經完成
。如果直接獲得結果
,那麼get()會阻塞,直到任務完成後才傳回結果。異步任務還沒有完成
一個
Future<V>接口
表示一個未來可能會傳回的結果,它定義的方法有:
-
:擷取結果(可能會等待)get()
-
:擷取結果,但隻等待指定的時間;get(long timeout, TimeUnit unit)
-
cancel(boolean mayInterruptIfRunning)
:取消目前任務;
-
:判斷任務是否已完成。isDone()
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void TestFutureMain(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(4);
Future<BigDecimal> future = es.submit(new Task("601857"));
System.out.println(future.get());
es.shutdown();
}
}
class Task implements Callable<BigDecimal> {
public Task(String code) {
}
@Override
public BigDecimal call() throws Exception {
Thread.sleep(1000);
double d = 5 + Math.random() * 20;
return new BigDecimal(d).setScale(2, RoundingMode.DOWN);
}
}
3.小結
- 對
送出一個線程池
,可以獲得一個Callable任務
;Future對象
- 可以用Future
。在将來某個時刻擷取結果
十九. 使用CompletableFuture
1.什麼是CompletableFuture?
- 使用
獲得Future
時,要麼異步執行結果
,要麼調用阻塞方法get()
,這兩種方法都不是很好,因為輪詢看isDone()是否為true
。主線程也會被迫等待
- 從
開始引入了Java 8
,它針對Future做了改進,可以CompletableFuture
,當傳入回調對象
或者異步任務完成
,發生異常時
自動調用回調對象的回調方法。
以擷取股票價格為例,看看如何使用CompletableFuture
import java.util.concurrent.CompletableFuture;
public class TestCompletableFutureMain1{
public static void main(String[] args) throws Exception {
// 建立異步執行任務:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(TestCompletableFutureMain1::fetchPrice);
// 如果執行成功:
cf.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 如果執行異常:
cf.exceptionally((e) -> {
e.printStackTrace();
return null;
});
// 主線程不要立刻結束,否則CompletableFuture預設使用的線程池會立刻關閉:
Thread.sleep(2000);
}
static Double fetchPrice() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 + Math.random() * 20;
}
}
2.Java中使用CompletableFuture?
- 建立一個
是通過CompletableFuture.supplyAsync()CompletableFuture
需要一個實作了Supplier接口的對象:實作的,它
public interface Supplier<T> {
T get();
}
這裡我們用
lambda文法
簡化了一下,直接傳入
TestCompletableFutureMain1::fetchPrice
,因為
TestCompletableFutureMain1.fetchPrice()靜态方法
的簽名符合
Supplier接口的定義
(除了方法名外)。
- 緊接着,
已經被CompletableFuture
,我們需要定義的是CompletableFuture完成時和異常時需要回調的執行個體。送出給預設的線程池執行了
完成時
,
CompletableFuture
會
調用Consumer對象
:
public interface Consumer<T> {
void accept(T t);
}
異常時
,
CompletableFuture
會調用
Function對象
:
public interface Function<T, R> {
R apply(T t);
}
!!! 這裡我們都用lambda文法簡化了代碼。
*可見CompletableFuture的優點是:
-
,會自動回調某個對象的方法;異步任務結束時
-
,會自動回調某個對象的方法;異步任務出錯時
-
,主線程設定好回調後
。不再關心異步任務的執行
3.CompletableFuture相比Future的優勢
如果隻是實作了
異步回調機制
,我們還看不出CompletableFuture相比Future的優勢。CompletableFuture更強大的功能是,
多個
CompletableFuture可以
串行執行
例如: 定義兩個CompletableFuture,
第一個CompletableFuture根據證券名稱查詢證券代碼,
第二個CompletableFuture根據證券代碼查詢證券價格,這兩個CompletableFuture實作串行操作如下:
import java.util.concurrent.CompletableFuture;
public class TestCompletableFutureMain2 {
public static void main(String[] args) throws Exception {
// 第一個任務:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中國石油");
});
// cfQuery成功後繼續執行下一個任務:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功後列印結果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主線程不要立刻結束,否則CompletableFuture預設使用的線程池會立刻關閉:
Thread.sleep(2000);
}
static String queryCode(String name) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
除了串行執行外,
多個
CompletableFuture還可以
并行執行
。例如,我們考慮這樣的場景:
同時從新浪和網易查詢證券代碼,
隻要任意一個傳回結果,就進行下一步查詢價格
,
查詢價格也同時從新浪和網易查詢,隻要任意一個傳回結果,就完成操作
:
import java.util.concurrent.CompletableFuture;
public class TestCompletableFutureMain3 {
public static void main(String[] args) throws Exception {
// 兩個CompletableFuture執行異步查詢:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中國石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中國石油", "https://money.163.com/code/");
});
// 用anyOf合并為一個新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 兩個CompletableFuture執行異步查詢:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// 用anyOf合并為一個新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最終結果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主線程不要立刻結束,否則CompletableFuture預設使用的線程池會立刻關閉:
Thread.sleep(2000);
}
static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
上述邏輯實作的異步查詢規則實際上是:
除了
anyOf()
可以實作“任意個CompletableFuture隻要一個成功”,
allOf()
可以實作“所有CompletableFuture都必須成功”,這些組合操作可以實作非常複雜的異步流程控制。
最後我們注意CompletableFuture的
命名規則
:
-
:表示該方法将繼續在已有的線程中執行;xxx()
-
:表示将異步線上程池中執行。xxxAsync()
4.小結
CompletableFuture可以指定異步處理流程:
-
處理正常結果;thenAccept()
-
處理異常結果;exceptional()
-
用于串行化另一個CompletableFuture;thenApplyAsync()
-
用于并行化多個CompletableFuture。anyOf()和allOf()
二十.使用ForkJoin
1.什麼是ForkJoin
Java 7開始引入了一種新的Fork/Join線程池,它可以執行一種特殊的任務:把一個大任務拆成多個小任務
并行執行
。
我們舉個例子:如果要計算一個超大數組的和,最簡單的做法是
用一個循環在一個線程内完成
:
還有一種方法,
可以把數組拆成兩部分,分别計算,最後加起來就是最終結果,這樣可以用兩個線程并行執行:
如果
拆成兩部分還是很大,我們還可以繼續拆,用4個線程并行執行
:
這就是Fork/Join任務的原理:判斷一個任務是否足夠小,如果就
直接計算
,否則就
分拆成幾個小任務分别計算
。這個過程可以反複
“裂變”
成一系列小任務。
2.Java中使用Fork/Join
使用Fork/Join對大資料進行并行求和:
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class TestForkJoinMain {
static Random random = new Random(0);
public static void main(String[] args) throws Exception {
// 建立2000個随機數組成的數組:
long[] array = new long[2000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
static long random() {
return random.nextInt(10000);
}
}
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 500;
private static final long serialVersionUID = 3426594844235690937L;
long[] array;
int start;
int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任務足夠小,直接計算:
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
// 故意放慢計算速度:
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
return sum;
}
// 任務太大,一分為二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}
執行結果
觀察上述代碼的執行過程,一個大的計算任務
0~2000
首先分裂為兩個小任務
0~1000
和
1000~2000
,這兩個小任務仍然太大,繼續分裂為更小的
0~500
,
500~1000
,
1000~1500
,
1500~200
0,最後,計算結果被依次合并,得到最終結果。
是以,核心代碼
SumTask
繼承自
RecursiveTask
,在
compute()方法
中,關鍵是如何
“分裂”
出子任務并且送出子任務:
class SumTask extends RecursiveTask<Long> {
protected Long compute() {
// “分裂”子任務:
SumTask subtask1 = new SumTask(...);
SumTask subtask2 = new SumTask(...);
// invokeAll會并行運作兩個子任務:
invokeAll(subtask1, subtask2);
// 獲得子任務的結果:
Long result1 = fork1.join();
Long result2 = fork2.join();
// 彙總結果:
return result1 + result2;
}
}
Fork/Join線程池
在Java标準庫中就有應用。Java标準庫提供的
java.util.Arrays.parallelSort(array)
可以進行并行排序,它的原理就是内部通過Fork/Join對
大數組分拆進行并行排序
,在
多核CPU
上就可以大大提高排序的速度。
3.小結
-
是一種基于Fork/Join
的算法:通過“分治”
務,分解任
,最後并行執行
得到最終結果。合并結果
-
可以把ForkJoinPool線程池
,任務類必須一個大任務分拆成小任務并行執行
自繼承
。RecursiveTask或RecursiveAction
- 使用
可以進行Fork/Join模式
以提高效率。并行計算
二十一.使用ThreadLocal
1.什麼是ThreadLocal?
多線程是Java實作多任務的基礎,
Thread對象
代表一個線程,我們可以在代碼中調用
Thread.currentThread()擷取目前線程
。例如,列印日志時,可以同時列印出目前線程的名字:
public class Main {
public static void main(String[] args) throws Exception {
log("start main...");
new Thread(() -> {
log("run task...");
}).start();
new Thread(() -> {
log("print...");
}).start();
log("end main.");
}
static void log(String s) {
System.out.println(Thread.currentThread().getName() + ": " + s);
}
}
對于
多任務
,Java标準庫提供的
線程池
可以友善地執行這些任務,同時
複用線程
。Web應用程式就是典型的
多任務應用
,每個使用者請求頁面時,我們都會建立一個任務,類似:
public void process(User user) {
checkPermission();
doWork();
saveStatus();
sendResponse();
}
然後,通過線程池去執行這些任務。
觀察
process()
方法,它内部需要調用若幹其他方法,同時,我們遇到一個問題:如何在一個線程内傳遞狀态?
- process()方法需要傳遞的狀态就是
。有的童鞋會想,簡單地傳入User就可以了:User執行個體
public void process(User user) {
checkPermission(user);
doWork(user);
saveStatus(user);
sendResponse(user);
}
但是往往一個方法又會調用其他很多方法,這樣會導緻
User
傳遞到所有地方:
void doWork(User user) {
queryStatus(user);
checkStatus();
setNewStatus(user);
log();
}
這種在
一個線程
中,
橫跨若幹方法調用
,需
要傳遞的對象
,我們通常稱之為
上下文(Context)
,它是一種狀态,可以是
使用者身份
、
任務資訊
等。
- 給
增加一個每個方法
非常麻煩,而且有些時候,如果調用鍊有無法修改源碼的第三方庫,User對象就傳不進去了。context參數
Java标準庫提供了一個特殊的
ThreadLocal
,它可以在一個線程中傳遞同一個對象。
2.Java中使用ThreadLocal
- ThreadLocal執行個體通常總是以
如下:靜态字段初始化
它的典型使用方式如下:
void processUser(user) {
try {
threadLocalUser.set(user);
step1();
step2();
} finally {
threadLocalUser.remove();
}
}
通過設定一個
User執行個體
關聯到
ThreadLocal
中, 在
移除之前
,
所有方法都可以随時擷取到該User執行個體
:
void step1() {
User u = threadLocalUser.get();
log();
printUser();
}
void log() {
User u = threadLocalUser.get();
println(u.name);
}
void step2() {
User u = threadLocalUser.get();
checkUser(u.id);
}
- 注意到 普通的方法調用一定是
,是以,同一個線程執行的
、step1()
以及step2()
方法内,log()
擷取的threadLocalUser.get()
是同一個執行個體。User對象
實際上,可以把
ThreadLocal
看成一個
全局Map<Thread, Object>
:每個線程擷取ThreadLocal變量時,總是使用
Thread自身作為key
:
- 是以,ThreadLocal相當于
,各個線程的ThreadLocal關聯的執行個體互不幹擾。給每個線程都開辟了一個獨立的存儲空間
最後,特别注意ThreadLocal一定要在
finally
中清除:
try {
threadLocalUser.set(user);
...
} finally {
threadLocalUser.remove();
}
- 這是因為
,目前線程執行完相關代碼後
,如果ThreadLocal沒有被清除,很可能會被重新放入線程池中
該線程執行其他代碼時,會把上一次的狀态帶進去
為了保證能釋放ThreadLocal關聯的執行個體,我們可以通過
AutoCloseable
接口配合
try-catch-resource
,
讓編譯器自動為我們關閉
。
- 例如,一個儲存了目前使用者名的ThreadLocal可以封裝為一個UserContext對象:
public class UserContext implements AutoCloseable {
static final ThreadLocal<String> ctx = new ThreadLocal<>();
public UserContext(String user) {
ctx.set(user);
}
public static String currentUser() {
return ctx.get();
}
@Override
public void close() {
ctx.remove();
}
}
使用的時候,我們借助
try (resource) {...}
結構,可以這麼寫:
try (UserContext ctx = new UserContext("Bob")) {
// 可任意調用UserContext.currentUser():
String currentUser = UserContext.currentUser();
} // 在此自動調用UserContext.close()方法釋放ThreadLocal關聯對象
這樣就在
UserContext
中完全封裝了
ThreadLocal
,外部代碼在
try (resource) {...}内部
可以随時調用
UserContext.currentUser()
擷取
目前線程綁定的使用者名
。
執行個體代碼
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TestThreadLocalMain {
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newFixedThreadPool(3);
String[] users = new String[]{"Bob", "Alice", "Tim", "Mike", "Lily", "Jack", "Bush"};
for (String user : users) {
es.submit(new Task(user));
}
es.awaitTermination(3, TimeUnit.SECONDS);
es.shutdown();
}
}
class UserContext implements AutoCloseable {
private static final ThreadLocal<String> userThreadLocal = new ThreadLocal<>();
public UserContext(String name) {
userThreadLocal.set(name);
System.out.printf("[%s] init user %s...\n", Thread.currentThread().getName(), UserContext.getCurrentUser());
}
public static String getCurrentUser() {
return userThreadLocal.get();
}
@Override
public void close() {
System.out.printf("[%s] cleanup for user %s...\n", Thread.currentThread().getName(),
UserContext.getCurrentUser());
userThreadLocal.remove();
}
}
class Task implements Runnable {
final String username;
public Task(String username) {
this.username = username;
}
@Override
public void run() {
try (UserContext ctx = new UserContext(this.username)) {
new Task1().process();
new Task2().process();
new Task3().process();
}
}
}
class Task1 {
public void process() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
System.out.printf("[%s] check user %s...\n", Thread.currentThread().getName(), UserContext.getCurrentUser());
}
}
class Task2 {
public void process() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
System.out.printf("[%s] %s registered ok.\n", Thread.currentThread().getName(), UserContext.getCurrentUser());
}
}
class Task3 {
public void process() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
System.out.printf("[%s] work of %s has done.\n", Thread.currentThread().getName(),
UserContext.getCurrentUser());
}
}
執行結果
[pool-1-thread-1] init user Bob…
[pool-1-thread-3] init user Tim…
[pool-1-thread-2] init user Alice…
[pool-1-thread-3] check user Tim…
[pool-1-thread-2] check user Alice…
[pool-1-thread-1] check user Bob…
[pool-1-thread-2] Alice registered ok.
[pool-1-thread-3] Tim registered ok.
[pool-1-thread-1] Bob registered ok.
[pool-1-thread-1] work of Bob has done.
[pool-1-thread-2] work of Alice has done.
[pool-1-thread-3] work of Tim has done.
[pool-1-thread-2] cleanup for user Alice…
[pool-1-thread-1] cleanup for user Bob…
[pool-1-thread-2] init user Mike…
[pool-1-thread-3] cleanup for user Tim…
[pool-1-thread-1] init user Lily…
[pool-1-thread-3] init user Jack…
[pool-1-thread-2] check user Mike…
[pool-1-thread-3] check user Jack…
[pool-1-thread-1] check user Lily…
[pool-1-thread-2] Mike registered ok.
[pool-1-thread-3] Jack registered ok.
[pool-1-thread-1] Lily registered ok.
[pool-1-thread-2] work of Mike has done.
[pool-1-thread-2] cleanup for user Mike…
[pool-1-thread-2] init user Bush…
[pool-1-thread-1] work of Lily has done.
[pool-1-thread-3] work of Jack has done.
[pool-1-thread-1] cleanup for user Lily…
[pool-1-thread-3] cleanup for user Jack…
[pool-1-thread-2] check user Bush…
[pool-1-thread-2] Bush registered ok.
[pool-1-thread-2] work of Bush has done.
[pool-1-thread-2] cleanup for user Bush…
3.小結
-
表示ThreadLocal
,它確定線程的“局部變量”
都是每個線程的ThreadLocal變量
的;各自獨立
-
适合ThreadLocal
(避免了同一參數在所有方法中傳遞);在一個線程的處理流程中保持上下文
- 使用
要用ThreadLocal
結構,并在try ... finally
中清除,或者通過finally
口配合AutoCloseable接
,讓try-catch-resource
。編譯器自動為我們關閉