RxJS文档 阅读笔记

知识依赖

  1. 基本的js语法
  2. 基本的nodejs用法
  3. Promise用法

准备工作

我想用nodejs直接跑rxjs,然而它并不支持es6的import,尝试了--experimental-modules依然有报错

方案1 在线编辑器

https://rxjs-dev.firebaseapp.com/guide/observable 在上面代码点击右上角的Edit in StackBlitx

方案2 nodejs + rxjs + ES6

https://github.com/standard-things/esm

安装:npm install esm rxjs

带上esm运行

node -r esm index.js

方案3 webpack + rxjs + ES6

创建文件夹 并初始化

1
2
mkdir rxjsdemo && cd rxjsdemo
npx webpack-cli init

这样选择

1
2
3
4
5
? Will your application have multiple bundles? No
? Which will be your application entry point? index
? In which folder do you want to store your generated bundles? dist
? Will you use one of the below JS solutions? ES6
? Will you use one of the below CSS solutions? No

然后运行

1
npm install --save-dev rxjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { Subject } from 'rxjs';
import { interval } from 'rxjs';

const subject = new Subject();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(1);
subject.next(2);

const observable = interval(1000);
const subscription = observable.subscribe(x => console.log(x));

运行npm start在打开的网页查看console是否正常输出, 忽略红字,能输出如下即可

1
2
3
4
5
6
7
observerA: 1
observerB: 1
observerA: 2
observerB: 2
0
1
...

是个啥

众所周知Promise

Observable可以看作一个更高级的Promise

  • Observable 返回值的个数为非负整数个,而Promise返回的个数为0或1个

它说 Observables 不像 事件emitters也不像多个值的Promise, [虽然在某些使用下像

Observables are like functions with zero arguments, but generalize those to allow multiple values.

来源https://subscription.packtpub.com/book/application_development/9781787120426/1/01lvl1sec7/a-brief-history-of-reactivex-and-rxjava

据说是2010年 微软一个大佬写的 .NET的一个框架,然后现在除了RxJs,还有Rx.Java,Rx.C++等等多语言支持

Observables

基本使用和对比

1
2
3
4
5
6
7
8
9
10
11
12
13
import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
});

foo.subscribe(x => {
console.log(x);
});
foo.subscribe(y => {
console.log(y);
});

输出

1
2
3
4
Hello
42
Hello
42
  1. lazy计算
  2. 订阅 类似于调用函数,它们是分开的,不同的订阅的执行不共享(相对而言 事件emitters并不会根据 订阅者个数而决定事件个数)

同步

1
2
3
4
5
console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');

ok 它是同步的

其实Observable有能力 同步分发值,和异步分发值

和函数区别,能return很多次值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { Observable } from 'rxjs';

const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100); // "return" another value
subscriber.next(200); // "return" yet another
// 异步返回
setTimeout(() => {
subscriber.next(300); // happens asynchronously
}, 1000);
});

console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');

func.call() 同步给我一个返回值
observable.subscribe() 异步或同步 给我任意数量的值

拆解 Observable

创建 Observables

1
2
3
4
5
6
7
import { Observable } from 'rxjs';

const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi')
}, 1000);
});

以上是一个每秒向订阅者 发送hiObservable

更常见的创建方式,是用创建函数of,from,interval

订阅 Observables

observable.subscribe(x => console.log(x)); // 相当于call了observable的函数

执行 Observable

可以调用的方法

Next 通知: 发送 数值,字符串,结构体

Error 通知: 发送一个 JavaScript Error or exception.

Complete 通知: 不带值.

正则描述next*(error|complete)? , 任意多个next,但当error或complete 其中一个执行后,就没有其它的可以分发了。

然后 建议了

1
2
3
4
5
6
7
8
9
try{
subscriber.next(...)// ...
subscriber.next(...)// ...
subscriber.next(...)// ...
subscriber.next(...)// ...
subscriber.complete();
}catch(e){
subscriber.error(e)
}

Disposing Observables

订阅者的退订

1
2
3
4
5
6
import { from } from 'rxjs';

const observable = from([10, 20, 30]);
const subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

发布者的关闭

