Rxjs 笔记

Rxjs笔记

安装

用 npm

npm install rxjs-es

或者用 unpkg 来直接添加到网页里面:

1
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

更多其他方法 Rxjs install

教程参考网页

https://fe.ele.me/let-us-learn-rxjs/;

官方文档翻译


概要

  • Observable: (是什么)
  • Observer: (是什么)
  • Subscription: (是什么)
  • Operators:(是什么)
  • Subject: (是什么)
  • Schedulers: (是什么)

两分钟快速理解 Reactive

原文

当你有个数组:

[ 14, 9, 5, 2, 10, 13, 4 ]

你只要偶数的时候你可能会这么做:

1
2
3
4
5
[ 14, 9, 5, 2, 10, 13, 4 ]

filter( (x) -> x % 2 == 0 )

[ 14, 2, 10, 4 ]

Observable 是一个异步的数组: Obserbale = 数组 + 时间轴

下面是一个鼠标点击例子:

鼠标点击产生的 stream;

我们只要其中的

filter( (event) -> event.x < 250 )

看到这里你会说这不就是个 stream 嘛, 好吧, 完全可以这么认为. 在下面的介绍中 observable 和 stream 是同一个词.

面对 RxJS 那么多 operator ,我们要怎么学习呢?很简单:

分类别

Marble 图

Marble diagram

我们再用图来表示一下的话:

--a---b-c--d-----e--|-->

这种图叫做 marble diagram 。
我们可以把 ASCII 的 marble 图转成 SVG 的:ASCII -> SVG 。

- 表示时间轴,a ~ e 表示 emit 的值,| 则表示这个 stream 已经结束了。
比方说,click 事件用上图来表示:a 表示第 1 次点击,b 表示第 2 次点击,如此类推。


Observable

Demo

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

// 创建一个stream
// 1-2-3-----------4--|-->

var observable = Rx.Observable.create(function (observer) {

//可以给订阅返回多个值
observer.next(1);
observer.next(2);
observer.next(3);

//也可以异步返回
setTimeout(() => {
observer.next(4);
observer.complete();
// 一旦结束后再使用 complete 或者 next 不会有反应
observer.complete();
observer.next(5);
}, 1000);
});

console.log('just before subscribe');
//订阅
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');

// 取消订阅
//subscription.unsubscribe();

订阅和取消订阅

Demo

订阅可以理解成可观察对象的执行.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

//创建两个可观察对象
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

//两个订阅
var subscription = observable1.subscribe(x => console.log('first:' + x));
var childSubscription = observable2.subscribe(x => console.log("second:" +x));

//订阅对象也可以被放置在一起
subscription.add(childSubscription);

setTimeout(() => {
console.log("unsubscribe")
// 取消两个订阅 subscription and childSubscription
subscription.unsubscribe();
}, 1000);

Observer

什么是观察者?观察者是可观察对象所发送数据的消费者,观察者简单而言是一组回调函数 , 分别对应一种被可观察对象发送的通知的类型:next, error和complete

1
2
3
4
5
6
7
var observer={
next:x=>console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification')
}

observable.subscribe(observer)

Subject

Subject就是一个可观察对象,只不过可以被多播至多个观察者。同时Subject也类似于EventEmitter:维护者着众多事件监听器的注册表。

一个简单的例子

把它当做 eventEmitter 来用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

// new event emmiter
var subject = new Rx.Subject();

// event.bind()
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});

// event.trigger()
subject.next(1);
subject.next(2);

结合 subject 和 observable

我们有一个 observable (或stream) 但它只能在我们订阅的时候才能执行, 每次执行都是单个执行的, 如果我们要把订阅值推送到不同的地方怎么办.

在这里我们就可以用到 subject.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建 eventEmmiter
var subject = new Rx.Subject();

// 绑定事件 event.bind()
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});

// 绑定事件 event.bind()
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

// 触发订阅事件 event.trigger();
observable.subscribe(subject);

多播的可观察对象 (multicast)

一个多播的可观察对象使用一个Subject,使得多个观察者可以看到同一个可观察对象的执行 (和上面的结果一样, 就是方法有区别)

1
2
3
4
5
6
7
8
9
10
var source=Rx.Observable.from([1,2,3]);
var subject=new Rx.Subject();
var multicasted=source.multicast(subject);
multicasted.subscribe({
next:(v)=>console.log('observerA:' +v);
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
multicasted.connect();

multicast方法返回一个看起来很像普通的可观察对象的可观察对象,但是在订阅时却有着和Subject一样的行为,multicast返回一个ConnectableObservable,它只是一个具有connect()方法的Observable。

connect()方法对于在决定何时开始分享可观察对象的执行是非常重要的。 因为connect()在source下面有source.subscribe(subject),connect()返回一个Subscription,你可以取消订阅,以取消共享的Observable执行。

BehaviorSubject

Demo

BehaviorSubject 和 subject 一样, 区别在于:

  • 可以初始值.
  • 保存当前的值.
  • 在中途可以获取值 用 getValue();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 初始值 "a"
let bSubject = new Rx.BehaviorSubject("a");

// 保存新的值
bSubject.next("b");

bSubject.subscribe((value) => {
console.log("BehaviorSubject Subscription got", value);
// 订阅并获取值 "b"
// 这个是在其他 Observable 或者 Subject 里面没有的
});

bSubject.next("c"); // 获取值 c
bSubject.next("d"); // 获取值 d

对比普通的 Subject:

1
2
3
4
5
6
7
8
9
10
11
// Regular Subject

let subject = new Rx.Subject();

subject.next("b");
subject.subscribe((value) => {
console.log("Subject Subscription got", value); //在这里没有获取任何值
});

subject.next("c"); // 获取值 c
subject.next("d"); // 获取值 d

ReplaySubject

一个ReplaySubject类似于一个BehaviorSubject,因为它可以发送一个过去的值(old values)给一个新的订阅者,但是它也可以记录可观察对象的一部分执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var subject = new Rx.ReplaySubject(3); 
// buffer 3 values for new subscribers ,注:缓存了三个值。

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出:

1
2
3
4
5
6
7
8
9
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

AsyncSubject

AsyncSubject是另一个变体,它只发送给观察者可观察对象执行的最新值,并且仅在执行结束时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var subject = new Rx.AsyncSubject();

subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete(); // 只有在结束时候订阅才获取值

输出:

1
2
observerA: 5
observerB: 5

asObservable()

Demo;

隐藏 序列 方法 比如: next(), complete()…

hiden the indentity of a observable sequence.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var data = [1,2,3,4,5,6,7,8];

var myApi = {
getData(){
var subject = new Rx.BehaviorSubject();
subject.next({data});
return subject.asObservable();
}
}

var result = myApi.getData();
result.subscribe({
next: (data)=>{console.log(data)}
})
result.next("new date"); //错误

Subscription 对象

创建 subscribe 看起来很简单, 但是我们需要在软件中添加 unsubscribe 来取消订阅以防这些代码继续在跑. 一旦多了就不好控制.

打个比方 我们创建了一个 组件(web custom element), 在这个组件里面加载了很多 rxjs的订阅, 当这个组件被删除时候我们需要吧 全部的订阅给取消了. 一个个取消比较麻烦所以我们需要这个 subscription 对象来创建一个 订阅数组.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var group = new suscription();

subject.next("b");
var a = subject.subscribe((value) => {
console.log("Subject A Subscription got", value); //在这里没有获取任何值
});

group.add(a);

var b = subject.subscribe((value) => {
console.log("Subject B Subscription got", value); //在这里没有获取任何值
});

group.add(b);

group.unsubscibe(); //取消全部订阅