天天看點

你不容錯過的響應式程式設計介紹

呼,翻譯完後又花了些時間重新校對了一遍,删掉了原文作者一些比較“矯情”的地方,也修改了一些段落,目的是為了讓全文讀起來更加通俗易懂。以前也做過些有趣的翻譯,比如翻譯Morphia的API文檔。一來是為了鍛煉一下自己閱讀英國文檔的能力,二來是覺得響應式程式設計非常難懂,特别是它的思維。它解決問題的思路不是直接的解決,而是通過描述,讓問題在描述過程得到解決。如果你正在學習“響應式程式設計”、“函數式程式設計”或“異步程式設計”,那希望本文可以給你些啟示。

你不容錯過的響應式程式設計介紹

你或許會對學習“響應式程式設計”(函數化反應程式設計,FRP)非常感興趣。

但是學習FRP非常困難,主要是因為缺少好的學習資料。當我嘗試學習的時候,也努力的去找了很多教程。結果隻找到少量的實用性指導,而且他們大多都過于簡單的描述了一些概念,并沒有描述如何用Rx建構一個完整的應用。說實在的,當你嘗試去了解一些函數的時候,FRP的API文檔幾乎起不了什麼作用,譬如:

Rx.Observable.prototype.flatMapLatest(selector, [thisArg])Projects each element of an observable sequence into a new sequence of observable sequences by incorporating the element’s index and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.

我的天哪!

我曾經讀過兩本書,其中一本大篇幅介紹了FRP是什麼,另外一本隻是教我們怎麼使用FRP的API。在Futurice工作期間,我需要在項目中使用FRP。幸運的是,當我陷入問題的時候,同僚們給予了很大的幫助。最終通過使用FRP,我算是搞懂了它。哎,這真是一個艱難的方法。

學習FRP最艱難的地方是“FRP的思維(Thinking in FRP)”。這需要你在寫代碼的時候,放棄習慣的程式設計方法,用新的思維去思考。我沒在網上找到這方面的資料,但我覺得應該要有一個實用性的教程來說明“用FRP來思考”并幫助你解決問題。一旦在思想層面有了覺悟,FRP的API就可以為你解決以後的問題。我希望這篇文章可以幫到你。

什麼是FRP?

網上有大量糟糕的解析和定義:

  • 一如既往的抽象和理論化的Wikipedia
  • Stackoverflow權威的答案并不适合于初學者
  • Reactive Manifesto是售前拿去吹水的東西
  • 微軟的Rx terminology(Rx 公式化:Rx = Observables + LINQ + Schedulers)把我們丢在風中繼續淩亂。

說實在的,相比起你熟悉的“MV*”和程式設計語言,諸如“響應式”和“傳播改變”這類術語很具體傳達它們之間有什麼不同。當然我用的架構是“響應式”的,我說的“改變”是會傳播的。不然的話,也沒法說下去了。

好吧,讓我們開始正題吧。

FRP是異步資料流程式設計。

這不是什麼新鮮的東西了。在前端程式設計中(用Javascript),監聽某個按鈕的點選事件,并在事件被觸發以後回調一個函數做一些操作,這個過程就是異步資料流程式設計,也就是FRP。FRP的靈感來源于細胞的激素刺激,你可以回想一下國中生物學的“生物應激”。我們可以為任何東西建立資料流(Stream),不僅僅局限于click和hover事件。Stream是随處可見的,任何東西都可以成為Stream:變量、使用者的輸入、屬性、緩存、資料結構等等。舉個例子,微網誌的推薦(推薦好友,推薦好新聞)就是一種和click事件一樣的Stream,你可以監聽它的裡面(推薦事件)并按需作出響應。

單一的一個Stream可以用來作為另一個Stream的輸入,甚至多個Stream也可以輸入給某一個Stream。假設現在你有一個超牛逼的工具集,包含了很多功能可以讓你合并、建立和過濾Stream。那麼你就可以merge(合并)多個Stream,對Stream做你隻感興趣的事件的filter(過濾),也可以把一個Streammap(映射)為另外一個新的Stream。