1
2
3
4
5
6
7
8
9
10
11
12
const observable = new Observable(function subscribe(subscriber) {
// Keep track of the interval resource
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);

// Provide a way of canceling and disposing the interval resource
// 提供关闭发布者的方法
return function unsubscribe() {
clearInterval(intervalId);
};
});

Operators

piping

op4()(op3()(op2()(op1()(obs)))) 可读性差改为

1
2
3
4
5
6
obs.pipe(
op1(),
op2(),
op3(),
op4(),
)

按照这种格式,即使只有一层 也不应该用op()(obs)而是obs.pipe(op())

pipeable operators: 用 observableInstance.pipe(operator())的语法,如filter(...)mergeMap(...)它们不会改变原来的Observable实例,而是返回新的Observable

Creation Operators

Creation Operators: 创建新的Observable,如of,interval

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`value: ${v}`));

// Logs:
// value: 1
// value: 4
// value: 9


first()(of(1, 2, 3).subscribe((v) => console.log(`value: ${v}`));

// Logs:
// value: 1

Higher-order Observables 高阶Observables

处理 Observables of Observables

1
2
3
4
const fileObservable = urlObservable.pipe(
map(url => http.get(url)),
concatAll(),
);
  1. concatAll() subscribes to each “inner” Observable that comes out of the “outer” Observable, and copies all the emitted values until that Observable completes, and goes on to the next one. All of the values are in that way concatenated,顺序 调用每个Observable
  2. mergeAll() — subscribes to each inner Observable as it arrives, then emits each value as it arrives,并行 调用每个Observable,获得了就输出
  3. switchAll() — subscribes to the first inner Observable when it arrives, and emits each value as it arrives, but when the next inner Observable arrives, unsubscribes to the previous one, and subscribes to the new one. 每当接受到新的 Observable 会舍弃老的Observable,当一个Observable 返回值,会立即emit
  4. exhaust() — subscribes to the first inner Observable when it arrives, and emits each value as it arrives, discarding all newly arriving inner Observables until that first one completes, then waits for the next inner Observable. 当前Observable 没有结束时,会抛弃后来的所有Observable,结束以后,接受一个Observables

可以看到 1,3,4一个时刻 只有一个Observable在运行,1是串行办法,3是很可能半途而废 尽可能新,4是做就做完不被打断

就像很多 和map融合的方法这也有对应的,concatMap(), mergeMap(), switchMap(), and exhaustMap()

Marble diagrams

marble diagrams

  1. 圆是值
  2. 上面的箭头是输入
  3. 下面的是输出
  4. 竖着的线是 完成态
  5. 叉是 抛出错误
  6. 正中是描述

所有Observables

按分类的: https://rxjs-dev.firebaseapp.com/guide/operators

自定义Observables

组合已有的

1
2
3
4
5
6
7
8
9
import { pipe } from 'rxjs';
import { filter, map } from 'rxjs';

function discardOddDoubleEven() {
return pipe(
filter(v => ! (v % 2)),
map(v => v + v),
);
}

注意这里的pipe.pipe是不同的

从零开始写一个Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { Observable } from 'rxjs';

function delay(delayInMillis) {
return (observable) => new Observable(observer => {
// this function will called each time this
// Observable is subscribed to.
const allTimerIDs = new Set();
const subscription = observable.subscribe({
next(value) {
const timerID = setTimeout(() => {
observer.next(value);
allTimerIDs.delete(timerID);
}, delayInMillis);
allTimerIDs.add(timerID);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
}
});
// the return value is the teardown function,
// which will be invoked when the new
// Observable is unsubscribed from.
return () => {
subscription.unsubscribe();
allTimerIDs.forEach(timerID => {
clearTimeout(timerID);
});
}
});
}
  1. 你需要实现3个Observer functions, next(),error(),complete()
  2. 实现teardown函数(清理结束内部的方法等) ,返回该函数

Subscription

父子关系 + unsubscribe()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import { interval } from 'rxjs';

const observable1 = interval(400);
const observable2 = interval(300);

const subscription = observable1.subscribe(x => console.log('first: ' + x));
const childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);

Subject

plain Observables 是一个单向广播

