(手機橫屏看源碼更友善)
問題
(1)自己動手寫的線程池如何支援帶傳回值的任務呢?
(2)如果任務執行的過程中抛出異常了該怎麼處理呢?
簡介
上一章我們自己動手寫了一個線程池,但是它是不支援帶傳回值的任務的,那麼,我們自己能否實作呢?必須可以,今天我們就一起來實作帶傳回值任務的線程池。
前情回顧
首先,讓我們先回顧一下上一章寫的線程池:
(1)它包含四個要素:核心線程數、最大線程數、任務隊列、拒絕政策;
(2)它具有執行無傳回值任務的能力;
(3)它無法處理有傳回值的任務;
(4)它無法處理任務執行的異常(線程中的異常不會抛出到線程外);
那麼,我們能不能在現有的基礎上實作其下面兩項能力呢?讓我們一起來試一試吧!
有傳回值和無傳回值的任務到底有何不同?
答案很明顯,就是一個有傳回值,一個無傳回值,用僞代碼來表示就是下面這樣:
// 無傳回值
threadPool.execute(()->{
System.out.println(1);
});
// 有傳回值,分兩步走
// 1. 送出任務到線程池中
SomeClass result = threadPool.execute(()->{
System.out.println(1);
return 1;
});
// 2. 等待任務的結果傳回
Object value = result.get();
無傳回值的任務送出了就完事,主線程并不Care它到底有沒有執行完,并不關心它是不是抛出異常,主線程Just送出線程到線程池中,其餘什麼都不管。
有傳回值的任務就不一樣了,主線程首先要送出任務到線程池中,它需要使用到任務執行的結果,是以它必須等待任務執行完畢才能拿到任務執行的結果。
那麼,為什麼不直接在execute的時候就等待任務執行完畢呢?這樣的話那不就跟串行沒啥差別了,還不如直接在主線程執行任務呢,還少了線程切換的資源消耗。
之是以要分成兩步,是因為主線程并不一定需要立即擷取傳回值,在需要用到傳回值的時候才去get,這樣就可以在送出任務和擷取傳回值之間幹些其它的事情,提高效率。
是以,送出任務的時候不需要阻塞,get傳回值的時候才可能需要阻塞,如果get的時候任務已經執行完畢了,這時候也不需要阻塞,如果get的時候任務還未執行完畢,那就要阻塞等待任務執行完畢才能擷取到傳回值。
實作分析
首先,無傳回值的任務我們直接使用的Runnable函數式接口,有傳回值的任務有沒有現成的接口呢?還真有,那就是Callable接口,它有個傳回值。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
其次,送出任務的時候需要有個傳回值,它是在将來用來擷取任務執行結果的,實際上它也是新任務的一種能力,可以使用它對任務進行包裝,使其具有傳回值的能力。
public interface Future<T> {
T get();
}
再次,我們需要給現有的線程池增加一種新的能力,根據單一職責原則,我們定義一個新的接口來承載這種能力。
public interface FutureExecutor extends Executor {
<T> Future<T> submit(Callable<T> command);
}
然後,我們需要一種新的任務,它既具有舊任務的執行能力(run()方法),又具有新任務的傳回值能力(get()方法),是以我們造一個“将來的任務”對送出的任務進行包裝,使其具有傳回值的能力。
public class FutureTask<T> implements Runnable, Future<T> {
/**
* 真正的任務
*/
private Callable<T> task;
public FutureTask(Callable<T> task) {
this.task = task;
}
@Override
public void run() {
// 具體實作...
}
@Override
public T get() {
// 具體實作...
}
}
最後,我們隻要對原有的線程池進行擴充,将送出的任務包裝成“将來擷取傳回值的任務”,還是使用原來的方法去執行,然後傳回這個将來的任務即可。
根據開閉原則,【本篇文章由公衆号“彤哥讀源碼”原創】原來的代碼我們不做任何修改,擴充新的子類來實作新的能力。
public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {
public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
super(name, coreSize, maxSize, taskQueue, rejectPolicy);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
// 包裝成将來擷取傳回值的任務
FutureTask<T> futureTask = new FutureTask<>(task);
// 還是使用原來的執行能力
execute(futureTask);
// 傳回将來的任務,隻需要傳回其get傳回值的能力即可
// 是以這裡傳回的是Future而不是FutureTask類型
return futureTask;
}
}
好了,到這裡整體的邏輯我們就已經比較清晰地實作完了,還剩下最關鍵的部分,這個“将來的任務”的兩個能力要如何實作。
将來的任務
将來的任務,具有兩個能力:一是執行真正任務的能力,二是将來擷取傳回值的能力。
public class FutureTask<T> implements Runnable, Future<T> {
@Override
public void run() {
// 具體實作...
}
@Override
public T get() {
// 具體實作...
}
}
首先,我們要明确一件事,任務的執行是線程池中,擷取傳回值是在主線程中,它們是在兩個線程中執行的,而且誰先誰後我們無法确定。
其次,如果run()在get()之前執行,我們需要告訴get()任務已經執行完畢了,是以需要一個狀态來通知這個事,還需要一個變量來承載任務執行的傳回值。
/**
* 任務執行的狀态,0未開始,1正常完成,2異常完成
* 也可以使用volatile+Unsafe實作CAS操作
*/
private AtomicInteger state = new AtomicInteger(NEW);
private static final int NEW = 0;
private static final int FINISHED = 1;
private static final int EXCEPTION = 2;
/**
* 任務執行的結果【本篇文章由公衆号“彤哥讀源碼”原創】
* 如果執行正常,傳回結果為T
* 如果執行異常,傳回結果為Exception
*/
private Object result;
再次,如果get()在run()之前執行,那就需要阻塞等待run()執行完畢才能拿到傳回值,是以需要儲存調用者(主線程),get()的時候park阻塞住,run()完成了unpark喚醒它來拿傳回值。
/**
* 調用者線程
* 也可以使用volatile+Unsafe實作CAS操作
*/
private AtomicReference<Thread> caller = new AtomicReference<>();
然後,我們先來看看run()方法的邏輯,它其實就是先執行真正的任務,然後修改狀态為完成,并儲存任務的傳回值,如果儲存了主線程,還要喚醒它。
@Override
public void run() {
// 如果狀态不是NEW,說明執行過了,直接傳回
if (state.get() != NEW) {
return;
}
try {
// 執行任務【本篇文章由公衆号“彤哥讀源碼”原創】
T r = task.call();
// CAS更新state的值為FINISHED
// 如果更新成功,就把r指派給result
// 如果更新失敗,說明state的值不為NEW了,也就是任務已經執行過了
if (state.compareAndSet(NEW, FINISHED)) {
this.result = r;
// finish()必須放在修改state裡面,見下面的分析
finish();
}
} catch (Exception e) {
// 如果CAS更新state的值為EXCEPTION成功,就把e指派給result
// 如果CAS更新失敗,說明state的值不為NEW了,也就是任務已經執行過了
if (state.compareAndSet(NEW, EXCEPTION)) {
this.result = e;
// finish()必須放在修改state裡面,見下面的分析
finish();
}
}
}
private void finish() {
// 檢查調用者是否為空,如果不為空,喚醒它
// 調用者在調用get()方法的進入阻塞狀态
for (Thread c; (c = caller.get()) != null;) {
if (caller.compareAndSet(c, null)) {
LockSupport.unpark(c);
}
}
}
最後,我們再看看get()方法,如果任務還未執行,就阻塞等待任務的執行;如果任務已經執行完畢了,直接拿傳回值即可;但是,還有一種情況,get()方法執行的過程中run()方法也在執行,是以get()方法中的每一步都要檢查狀态的值有沒有變化。
@Override
public T get() {
int s = state.get();
// 如果任務還未執行完成,判斷目前線程是否要進入阻塞狀态
if (s == NEW) {
// 辨別調用者線程是否被标記過
boolean marked = false;
for (;;) {
// 重新擷取state的值
s = state.get();
// 如果state大于NEW說明完成了,跳出循環
if (s > NEW) {
break;
// 此處必須把caller的CAS更新和park()方法分成兩步處理,不能把park()放在CAS裡面
} else if (!marked) {
// 嘗試更新調用者線程
// 試想斷點停在此處【本篇文章由公衆号“彤哥讀源碼”原創】
// 此時state為NEW,讓run()方法執行到底,它不會執行finish()中的unpark()方法
// 這時打開斷點,這裡會更新caller成功,但是循環從頭再執行一遍發現state已經變了,
// 直接在上面的if(s>NEW)處跳出循環了,因為finish()在修改state内部
marked = caller.compareAndSet(null, Thread.currentThread());
} else {
// 調用者線程更新之後park目前線程
// 試想斷點停在此處
// 此時state為NEW,讓run()方法執行到底,因為上面的caller已經設定值了,
// 是以會執行finish()方法中的unpark()方法,
// 這時再打開斷點,這裡不會park信
// 見unpark()方法的注釋,上面寫得很清楚:
// 如果線程執行了park()方法,那麼執行unpark()方法會喚醒那個線程
// 如果先執行了unpark()方法,那麼線程下一次執行park()方法将不會阻塞
LockSupport.park();
}
}
}
if (s == FINISHED) {
return (T) result;
}
throw new RuntimeException((Throwable) result);
}
在我們的實作中,如果任務執行的過程抛出異常了,也是通過result傳回給主線程,這樣主線程就拿到了這個異常,它就可以做相應的處理了。
好了,完整的實作到此結束,不知道你領悟了沒有。
測試用例
最後奉上測試代碼:
public class MyThreadPoolFutureExecutorTest {
public static void main(String[] args) {
FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int num = i;
Future<Integer> future = threadPool.submit(() -> {
Thread.sleep(1000);
System.out.println("running: " + num);
return num;
});
list.add(future);
}
for (Future<Integer> future : list) {
System.out.println("runned: " + future.get());
}
}
}
運作結果:
thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒絕的任務
【本篇文章由公衆号“彤哥讀源碼”原創】
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
running: 3
running: 5
runned: 2
runned: 3
runned: 4
runned: 5
running: 6
running: 7
runned: 6
runned: 7
runned: 8
runned: 9
總結
(1)有傳回值的任務是通過包裝成将來的任務來實作的,這個任務既具有基本的執行能力,又具有将來擷取傳回值的能力;
(2)任務執行的異常跟任務正常的傳回值是通過同一個傳回值傳回到主線程的,主線程根據狀态判斷是異常還是正常值;
(3)我們的實作中運用了單一職責原則、開閉原則等設計原則,對原有代碼沒有造成任何的入侵;
彩蛋
手寫線程池目前隻打算寫這兩章,後面開始進入jdk原生線程池的源碼分析,敬請期待。
另外,需要手寫線程池完整源碼的同學請關注我的公衆号“彤哥讀源碼”,在背景回複“MyThreadPool”(不帶引号)即可領取手寫線程池完整源碼,注意大小寫不要弄錯哦,否則彤哥是不會給你的哈。
歡迎關注我的公衆号“彤哥讀源碼”,檢視更多源碼系列文章, 與彤哥一起暢遊源碼的海洋。