既然Stream是FRP的核心,那我們就從最熟悉的“click a button”來仔細的了解Stream。

你不容錯過的響應式程式設計介紹

從上面可以看出,Stream是一個不間斷的按照時間順序排列的event序列。它可以發出三樣信号:值(value,對應于某些類型)、錯誤(error)和完成(completed)。隻有當按鈕所在的視窗或者視窗被關閉的時候,才會發出“完成”信号。

既然知道Stream會發出(emit)三種信号,那麼我們就可以為其定義三個對應的執行函數異步的捕捉并處理這三種信号量。有時候,對于從Stream裡面發出的error和completed,你可以按照需要選擇捕捉處理或不捕捉處理。對Stream的“監聽”叫做“訂閱(subscribe)”,這些執行函數就是“觀察者(observeers)”,Stream就是被觀察的“主體(subject)”或者“可觀察序列(observable)”。這正是觀察者模式的設計展現。

在本教程的某些地方,我會用ASCII圖來表示Stream,就叫做StreamGraph吧:

--a---b-c---d---X---|->

a, b, c, d :事件對應的類型或者值
X :錯誤
| :完成
---> :時間軸
           

下面一起來做些有趣的嘗試:把“Stream(origin click event)”轉換為“Stream(counter click event)”。

使用FRP的時候,Stream會被拓展了很多方法,如

map

filter

scan

等。當調用其中一個方法時,如

clickStream.map(f)

,它将傳回一個“new Stream”。這意味着“origin Stream”沒有發生改變。在FRP裡,Stream具備恒定(immutability)的特性,說白了Stream是一個發生後即不可變的序列。是以

clickStream.map(f).scan(g)

這樣鍊式調用可以在Stream上操作。

clickStream:   ---c----c--c----c------c-->
               vvvvv map(c becomes 1) vvvv
               ---1----1--1----1------1-->
               vvvvvvvvv scan(+) vvvvvvvvv
counterStream: ---1----2--3----4------5-->
           

上面的例子,

map(f)

方法用傳入的

f

函數替代每個發出的信号量,把每次的點選事件映射為數值1。

map

産生的流會被

scan(g)

方法掃描并用

g(accumulated, current)

來聚合,在例子中就是簡單地相加。最後的

countStream

統計了每次點選時,目前一共産生了多少次點選事件。

為了展現FRP的強大,假設現在需要一個double-click(輕按兩下)Stream(短時間内兩次或三次的單擊認為是輕按兩下)。深呼吸并回想你是怎麼用傳統的方式來實作的。我敢打賭一定讓人非常抓狂,因為會需要大量的變量去描述每個階段的狀态,還會用到定時處理。

但用FRP會讓事情變得非常簡單。事實上,僅需要4行邏輯代碼。不過,我們先忽略代碼來看個圖:

你不容錯過的響應式程式設計介紹

不用需要了解底層是如何實作的,隻管用就是了。灰色框裡面的函數把一個Stram轉換為另外一個Stream。先用

buffer(stream.throttle(250ms))

判定出那些單擊可以歸為一次輕按兩下,進而獲得一個新的歸并後的單擊Stream。然後用

map()

做事件數量統計,獲得一個新的含有每個歸并單元中事件數量的Stream。最後用

filter(x >= 2)

來忽略數量為

1

的歸并單元。就這樣,通過3步操作獲得所需要的Stream。現在可以按照需要,通過subscribe對輕按兩下Stream進行監聽并做出響應。

我希望你會享受這種程式設計方式。這個例子僅僅是冰山一角:你可以用FRP實作的類庫如“Rx*”來做更多。

為什麼要用FRP?

FRP提高了抽象的層次,是以你可以專注于業務邏輯裡面事件間的互相依賴,而不需要關心一大堆實作的細節。用FRP将會使代碼變得更加簡潔。

FRP的優勢在富“UI事件和資料事件互動”的現代Web、移動應用得到了證明。10年前,Web頁面的互動基本上就是送出一個大表單到後端,然後在前端執行簡單的渲染。應用逐漸變得更加實時:修改單個表單域能夠自動觸發儲存到背景,内容會根據個人的“喜好”比對到相關使用者等等。