Subject 是一种多向广播的Observable

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.

对于订阅者来说,它无法区别它订阅的是Subject还是plain Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { Subject, from } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(1);
subject.next(2);

const observable = from([9, 5, 4]);

observable.subscribe(subject); // You can subscribe providing a Subject

以上我们可以看到,Subject既可以看作 订阅者,也可以当作发送者

Multicasted Observables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { from, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = from([1, 2, 3]); // 数据源
const subject = new Subject();
const multicasted = source.pipe(multicast(subject)); // multicast(subject)(source) 见上面pipe用法

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

multicast: Observers 订阅一个潜在的 Subject, Subject 订阅 源Observable. 返回一个ConnectableObservable, 可以看作一个简单的 有 connect()方法的 Observable

connect()方法在后台执行source.subscribe(subject)方法,返回一个Subscription,对返回的变量可以 unsubscribe来结束执行

考虑下面情景

  1. A 订阅了 multicasted Observable
  2. multicasted Observable 开始 connected
  3. next 值 0 分发给A
  4. B 订阅了 multicasted Observable
  5. next 值 1 分发给A
  6. next 值 1 分发给B
  7. A 取消订阅
  8. next 值 2 分发给B
  9. B 取消订阅
  10. multicasted Observable 的 connection 被 unsubscribed

可以如下实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import { interval, Subject } from 'rxjs';
import { multicast } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const multicasted = source.pipe(multicast(subject));
let subscription1, subscription2, subscriptionConnect;

subscription1 = multicasted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
// We should call `connect()` here, because the first
// subscriber to `multicasted` is interested in consuming values
subscriptionConnect = multicasted.connect();

setTimeout(() => {
subscription2 = multicasted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);

setTimeout(() => {
subscription1.unsubscribe();
}, 1200);

// We should unsubscribe the shared Observable execution here,
// because `multicasted` would have no more subscribers after this
setTimeout(() => {
subscription2.unsubscribe();
subscriptionConnect.unsubscribe(); // for the shared Observable execution
}, 2000);

手动调用connect有时很笨重,通常我们希望在第一个Observer到达时调用,在最后一个Observer unsubscribes 时, refCountmulticasted Observable 第一个subscriber到达时自动执行,在最后一个subscriber结束时自动 结束执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

const source = interval(500);
const subject = new Subject();
const refCounted = source.pipe(multicast(subject), refCount());
let subscription1, subscription2;

// This calls `connect()`, because
// it is the first subscriber to `refCounted`
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

setTimeout(() => {
console.log('observerB subscribed');
subscription2 = refCounted.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 600);

setTimeout(() => {
console.log('observerA unsubscribed');
subscription1.unsubscribe();
}, 1200);

// This is when the shared Observable execution will stop, because
// `refCounted` would have no more subscribers after this
setTimeout(() => {
console.log('observerB unsubscribed');
subscription2.unsubscribe();
}, 2000);

注意 只有ConnectableObservable上面有refCount()方法

BehaviorSubject

当前的值

它存储最后 给它consumers的值,当有新的订阅,则立即发送给它的订阅者

demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(3);

ReplaySubject

和BehaviorSubject类似,但是不是记录最后一个,而是buffer大小来决定记录最后的多少个,初始化传入buffer的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2); // 上面 buffer大小设置为3,所以 observerB 只能接收到 [2,3,4]
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);

除了可以指定buffer的size,也可以指定buffer的过期时间 new ReplaySubject(buffer size, buffer time),单位毫秒

AsyncSubject

只有在complete时才向它的订阅者发送最后一个 值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();

subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

subject.next(5);
subject.complete();

只会接受到5

Scheduler

调度!

关于Promise能有的和调度沾边的 只有Promise.all(),和Promise.race()

  1. 它是个数据结构,储存,基于 优先级或其它表转 执行队列任务
  2. 是一个执行环境,它表示 一个任务在何时何地被执行, 立即 或 回调
  3. 它有一个虚拟时钟, 通过调度器的now()方法,会回一个时间 标识,task得到的时间标识仅与其所在的调度器有关。

A Scheduler lets you define in what execution context will an Observable deliver notifications to its Observer.

调度器,让你定义 在什么执行上下文下,一个Observale会向它的Observer分发同志

串行 emit 1 2 3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

var observable = new Observable((proxyObserver) => {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
}).pipe(
observeOn(asyncScheduler)
);

var finalObserver = {
next(x) {
console.log('got value ' + x)
},
error(err) {
console.error('something wrong occurred: ' + err);
},
complete() {
console.log('done');
}
};

console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

proxyObserverobserveOn(asyncScheduler)创建出的,其中next(val)函数大约是

1
2
3
4
5
6
7
8
9
10
const proxyObserver = {
next(val) {
asyncScheduler.schedule(
(x) => finalObserver.next(x),
0 /* delay */, // 仅管 这里设置为0, async 依然用的是setTimeout 或 setInterval来进行操作
val /* will be the x for the function above */
);
},
// ...
}

除了asyncScheduler还有queueScheduler(遍历 当前事件frame),asapScheduler(微任务队列调度,promise有用,在当前任务后,在下一个任务前用于异步交流),animationFrameScheduler(在下一个 浏览器内容绘制前,可用于创建流畅的浏览器动画)

然后 你其实已经使用了调度器,只是隐式的

如,你在创建少量有限返回值的Observale时 用的是null或undefined调度器

对于返回大量或无限数据的用的是queue调度器

对于用到了timer的 用的是async调度器

对于一些性能优化的目的,你可以指定具体的调度器,如 creation operatorfrom([10,20,30],asyncScheduler)

以下这些都支持传递 调度器 参数

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

subscribeOn 可以把 上面 第二个参数传递调度器,改成, obs.pipe(subscribeOn(调度器))的写法

https://rxjs-dev.firebaseapp.com/api/operators/subscribeOn

实例

输入事件 每100ms的限制 来进行搜索

1
2
3
4
5
6
7
8
9
10
Observable.fromEvent(input, 'input')
.map(e => e.target.value)
.filter(value => value.length >= 1)
.throttleTime(100) // https://rxjs-dev.firebaseapp.com/api/operators/throttleTime
.distinctUntilChanged() // https://rxjs-dev.firebaseapp.com/api/operators/distinctUntilChanged
.switchMap((value) => "message is " + value) // 或者替换为搜索 函数
.subscribe(
x => renderSearchResult(x), // 渲染搜索结果
err => console.error(err)
)

同时调用 多个 异步方法,等待3个全回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import { Observable, pipe } from 'rxjs';
import { skip, take } from 'rxjs/operators';

function fetchdata(url,cb){
console.log('start fetch'+url);
setTimeout(()=>{
console.log('fetched'+url);
cb();
},1000);
}

let ob = Observable.create((observer)=>{
fetchdata('/url1',()=>{
observer.next();
});
fetchdata('/url2',()=>{
observer.next();
});
fetchdata('/url3',()=>{
observer.next();
});
}).pipe(skip(2),take(1)).subscribe(()=>{
console.log('finish');
});

生命周期实例

需求: angular 内页面上有一些observables。

我们希望在页面销毁时 能结束这些。(否则会有内存泄漏)

方案1 手动逐个unsubscrible

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component({...})
export class AppComponent implements OnInit, OnDestroy {
subscription1$: Subscription
subscription2$: Subscription
ngOnInit () {
var observable1$ = Rx.Observable.interval(1000);
var observable2$ = Rx.Observable.interval(400);
this.subscription1$ = observable.subscribe(x => console.log("From interval 1000" x));
this.subscription2$ = observable.subscribe(x => console.log("From interval 400" x));
}
ngOnDestroy() {
this.subscription1$.unsubscribe()
this.subscription2$.unsubscribe()
}
}

方案2 用数组,循环unsubscrible

优点,新增时,只需要 新增和添加到数组,不需要修改OnDestroy的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component({...})
export class AppComponent implements OnInit, OnDestroy {
subscription1$: Subscription
subscription2$: Subscription
subscriptions: Subscription[] = []
ngOnInit () {
var observable1$ = Rx.Observable.interval(1000);
var observable2$ = Rx.Observable.interval(400);
this.subscription1$ = observable.subscribe(x => console.log("From interval 1000" x));
this.subscription2$ = observable.subscribe(x => console.log("From interval 400" x));
this.subscriptions.push(this.subscription1$)
this.subscriptions.push(this.subscription2$)
}
ngOnDestroy() {
this.subscriptions.forEach((subscription) => subscription.unsubscribe())
}
}

方案3 用Subscription

优点,用自带的Subscription 替代手工数组(对写法影响不大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component({...})
export class AppComponent implements OnInit, OnDestroy {

subscription: Subscription
ngOnInit () {
var observable1$ = Rx.Observable.interval(1000);
var observable2$ = Rx.Observable.interval(400);
var subscription1$ = observable.subscribe(x => console.log("From interval 1000" x));
var subscription2$ = observable.subscribe(x => console.log("From interval 400" x));
this.subscription.add(subscription1$)
this.subscription.add(subscription2$)
}
ngOnDestroy() {
this.subscription.unsubscribe()
}
}

方案4 async pipe

对于直接的observable,直接上async,不需要手动unsubscribe,依赖angular内部自动unsubscribe。

缺点,对于复杂的并不适用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component({
...,
template: `
<div>
Interval: {{observable$ | async}}
</div>
`
})
export class AppComponent implements OnInit {
observable$
ngOnInit () {
this.observable$ = Rx.Observable.interval(1000);
}
}

方案5 take

1
2
3
take(n)
takeUntil(notifier)
takeWhile(predicate)

take依然需要unsubscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component({
...
})
export class AppComponent implements OnInit, OnDestroy {
subscription$
ngOnInit () {
var observable$ = Rx.Observable.interval(1000);
this.subscription$ = observable$.pipe(take(1)).
subscribe(x => console.log(x))
}
ngOnDestroy() {
this.subscription$.unsubscribe()
}
}

takeUntil目前感觉是最好的,3行完成初始化和关闭,在需要的部分用pipe接上即可。易于代码插拔

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component({...})
export class AppComponent implements OnInit, OnDestroy {
notifier = new Subject()
ngOnInit () {
var observable$ = Rx.Observable.interval(1000);
observable$.pipe(takeUntil(this.notifier))
.subscribe(x => console.log(x));
}
ngOnDestroy() {
this.notifier.next()
this.notifier.complete()
}
}

takeWhile依然需要unsubscrible

1
2
3
4
5
6
7
8
9
10
11
12
@Component({...})
export class AppComponent implements OnInit, OnDestroy {
subscription$
ngOnInit () {
var observable$ = Rx.Observable.interval(1000);
this.subscription$ = observable$.pipe(takeWhile(value => value < 10))
.subscribe(x => console.log(x));
}
ngOnDestroy() {
this.subscription$.unsubscribe()
}
}

方案6 first

不确定页面是否先destroy, 还是建议在ngOnDestroy 里显示取消

方案7 利用注解 自动取消

typescript decorators

实现(和 参考的文章不同,我们调用apply会传入原始的this,并且让用户的apply先于 unsubscribe, 关注所有的unsubscribe)

缺点是,所有的 需要一个放在this上的变量,才能被自动取消订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
const AutoUnsub = () => {
return (constructor) => {
const orig = constructor.prototype.ngOnDestroy;
constructor.prototype.ngOnDestroy = function(): void {
orig.apply(this);
for (const element of Object.values(this)) {
if ((element as Unsubscribable) && (element as Unsubscribable).unsubscribe) {
(element as Unsubscribable).unsubscribe();
}
}
};
};
};

使用

1
2
3
4
5
6
7
8
9
10
11
@Component({
...
})
@AutoUnsub
export class AppComponent implements OnInit {
observable$
ngOnInit () {
this.observable$ = Rx.Observable.interval(1000);
this.observable$.subscribe(x => console.log(x))
}
}

作者有提到tslint来检查是否有Destroy,但看起来并不够优雅了。

综上,感觉最科学的是takeUntil

参考

stackoverflow: es6 + node

webpack-cli init

esm

observable

angular.io:observables

rxmarbles 宝石图

cn.rx.js.org

webpack + typescript

wikipedia ReactiveX

unsubscribe from observables