天天看點

RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject

原文連結: https://mp.weixin.qq.com/s/i14brW_Ok8JYGoBIcfhs5Q

原文标題:RxJS 源碼解析(二): Multicast Observable

上一篇,我們分析了 Oberservable 和 Subscription 的具體實作方法。這一篇,将會了解一系列不同的 Muticasted Observable(多點傳播觀察源)。這些 Observable 在 RxJS 中主要是以 Subject 命名,它們有以下幾種不同的實作:

  • Subject
  • AnonymousSubject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

所謂 Muticasted Observable,就是這個 Observable 可以持續的發送資料給到訂閱它的訂閱者們。

Subject 是最基礎的 Muticasted Observable,訂閱者對其進行訂閱後,将會拿到 Subject 之後發送的資料。但是,如果訂閱者在資料發送後再訂閱,那麼它将永遠都拿不到這條資料。用一下例子簡單說明一下:

RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject

class Subject extends Observable {  

 observers: Observer[] = [];

 // 省略了一些内容

 next(value?: T) {

   if (!this.isStopped) {

     ...

     const { observers } = this;

     const len = observers.length;

     const copy = observers.slice();

     for (let i = 0; i < len; i++) {

       copy[i].next(value);

     }

   }

 }

 // error 類似于 next

 error(err: any) {

   ...

   this.hasError = true;

   this.thrownError = err;

   this.isStopped = true;

   const { observers } = this;

   const len = observers.length;

   const copy = observers.slice();

   for (let i = 0; i < len; i++) {

     copy[i].error(err);

   this.observers.length = 0;

 // complete 類似于 next

 complete() {

     copy[i].complete();

}

RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject

export class AnonymousSubject extends Subject {

 constructor(protected destination?: Observer, source?: Observable) {

   super();

   this.source = source;

 next(value: T) {

   const { destination } = this;

   if (destination && destination.next) {

     destination.next(value);

   if (destination && destination.error) {

     this.destination.error(err);

   if (destination && destination.complete) {

     this.destination.complete();

RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject

訂閱過程

ReplaySubject 的訂閱過程比較特殊,因為訂閱的時候需要發送緩沖區資料,而且在不同時間進行訂閱也會使得緩沖區中的資料變化,是以訂閱是需要考慮的問題會比較多。那麼,抓住 _infiniteTimeWindow 這個變量來看代碼會變得很容易。

// 以下源碼省略了排程器相關的代碼

_subscribe(subscriber: Subscriber): Subscription {

 const _infiniteTimeWindow = this._infiniteTimeWindow;

 // 視窗時間是無限的則不用考慮

 // 視窗時間是有限的則更新緩沖區

 const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents();

 const len = _events.length;

 // 建立 subscription

 let subscription: Subscription;

 if (this.isStopped || this.hasError) {

   subscription = Subscription.EMPTY;

 } else {

   this.observers.push(subscriber);

   subscription = new SubjectSubscription(this, subscriber);

 // 分類讨論不同的限制條件

 if (_infiniteTimeWindow) {

   // 視窗時間不是無限的,緩沖區存儲直接就是資料

   for (let i = 0; i < len && !subscriber.closed; i++) {

     subscriber.next(_events[i]);

   // 視窗時間不是無限的,緩沖區存儲的是 ReplayEvent

     subscriber.next((>_events[i]).value);

 if (this.hasError) {

   subscriber.error(this.thrownError);

 } else if (this.isStopped) {

   subscriber.complete();

 return subscription;

最後

本章我主要簡單分析了 5 種主要的 Subject,這些 Subject 實作了不同類型的 Muticasted Observable,對 Observable 進行了擴充。

繼續閱讀