現今的應用需要通過豐富多樣的實時事件來提供高水準的使用者體驗,而FRP可以很好解決。

執行個體講解FRP

讓我們來點幹貨。通過一個真實的例子一步步的了解FRP。在教程的最後會給出所有的代碼,同時了解每行代碼在做什麼。

我用Javascript和RxJS作為工具,那是因為:Javascript是目前比較熟悉的語言,同時Rx*庫系列提供了對多種語言和平台的(.NET,Java,Scala,Clojure,JavaScript,Ruby,Python,C++,Objective-C/Cocoa,Groovy等等)支援。是以無論你用什麼工具,都可以按照本教程享受FRP的好處。

實作“推薦關注”

在微網誌裡,有個專門推薦新使用者給你關注的面闆:

你不容錯過的響應式程式設計介紹

這個“推薦關注”的實作包含以下這些核心特點:

  • 啟動的時候,從API中加載并顯示3條其他賬戶資訊。
  • 點選“Refresh”按鈕時,重新加載3條其他賬戶資訊。
  • 點選每條賬戶資訊的“x”按鈕時,清掉目前這條賬戶,并顯示另外一條。

微網誌裡面對未通過認證的開發着不公開這個“推薦關注”的API,是以我們用Github的API來實作。

請求和響應

你會怎麼用FRP來解決這個問題呢?嗯,開始的時候(幾乎)把任何東西都流化。這幾乎成了FRP的魔咒。我們從最簡單的功能開始:“啟動的時候,從API中加載并顯示3條其他賬戶資訊。”。這裡沒有任何問題,就是簡單的(1)發送一個請求,(2)接收一個響應,(3)渲染這個響應。是以,我們用Stream來代表(1)的請求。起初這感覺像殺雞用牛刀,但是我們需要從基本的東西開始,對吧?

啟動的時候隻需要發送一次請求,那麼對應的StreamModel(流模型)是一個隻含有一個值的Stream。最後會發現啟動的時候會有很多請求,但目前隻有一個:

--a------|->

a :'https://api.github.com/users'這個字元串
           

這是一個請求位址(URL)Stream。這條Stream告訴了我們兩件事情:“什麼時候”和“是什麼”(When & What)。“什麼時候”意味着當産生事件(emit event)時要發送請求,而“是什麼”說明了産生的事件(emitted event)是一串請求位址字元。

用“Rx*”建立Stream是非常簡單的。Stream的術語是“Observable(可觀察序列)”,這說明它是可以被其他人觀察的。但我發現這真是個愚蠢的名詞,是以我更喜歡稱它為“Stream(流)”。

這裡現在有一個隻含一個字元串事件的Stream,但是沒有任何操作,是以我們需要添加一個處理即将到來的字元串事件的函數。下面給

requestStream

加上

subscribing

requestStream.subscribe(function(requestUrl){
	// 發送請求
	jQuery.getJSON(requestUrl, function(responseData){
		// ...
	});
});
           

注意上面我們用了jQuery的Ajax回調處理響應的結果。( ⊙ o ⊙ )!,等等,FRP不是擅長于處理異步資料流嗎?能不能把jQuery的結果交給RxJS處理?感覺好像沒什麼問題,我們來試試:

requestStream.subscribe(function(requestUrl){
	// 響應也是個流
	var responseStream = Rx.Observable.create(function(observer){
		jQuery.getJSON(requestUrl)
    // 當jQuery成功調用以後,就把結果交給RxJS處理
		.done(function(response){ observer.onNext(response); })
    // 當jQuery失敗時,把失敗交給RxJS處理
		.fail(function(jqXHR, status, error){ observer.onError(error); })
    // 當jQuery完成時,告知RxJS調用完成的處理
		.always(function(){ observer.onCompleted(); })
	});

	responseStream.subscribe(function(response){
		// 為響應做處理
	});
});
           

