天天看點

Java 是如何實作 Future 模式的?萬字詳解!

Java 是如何實作 Future 模式的?萬字詳解!
Java 是如何實作 Future 模式的?萬字詳解!

《java工程師面試突擊(第3季)》重磅更新,由原來的70講增至150講,内容擴充一倍多,

https://github.com/yuanmabiji/jdk1.8-sourcecode-blogs

1 future是什麼?

先舉個例子,我們平時網購買東西,下單後會生成一個訂單号,然後商家會根據這個訂單号發貨,發貨後又有一個快遞單号,然後快遞公司就會根據這個快遞單号将網購東西快遞給我們。在這一過程中,這一系列的單号都是我們收貨的重要憑證。是以,jdk的future就類似于我們網購買東西的單号,當我們執行某一耗時的任務時,我們可以另起一個線程異步去執行這個耗時的任務,同時我們可以幹點其他事情。當事情幹完後我們再根據future這個"單号"去提取耗時任務的執行結果即可。是以future也是多線程中的一種應用模式。

擴充: 說起多線程,那麼future又與thread有什麼差別呢?最重要的差別就是thread是沒有傳回結果的,而future模式是有傳回結果的。

2 如何使用future

前面搞明白了什麼是future,下面我們再來舉個簡單的例子看看如何使用future。假如現在我們要打火鍋,首先我們要準備兩樣東西:把水燒開和準備食材。因為燒開水是一個比較漫長的過程(相當于耗時的業務邏輯),是以我們可以一邊燒開水(相當于另起一個線程),一邊準備火鍋食材(主線程),等兩者都準備好了我們就可以開始打火鍋了。

執行結果如下截圖,符合我們的預期:

Java 是如何實作 Future 模式的?萬字詳解!

從以上代碼中可以看到,我們使用future主要有以下步驟:

建立一個callable匿名函數實作類對象,我們的業務邏輯在callable的call方法中實作,其中callable的泛型是傳回結果類型;

然後把callable匿名函數對象作為futuretask的構造參數傳入,建構一個futuretask對象;

然後再把futuretask對象作為thread構造參數傳入并開啟這個線程執行去執行業務邏輯;

最後我們調用futuretask對象的get方法得到業務邏輯執行結果。

可以看到跟future使用有關的jdk類主要有futuretask和callable兩個,下面主要對futuretask進行源碼分析。

擴充:還有一種使用future的方式是将callable實作類送出給線程池執行的方式,這裡不再介紹,自行百度即可。

3 futuretask類結構分析

我們先來看下futuretask的類結構:

Java 是如何實作 Future 模式的?萬字詳解!

可以看到futuretask實作了runnablefuture接口,而runnablefuture接口又繼承了future和runnable接口。因為futuretask間接實作了runnable接口,是以可以作為任務被線程thread執行;此外,最重要的一點就是futuretask還間接實作了future接口,是以還可以獲得任務執行的結果。下面我們就來簡單看看這幾個接口的相關api。

runnable沒啥好說的,相信大家都已經很熟悉了。

future接口象征着異步執行任務的結果即執行一個耗時任務完全可以另起一個線程執行,然後此時我們可以去做其他事情,做完其他事情我們再調用future.get()方法擷取結果即可,此時若異步任務還沒結束,此時會一直阻塞等待,直到異步任務執行完擷取到結果。

runnablefuture是future和runnable接口的組合,即這個接口表示又可以被線程異步執行,因為實作了runnable接口,又可以獲得線程異步任務的執行結果,因為實作了future接口。是以解決了runnable異步任務沒有傳回結果的缺陷。接下來我們來看下futuretask,futuretask實作了runnablefuture接口,是以是future和runnable接口的具體實作類,是一個可被取消的異步線程任務,提供了future的基本實作,即異步任務執行後我們能夠擷取到異步任務的執行結果,是我們接下來分析的重中之重。futuretask可以包裝一個callable和runnable對象,此外,futuretask除了可以被線程執行外,還可以被送出給線程池執行。我們先看下futuretask類的api,其中重點方法已經紅框框出。

