知识依赖
基本的js语法
基本的nodejs用法
Promise用法
准备工作 我想用nodejs直接跑rxjs,然而它并不支持es6的import,尝试了--experimental-modules
依然有报错
方案1 在线编辑器 https://rxjs-dev.firebaseapp.com/guide/observable
在上面代码点击右上角的Edit in StackBlitx
方案2 nodejs + rxjs + ES6 https://github.com/standard-things/esm
安装:npm install esm rxjs
带上esm
运行
node -r esm index.js
方案3 webpack + rxjs + ES6 创建文件夹 并初始化
1 2 mkdir rxjsdemo && cd rxjsdemonpx webpack-cli init
这样选择
1 2 3 4 5 ? Will your application have multiple bundles? No ? Which will be your application entry point? index ? In which folder do you want to store your generated bundles? dist ? Will you use one of the below JS solutions? ES6 ? Will you use one of the below CSS solutions? No
然后运行
1 npm install --save-dev rxjs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import { Subject } from 'rxjs' ;import { interval } from 'rxjs' ;const subject = new Subject ();subject.subscribe ({ next : (v ) => console .log (`observerA: ${v} ` ) }); subject.subscribe ({ next : (v ) => console .log (`observerB: ${v} ` ) }); subject.next (1 ); subject.next (2 ); const observable = interval (1000 );const subscription = observable.subscribe (x => console .log (x));
运行npm start
在打开的网页查看console
是否正常输出, 忽略红字,能输出如下即可
1 2 3 4 5 6 7 observerA: 1 observerB: 1 observerA: 2 observerB: 2 0 1 ...
是个啥 众所周知Promise
Observable
可以看作一个更高级的Promise
Observable 返回值的个数为非负整数个,而Promise返回的个数为0或1个
它说 Observables 不像 事件emitters也不像多个值的Promise, [虽然在某些使用下像
Observables are like functions with zero arguments, but generalize those to allow multiple values.
来源https://subscription.packtpub.com/book/application_development/9781787120426/1/01lvl1sec7/a-brief-history-of-reactivex-and-rxjava
据说是2010年 微软一个大佬写的 .NET
的一个框架,然后现在除了RxJs,还有Rx.Java,Rx.C++等等多语言支持
Observables 基本使用和对比 1 2 3 4 5 6 7 8 9 10 11 12 13 import { Observable } from 'rxjs' ; const foo = new Observable (subscriber => { console .log ('Hello' ); subscriber.next (42 ); }); foo.subscribe (x => { console .log (x); }); foo.subscribe (y => { console .log (y); });
输出
lazy计算
订阅 类似于调用函数,它们是分开的,不同的订阅的执行不共享(相对而言 事件emitters并不会根据 订阅者个数而决定事件个数)
同步 1 2 3 4 5 console .log ('before' );foo.subscribe (x => { console .log (x); }); console .log ('after' );
ok 它是同步的
其实Observable有能力 同步分发值,和异步分发值
和函数区别,能return很多次值 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import { Observable } from 'rxjs' ; const foo = new Observable (subscriber => { console .log ('Hello' ); subscriber.next (42 ); subscriber.next (100 ); subscriber.next (200 ); setTimeout (() => { subscriber.next (300 ); }, 1000 ); }); console .log ('before' );foo.subscribe (x => { console .log (x); }); console .log ('after' );
func.call()
同步给我一个返回值observable.subscribe()
异步或同步 给我任意数量的值
拆解 Observable 创建 Observables 1 2 3 4 5 6 7 import { Observable } from 'rxjs' ;const observable = new Observable (function subscribe (subscriber ) { const id = setInterval (() => { subscriber.next ('hi' ) }, 1000 ); });
以上是一个每秒向订阅者 发送hi
的Observable
更常见的创建方式,是用创建函数of
,from
,interval
订阅 Observables observable.subscribe(x => console.log(x)); // 相当于call了observable的函数
执行 Observable 可以调用的方法
Next 通知: 发送 数值,字符串,结构体
Error 通知: 发送一个 JavaScript Error or exception.
Complete 通知: 不带值.
正则描述next*(error|complete)?
, 任意多个next,但当error或complete 其中一个执行后,就没有其它的可以分发了。
然后 建议了
1 2 3 4 5 6 7 8 9 try { subscriber.next (...) subscriber.next (...) subscriber.next (...) subscriber.next (...) subscriber.complete (); }catch (e){ subscriber.error (e) }
Disposing Observables 订阅者的退订
1 2 3 4 5 6 import { from } from 'rxjs' ;const observable = from ([10 , 20 , 30 ]);const subscription = observable.subscribe (x => console .log (x));subscription.unsubscribe ();
发布者的关闭
1 2 3 4 5 6 7 8 9 10 11 12 const observable = new Observable (function subscribe (subscriber ) { const intervalId = setInterval (() => { subscriber.next ('hi' ); }, 1000 ); return function unsubscribe ( ) { clearInterval (intervalId); }; });
Operators piping op4()(op3()(op2()(op1()(obs))))
可读性差改为
1 2 3 4 5 6 obs.pipe ( op1 (), op2 (), op3 (), op4 (), )
按照这种格式,即使只有一层 也不应该用op()(obs)
而是obs.pipe(op())
pipeable operators: 用 observableInstance.pipe(operator())
的语法,如filter(...)
和mergeMap(...)
它们不会改变原来的Observable
实例,而是返回新的Observable
Creation Operators Creation Operators: 创建新的Observable,如of,interval
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import { of } from 'rxjs' ;import { map } from 'rxjs/operators' ;map (x => x * x)(of (1 , 2 , 3 )).subscribe ((v ) => console .log (`value: ${v} ` ));first ()(of (1 , 2 , 3 ).subscribe ((v ) => console .log (`value: ${v} ` ));
Higher-order Observables 高阶Observables 处理 Observables of Observables
1 2 3 4 const fileObservable = urlObservable.pipe( map(url => http.get(url)), concatAll(), );
concatAll()
subscribes to each “inner” Observable that comes out of the “outer” Observable, and copies all the emitted values until that Observable completes, and goes on to the next one. All of the values are in that way concatenated,顺序 调用每个Observable
mergeAll()
— subscribes to each inner Observable as it arrives, then emits each value as it arrives,并行 调用每个Observable,获得了就输出
switchAll()
— subscribes to the first inner Observable when it arrives, and emits each value as it arrives, but when the next inner Observable arrives, unsubscribes to the previous one, and subscribes to the new one. 每当接受到新的 Observable 会舍弃老的Observable,当一个Observable 返回值,会立即emit
exhaust()
— subscribes to the first inner Observable when it arrives, and emits each value as it arrives, discarding all newly arriving inner Observables until that first one completes, then waits for the next inner Observable. 当前Observable 没有结束时,会抛弃后来的所有Observable,结束以后,接受一个Observables
可以看到 1
,3
,4
一个时刻 只有一个Observable在运行,1是串行办法,3是很可能半途而废 尽可能新,4是做就做完不被打断
就像很多 和map
融合的方法这也有对应的,concatMap()
, mergeMap()
, switchMap()
, and exhaustMap()
Marble diagrams
圆是值
上面的箭头是输入
下面的是输出
竖着的线是 完成态
叉是 抛出错误
正中是描述
所有Observables 按分类的: https://rxjs-dev.firebaseapp.com/guide/operators
自定义Observables 组合已有的
1 2 3 4 5 6 7 8 9 import { pipe } from 'rxjs' ;import { filter, map } from 'rxjs' ;function discardOddDoubleEven ( ) { return pipe ( filter (v => ! (v % 2 )), map (v => v + v), ); }
注意这里的pipe
和.pipe
是不同的
从零开始写一个Observable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import { Observable } from 'rxjs' ; function delay (delayInMillis ) { return (observable ) => new Observable (observer => { const allTimerIDs = new Set (); const subscription = observable.subscribe ({ next (value ) { const timerID = setTimeout (() => { observer.next (value); allTimerIDs.delete (timerID); }, delayInMillis); allTimerIDs.add (timerID); }, error (err ) { observer.error (err); }, complete ( ) { observer.complete (); } }); return () => { subscription.unsubscribe (); allTimerIDs.forEach (timerID => { clearTimeout (timerID); }); } }); }
你需要实现3个Observer functions, next()
,error()
,complete()
实现teardown
函数(清理结束内部的方法等) ,返回该函数
Subscription 父子关系 + unsubscribe()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import { interval } from 'rxjs' ; const observable1 = interval (400 );const observable2 = interval (300 ); const subscription = observable1.subscribe (x => console .log ('first: ' + x));const childSubscription = observable2.subscribe (x => console .log ('second: ' + x)); subscription.add (childSubscription); setTimeout (() => { subscription.unsubscribe (); }, 1000 );
Subject plain Observables 是一个单向广播
的
Subject 是一种多向广播
的Observable
A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners.
对于订阅者来说,它无法区别它订阅的是Subject还是plain Observable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import { Subject , from } from 'rxjs' ;const subject = new Subject <number>();subject.subscribe ({ next : (v ) => console .log (`observerA: ${v} ` ) }); subject.subscribe ({ next : (v ) => console .log (`observerB: ${v} ` ) }); subject.next (1 ); subject.next (2 ); const observable = from ([9 , 5 , 4 ]);observable.subscribe (subject);
以上我们可以看到,Subject既可以看作 订阅者,也可以当作发送者
Multicasted Observables 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import { from , Subject } from 'rxjs' ;import { multicast } from 'rxjs/operators' ;const source = from ([1 , 2 , 3 ]); const subject = new Subject ();const multicasted = source.pipe (multicast (subject)); multicasted.subscribe ({ next : (v ) => console .log (`observerA: ${v} ` ) }); multicasted.subscribe ({ next : (v ) => console .log (`observerB: ${v} ` ) }); multicasted.connect ();
multicast
: Observers 订阅一个潜在的 Subject, Subject 订阅 源Observable
. 返回一个ConnectableObservable, 可以看作一个简单的 有 connect()
方法的 Observable
connect()
方法在后台执行source.subscribe(subject)
方法,返回一个Subscription
,对返回的变量可以 unsubscribe来结束执行
考虑下面情景
A 订阅了 multicasted Observable
multicasted Observable 开始 connected
next 值 0 分发给A
B 订阅了 multicasted Observable
next 值 1 分发给A
next 值 1 分发给B
A 取消订阅
next 值 2 分发给B
B 取消订阅
multicasted Observable 的 connection 被 unsubscribed
可以如下实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import { interval, Subject } from 'rxjs' ;import { multicast } from 'rxjs/operators' ; const source = interval (500 );const subject = new Subject ();const multicasted = source.pipe (multicast (subject));let subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe ({ next : (v ) => console .log (`observerA: ${v} ` ) }); subscriptionConnect = multicasted.connect (); setTimeout (() => { subscription2 = multicasted.subscribe ({ next : (v ) => console .log (`observerB: ${v} ` ) }); }, 600 ); setTimeout (() => { subscription1.unsubscribe (); }, 1200 ); setTimeout (() => { subscription2.unsubscribe (); subscriptionConnect.unsubscribe (); }, 2000 );
手动调用connect
有时很笨重,通常我们希望在第一个Observer
到达时调用,在最后一个Observer
unsubscribes 时, refCount
让 multicasted Observable
第一个subscriber
到达时自动执行,在最后一个subscriber
结束时自动 结束执行.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import { interval, Subject } from 'rxjs' ;import { multicast, refCount } from 'rxjs/operators' ; const source = interval (500 );const subject = new Subject ();const refCounted = source.pipe (multicast (subject), refCount ());let subscription1, subscription2; console .log ('observerA subscribed' );subscription1 = refCounted.subscribe ({ next : (v ) => console .log (`observerA: ${v} ` ) }); setTimeout (() => { console .log ('observerB subscribed' ); subscription2 = refCounted.subscribe ({ next : (v ) => console .log (`observerB: ${v} ` ) }); }, 600 ); setTimeout (() => { console .log ('observerA unsubscribed' ); subscription1.unsubscribe (); }, 1200 ); setTimeout (() => { console .log ('observerB unsubscribed' ); subscription2.unsubscribe (); }, 2000 );
注意 只有ConnectableObservable
上面有refCount()
方法
BehaviorSubject 当前的值
它存储最后 给它consumers的值,当有新的订阅,则立即发送给它的订阅者
demo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import { BehaviorSubject } from 'rxjs' ;const subject = new BehaviorSubject (0 ); subject.subscribe ({ next : (v ) => console .log (`observerA: ${v} ` ) }); subject.next (1 ); subject.next (2 ); subject.subscribe ({ next : (v ) => console .log (`observerB: ${v} ` ) }); subject.next (3 );
ReplaySubject 和BehaviorSubject类似,但是不是记录最后一个,而是buffer大小来决定记录最后的多少个,初始化传入buffer的大小
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import { ReplaySubject } from 'rxjs' ;const subject = new ReplaySubject (3 ); subject.subscribe ({ next : (v ) => console .log (`observerA: ${v} ` ) }); subject.next (1 ); subject.next (2 ); subject.next (3 ); subject.next (4 ); subject.subscribe ({ next : (v ) => console .log (`observerB: ${v} ` ) }); subject.next (5 );
除了可以指定buffer的size,也可以指定buffer的过期时间 new ReplaySubject(buffer size, buffer time)
,单位毫秒
AsyncSubject 只有在complete
时才向它的订阅者发送最后一个 值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import { AsyncSubject } from 'rxjs' ;const subject = new AsyncSubject (); subject.subscribe ({ next : (v ) => console .log (`observerA: ${v} ` ) }); subject.next (1 ); subject.next (2 ); subject.next (3 ); subject.next (4 ); subject.subscribe ({ next : (v ) => console .log (`observerB: ${v} ` ) }); subject.next (5 ); subject.complete ();
只会接受到5
Scheduler 调度!
关于Promise
能有的和调度沾边的 只有Promise.all()
,和Promise.race()
它是个数据结构,储存,基于 优先级或其它表转 执行队列任务
是一个执行环境,它表示 一个任务在何时何地被执行, 立即 或 回调
它有一个虚拟时钟, 通过调度器的now()
方法,会回一个时间 标识,task得到的时间标识仅与其所在的调度器有关。
A Scheduler lets you define in what execution context will an Observable deliver notifications to its Observer.
调度器,让你定义 在什么执行上下文下,一个Observale会向它的Observer分发同志
串行 emit 1 2 3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import { Observable , asyncScheduler } from 'rxjs' ;import { observeOn } from 'rxjs/operators' ; var observable = new Observable ((proxyObserver ) => { proxyObserver.next (1 ); proxyObserver.next (2 ); proxyObserver.next (3 ); proxyObserver.complete (); }).pipe ( observeOn (asyncScheduler) ); var finalObserver = { next (x ) { console .log ('got value ' + x) }, error (err ) { console .error ('something wrong occurred: ' + err); }, complete ( ) { console .log ('done' ); } }; console .log ('just before subscribe' );observable.subscribe (finalObserver); console .log ('just after subscribe' );
proxyObserver
是 observeOn(asyncScheduler)
创建出的,其中next(val)
函数大约是
1 2 3 4 5 6 7 8 9 10 const proxyObserver = { next (val ) { asyncScheduler.schedule ( (x ) => finalObserver.next (x), 0 , val ); }, }
除了asyncScheduler
还有queueScheduler
(遍历 当前事件frame),asapScheduler
(微任务队列调度,promise有用,在当前任务后,在下一个任务前用于异步交流),animationFrameScheduler
(在下一个 浏览器内容绘制前,可用于创建流畅的浏览器动画)
然后 你其实已经使用了调度器,只是隐式的
如,你在创建少量有限返回值的Observale时 用的是null或undefined调度器
对于返回大量或无限数据的用的是queue调度器
对于用到了timer的 用的是async调度器
对于一些性能优化的目的,你可以指定具体的调度器,如 creation operator
的from([10,20,30],asyncScheduler)
以下这些都支持传递 调度器 参数
bindCallback
bindNodeCallback
combineLatest
concat
empty
from
fromPromise
interval
merge
of
range
throw
timer
subscribeOn 可以把 上面 第二个参数传递调度器,改成, obs.pipe(subscribeOn(调度器))
的写法
https://rxjs-dev.firebaseapp.com/api/operators/subscribeOn
实例 输入事件 每100ms的限制 来进行搜索
1 2 3 4 5 6 7 8 9 10 Observable .fromEvent (input, 'input' ) .map (e => e.target .value ) .filter (value => value.length >= 1 ) .throttleTime (100 ) .distinctUntilChanged () .switchMap ((value ) => "message is " + value) .subscribe ( x => renderSearchResult (x), err => console .error (err) )
同时调用 多个 异步方法,等待3个全回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import { Observable , pipe } from 'rxjs' ;import { skip, take } from 'rxjs/operators' ;function fetchdata (url,cb ){ console .log ('start fetch' +url); setTimeout (()=> { console .log ('fetched' +url); cb (); },1000 ); } let ob = Observable .create ((observer )=> { fetchdata ('/url1' ,()=> { observer.next (); }); fetchdata ('/url2' ,()=> { observer.next (); }); fetchdata ('/url3' ,()=> { observer.next (); }); }).pipe (skip (2 ),take (1 )).subscribe (()=> { console .log ('finish' ); });
生命周期实例 需求: angular 内页面上有一些observables。
我们希望在页面销毁时 能结束这些。(否则会有内存泄漏)
方案1 手动逐个unsubscrible 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component ({...})export class AppComponent implements OnInit , OnDestroy { subscription1$ : Subscription subscription2$ : Subscription ngOnInit () { var observable1$ = Rx .Observable .interval (1000 ); var observable2$ = Rx .Observable .interval (400 ); this .subscription1$ = observable.subscribe (x => console .log ("From interval 1000" x)); this .subscription2$ = observable.subscribe (x => console .log ("From interval 400" x)); } ngOnDestroy ( ) { this .subscription1$ .unsubscribe () this .subscription2$ .unsubscribe () } }
方案2 用数组,循环unsubscrible 优点,新增时,只需要 新增和添加到数组,不需要修改OnDestroy的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Component ({...})export class AppComponent implements OnInit , OnDestroy { subscription1$ : Subscription subscription2$ : Subscription subscriptions : Subscription [] = [] ngOnInit () { var observable1$ = Rx .Observable .interval (1000 ); var observable2$ = Rx .Observable .interval (400 ); this .subscription1$ = observable.subscribe (x => console .log ("From interval 1000" x)); this .subscription2$ = observable.subscribe (x => console .log ("From interval 400" x)); this .subscriptions .push (this .subscription1$ ) this .subscriptions .push (this .subscription2$ ) } ngOnDestroy ( ) { this .subscriptions .forEach ((subscription ) => subscription.unsubscribe ()) } }
方案3 用Subscription 优点,用自带的Subscription 替代手工数组(对写法影响不大。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component ({...})export class AppComponent implements OnInit , OnDestroy { subscription : Subscription ngOnInit () { var observable1$ = Rx .Observable .interval (1000 ); var observable2$ = Rx .Observable .interval (400 ); var subscription1$ = observable.subscribe (x => console .log ("From interval 1000" x)); var subscription2$ = observable.subscribe (x => console .log ("From interval 400" x)); this .subscription .add (subscription1$) this .subscription .add (subscription2$) } ngOnDestroy ( ) { this .subscription .unsubscribe () } }
方案4 async pipe 对于直接的observable,直接上async,不需要手动unsubscribe,依赖angular内部自动unsubscribe。
缺点,对于复杂的并不适用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Component ({ ..., template : ` <div> Interval: {{observable$ | async}} </div> ` }) export class AppComponent implements OnInit { observable$ ngOnInit () { this .observable$ = Rx .Observable .interval (1000 ); } }
方案5 take 1 2 3 take(n) takeUntil(notifier) takeWhile(predicate)
take依然需要unsubscribe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Component ({ ... }) export class AppComponent implements OnInit , OnDestroy { subscription$ ngOnInit () { var observable$ = Rx .Observable .interval (1000 ); this .subscription$ = observable$.pipe (take (1 )). subscribe (x => console .log (x)) } ngOnDestroy ( ) { this .subscription$ .unsubscribe () } }
takeUntil目前感觉是最好的,3行完成初始化和关闭,在需要的部分用pipe接上即可。易于代码插拔
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component ({...})export class AppComponent implements OnInit , OnDestroy { notifier = new Subject () ngOnInit () { var observable$ = Rx .Observable .interval (1000 ); observable$.pipe (takeUntil (this .notifier )) .subscribe (x => console .log (x)); } ngOnDestroy ( ) { this .notifier .next () this .notifier .complete () } }
takeWhile依然需要unsubscrible
1 2 3 4 5 6 7 8 9 10 11 12 @Component ({...})export class AppComponent implements OnInit , OnDestroy { subscription$ ngOnInit () { var observable$ = Rx .Observable .interval (1000 ); this .subscription$ = observable$.pipe (takeWhile (value => value < 10 )) .subscribe (x => console .log (x)); } ngOnDestroy ( ) { this .subscription$ .unsubscribe () } }
方案6 first 不确定页面是否先destroy, 还是建议在ngOnDestroy 里显示取消
方案7 利用注解 自动取消 typescript decorators
实现(和 参考的文章不同,我们调用apply会传入原始的this,并且让用户的apply先于 unsubscribe, 关注所有的unsubscribe)
缺点是,所有的 需要一个放在this上的变量,才能被自动取消订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 const AutoUnsub = ( ) => { return (constructor ) => { const orig = constructor.prototype .ngOnDestroy ; constructor.prototype .ngOnDestroy = function ( ): void { orig.apply (this ); for (const element of Object .values (this )) { if ((element as Unsubscribable ) && (element as Unsubscribable ).unsubscribe ) { (element as Unsubscribable ).unsubscribe (); } } }; }; };
使用
1 2 3 4 5 6 7 8 9 10 11 @Component ({ ... }) @AutoUnsub export class AppComponent implements OnInit { observable$ ngOnInit () { this .observable$ = Rx .Observable .interval (1000 ); this .observable$ .subscribe (x => console .log (x)) } }
作者有提到tslint来检查是否有Destroy,但看起来并不够优雅了。
综上,感觉最科学的是takeUntil
参考 stackoverflow: es6 + node
webpack-cli init
esm
observable
angular.io:observables
rxmarbles 宝石图
cn.rx.js.org
webpack + typescript
wikipedia ReactiveX
unsubscribe from observables