因為我們需要發送一個Ajax請求,是以我們就用jQuery包裝一下我們的RxJS。上面看起來非常直覺,而

Rx.Observable.create()

通過傳入一個包含observer參數的函數,會傳回一個自定義的Stream。當Stream産生任何事件的時候,都會調用這個傳入的方法,并傳入目前observer。打擾一下,這是不是意味着Promise也是一個“Observable(可觀察序列)”?【注:作者這裡不用Stream,是為了更加官方的描述Promise。】

你不容錯過的響應式程式設計介紹

是的!

Observable是Promise++。在RxJS裡面,你可以很容易的把Promise轉換為Observable通過調用

var stream = Rx.Observable.fromPromise(promise)

,是以我們來用用它。值得一提的是,Observable和Promises/A+是不相容的,但概念上并沒有沖突。你可以這樣了解,Promise就是Observable的單值版本。

可以看到RxJS比起jQuery這類架構實作的Promise要強大多了。當别人大肆吹捧Promises的時候,你給給他說說RxJS。

好吧,回到我們的例子來。你注意到下面這些問題了嗎?

  • 把一個

    subscribe()

    調用嵌入了另外一個

    subscribe()

    裡面,這可能會陷入“callback hell”。
  • resposneStream

    緊密依賴于

    requestStream

    。【注:這裡涉及“關注點分離”】

哎呀,那麼多問題。幸虧,FRP提供了大量的操作函數來解決上面的問題。

現在相信你已經很清楚基礎函數

map(f)

了。這是一個把生産流(Stream A)裡面的所有值你拿出來執行

f

轉換,把轉換的結果放入到消費流(Stream B)中。例如,我們正好需要把請求位址(URL)對應的轉成一個響應的Stream(Promise可以包裝成Stream)。

var responseMetastream = requestStream.map(function(requestUrl){
	return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
           

不過,上面的代碼建立了一個怪獸:“metastream”。“metastream”的每個值是一個Stream的指針:每個值

指向

另外一個Stream【注:map轉換以後是流,但是流裡面的東西是指向Promise的指針】。在我們的例子中,每個請求URL都被映射為一個指針指向對應包含響應的promise流。

你不容錯過的響應式程式設計介紹

響應的“metastream”讓人看起來非常困惑,而且實際上我們需要的是一個包含Promise【注:Promise是流】的Stream,而不是一條包含Stream指針的“metastream”。向Flatmap先生說“你好”吧。

flatmap()

map()

的一個“扁平化”處理版本,就像是從“主幹”流裡分出“支流”,然後對“支流”處理。【注:flatmap和map的對比可以看這裡,可以這樣了解:map就是在源流的每個事件上用一個“傳回值的函數”做了計算并傳回值,然後組合再傳回新的流。而flatmap是在源流的每個事件上用一個“會回流的函數”做了計算并傳回流,然後把傳回的流(子流)組合再傳回新的流。】值得注意的時候,

flatmap()

 不是在修複

map()

,“metastream”也不是一個錯誤,它們都是真實的工具用于在FRP中解決異步響應的問題。

var responseStream = requestStream.flatmap(function(requestUrl){
	return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
});
           
你不容錯過的響應式程式設計介紹

很好。因為響應的Stream是基于請求的Stream而定義的,是以如果以後我們有更多的事件在請求的Stream中産生,就會有對應的事件在響應的Stream中産生。

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

(小寫的是請求,大寫的是響應)
           

既然我們好不容易擁有了響應的Stream,那麼我們就可以渲染所接收的資料:

responseStream.subscribe(function(response){
	// 按照你的意願在DOM樹裡面渲染response對象
});
           

我們把前面所有的代碼合在一起,那樣就是:

var requestStream = Rx.Observable.returnValue('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // 按照你的意願在DOM樹裡面渲染response對象
});
           

重新整理按鈕

我沒有提及一件事情就是上面的響應傳回的JSON制式的使用者資訊有100條。這個API僅僅允許我們傳頁偏移值,而不允許傳頁限制數,是以導緻我們隻能用3條資料對象而浪費97條。我們現在先忽略這些問題,後面将會看到如何緩存這些響應。

每次重新整理按鈕被點選的時候,請求的Stream就會産生一個String事件。我們需要兩樣東西:

  1. 重新整理按鈕上産生點選事件Stream;
  2. 上述的重新整理按鈕的點選事件Stream可以改變請求的Stream。

可喜的是,RxJS具備相應的工具給DOM元素建構指定的事件的Stream:

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
           

接下來,讓重新整理按鈕的點選事件Stream改變請求的Stream。通過傳一個每次都随機産生的參數作為偏移值發送請求給Github:

var requestStream = refreshClickStream
	.map(function(){
		var randomOffset = Math.floor(Math.random() * 500);
		return 'https://api.github.com/users?since=' + randomOffset;
	});
           

不過現在有個問題,就是請求在啟動的時候并不會馬上被發送,隻會在重新整理按鈕被點選時才會執行。如何才能在啟動的時候馬上發送請求并且點選重新整理按鈕的時候也能發送請求?

首先,我們都知道如何為上面說的兩種情況建立對應的Stream:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.returnValue('https://api.github.com/users');
           

但是如何“合并”上面這兩個Stream為一個Stream呢?不用擔心,這裡有

merge()

。用StreamGraph來描述:

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->
           

現在事情就變得簡單了:

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.returnValue('https://api.github.com/users');

var requestStream = Rx.Observable.merge(
  requestOnRefreshStream, startupRequestStream
);
           

這裡有另外一個幹淨簡單的方式去書寫上面的代碼:

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .merge(Rx.Observable.returnValue('https://api.github.com/users'));
           

甚至更短,可讀性更強:

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .startWith('https://api.github.com/users');
           

這個

startWith()

方法恰好精準的反映了你想要做的事情。無論你傳入的Stream是什麼樣子的,但最後調用

startWith(x)

,就會以

x

作為開始。不過這不夠DRY,我重複了通路Github的請求位址。解決這個問題的方法就是通過移動

startWith()

refreshClickStream

後,然後在啟動時“模拟”一次重新整理點選:

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });
           