Java 是如何實作 Future 模式的?萬字詳解!

上圖中futuretask的run方法是被線程異步執行的方法,get方法即是取得異步任務執行結果的方法,還有cancel方法是取消任務執行的方法。接下來我們主要對這三個方法進行重點分析。

思考:

futuretask覆寫的run方法的傳回類型依然是void,表示沒有傳回值,那麼futuretask的get方法又是如何獲得傳回值的呢?

futuretask的cancel方法能真正取消線程異步任務的執行麼?什麼情況下能取消?

因為futuretask異步任務執行結果還跟callable接口有關,是以我們再來看下callable接口:

我們都知道,callable<v>接口和runnable接口都可以被送出給線程池執行,唯一不同的就是callable<v>接口是有傳回結果的,其中的泛型v就是傳回結果,而runnable接口是沒有傳回結果的。

思考:一般情況下,runnable接口實作類才能被送出給線程池執行,為何callable接口實作類也可以被送出給線程池執行?想想線程池的submit方法内部有對callable做适配麼?

4 futuretask源碼分析

我們首先來看下futuretask的成員變量有哪些,了解這些成員變量對後面的源碼分析非常重要。

這裡我們要重點關注下futuretask的callable成員變量,因為futuretask的異步任務最終是委托給callable去實作的。

futuretask的成員變量runner,waiters和state都被volatile修飾,我們可以思考下為什麼這三個成員變量需要被volatile修飾,而其他成員變量又不用呢?volatile關鍵字的作用又是什麼呢?

既然已經定義了成員變量runner,waiters和state了,此時又定義了stateoffset,runneroffset和waitersoffset變量分别對應runner,waiters和state的偏移位址,為何要多此一舉呢?

我們再來看看stateoffset,runneroffset和waitersoffset變量這三個變量的初始化過程:

前面講了futuretask的成員變量,有一個表示狀态的成員變量state我們要重點關注下,state變量表示任務執行的狀态。

可以看到任務狀态變量state有以上7種狀态,0-6分别對應着每一種狀态。任務狀态一開始是new,然後由futuretask的三個方法set,setexception和cancel來設定狀态的變化,其中狀态變化有以下四種情況:

new -> completing -> normal:這個狀态變化表示異步任務的正常結束,其中completing是一個瞬間臨時的過渡狀态,由set方法設定狀态的變化;

new -> completing -> exceptional:這個狀态變化表示異步任務執行過程中抛出異常,由setexception方法設定狀态的變化;

new -> cancelled:這個狀态變化表示被取消,即調用了cancel(false),由cancel方法來設定狀态變化;

new -> interrupting -> interrupted:這個狀态變化表示被中斷,即調用了cancel(true),由cancel方法來設定狀态變化。

futuretask有兩個構造函數,我們分别來看看:

可以看到,這個構造函數在我們前面舉的“打火鍋”的例子代碼中有用到,就是callable成員變量指派,在異步執行任務時再調用callable.call方法執行異步任務邏輯。此外,此時給任務狀态state指派為new,表示任務建立狀态。我們再來看下futuretask的另外一個構造函數:

這個構造函數在執行executors.callable(runnable, result)時是通過擴充卡runnableadapter來将runnable對象runnable轉換成callable對象,然後再分别給callable和state變量指派。注意,這裡我們需要記住的是futuretask建立時,此時的任務狀态state是new就好了。

前面我們有講到futuretask間接實作了runnable接口,覆寫了runnable接口的run方法,是以該覆寫的run方法是送出給線程來執行的,同時,該run方法正是執行異步任務邏輯的方法,那麼,執行完run方法又是如何儲存異步任務執行的結果的呢?我們現在着重來分析下run方法:

可以看到執行異步任務的run方法主要分為以下四步來執行:

判斷線程是否滿足執行異步任務的條件:為了防止多線程并發執行異步任務,這裡需要判斷線程滿不滿足執行異步任務的條件;

若滿足條件,執行異步任務:因為異步任務邏輯封裝在callable.call方法中,此時直接調用callable.call方法執行異步任務,然後傳回執行結果;

根據異步任務的執行情況做不同的處理:1) 若異步任務執行正常結束,此時調用set(result);來設定任務執行結果;2)若異步任務執行抛出異常,此時調用setexception(ex);來設定異常,詳細分析請見4.4.1小節;

異步任務執行完後的善後處理工作:不管異步任務執行成功還是失敗,若其他線程有調用futuretask.cancel(true),此時需要調用handlepossiblecancellationinterrupt方法進行中斷,詳細分析請見4.4.2小節。

這裡值得注意的是判斷線程滿不滿足執行異步任務條件時,runner是否為null是調用unsafe的cas方法compareandswapobject來判斷和設定的,同時compareandswapobject是通過成員變量runner的偏移位址runneroffset來給runner指派的,此外,成員變量runner被修飾為volatile是在多線程的情況下, 一個線程的volatile修飾變量的設值能夠立即刷進主存,是以值便可被其他線程可見。

下面我們來看下當異步任務執行正常結束時,此時會調用set(result);方法:

可以看到當異步任務正常執行結束後,且異步任務沒有被cancel的情況下,此時會做以下事情:将任務執行結果儲存到futuretask的成員變量outcome中的,指派結束後會調用finishcompletion方法來喚醒阻塞的線程(哪裡來的阻塞線程?後面會分析),值得注意的是這裡對應的任務狀态變化是new -> completing -> normal。我們繼續來看下當異步任務執行過程中抛出異常,此時會調用setexception(ex);方法。

可以看到setexception(throwable t)的代碼邏輯跟前面的set(v v)幾乎一樣,不同的是任務執行過程中抛出異常,此時是将異常儲存到futuretask的成員變量outcome中,還有,值得注意的是這裡對應的任務狀态變化是new -> completing -> exceptional。因為異步任務不管正常還是異常結束,此時都會調用futuretask的finishcompletion方法來喚醒喚醒阻塞的線程,這裡阻塞的線程是指我們調用future.get方法時若異步任務還未執行完,此時該線程會阻塞。

finishcompletion方法的作用就是不管異步任務正常還是異常結束,此時都要喚醒且移除線程等待連結清單的等待線程節點,這個連結清單實作的是一個是treiber stack,是以喚醒(移除)的順序是"後進先出"即後面先來的線程先被先喚醒(移除),關于這個線程等待連結清單是如何成鍊的,後面再繼續分析。

在4.4小節分析的run方法裡的最後有一個finally塊,此時若任務狀态state >= interrupting,此時說明有其他線程執行了cancel(true)方法,此時需要讓出cpu執行的時間片段給其他線程執行,我們來看下具體的源碼:

思考:為啥任務狀态是interrupting時,此時就要讓出cpu執行的時間片段呢?還有為什麼要在義務任務執行後才調用handlepossiblecancellationinterrupt方法呢?

可以看到,如果任務狀态state<=completing,說明異步任務正在執行過程中,此時會調用awaitdone方法阻塞等待;當任務執行完後,此時再調用report方法來報告任務結果,此時有三種情況:1)任務正常執行;2)任務執行異常;3)任務被取消。

futuretask.awaitdone方法會阻塞擷取異步任務執行結果的目前線程,直到異步任務執行完成。

futuretask.awaitdone方法主要做的事情總結如下:

首先awaitdone方法裡面是一個死循環;

若擷取結果的目前線程被其他線程中斷,此時移除該線程waitnode連結清單節點,并抛出interruptedexception;

如果任務狀态state>completing,此時傳回任務執行結果;

