天天看點

從 RxJS 到 Flink:如何處理資料流?

從 RxJS 到 Flink:如何處理資料流?
導讀:前端開發的本質是什麼?響應式程式設計相對于 MVVM 或者 Redux 有什麼優點?響應式程式設計的思想是否可以應用到後端開發中?本文以一個新聞網站為例,闡述在前端開發中如何使用響應式程式設計思想;再以計算電商平台雙11每小時成交額為例,分享同樣的思想在實時計算中的相同與不同之處。

一 、前端開發在開發什麼

大家在前端開發的過程中,可能會想過這樣一個問題:前端開發究竟是在開發什麼?在我看來,前端開發的本質是讓網頁視圖能夠正确地響應相關事件。在這句話中有三個關鍵字:"網頁視圖","正确地響應"和"相關事件"。

"相關事件"可能包括頁面點選,滑鼠滑動,定時器,服務端請求等等,"正确地響應"意味着我們要根據相關的事件來修改一些狀态,而"網頁視圖"就是我們前端開發中最熟悉的部分了。

按照這樣的觀點我們可以給出這樣 視圖 = 響應函數(事件) 的公式:

View = reactionFn(Event)

在前端開發中,需要被處理事件可以歸類為以下三種:

● 使用者執行頁面動作,例如 click, mousemove 等事件。

● 遠端服務端與本地的資料互動,例如 fetch, websocket。

● 本地的異步事件,例如 setTimeout, setInterval async_event。

從 RxJS 到 Flink:如何處理資料流?

這樣我們的公式就可以進一步推導為:

View = reactionFn(UserEvent | Timer | Remote API)

二、應用中的邏輯處理

為了能夠更進一步了解這個公式與前端開發的關系,我們以新聞網站舉例,該網站有以下三個要求:

● 單擊重新整理:單擊 Button 重新整理資料。

● 勾選重新整理:勾選 Checkbox 時自動重新整理,否則停止自動重新整理。

● 下拉重新整理:當使用者從螢幕頂端下拉時重新整理資料。

如果從前端的角度分析,這三種需求分别對應着:

● 單擊重新整理:click -> fetch

● 勾選重新整理:change -> (setInterval + clearInterval) -> fetch

● 下拉重新整理:(touchstart + touchmove + touchend) -> fetch news_app

從 RxJS 到 Flink:如何處理資料流?

1.MVVM

在 MVVM 的模式下,對應上文的響應函數(reactionFn)會在 Model 與 ViewModel 或者 View 與 ViewModel 之間進行被執行,而事件 (Event) 會在 View 與 ViewModel 之間進行處理。

從 RxJS 到 Flink:如何處理資料流?

MVVM 可以很好的抽象視圖層與資料層,但是響應函數(reactionFn)會散落在不同的轉換過程中,這會導緻資料的指派與收集過程難以進行精确追蹤。另外因為事件 (Event) 的處理在該模型中與視圖部分緊密相關,導緻 View 與 ViewModel 之間對事件處理的邏輯複用困難。

2.Redux

在 Redux 最簡單的模型下,若幹個事件 (Event) 的組合會對應到一個 Action 上,而 reducer 函數可以被直接認為與上文提到的響應函數 (reactionFn) 對應。

從 RxJS 到 Flink:如何處理資料流?

但是在 Redux 中:

● State 隻能用于描述中間狀态,而不能描述中間過程。

● Action 與 Event 的關系并非一一對應導緻 State 難以追蹤實際變化來源。

3.響應式程式設計與 RxJS

維基百科中是這樣定義響應式程式設計:

在計算中,響應式程式設計或反應式程式設計(英語:Reactive programming)是一種面向資料流和變化傳播的聲明式程式設計範式。這意味着可以在程式設計語言中很友善地表達靜态或動态的資料流,而相關的計算模型會自動将變化的值通過資料流進行傳播。

以資料流次元重新考慮使用者使用該應用的流程:

● 點選按鈕 -> 觸發重新整理事件 -> 發送請求 -> 更新視圖

● 勾選自動重新整理

● 手指觸摸螢幕

● 自動重新整理間隔 -> 觸發重新整理事件 -> 發送請求 -> 更新視圖

● 手指在螢幕上下滑

● 手指在螢幕上停止滑動 -> 觸發下拉重新整理事件 -> 發送請求 -> 更新視圖

● 關閉自動重新整理