給“推薦關注”的每項模組化

現在,我們隻能在

responseStream

subscribe()

裡,才能夠對每項推薦的UI元素做渲染操作。可是,如果你用最快的速度點選重新整理按鈕的時候,目前的3條推薦都沒有被清掉,而新的推薦隻有在請求到達以後才會到達。這就看起來好像是點了重新整理和不點重新整理沒有兩樣似的。但為了讓UI看起來更舒服點,我們需要在點選重新整理時清除目前的3條推薦。

refreshClickStream.subscribe(function(){
	// 清除3條推薦的DOM元素
});
           

如果你這麼幹,現在就會有兩個訂閱者(一個是

refreshClickStream.subscribe()

,另外一個是

responseStream.subscribe()

)關聯着這3條推薦的DOM元素,事情會變得很糟糕。因為這不是“關注點分離”【注:關注點分離是指隻對與“特定概念、目标”(關注點)相關聯的軟體組成部分進行“辨別、封裝和操縱”的能力。這是處理複雜性的一個原則。因為關注點混雜在一起将會加大軟體的複雜度,而分離開關注點進行處理能夠降解複雜度。面向切面程式設計的核心就是關注點分離。】。

你不容錯過的響應式程式設計介紹

是以我們需要給每項推薦做Stream處理,并使得每個事件都包含響應JSON值。我們将會針對3條推薦中的每條做分離。第1條推薦分離後的樣子:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 随機從清單中擷取一個
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });
           

對此,

suggestion2Stream

suggestion2Stream

可以簡單的從

suggestion1Stream

裡面複制過來。雖然這不夠DRY,不過保證了我們的例子足夠的簡單。

取代

resposneStream.subscribe()

裡面的渲染操作,我們可以這樣做:

suggestion1Stream.subscribe(function(){
	// 渲染第1個推薦到DOM
});
           

回到“在重新整理的時候,清除所有推薦”,我們可以通過映射重新整理點選到