若任務狀态為completing,此時擷取任務結果的線程需讓出cpu執行時間片段;

若q == null,說明目前線程還未設定到waitnode節點,此時建立waitnode節點并設定其thread屬性為目前線程;

若queued==false,說明目前線程waitnode節點還未加入線程等待連結清單,此時加入該連結清單的頭部;

當timed設定為true時,此時該方法具有逾時功能,關于逾時的邏輯這裡不詳細分析;

目前面6個條件都不滿足時,此時阻塞目前線程。

我們分析到這裡,可以直到執行異步任務隻能有一個線程來執行,而擷取異步任務結果可以多線程來擷取,當異步任務還未執行完時,此時擷取異步任務結果的線程會加入線程等待連結清單中,然後調用調用locksupport.park(this);方法阻塞目前線程。直到異步任務執行完成,此時會調用finishcompletion方法來喚醒并移除線程等待連結清單的每個waitnode節點,這裡這裡喚醒(移除)waitnode節點的線程是從連結清單頭部開始的,前面我們也已經分析過。還有一個特别需要注意的就是awaitdone方法裡面是一個死循環,當一個擷取異步任務的線程進來後可能會多次進入多個條件分支執行不同的業務邏輯,也可能隻進入一個條件分支。下面分别舉兩種可能的情況進行說明:情況1:當擷取異步任務結果的線程進來時,此時異步任務還未執行完即state=new且沒有逾時設定時:

第一次循環:此時q = null,此時進入上面代碼标号【1】的判斷分支,即為目前線程建立一個waitnode節點;

第二次循環:此時queued = false,此時進入上面代碼标号【2】的判斷分支,即将之前建立的waitnode節點加入線程等待連結清單中;

第三次循環:此時進入上面代碼标号【3】的判斷分支,即阻塞目前線程;

第四次循環:加入此時異步任務已經執行完,此時進入上面代碼标号【5】的判斷分支,即傳回異步任務執行結果。

情況2:當擷取異步任務結果的線程進來時,此時異步任務已經執行完即state>completing且沒有逾時設定時,此時直接進入上面代碼标号【5】的判斷分支,即直接傳回異步任務執行結果即可,也不用加入線程等待連結清單了。

在get方法中,當異步任務執行結束後即不管異步任務正常還是異常結束,亦或是被cancel,此時擷取異步任務結果的線程都會被喚醒,是以會繼續執行futuretask.report方法報告異步任務的執行情況,此時可能會傳回結果,也可能會抛出異常。

我們最後再來看下futuretask.cancel方法,我們一看到futuretask.cancel方法,肯定一開始就天真的認為這是一個可以取消異步任務執行的方法,如果我們這樣認為的話,隻能說我們猜對了一半。

以上代碼中,當異步任務狀态state != new時,說明異步任務已經正常執行完或已經異常結束亦或已經被cancel,此時直接傳回false;當異步任務狀态state = new時,此時又根據mayinterruptifrunning參數是否為true分為以下兩種情況:

當mayinterruptifrunning = false時,此時任務狀态state直接被指派為cancelled,此時不會對執行異步任務的線程發出中斷信号,值得注意的是這裡對應的任務狀态變化是new -> cancelled。

當mayinterruptifrunning = true時,此時會對執行異步任務的線程發出中斷信号,值得注意的是這裡對應的任務狀态變化是new -> interrupting -> interrupted。

最後不管mayinterruptifrunning為true還是false,此時都要調用finishcompletion方法喚醒阻塞的擷取異步任務結果的線程并移除線程等待連結清單節點。從futuretask.cancel源碼中我們可以得出答案,該方法并不能真正中斷正在執行異步任務的線程,隻能對執行異步任務的線程發出中斷信号。如果執行異步任務的線程處于sleep、wait或join的狀态中,此時會抛出interruptedexception異常,該線程可以被中斷;此外,如果異步任務需要在while循環執行的話,此時可以結合以下代碼來結束異步任務線程,即執行異步任務的線程被中斷時,此時thread.currentthread().isinterrupted()傳回true,不滿足while循環條件是以退出循環,結束異步任務執行線程,如下代碼:

注意:調用了futuretask.cancel方法,隻要傳回結果是true,假如異步任務線程雖然不能被中斷,即使異步任務線程正常執行完畢,傳回了執行結果,此時調用futuretask.get方法也不能夠擷取異步任務執行結果,此時會抛出cancellationexception異常。請問知道這是為什麼嗎?因為調用了futuretask.cancel方法,隻要傳回結果是true,此時的任務狀态為cancelled或interrupted,同時必然會執行finishcompletion方法,而finishcompletion方法會喚醒擷取異步任務結果的線程等待清單的線程,而擷取異步任務結果的線程喚醒後發現狀态s >= cancelled,此時就會抛出cancellationexception異常了。

5 總結

好了,本篇文章對futuretask的源碼分析就到此結束了,下面我們再總結下futuretask的實作邏輯:

我們實作callable接口,在覆寫的call方法中定義需要執行的業務邏輯;

然後把我們實作的callable接口實作對象傳給futuretask,然後futuretask作為異步任務送出給線程執行;

最重要的是futuretask内部維護了一個狀态state,任何操作(異步任務正常結束與否還是被取消)都是圍繞着這個狀态進行,并随時更新state任務的狀态;

隻能有一個線程執行異步任務,當異步任務執行結束後,此時可能正常結束,異常結束或被取消。

可以多個線程并發擷取異步任務執行結果,當異步任務還未執行完,此時擷取異步任務的線程将加入線程等待清單進行等待;

當異步任務線程執行結束後,此時會喚醒擷取異步任務執行結果的線程,注意喚醒順序是"後進先出"即後面加入的阻塞線程先被喚醒。

當我們調用futuretask.cancel方法時并不能真正停止執行異步任務的線程,隻是發出中斷線程的信号。但是隻要cancel方法傳回true,此時即使異步任務能正常執行完,此時我們調用get方法擷取結果時依然會抛出cancellationexception異常。

擴充:前面我們提到了futuretask的runner,waiters和state都是用volatile關鍵字修飾,說明這三個變量都是多線程共享的對象(成員變量),會被多線程操作,此時用volatile關鍵字修飾是為了一個線程操作volatile屬性變量值後,能夠及時對其他線程可見。此時多線程操作成員變量僅僅用了volatile關鍵字仍然會有線程安全問題的,而此時doug lea老爺子沒有引入任何線程鎖,而是采用了unsafe的cas方法來代替鎖操作,確定線程安全性。

6 分析futuretask源碼,我們能學到什麼?

我們分析源碼的目的是什麼?除了弄懂futuretask的内部實作原理外,我們還要借鑒大佬寫寫架構源碼的各種技巧,隻有這樣,我們才能成長。分析了futuretask源碼,我們可以從中學到:

利用locksupport來實作線程的阻塞\喚醒機制;

利用volatile和unsafe的cas方法來實作線程共享變量的無鎖化操作;

若要編寫逾時異常的邏輯可以參考futuretask的get(long timeout, timeunit unit)的實作邏輯;

多線程擷取某一成員變量結果時若需要等待時的線程等待連結清單的邏輯實作;

某一異步任務在某一時刻隻能由單一線程執行的邏輯實作;

futuretask中的任務狀态state的變化處理的邏輯實作。 以上列舉的幾點都是我們可以學以緻用的地方。

【源碼筆記】github位址:

https://github.com/yuanmabiji/java-sourcecode-blogs

《java工程師面試突擊第三季》加餐部分大綱

Java 是如何實作 Future 模式的?萬字詳解!
Java 是如何實作 Future 模式的?萬字詳解!
Java 是如何實作 Future 模式的?萬字詳解!