以 Marbles 圖表示:

從 RxJS 到 Flink:如何處理資料流?

拆分上圖邏輯,就會得到使用響應式程式設計開發目前新聞應用時的三個步驟:

● 定義源資料流

● 組合/轉換資料流

● 消費資料流并更新視圖

我們分别來進行較長的描述。

定義源資料流

使用 RxJS,我們可以很友善的定義出各種 Event 資料流。

1)單擊操作

涉及 click 資料流。

click$ = fromEvent(document.querySelector('button'), 'click');
```  

2)勾選操作

涉及 change 資料流。

```
change$ = fromEvent(document.querySelector('input'), 'change');           

3)下拉操作

涉及 touchstart, touchmove 與 touchend 三個資料流。

touchstart$ = fromEvent(document, 'touchstart');
touchend$ = fromEvent<TouchEvent>(document, 'touchend');
touchmove$ = fromEvent(document, 'touchmove');
```  

4)定時重新整理

```
interval$ = interval(5000);           

5)服務端請求

fetch$ = fromFetch('https://randomapi.azurewebsites.net/api/users');
```  

**組合/轉換資料流**

1)點選重新整理事件流

在點選重新整理時,我們希望短時間内多次點選隻觸發最後一次,這通過 RxJS 的 debounceTime operator 就可以實作。

![image.png](https://ucc.alicdn.com/pic/developer-ecology/5b0aa59c9ec54d8287c84e4f2136b48f.png)

```
clickRefresh$ = this.click$.pipe(debounceTime(300));
```  

2)自動重新整理流

使用 RxJS 的 switchMap 與之前定義好的 interval$ 資料流配合。

![image.png](https://ucc.alicdn.com/pic/developer-ecology/fabe03a9010f4304b7501e32d1448b81.png)
           

autoRefresh$ = change$.pipe(

switchMap(enabled => (enabled ? interval$ : EMPTY))

);

```

3)下拉重新整理流

結合之前定義好的 touchstart$touchmove$ 與 touchend$ 資料流。

從 RxJS 到 Flink:如何處理資料流?
pullRefresh$ = touchstart$.pipe(
  switchMap(touchStartEvent =>
    touchmove$.pipe(
      map(touchMoveEvent => touchMoveEvent.touches[0].pageY - touchStartEvent.touches[0].pageY),
      takeUntil(touchend$)
    )
  ),
  filter(position => position >= 300),
  take(1),
  repeat()
);           

最後,我們通過 merge 函數将定義好的 clickRefresh$autoRefresh$ 與 pullRefresh$ 合并,就得到了重新整理資料流。