null

的推薦資料,那麼在

suggestion1Stream

裡面,就像:

var suggestion1Stream = responseStream.map(function(listUsers){
	return listUsers[Math.floor(Math.random() * listUsers.length)];
})
.merge(
	refreshClickStream.map(function(){ return null; })
);
           

在渲染的時候,我們把

null

解析為“沒有資料”,進而隐藏對應的UI元素。

suggestion1Stream.subscribe(function(suggestion){
	if (suggestion === null) {
		// 隐藏第1個推薦的DOM元素
	} else {
		// 或者展示第1個推薦的DOM元素并渲染對應的資料
	}
});
           

那麼現在對應的流圖:

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t-->

 N :代表null
           

為了更完善,我們也可以在開始的時候渲染“空”推薦。通過添加

startWith(null)

到第一條推薦的Stream:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
           

最終的StreamGraph如下:

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t-->

 N :代表null
           

關閉一個推薦以及使用緩存結果集

還有最後一個功能還未實作:每個推薦都會有對應的“x”按鈕用于清除目前的推薦并加載新的推薦。剛開始弄的時候,你可能會選擇在關閉某個推薦的時候發起一個新的請求:

var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// 同理于 close2Button和close3Button
var requestStream = refreshClickStream.startWith('startup click')
  .merge(close1ClickStream) // 我們添加了這個,使得點選close1的時候,會觸發新的請求
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });
           

但如果我們點選任意一個關閉按鈕的時候,它會清除目前所有的推薦并重新加載。有很多方法可以解決,為了讓事情更有趣,我們将會重用上次請求後響應的資料去解決這個問題。Github的響應中每頁的大小為100個使用者資訊,然而我們隻需要使用到其中的3個,是以會存在大量有效的新資料。不需要再發起新的請求。

我們再把它想象成為Stream。當第一條推薦的“關閉”按鈕被點選時,我們需要在

resposneStream

中上一個的響應資料中随機擷取一個使用者資料。就像:

requestStream:     --r--------------->
responseStream:    ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

 c :代表關閉
           

在“Rx*”裡面有一個聯合函數

combineLatest

看起來可以實作我們的需求。它把兩個不同Stream作為輸入,無論其中哪個Stream産生一個事件,

combineLatest

會組合兩個Stream的“上一個”事件,以參數

a

b

的形式然後輸出值

c = f(x,y)

,而

f

是你所定義的函數。用StreamGraph解析:

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

f :大寫函數
           

我們可以在

close1ClickStream

上調用

combineLatest()

,傳入

responseStream

。這樣當點選“關閉”按鈕1時,我們都會獲得

responseStream

的上一個事件并計算出新值給

suggestion1Stream

。另外一方面

combineLatest()

函數是對稱的:

responseStream

産生新的事件會組合“關閉”按鈕1的上一個事件,計算出新值傳給

suggestion1Stream

。這樣我們就可以簡化之前

suggestion1Stream

的代碼:

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
           

但這裡還有一個問題:

combineLatest()

使用最近兩個源,但是如果其中一個沒有産生事件,那麼組合的Stream(

suggestion1Stream

)是不會産生事件的。認真觀察前面的StreamGraph,你會發現當A流産生a事件時,

suggestion1Stream

不會産生事件。隻有在B流産生b事件的時候,組合的Stream才會産生事件。

我們用最簡單的方法來解決這個問題,就是在啟動時“模拟”點選了’關閉’按鈕1:

var suggestion1Stream = close1ClickStream.startWith('startup click') // 我們增加了這個
  .combineLatest(responseStream,
    function(click, listUsers) {l
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
           

總結

終于弄完了。下面是全部代碼:

var refreshButton = document.querySelector('.refresh');
var closeButton1 = document.querySelector('.close1');
var closeButton2 = document.querySelector('.close2');
var closeButton3 = document.querySelector('.close3');

var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
var close2ClickStream = Rx.Observable.fromEvent(closeButton2, 'click');
var close3ClickStream = Rx.Observable.fromEvent(closeButton3, 'click');

var requestStream = refreshClickStream.startWith('startup click')
    .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
    });

