原文連結: https://mp.weixin.qq.com/s/i14brW_Ok8JYGoBIcfhs5Q
原文标題:RxJS 源碼解析(二): Multicast Observable
上一篇,我們分析了 Oberservable 和 Subscription 的具體實作方法。這一篇,将會了解一系列不同的 Muticasted Observable(多點傳播觀察源)。這些 Observable 在 RxJS 中主要是以 Subject 命名,它們有以下幾種不同的實作:
- Subject
- AnonymousSubject
- BehaviorSubject
- ReplaySubject
- AsyncSubject
所謂 Muticasted Observable,就是這個 Observable 可以持續的發送資料給到訂閱它的訂閱者們。
Subject 是最基礎的 Muticasted Observable,訂閱者對其進行訂閱後,将會拿到 Subject 之後發送的資料。但是,如果訂閱者在資料發送後再訂閱,那麼它将永遠都拿不到這條資料。用一下例子簡單說明一下:
class Subject extends Observable {
observers: Observer[] = [];
// 省略了一些内容
next(value?: T) {
if (!this.isStopped) {
...
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value);
}
}
}
// error 類似于 next
error(err: any) {
...
this.hasError = true;
this.thrownError = err;
this.isStopped = true;
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].error(err);
this.observers.length = 0;
// complete 類似于 next
complete() {
copy[i].complete();
}
export class AnonymousSubject extends Subject {
constructor(protected destination?: Observer, source?: Observable) {
super();
this.source = source;
next(value: T) {
const { destination } = this;
if (destination && destination.next) {
destination.next(value);
if (destination && destination.error) {
this.destination.error(err);
if (destination && destination.complete) {
this.destination.complete();
訂閱過程
ReplaySubject 的訂閱過程比較特殊,因為訂閱的時候需要發送緩沖區資料,而且在不同時間進行訂閱也會使得緩沖區中的資料變化,是以訂閱是需要考慮的問題會比較多。那麼,抓住 _infiniteTimeWindow 這個變量來看代碼會變得很容易。
// 以下源碼省略了排程器相關的代碼
_subscribe(subscriber: Subscriber): Subscription {
const _infiniteTimeWindow = this._infiniteTimeWindow;
// 視窗時間是無限的則不用考慮
// 視窗時間是有限的則更新緩沖區
const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents();
const len = _events.length;
// 建立 subscription
let subscription: Subscription;
if (this.isStopped || this.hasError) {
subscription = Subscription.EMPTY;
} else {
this.observers.push(subscriber);
subscription = new SubjectSubscription(this, subscriber);
// 分類讨論不同的限制條件
if (_infiniteTimeWindow) {
// 視窗時間不是無限的,緩沖區存儲直接就是資料
for (let i = 0; i < len && !subscriber.closed; i++) {
subscriber.next(_events[i]);
// 視窗時間不是無限的,緩沖區存儲的是 ReplayEvent
subscriber.next((>_events[i]).value);
if (this.hasError) {
subscriber.error(this.thrownError);
} else if (this.isStopped) {
subscriber.complete();
return subscription;
最後
本章我主要簡單分析了 5 種主要的 Subject,這些 Subject 實作了不同類型的 Muticasted Observable,對 Observable 進行了擴充。