天天看点

[RxJS] Creating Observable From Scratch

class SafeObserver {
  constructor(destination) {
    this.destination = destination;
  }
  
  next(value) {
    const destination = this.destination;
    if (destination.next && !this.isUnsubscribed) {
      destination.next && destination.next(value);
    }
  }
  
  error(err) {
    const destination = this.destination;
    if (!this.isUnsubscribed) {
      if (destination.error) {
        destination.error(error);
      }
      this.unsubscribe();
    }
  }
  
  complete() {
    const destination = this.destination;
    if (!this.isUnsubscribed) {
      if (destination.complete) {
        destination.complete();
      }
      this.unsubscribe();
    }
  }
  
  unsubscribe() {
    this.isUnsubscribed = true;
    if (this._unsubscribe) {
      this._unsubscribe();
    }
  }
}

class Observable {
  constructor(_subscribe) {
    this._subscribe = _subscribe;
  }
  
  subscribe(observer) {
    const safeObserver = new SafeObserver(observer);
    safeObserver._unsubscribe = this._subscribe(safeObserver);
    return () => safeObserver.unsubscribe();
  }
}

const myObservable = new Observable((observer) => {
  let i = 0;
  const id = setInterval(() => {
    if (i < 10) {
      observer.next(i++);
    } else {
      observer.complete();
    }
  }, 100);
  
  return () => {
    console.log('unsubbed');
    clearInterval(id);
  };
});

const observer = {
  next(value) { console.log('next -> ' + value); },
  error(err) { },
  complete() { console.log('complete'); }
};


const foo = myObservable.subscribe(observer);

foo.unsubscribe();