磕叨
在公司做項目是見到前輩們寫的一端任務鍊的代碼,大概如下
Runnable task = new TaskA(new TaskB(new TaskC(new taskD())));
task.run();
taskA執行run調用并完成TaskA聲明的任務邏輯之後,内部會自動調用構造參數傳入的TaskB的run方法,過程類似TaskA,TaskB完成之後一樣會調用參數傳入的task,知道最後一個沒有帶下一個task類傳入的任務完成,即完成一個管道式調用。
愛思考的我想,可用,不好用重用,于是動手改改。
準備
經過一段時間開發後,有了一個常用的工具類,友善快速開發,但是這裡用到的東西很少,還是要說明一下,這裡用到一個我稱作ecommon的包,當然我隻用了兩個很基礎的額部分。這兩個部分完全可以用你自己的實作,是非常簡單的。
-
函數接口
jdk8之後很友善讓我們寫出lambda,但是我覺得了解起來不直覺,于是自己重寫了 12 個接口,按參數個數和傳回類型可以直接根據函數名直接選出你要的。具體在 https://github.com/kimffy24/EJoker/tree/master/ejoker-common/src/main/java/pro/jiefzz/ejoker/common/system/functional , IVoid打頭的就是無傳回的,後面的數字就是要帶多少個參數,參數和傳回類型全部都是泛型。1-6個參數已經能包括大部分情況了,需要更多參數的情況完全可以自定義一個上下文傳遞過去。
-
字元串填充類
類似
,但是我不用正則,而是類似slf4j那種,String.format
, 類似這種占位填充。我的實作在 https://github.com/kimffy24/EJoker/blob/dev/ejoker-common/src/main/java/pro/jiefzz/ejoker/common/system/helper/StringHelper.java 的fill方法中。你也可以用log.info("This is a template, keyA={}, keyB={}", "valueA", "valueB")
代替。String.format
思路簡述
我們先明确,jdk8以下的情況不作考慮。
pipeline我更多的印象是來自終端上的應用
pipeline是單向的,上個task的輸出作為下個task的輸入,直到沒有下一個task,最後一個task的作用就應該是你期望的。且
後續任務隻關心前者的輸出結果,對于的他是誰,怎麼做的,是不關心的
。記為
Point1
這個特點是我視為管道與切面或職責鍊模式的差別所在。
首先,我們得有第一推動,讓管道流能有個開始,再就是有中間task,他必定是能接收到上一個任務的輸出的,并且,可能有自帶參數,并且有自己的輸出,最後,有latest的task 與中間task差別在于他不用傳回了,latest一般是以副作用的形式實作我們的企圖的,如上圖的
wc -l
作為最後一個任務是直接把結果列印到螢幕上,而不是傳回一個變量給我們讀取。根據java的強類型屬性,以及剛剛一段的分析,可以得知,有3種類型的任務,開始任務,中間任務,最後任務,并且中間任務的個數是不限的,所有任務至于相鄰的任務有一個管理點,那就是
前者的輸出類型與後者的輸入類型一緻
(網文中大部分說自己實作的pipeline的模式都是傳遞Object類型,到各個子任務中自己強轉到需要的類型的,不說好與不好,但我肯定不喜歡)。這個特性記為
Point2
。
而且,每個子任務,本身是可以帶參數的,這是一個需要支援的點。像上圖指令中的管道,每個子指令(除第一個)都是同時接受前一個指令的輸出作為輸入,且自帶參數的。但是java在這裡其實并不靈活,是以我們約定
後續任務的第一個參數就是前一個任務的輸入
, 這個約定是直接影響到我們的代碼實作的。這個特性記為
Point3
。
另外,管道的入口唯一的,一定是從開始任務往後流的。如果入口不一樣,那麼就是像個不同的管道,他們的意圖以及輸入輸出的期望都是不同的。這個特性記為
Point4
。
最後,在java中使用,我肯定不能像終端那種,錯了重敲指令就是了,是以需要異常控制以及做一些相鄰任務承上啟下的時刻做點什麼,例如日子列印,斷言等。這個算附加題。
提起鍵盤撸
(因為我已經寫完并測試完了,是以我就反過來解析我是怎麼想的了)
這裡以Runnable接口作為基礎接口。給出其中一個測試的例子
這裡初始任務是給出一個日期,中間任務是拼接成人類友好的1句話,最終任務是直接列印到螢幕。(現實中要實作這樣一句話,當然是直接撸啦。這裡隻是為了示範),看看Pipeline初始任務的定義
先不看其他屬性,看構造方法,傳入一個
IFunction<R>
,按照準備一節的定義,他是一個傳回類型為聲明泛型R,且無參數輸入的閉包函數(或稱作lambda表達式)。對照上面PipelineTest中就是那個
() -> { return new Date(); }
, (得益于jdk8的類型推斷,在
new Pipeline<>
構造時,不用再聲明其泛型,編譯器能根據閉包函數的return類型推斷出這裡是個Date類型)。
next
,
end
是指明管道的下接任務,這可以看出管道是極其類似于任務鍊/職責鍊的(需要注意
next
和
end
同時
隻能有個一個存在
)。hook是異常管理以及任務間承接時做一個切面方法的,argCxt是記下傳遞參數,友善hook中的方法使用(這個是因為java需要的,跟管道模式并沒有關系)。
再看add方法的一個重載,添加并傳回中間task
add方法傳入一個
IFunction1<RT, R>
的閉包(lambda),尾數為1,意味着接受一個
R
類型的輸入,并在方法升聲明了
RT
,以
RT
類型作為輸出。其中
R
的泛型聲明在類上,就是與構造方法的
R
是同一個類型。而
RT
的具體類型的推斷會根據具體的lambda的傳回類型決定。這裡add方法會傳回剛剛構造出來的中間任務的聲明對象。add方法需要保證目前任務是沒被聲明過後續任務的。
再看MiddlePipeline類的定義
先看構造方法,他就是接受add方法傳入的閉包。他聲明了兩個泛型變量 分别是
<C, P>
,其中
C
代表他的輸入類型,
P
代表的他的産出類型。同初始任務一樣,他也有
next
和
end
指明他的管道後接任務(
next
)。可以注意到這裡的
next
和初始任務的屬性
next
的産出類型都是被放上了泛型通配符
?
,是因為任務并沒辦法知道他的子任務的産出類型的(後面會再說一下這個問題)。
再看add方法的一個另一個重載,添加并傳回最終task
類似傳回中間态的task,隻不過這裡用了無傳回的閉包。
再看EndPipeline類的定義
最終task的定義清爽很多,他隻關心輸入,并執行。并且他沒有後續任務。
再來補充下AbstractPipeline的解析
這個寫法是為了實作
Point4
所描述的事的,隻要是同一個pipeline上的task所有入口都是初始任務上的那個
run
方法。(為了省事實作,後續任務的基類和所有派生類都是初始任務的非靜态内部類)
再看看初版版本run方法
邏輯很簡單,執行初始任務,得到結果,然後找後續任務,把結果作為輸入來執行後續任務,(其中循環時滿足上一個輸出作為下一個輸入),直到有一個管道類的中間态任務為
null
,然後判斷最終任務是否為
null
,非空則執行它。
需要說明一下這裡用
@SuppressWarnings
壓制了警告,是因為确信java編譯器能確定聯系兩個add進來的task之間的輸入輸出的類型關系是一緻的(這一點,如果不一緻,在編寫代碼時IDE就會報錯了)。
到此,一個簡單的java實作的pipeline模式基本可以用,跑最開始那個demoTest是沒有問題了。
再給一個樣例demo
管道中的3個方法的職責就如他的名字那樣(實作上我這裡隻是簡單的new一下),然後同過Pipeline類以及它的add方法串起來,執行結果如紅色部分。聰明的人肯定能想到,那麼像那個java的stream的?嗯很像,stream是類似把元素放到單個跑到上,按照定義那樣的自己跑到終點(這也是使用方代碼友善地切換到并行流的原因,因為邏輯一緻,當然,并發問題是另一個層面的問題)。
而pipeline則橫向的一階段一階段地執行,如果要增加吞吐量怎麼搞?聰明的你肯定能想到分片了,這樣走下去就跟parallelstream的意圖不謀而合了。那麼還有别的好處嗎?嗯,你想想Mock測試?職責上有沒有讓你更好切分了(正如這裡指令的方法名那這樣)?
進一步完善
上面一節基本上能把
Point1
Point2
的一半
Point3
Point4
實作了,剩下
Point2
中說到的,除了接收前一個任務的輸入,還允許管道聲明時傳入參數的這個功能,以及那個附加題說到的java應用上的妥協。
pipeline聲明上附帶參數
這個時候就要好好用到
準備
一節中的那些
函數接口
了。說起來并不好解析,但是如果你了解過curry柯裡化這個概念的話,那一看圖你就懂了,看圖。
就是把帶參數的lambda重新包裝一次為不帶參數的lambda表達式。後面middlePipeline的帶參數部分則是重新封裝為一個隻接受一個參數且傳回類型相同的lambda表達式,這是類似的。
來一個測試看看,并附上圖中說明
對java友好支援
附加題說的這個就跟簡單了,找個地方分别設定好兩個玩意,在對應的地方執行他們就是了
public class PipelineHook {
private boolean preventThrow = false;
// 異常發生時執行此表達式
public final IVoidFunction3<PipelineHook, Exception, Object[]> exceptionHandler;
// 調用後續任務錢執行此lambda表達式
public final IVoidFunction1<Object> aspecter;
public PipelineHook(IVoidFunction3<PipelineHook, Exception, Object[]> exceptionHandler, IVoidFunction1<Object> aspecter) {
this.exceptionHandler = exceptionHandler;
this.aspecter = aspecter;
}
public PipelineHook(IVoidFunction1<Object> aspecter) {
this(null, aspecter);
}
public PipelineHook(IVoidFunction3<PipelineHook, Exception, Object[]> exceptionHandler) {
this(exceptionHandler, null);
}
// 是否阻止異常抛出
public boolean isPreventThrow() {
return preventThrow;
}
// 設定标記阻止異常抛出
public void setPreventThrow() {
this.preventThrow = true;
}
}
通過兩個lambda變量構造出hook對象,并通過初始任務的的
addPipelineHook
放set進去,他們具體再
run
方法發回自己的作用,現在,run方法更新為
其中
getCxtInfo
方法會把目前子任務的參數轉化是字元串,讓異常資訊能夠被人讀懂。
今天先到這裡了,整體下來,覺得跟stream太像了,我發現用stream碼起來特爽,讀起來特慘(特别是讀别人的多重stream的時候),而這個pipeline正好相反耶。總的來說,就是個模式,需要提高吞吐量的話,使用配置設定配合線程池的話,吞吐量會得到巨量提升哦(把每個配置設定的大小設定為1不就是我們的parallelStream嗎?哈哈)。
issue在: https://github.com/kimffy24/EJoker/issues/30
初次送出: https://github.com/kimffy24/EJoker/commit/c71e5d76a0904249b7c1399bd8ba52ec72fe9a0e