![image.png](https://ucc.alicdn.com/pic/developer-ecology/3288b228db534840bca56d00788d90b3.png)

```

refresh$ = merge(clickRefresh$, autoRefresh$, pullRefresh$));

**消費資料流并更新視圖**

将重新整理資料流直接通過 switchMap 打平到在第一步到定義好的 fetch$,我們就獲得了視圖資料流。

從 RxJS 到 Flink:如何處理資料流?

可以通過在 Angular 架構中可以直接 async pipe 将視圖流直接映射為視圖:

<div *ngFor="let user of view$ | async">
</div>           

在其他架構中可以通過 subscribe 獲得資料流中的真實資料,再更新視圖。

至此,我們就使用響應式程式設計完整的開發完成了目前新聞應用,示例代碼[1]由 Angular 開發,行數不超過 160 行。

我們總結一下,使用響應式程式設計思想開發前端應用時經曆的三個過程與第一節中公式的對應關系:

1)描述源資料流

與事件UserEvent | Timer | Remote API 對應,在 RxJS 中對應函數分别是:

● UserEvent: fromEvent

● Timer: interval, timer

● Remote API: fromFetch, webSocket

2)組合轉換資料流

與響應函數(reactionFn)對應,在 RxJS 中對應的部分方法是:

● COMBINING: merge, combineLatest, zip

● MAPPING: map

● FILTERING: filter

● REDUCING: reduce, max, count, scan

● TAKING: take, takeWhile

● SKIPPING: skip, skipWhile, takeLast, last

● TIME: delay, debounceTime, throttleTime

3)消費資料流更新視圖

與 View 對應,在 RxJS 及 Angular 中可以使用:

● subscribe

● async pipe

響應式程式設計相對于 MVVM 或者 Redux 有什麼優點呢?

● 描述事件發生的本身,而非計算過程或者中間狀态。

● 提供了組合和轉換資料流的方法,這也意味着我們獲得了複用持續變化資料的方法。

● 由于所有資料流均由層層組合與轉換獲得,這也就意味着我們可以精确追蹤事件及資料變化的來源。

如果我們将 RxJS 的 Marbles 圖的時間軸模糊,并在每次視圖更新時增加縱切面,我們就會發現這樣兩件有趣的事情:

從 RxJS 到 Flink:如何處理資料流?

● Action 是 EventStream 的簡化。

● State 是 Stream 在某個時刻的對應。

難怪我們可以在 Redux 官網中有這樣一句話:如果你已經使用了 RxJS,很可能你不再需要 Redux 了。

The question is: do you really need Redux if you already use Rx? Maybe not. It's not hard to re-implement Redux in Rx. Some say it's a two-liner using Rx.scan() method. It may very well be!

寫到這裡,我們對網頁視圖能夠正确地響應相關事件這句話是否可以進行進一步的抽象呢?

所有事件 -- 找到 --> 相關事件 -- 做出 --> 響應

而按時間順序發生的事件,本質上就是資料流,進一步拓展就可變成:

源資料流 -- 轉換 --> 中間資料流 -- 訂閱 --> 消費資料流

這正是響應式程式設計在前端能夠完美工作的基礎思想。但是該思想是否隻在前端開發中有所應用呢?

答案是否定的,該思想不僅可以應用于前端開發,在後端開發乃至實時計算中都有着廣泛的應用。

三、打破資訊之牆

在前後端開發者之間,通常由一面叫 REST API 的資訊之牆隔開,REST API 隔離了前後端開發者的職責,提升了開發效率。但它同樣讓前後端開發者的眼界被這面牆隔開,讓我們試着來推倒這面資訊之牆,一窺同樣的思想在實時計算中的應用。

1.實時計算 與 Apache Flink

在開始下一部分之前,讓我們先介紹一下 Flink。Apache Flink 是由 Apache 軟體基金會開發的開源流處理架構,用于在無邊界和有邊界資料流上進行有狀态的計算。它的資料流程式設計模型在有限和無限資料集上提供單次事件(event-at-a-time)處理能力。

從 RxJS 到 Flink:如何處理資料流?

在實際的應用中,Flink 通常用于開發以下三種應用:

● 事件驅動型應用 事件驅動型應用從一個或多個事件流提取資料,并根據到來的事件觸發計算、狀态更新或其他外部動作。場景包括基于規則的報警,異常檢測,反欺詐等等。

● 資料分析應用 資料分析任務需要從原始資料中提取有價值的資訊和名額。例如雙十一成交額計算,網絡品質監測等等。

● 資料管道(ETL)應用 提取-轉換-加載(ETL)是一種在存儲系統之間進行資料轉換和遷移的常用方法。ETL 作業通常會周期性地觸發,将資料從事務型資料庫拷貝到分析型資料庫或資料倉庫。

我們這裡以計算電商平台雙十一每小時成交額為例,看下我們在之前章節得到方案是否仍然可以繼續使用。

在這個場景中我們首先要擷取使用者購物下單資料,随後計算每小時成交資料,然後将每小時的成交資料轉存到資料庫并被 Redis 緩存,最終通過接口擷取後展示在頁面中。

在這個鍊路中的資料流處理邏輯為:

使用者下單資料流 -- 轉換 --> 每小時成交資料流 -- 訂閱 --> 寫入資料庫

與之前章節中介紹的:

思想完全一緻。

如果我們用 Marbles 描述這個過程,就會得到這樣的結果,看起來很簡單,似乎使用 RxJS 的 window operator 也可以完成同樣的功能,但是事實真的如此嗎?

從 RxJS 到 Flink:如何處理資料流?

2.被隐藏的複雜度

真實的實時計算比前端中響應式程式設計的複雜度要高很多,我們在這裡舉幾個例子:

事件亂序

在前端開發過程中,我們也會碰到事件亂序的情況,最經典的情況先發起的請求後收到響應,可以用如下的 Marbles 圖表示。這種情況在前端有很多種辦法進行處理,我們在這裡就略過不講。

從 RxJS 到 Flink:如何處理資料流?

我們今天想介紹的是資料處理時面臨的時間亂序情況。在前端開發中,我們有一個很重要的前提,這個前提大幅度降低了開發前端應用的複雜度,那就是:前端事件的發生時間和處理時間相同。

從 RxJS 到 Flink:如何處理資料流?

想象一下,如果使用者執行頁面動作,例如 click, mousemove 等事件都變成了異步事件,并且響應時間未知,那整個前端的開發複雜度會如何。

但是事件的發生時間與處理時間不同,在實時計算領域是一個重要的前提。我們仍以每小時成交額計算為例,當原始資料流經過層層傳輸之後,在計算節點的資料的先後順很可能已經亂序了。

從 RxJS 到 Flink:如何處理資料流?

如果我們仍然以資料的到來時間來進行視窗劃分,最後的計算結果就會産生錯誤:

從 RxJS 到 Flink:如何處理資料流?

為了讓 window2 的視窗的計算結果正确,我們需要等待 late event 到來之後進行計算,但是這樣我們就面臨了一個兩難問題:

● 無限等下去:late event 可能在傳輸過程中丢失,window2 視窗永遠沒有資料産出。

● 等待時間太短:late event 還沒有到來,計算結果錯誤。

Flink 引入了 Watermark 機制來解決這個問題,Watermark 定義了什麼時候不再等待 late event,本質上提供了實時計算的準确性和實時性的折中方案。

關于 Watermark 有個形象的比喻:上學的時候,老師會将班級的門關上,然後說:“從這個點之後來的同學都算遲到了,統統罰站“。在 Flink 中,Watermark 充當了老師關門的這個動作。

從 RxJS 到 Flink:如何處理資料流?

資料反壓

在浏覽器中使用 RxJS 時,不知道大家有沒有考慮這樣一種情況:observable 産生的速度快于 operator 或者 observer 消費的速度時,會産生大量的未消費的資料被緩存在記憶體中。這種情況被稱為反壓,幸運的是,在前端産生資料反壓隻會導緻浏覽器記憶體被大量占用,除此之外不會有更嚴重的後果。

但是在實時計算中,當資料産生的速度高于中間節點處理能力,或者超過了下遊資料的消費能力時,應當如何處理?

從 RxJS 到 Flink:如何處理資料流?

對于許多流應用程式來說,資料丢失是不可接受的,為了保證這一點,Flink 設計了這樣一種機制:

● 在理想情況,在一個持久通道中緩沖資料。

● 當資料産生的速度高于中間節點處理能力,或者超過了下遊資料的消費能力時,速度較慢的接收器會在隊列的緩沖作用耗盡後立即降低發送器的速度。更形象的比喻是,在資料流流速變慢時,将整個管道從水槽“回壓”到水源,并對水源進行節流,以便将速度調整到最慢的部分,進而達到穩定狀态。

從 RxJS 到 Flink:如何處理資料流?

Checkpoint

實時計算領域,每秒鐘處理的資料可能有數十億條,這些資料的處理不可能由單台機器獨立完成。事實上,在 Flink 中,operator 運算邏輯會由不同的 subtask 在 不同的 taskmanager 上執行,這時我們就面臨了另外一個問題,當某台機器發生問題時,整體的運算邏輯與狀态該如何處理才能保證最後運算結果的正确性?

從 RxJS 到 Flink:如何處理資料流?

Flink 中引入了 checkpoint 機制用于保證可以對作業的狀态和計算位置進行恢複,checkpoint 使 Flink 的狀态具有良好的容錯性。Flink 使用了 Chandy-Lamport algorithm 算法的一種變體,稱為異步 barrier 快照(asynchronous barrier snapshotting)。

當開始 checkpoint 時,它會讓所有 sources 記錄它們的偏移量,并将編号的 checkpoint barriers 插入到它們的流中。這些 barriers 會經過每個 operator 時标注每個 checkpoint 前後的流部分。

從 RxJS 到 Flink:如何處理資料流?
從 RxJS 到 Flink:如何處理資料流?

當發生錯誤時,Flink 可以根據 checkpoint 存儲的 state 進行狀态恢複,保證最終結果的正确性。

冰山一角

由于篇幅的關系,今天介紹的部分隻能是冰山一角,不過:

的模型無論在響應式程式設計還是實時計算都是通用的,希望這篇文章能夠讓大家對資料流的思想有更多的思考。

相關連結:

[1]

https://github.com/vthinkxie/ng-pull-refresh
從 RxJS 到 Flink:如何處理資料流?