前言
最近有看到馬士兵老師在B站上一個關于多線程的視訊,在此把重要的知識點進行總結。
正文
多線程基礎
1.synchronized 鎖定的代碼塊中的代碼越少,效率越高。
2.synchronized 鎖定的是堆記憶體, 而不是對象的引用。 如果synchronized 放在方法上鎖定的是 this 也就是目前的這個對象本身的堆位址, 如果synchronized 放在static的變量或者方法上,鎖定的是Class對象。 是以當指向的堆記憶體發生了變化,之前用同樣引用的将不再被鎖。
3.volatile解決了資料可見性,但是并不能解決原子性問題,是以他是無法代替synchronized的。 另一個可以使用原子性的辦法是使用AtomicXXX,例如說 AtomicInteger…,但是功能沒有synchronzined好用。
舉個簡單的例子: 我們做 i++ 操作的時候,其實是線程不安全的,為了保證安全除了synchronized我們還可以使用AtomicInteger中的getAndIncrement 方法 或者是 incrementAndGet方法
4.不要以字元串做為鎖定常量,如果a,b字元串值相同, a先鎖定一塊,b在鎖定一塊,有可能就會導緻死鎖。 根本原因是因為JVM有常量池,會緩存定義的字元串。
5.Thread 中wait 會釋放鎖, notify不會釋放鎖,這兩個方法都是線程間通信的方法。但是這兩個方法一定要慎用,使用不當會帶來線程的邏輯混亂。
6.CountDownLatch 門闩機制,某一個線程一直等待latch.await,直到latch.countDown. 這個類主要應該于一個線程在等待其他所有的線程都變換了狀态之後,等待的線程再往下執行。有點像typescript中的forkJoin方法。
7.如果寫加鎖,讀不加鎖,那麼很有可能出現髒讀的情況,這種情況我們通常使用copyOnWrite。
ReentrantLock 重入鎖
1.ReentrantLock重入鎖,這個類可以替代synchronized,但是是一把手工鎖,一定要手工釋放 。
2.ReentrantLock 與synchronized 不太一樣的地方在于,可以通過tryLock進行嘗試拿鎖,如果拿到鎖了應該做sth,如果沒拿到應該做另外的sth,而不會像synchronized一樣在那裡死等。
3.ReentrantLock 可以指定為公平鎖(想象一下這樣的場景,5個線程同時等待另一個線程釋放鎖,如果鎖釋放之後,線程排程器不會去看這5個鎖他們到底等了多久的差别,而是随機排程的,這是非公平鎖),隻需要在構造函數中傳入ture即可
4.wait往往和while一起使用,而不是if,因為wait會釋放鎖,而在釋放的時候,有可能有兩個線程同時被叫醒,然後同時操作,就出了問題
5.想多線程并發的時候如果通知其他線程醒來,使用notifyAll。 如果使用reetrantLock的話,可以使用 Condition去規範具體讓哪些線程醒來。 可以根據reentrantLock的newCondition()方法,得到一個Condition,如果有不同條件可以有不同個 Condition
6.ReentrantLock 與傳統Thread 方法對比。
等待方法 | 通知方法 | 通知全部 |
---|---|---|
condition.await() | condition.signal() | condition.signalAl |
wait() | notify() | notifyAll() |
7.ThreadLocal 可以使每一個線程都有自己的一個空間區域,使用空間換時間。
并發容器
1.如果像保證list的size和remove共同的原子性(例如說一個購票系統),我們可以使用ConcurrentListQueue,poll 方法進行删除操作,如果 傳回值為空則queue中已經取完了,這樣先删後檢測空的辦法很好。
2.ConcurrentHashMap 代替原來的HashTable,原來是用一個範圍較大的鎖,鎖定對象,現在替換他的對象使用的是範圍更小的鎖,每次鎖定的是一個segment 參考文檔
https://www.cnblogs.com/heyonggang/p/9112731.html3.在高并發并且要求map拍好序的情況下,我們使用ConcurrentSkipListMap 代替TreeMap ConcurrentSkipListMap 插入效率低,查詢速率高
4.CopyOnWriteList 寫時複制,寫的效率很低,但是讀的效率很高,是在寫的時候,複制一份list然後在新的上面加上元素,最後把引用指向新的list
5.Collections.synchronizedXXX() 可以包裝一個不加鎖的容器為加鎖的容器。 XXX可以為任意容器 e.g. List,Set Map
6.高并發可使用兩種Queue
1)ConcurrentLinkedQueue
2)BlockingQueue(LinkedBQ 它是無界隊列,ArrayBQ它是有界隊列,DelayQueue用于執行定時任務的)
Queue相關的三個方法
1)add 如果 queue滿了,報錯
2)offer queue滿了,不報錯,會傳回boolean
3)put queue滿了,阻塞等待queue中有被消費的
7.DelayQueue中有一個transform方法,如果沒有消費者會一直被阻塞
8.SychronusQueue 容量為0, 沒有辦法進行add,隻能通過put放進一個元素後被阻塞,等待被消費者消費
線程池
1.Executor: 線程池頂級接口,隻有execute一個方法,傳入Runnable作為參數
2.ExecutorService: 可以向其中扔任務 任務可以為runnable也可以為callable,有execute和submit方法
3.Callable: callable 中的call方法有傳回值,而且可以抛出異常,這是他與runnable最大的差別
4.Executors:是一個工具類用于操縱Executor等
有五個方法可以初始化不同的線程池:
- newFixedThreadPool
- 固定個數線程池
- newCachedThreadPool
- 緩存線程池,預設情況下最大的值為int的長度,如果一個線程60s沒有被調用,則會被自動銷毀
- newSingleThreadExecutor
- 單例線程池,此線程保證了線程的順序性,像是一個隊列一樣,先進先出 FIFO
- newScheduledThreadPool
- 定時任務的線程池
- newWorkStealingPool
- 偷工作線程,一個開啟多個線程,分别往其中各方有任務,如果早的線程執行完了,它會去拿取别人線程中的任務,就是這麼勤勞能幹。
5.ThreadPool:線程池的概念,相當于我們使用固定數量的勞工,當它們的手頭的活幹完之後,我們不需要使用新的線程勞工,而是繼續使用剛剛幹活的但是現在空閑的勞工
6.Future 作為callable的傳回值
7.下面的代碼,是單線程和多個線程擷取一個範圍内所有的質數的例子:
package msb_013;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* 算出一定範圍内所有的質數
* @author luckyharry
*
*/
public class Test03_ParallelComputing {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//下面是通過一個線程進行工作,工作時間大概在12s左右
long start = System.currentTimeMillis();
List<Integer> list = getPrime(1,500000);
long end = System.currentTimeMillis();
System.out.println(end - start);
//使用了固定個數線程池
//下面是啟動了6個線程進行工作,工作時間大概在6s左右,工作時間縮短了一般
//1.首先我們需要一個執行器服務,指定到底要開多少個線程
//2.然後我們将我們實作了callable類的具體類執行個體化出來,并把對象傳遞進執行器服務的submit方法中
//3.從submit的方法中,我們可以擷取到Futrue對象,這個對象就是我們可以看見未來資料的入口。
//4.調用future.get(),我們可以擷取到具體的每一個線程所得到的結果,也就是未來的資料,值得注意的是,這個方法是阻塞的
//好多個future.get方法有點像我們之前學過的countDownLatch的作用
ExecutorService service = Executors.newFixedThreadPool(6);
//每個區間的範圍不同,是因為到後來,數越大,需要判斷的時間越長。
MyTask taskA = new MyTask(1,200000);
MyTask taskB = new MyTask(200001,350000);
MyTask taskC = new MyTask(350001,450000);
MyTask taskD = new MyTask(450001,500000);
Future<List<Integer>> futureA = service.submit(taskA);
Future<List<Integer>> futureB = service.submit(taskB);
Future<List<Integer>> futureC = service.submit(taskC);
Future<List<Integer>> futureD = service.submit(taskD);
start = System.currentTimeMillis();
List<Integer> listA = futureA.get();
List<Integer> listB = futureB.get();
List<Integer> listC = futureC.get();
List<Integer> listD = futureD.get();
end = System.currentTimeMillis();
System.out.println(end - start);
service.shutdown();
}
/**
* 每一個任務的實作
* @author luckyharry
*
*/
static class MyTask implements Callable<List<Integer>>{
int start, end;
public MyTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public List<Integer> call() throws Exception {
List<Integer> list = getPrime(start, end);
return list;
}
}
/**
* 判斷是否為質數
* @param number
* @return
*/
public static boolean isPrime(int number) {
for(int i =2; i<number/2; i++) {
if(number % i ==0) {
return false;
}
}
return true;
}
/**
* 擷取質數
* @param start
* @param end
* @return
*/
public static List<Integer> getPrime(int start, int end){
List<Integer> list = new ArrayList<Integer>();
for(int i = start; i<= end; i++){
if(isPrime(i))list.add(i);
}
return list;
}
}
後記
我會在以後,不斷對這篇文章進行更新,因為現在隻是粗略的知識點整理。