目錄
- 多線程:程序和線程是一對多的關系,一個程序(一個程式),由不同的線程來運作。有共享的空間也有獨立的空間。
- 并行: 同時進行,拿兩個cpu來跑同樣的程式同樣的代碼片段,那就并行了。
- 并發:不同時進行,隻有一個cpu,而多個線程都在争取這個cpu資源。便是并發。用TPS和QPS去衡量并發程度。
-
TPS:Transactions Per Second(每秒傳輸的事物處理個數),簡單說就是伺服器每秒處理事務的個數。
完整的包括: 請求+資料庫通路+響應
- QPS:Queries Per Second(每秒查詢率),簡單說就是伺服器每秒處理完請求的個數。
先了解線程的生命周期,上圖。線程的生命周期從一個新的線程産生到結束中間會經曆非常多的情況,大體上如下圖,多線程環境下我們主要是再running的時候采取線程的保護措施,進而使多線程環境下,讓線程進入阻塞的狀态。這種保護思想其實就是排他了,到最後都得一個個來,無論式任務還是記憶體互不幹擾,便達到線程安全了。
線程的生命周期
到了jdk8,記憶體模型已經有了相當的改變了,下圖是小編學習了幾篇優秀的博文學習,根據自己的了解繪制出來的,請多指教。
jdk8記憶體模型
[1] [2] [3]獨立記憶體空間
從圖中可以看出線程安全的區域是在棧空間,每個線程會有獨立的棧空間,進而也解釋了為什麼方法内是線程安全的,而全局變量這些是線程不安全的,因為這些都在堆區。
共享記憶體空間
堆空間,和MateSpace是被所有線程共享的,是以在處理多線程問題的時候,其實主要是處理這兩個空間的内容。共享區域在不加任何保護的情況下對其操作,會有異常結果。
怎麼做到線程安全?
- 隻使用線程安全的記憶體空間,不使用共享的空間
- 對共享的記憶體空間采取保護措施,比如:加Lock,volatile修飾等
- 繼承Thread
package com.example.demo;
import org.junit.Test;
/**
* Project <demo-project>
* Created by jorgezhong on 2018/8/31 16:01.
*/
public class ThreadDemo {
@Test
public void extendThreadTest() {
ExtendThread extendThread = new ExtendThread();
extendThread.start();
}
class ExtendThread extends Thread {
@Override
public void run() {
// TODO: 2018/8/31
}
}
}
- 實作Runnable接口
@Test
public void runnableThreadTest(){
RunnableThread runnableThread = new RunnableThread();
Thread thread = new Thread(runnableThread);
thread.start();
}
class RunnableThread implements Runnable{
@Override
public void run() {
// TODO: 2018/8/31
}
}
- Callable和Future
@Test
public void callableThreadTest(){
CallableThread callableThread = new CallableThread();
FutureTask<String> stringFutureTask = new FutureTask<>(callableThread);
Thread thread = new Thread(stringFutureTask);
thread.start();
}
/**
* 這種實作是由傳回值的
*/
class CallableThread implements Callable<String>{
@Override
public String call() {
// TODO: 2018/8/31
return "";
}
}
補充:Fulture和Callable(Future模式)
首先,這兩東西都在java.util.concurrent下,java本身就未多線程環境考慮了很多。看看下面的UML圖,RunnableFuturej繼承了Future和Runnable接口,将Future引入Runnable中,并且提供了預設實作FutureTask。RunnbleCallable和Future補充解決了兩個問題,一個是多線程阻塞解決方案,另一個則是傳回值問題。我們知道Runnable和Thread定義的run()是沒有傳回值的。而且當線程遇到IO阻塞的時候,隻能等待,該線程無法做任何事情。Callable和Fulture分别解決了這兩個問題。Callable提供了傳回值的調用,而Fulture提供了多線程異步的機制。
Callable沒什麼好說的,例子如上面代碼,就是多了個泛型的傳回值,方法變成了call而已。Future就比較複雜了。FultureTask的構造方法接受Runnable或者Callable,也就是說Runnable和Callable的執行個體都可以使用Fulture來完成異步擷取阻塞傳回值的操作。
uml java fulture m
Future隻有5個方法
- cancel:取消任務的執行。參數表示是否立即中斷任務
- isCancelled:判斷任務是否已經取消
- isDone:判斷任務是否已經完成
- get():阻塞到任務接受擷取傳回值
- get(long,TimeUnit):指定逾時時間,擷取傳回值
Future模式缺陷
Fulture比較簡單,基本上隻通過兩種方式:檢視狀态和等待完成。要麼去檢視一下是不是完成了,要麼就等待完成,而線程和線程之間的通信隻有通過等待喚醒機制來完成。原來的Fulture功能太弱,以至于google的Guava和Netty這些牛逼的架構都是重新去實作以拓展功能。而java8引入了實作了CompletionStage接口的CompletableFuture。可以說是極大的擴充了Future的功能。吸收了Guava的長處。
- CompletableFuture介紹
關于CompletableFuture和的具體内容,後續再寫一篇詳細介紹。結合java8的Stream API CompletionStage接口定義很多流式程式設計的方法,我們可以進行流式程式設計,這非常适用于多線程程式設計。CompletableFuture實作了該接口,并拓展了自己的方法。對比Fulture多了幾十個方法。大緻可以分為同步的和異步的兩種類型。而作業的時候,可以切入任務某一時刻,比如說完成後做什麼。還可以組合CompletionStage,也就是進行線程之間的協調作業。
- 使用線程池送出線程的實作(見下文)
我們可以看到java線程池相關的包,他們之間的關系如下圖。
java uml thread
java uml thread m
從uml類圖可以看出(圖檔有點大,放大一下把),整個線程池構成其實是這樣的:
- 1、
封裝了線程的實作
Executor
- 2、
的子接口
Executor
定義了管理
ExecutorService
的一系列方法。
Executor
實作了
ThreadPoolExecutor
,定義了一系列處理多線程的内容,比如線程工程和儲存線程任務的隊列
ExecutorService
- 3、
擴充了
ScheduledExecutorService
,增加了定時任務排程的功能。
ExecutorService
ScheduledThreadPoolExecutor
,同時繼承
ScheduledExecutorService
的功能
ThreadPoolExecutor
- 4、
靜态類,包含了生成各種ExecutorService的方法。
Executors
從接口的組成可以看出,Executor、ExecutorService和ScheduledThreadPoolExecutor三個接口定義了線程池的基礎功能。可以了解為他們三個就是線程池。
那麼整個線程池是圍繞兩個預設實作ThreadPoolExecutor和ScheduledThreadPoolExecutor類來操作的。
至于操作,我發現java還蠻貼心的,預設實作的線程池隻區分了可定時排程和不可定時排程的。實在是太過于靈活了,自己使用的話要配置一大堆參數,我想個線程池而已,給我搞這麼多配置表示很麻煩,隻需要關心是不是定時的,隻考慮我配置設定多少線程給線程池就好了。是以有了Executors
Executors操作兩個預設的實作類,封裝了了大量線程池的預設配置,并提供了以下幾種線程池給我們,我們隻需要管線少部分必要的配置即可。
- Single Thread Executor:隻有一個線程的線程池,順序執行
ExecutorService pool = Executors.newSingleThreadExecutor();
//送出實作到線程池
pool.submit(() -> {
// TODO: 2018/8/31 do something
});
- Cached Thread Pool:緩存線程池,超過60s池内線程沒有被使用,則删掉。就是一個動态的線程池,我們不需要關心線程數
ExecutorService pool = Executors.newCachedThreadPool();
//送出實作到線程池
pool.submit(() -> {
// TODO: 2018/8/31 do something
});
- Fixed Thread Pool:固定數量的線程池
//參數為線程數
ExecutorService pool = Executors.newFixedThreadPool(8);
//送出實作到線程池
pool.submit(() -> {
// TODO: 2018/8/31 do something
});
- Scheduled Thread Pool:用于排程指定時間執行任務的線程池
//參數為線程數
ScheduledExecutorService pool = Executors.newScheduledThreadPool(8);
/*
* 送出到線程池
* 參數1:Runnable
* 參數2:初始延遲時間
* 參數3:間隔時間
* 參數4:時間機關
*/
pool.scheduleAtFixedRate(() -> {
// TODO: 2018/8/31 do something
}, 1000, 2000, TimeUnit.MILLISECONDS);
- Single Thread Scheduled Pool:排程指定時間執行任務的線程池,隻有一個線程
ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
//參數少了初始延遲時間
pool.schedule(() -> {
// TODO: 2018/8/31 do something
}, 1000, TimeUnit.MILLISECONDS);
- 線程池的配置政策
1、考慮業務類型
除了考慮計算機性能外,更多的還是考慮業務邏輯,如果業務是運算密集型的,不适合開太多的線程,因為運算一般是cpu在算,cpu本身就是用于計算,極快,是以一個線程很快就能計算完畢。線程多了反而增加了資源的消耗。另一種是IO密集型業務,這種業務就比較是适合開多一點線程,因為IO、通信這些業務本身就是非常慢的,大部分的系統的瓶頸都集中這兩方面。是以這些業務适合開多個線程。
2、配合cpu的核心和線程數
在我們配置線程的時候,可以參考cpu的總線程,盡量不超出總線程數。一般使用核心數。
這其實是一個螢幕。可以監視類和對象。
原理:可以這麼了解,每個執行個體化的對象都有一個公共的鎖,該鎖被該執行個體共享。是以對于該對象的所有被synchronized修飾的執行個體方法,是共享的同一個對象鎖。同理,類鎖也是一樣的,伴随Class對象的生成,也會有一個類螢幕,也就有一個預設的類鎖了,被synchronized修飾的所有靜态方法都共享一個類鎖。
缺陷:同步鎖關鍵子雖然友善,但是畢竟是被限制了修飾方式,是以不夠靈活,另外修飾在方法上是修飾了整個方法,是以性能在并發量大且頻繁的時候就顯得不那麼好了。
- 修飾執行個體方法:
public synchronized void synchronizedMethod(){
// TODO: 2018/8/29 do something
}
- 修飾靜态方法:
public static synchronized void synchronizedMethod(){
// TODO: 2018/8/29 do something
}
- 修飾代碼快:
public void synchronizedMethod(){
//Object.class為鎖對象,其實就是鎖的鑰匙,使用同一把鑰匙的鎖是同步的
synchronized (Object.class){
// TODO: 2018/8/29 do something
}
}
由于synchronized的缺陷不夠靈活,對應的自然有靈活的解決方案。Lock便是解決方案。Lock是java.util.concurrent.locks包下的一個接口。但是Lock是靈活了,但是既然都多線程了,我們當然是最求性能啦。由于很多資料是對檢視沒有線程安全要求的,隻需要對寫入修改要求線程安全即可,于是有了ReadWriteLock,讀寫鎖可以隻對某一方加鎖,把鎖住的内容範圍更加縮小了,提升了性能。從下圖可以看到,ReentrantLock實作了Lock而ReentrantReadWriteLock實作了ReadWiteLock。我們可以直接使用它們的實作類實作鎖功能。
uml_java_lock
5.2.1、Lock
擷取鎖:lock()、tryLock()、lockInterruptibly()
釋放鎖:unLock()
直接上代碼來學習效果是最快的
[4]- DEMO:兩個線程争取同一把鎖
package com.example.demo;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.locks.ReentrantLock;
/**
* Project <demo-project>
* Created by jorgezhong on 2018/8/30 15:48.
*/
public class LockDemo {
private static final Logger LOGGER = LoggerFactory.getLogger(LockDemo.class);
/**
* 兩個線程争取同一把鎖
*/
@Test
public void lockTest() throws InterruptedException {
//造一把鎖先
ReentrantLock reentrantLock = new ReentrantLock();
Thread thread0 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
lockTestHandle(reentrantLock);
}
});
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
lockTestHandle(reentrantLock);
}
});
thread0.start();
thread1.start();
while (thread0.isAlive() || thread1.isAlive()) {}
}
private void lockTestHandle(ReentrantLock reentrantLock) {
try {
// 加鎖
reentrantLock.lock();
LOGGER.info("拿到鎖了,持有鎖5s");
Thread.sleep(5000);
} catch (Exception e) {
// TODO: 2018/8/30 do something
} finally {
// 記得自己釋放鎖,不然造成死鎖了
reentrantLock.unlock();
LOGGER.info("釋放鎖了");
}
}
}
運作結果:我們可以看到,循環的代碼是連續的,沒有被其他線程幹擾。确實是鎖上了,使用同一個鎖,必須等一個釋放了另一個才能持有。一個線程持有鎖,其他使用同一把鎖的線程就會同步阻塞,重新持有鎖之後才會結束阻塞的狀态,才能往下執行代碼。
16:36:05.740 [Thread-0] INFO com.example.demo.LockDemo - 拿到鎖了
16:36:05.744 [Thread-0] INFO com.example.demo.LockDemo - 循環:0 持有鎖
16:36:05.746 [Thread-0] INFO com.example.demo.LockDemo - 循環:1 持有鎖
16:36:05.746 [Thread-0] INFO com.example.demo.LockDemo - 循環:2 持有鎖
16:36:05.746 [Thread-0] INFO com.example.demo.LockDemo - 循環:3 持有鎖
16:36:05.746 [Thread-0] INFO com.example.demo.LockDemo - 循環:4 持有鎖
16:36:05.746 [Thread-0] INFO com.example.demo.LockDemo - 釋放鎖了
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 拿到鎖了
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 循環:0 持有鎖
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 循環:1 持有鎖
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 循環:2 持有鎖
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 循環:3 持有鎖
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 循環:4 持有鎖
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 釋放鎖了
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 拿到鎖了
16:36:05.746 [Thread-1] INFO com.example.demo.LockDemo - 循環:0 持有鎖
16:36:05.747 [Thread-1] INFO com.example.demo.LockDemo - 循環:1 持有鎖
16:36:05.747 [Thread-1] INFO com.example.demo.LockDemo - 循環:2 持有鎖
16:36:05.747 [Thread-1] INFO com.example.demo.LockDemo - 循環:3 持有鎖
16:36:05.747 [Thread-1] INFO com.example.demo.LockDemo - 循環:4 持有鎖
16:36:05.747 [Thread-1] INFO com.example.demo.LockDemo - 釋放鎖了
16:36:05.747 [Thread-0] INFO com.example.demo.LockDemo - 拿到鎖了
......
16:36:05.748 [Thread-1] INFO com.example.demo.LockDemo - 循環:4 持有鎖
16:36:05.748 [Thread-1] INFO com.example.demo.LockDemo - 釋放鎖了
16:36:05.748 [Thread-0] INFO com.example.demo.LockDemo - 拿到鎖了
16:36:05.748 [Thread-0] INFO com.example.demo.LockDemo - 循環:0 持有鎖
16:36:05.748 [Thread-0] INFO com.example.demo.LockDemo - 循環:1 持有鎖
16:36:05.748 [Thread-0] INFO com.example.demo.LockDemo - 循環:2 持有鎖
16:36:05.748 [Thread-0] INFO com.example.demo.LockDemo - 循環:3 持有鎖
16:36:05.748 [Thread-0] INFO com.example.demo.LockDemo - 循環:4 持有鎖
16:36:05.748 [Thread-0] INFO com.example.demo.LockDemo - 釋放鎖了
- DEMO:可被中斷鎖
/**
* lockInterruptibly:加了可中斷鎖的線程,如果在擷取不到鎖,可被中斷。
* <p>
* 中斷其實是使用了異常機制,當調用中斷方法,會抛出InterruptedException異常,捕獲它可進行中斷邏輯
*/
@Test
public void lockInterruptiblyTest() throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
Thread thread0 = new Thread(() -> {
try {
lockInterruptiblyTestHandle(reentrantLock);
} catch (InterruptedException e) {
LOGGER.info("被中斷了");
}
});
Thread thread1 = new Thread(() -> {
try {
lockInterruptiblyTestHandle(reentrantLock);
} catch (InterruptedException e) {
LOGGER.info("被中斷了");
}
});
thread1.setPriority(10);
thread1.start();
thread0.start();
Thread.sleep(500);
thread0.interrupt();
while (thread0.isAlive() || thread1.isAlive()) {}
}
private void lockInterruptiblyTestHandle(ReentrantLock reentrantLock) throws InterruptedException {
/*
* 加鎖不能放在try...finally塊裡面,會出現IllegalMonitorStateException,意思是當lockInterruptibly()異常的時候,執行了unlock()方法
* 其實就是加鎖都抛出異常失敗了,你還去解鎖時不行的。放外面抛出異常的時候就不會去解鎖了
*/
reentrantLock.lockInterruptibly();
try {
LOGGER.info("拿到鎖了,持有鎖5秒");
Thread.sleep(5000);
} finally {
// 釋放鎖
reentrantLock.unlock();
LOGGER.info("釋放鎖了");
}
}
從結果可以看到,thread-0被中斷了之後不再繼續執行
20:11:22.227 [Thread-1] INFO com.example.demo.LockDemo - 拿到鎖了,持有鎖5秒
20:11:22.742 [Thread-0] INFO com.example.demo.LockDemo - 被中斷了
20:11:27.231 [Thread-1] INFO com.example.demo.LockDemo - 釋放鎖了
Process finished with exit code 0
5.2.2 ReadWriteLock
ReadWriteLock
隻是定義了讀鎖和寫鎖兩個方法,其具體實作和拓展再預設實作ReentrantReadWriteLock中。簡單來說讀寫鎖呢,提供讀鎖和寫鎖,将讀和寫要擷取的鎖類型分開,用一個對列來管理,所有的鎖都會經過隊列。當需要擷取寫鎖的時候,後買的讀寫鎖擷取都需要等待,知道該寫鎖被釋放才能進行。
@Test
public void readWriteLockTest(){
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
try {
readEvent();
} catch (Exception e) {
LOGGER.error(e.getMessage(),e);
}finally {
readLock.unlock();
}
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
try {
writeEvent();
} catch (Exception e) {
LOGGER.error(e.getMessage(),e);
}finally {
writeLock.unlock();
}
}
private void writeEvent() {
// TODO: 2018/9/3 done write event
}
private void readEvent() {
// TODO: 2018/9/3 done read event
}
總的來說:凡是遇到寫,阻塞後面的線程隊列,讀與讀是不阻塞的。
volatile可修飾成員變量,能保證變量的可見性,但是不能保證原子性,也就是說并發的時候多個線程對變量進行計算的話,結果是會出錯的,保證可見性隻是能保證每個線程拿到的東西是最新的。
對于volatile來說,保證線程共享區域内容的可見性可以這麼來了解,堆記憶體的資料原來是需要拷貝到棧記憶體的,相當于複制一份過去,但是呢。再不加volatile的時候,棧區計算完之後在指派給堆區,問題就産生了。加了volatile之後,線程通路堆區的資料之後,堆區必須等待,知道棧區計算完畢将結果傳回給堆區之後,其他線程才能繼續通路堆區資料。
public volatile String name = "Jorgezhong";