var responseStream = requestStream
    .flatMap(function (requestUrl) {
        return Rx.Observable.fromPromise($.getJSON(requestUrl));
    });

function createSuggestionStream(closeClickStream) {
    return closeClickStream.startWith('startup click')
        .combineLatest(responseStream,
            function(click, listUsers) {
                return listUsers[Math.floor(Math.random()*listUsers.length)];
            }
        )
        .merge(
            refreshClickStream.map(function(){ 
                return null;
            })
        )
        .startWith(null);
}

var suggestion1Stream = createSuggestionStream(close1ClickStream);
var suggestion2Stream = createSuggestionStream(close2ClickStream);
var suggestion3Stream = createSuggestionStream(close3ClickStream);


//渲染 ---------------------------------------------------
function renderSuggestion(suggestedUser, selector) {
    var suggestionEl = document.querySelector(selector);
    if (suggestedUser === null) {
        suggestionEl.style.visibility = 'hidden';
    } else {
        suggestionEl.style.visibility = 'visible';
        var usernameEl = suggestionEl.querySelector('.username');
        usernameEl.href = suggestedUser.html_url;
        usernameEl.textContent = suggestedUser.login;
        var imgEl = suggestionEl.querySelector('img');
        imgEl.src = "";
        imgEl.src = suggestedUser.avatar_url;
    }
}

suggestion1Stream.subscribe(function (suggestedUser) {
    renderSuggestion(suggestedUser, '.suggestion1');
});

suggestion2Stream.subscribe(function (suggestedUser) {
    renderSuggestion(suggestedUser, '.suggestion2');
});

suggestion3Stream.subscribe(function (suggestedUser) {
    renderSuggestion(suggestedUser, '.suggestion3');
});
           

你可以到這裡檢視例子

上面的代碼非常簡短,但是十分緊湊:它展現了使用适當的關注點分離甚至響應捕獲可以控制符合的事件。函數化的代碼風格使得其看起來像描述而不是編碼:我們從來沒有給出一系列的執行過程,僅僅是通過定義流之間的關系來描述這是什麼。例如,通過FRP,我們告訴計算機

suggestion1Stream

是“關閉”按鈕1的Stream和上一次請求的響應的Stream做組合,當重新整理發生或者程式啟動的時候會變成

null

值得一提的是,上面的代碼既沒有諸如

if

for

while

這類流程控制元素,也沒有經典的回調控制流。通過使用

subscribe()

filter()

,你終于可以擺脫

if

else

了。

接下來

如果你認為“Rx*”會成為FRP的首選庫,那麼花些時間去看看如何用函數來轉換、組合和建立Observables。如果你想通過StreamGraph的形式來了解這些函數,你可以通路這個位址。這就是我開發的經驗總結:當你使用FRP時有任何疑問,可以先畫些圖再思考如何解決。

一旦你着手開始用“Rx”進行程式設計的時候,非常有必要了解“冷和熱的可觀察序列”【注:“冷”是指隻有訂閱者需要的時候才從Stream裡面産生一個事件,而且訂閱者之間沒有任何關聯。“熱”是指Stream會自動産生事件,而訂閱者之間存在關聯。而我們上面的例子中,Stream是個“冷”序列】。如果你忽略這個,後面可能會遇到很多奇怪的問題。你已經被提醒了。在日後通過學習真實的函數程式設計來磨練你的能力,并且了解“Rx”所帶來的其他副作用。

不僅僅隻有“Rx*”可以實作FRP。還有Bacon.js,Elm等。

FRP在富事件的Web前端和移動應用中有着不俗的問題解決能力。但它不僅僅隻适用于用戶端,它也可在後端工作和通路資料庫。事實上,RxJava是Netflix後端服務裡面一個非常重要的元件。請記住,FRP不是一個基于某種語言或者應用的架構,它是一種應用于事件驅動的程式設計範式。

如果這篇指南對你有所幫助,請分享吧。

frp