随着前端框架react,angular以及vue的流行,响应式编程也开始在前端领域得以广泛应用。因此,了解并且理解响应式编程有助于更好地学习这些框架,同时利用好响应式编程的相关工具,可以让编程更加轻松。

什么是响应式编程

和平常经常听说的面向对象编程和函数式编程一样,响应式编程(Reactive Programming)就是一个编程范式,但是与其他编程范式不同的是它是基于数据流和变化传播的。我们经常在程序中这样写

1
A = B + C

A被赋值为BC的值。这时,如果我们改变B的值,A的值并不会随之改变。而如果我们运用一种机制,当B或者C的值发现变化的时候,A的值也随之改变,这样就实现了”响应式“。

而响应式编程的提出,其目的就是简化类似的操作,因此它在用户界面编程领域以及基于实时系统的动画方面都有广泛的应用。另一方面,在处理嵌套回调的异步事件,复杂的列表过滤和变换的时候也都有良好的表现。

函数响应式编程

而主要利用函数式编程(Functional Programming)的思想和方法(函数、高阶函数)来支持Reactive Programming就是所谓的Functional Reactive Programming,简称FRP。

FPR 将输入分为两个基础的部分:行为(behavior)和事件(events) 。这两个基本元素在函数响应式编程中都是第一类(first-class)值。 其中行为是随时间连续变化的数据,而事件则是基于离散的时间序列 。例如:在我们操作网页的时候,会触发很多的事件,包括点击,拖动,按键事件等。这些事件都是不连续的。对事件求值是没有意义的,所有我们一般要通过fromEventbuffer等将其变成连续的行为来做进一步处理。与RP相比,FRP更偏重于底层。由于采用了函数式编程范式,FRP也自然而然带有其特点。这其中包括了不可变性,没有副作用以及通过组合函数来构建程序等特点。

应用范围

  1. 多线程,时间处理,阻塞等场景
  2. ajax,websocket和数据加载
  3. 失败处理
  4. DOM事件和动画

观察者模式和迭代器模式

在这里简单介绍一下观察者模式和迭代器模式,便于对后续介绍的概念有所了解。

观察者模式

观察者模式(Observer Pattern):定义了对象间的一种一对多的依赖关系,当一个对象状态发生改变时,其相关依赖对象皆得到通知并被自动更新。观察者模式又叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。

观察者模式在事件处理中应用特别广泛,也是MVC架构模式的核心。我们来写一个简单的应用,试一试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 监听者
class Observer {
constructor(index) {
this.index = index;
}

say() {
console.log(`我是第${this.index}个用户😁`);
}

update() {
console.log('observer : 我开始说话了');
this.say();
}
}
// 被监听者
class Observable {
constructor() {
this.observers = [];
}

addObserver(observer) {
this.observers.push(observer);
}

removeObserverByIndex(index) {
this.observers.splice(index,1);
}

notify() {
console.log('Observable : 开始通知所有监听者');
this.observers.forEach(x => x.update());
}
}
//客户端代码,注册监听
const observer1 = new Observer(1);
const observer2 = new Observer(2);
const observable = new Observable();
observable.addObserver(observer1);
observable.addObserver(observer2);
//通知所有监听者
observable.notify();

执行结果如下:

1
2
3
4
5
"Observable : 开始通知所有监听者"
"observer : 我开始说话了"
"我是第1个用户😁"
"observer : 我开始说话了"
"我是第2个用户😁"

在观察者模式中,可以分为两种模式,push和pull:

1.push“推模式“,就是被监听者将消息推送出去,进而触发监听者的相应事件。如上面的事例代码就是采用这种方式,响应式编程一般采用这种模式。

2.pull”拉模式”,就是监听者主动从被监听者处获取数据。

迭代器模式

提供一种方法顺序访问一个聚合对象中的各个元素,而又不暴露内部的表示。

迭代器模式常用的场合是在遍历集合,增删改查的时候。如今,很多的高级语言都已经将其作为自身的语言特性,如python,java,es6等都有其实现。我们可以使用es5的语法简单实现一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function getIterator(array){
var nextIndex = 0;

return {
next: function(){
return nextIndex < array.length ?
{value: array[nextIndex++], done: false} :
{done: true};
}
}
}

var iterator = getIterator([1,2,3]);
iterator.next().value; // 1
iterator.next().value; // 2

以上的代码利用了javscript语法的闭包特性,返回了一个带状态的对象,通过调用next方法来获取集合中的下一个值。这和函数式编程中部分求值的特点是一样的。这个模式很简单,我们利用它就可以不再需要手动遍历获取集合中的数据了。

RX(Reactive Extension)

Reactive Extension 这个概念最早是出现在微软的.NET社区中的,而目前也越来越多的语言实现了自己的RX,如java,javascript,ruby等。

微软官方的解释是这样的:

Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.

简单地来说就是利用ObservableLINQ风格的基于事件驱动的编程扩展库。它是响应式编程的一种实现,用于解决异步事件流的一种解决方案。通俗点解释,就是利用它可以很好地控制事件流的异步操作,将事件的发生和对事件的响应进行解耦。可以让开发者不再关心复杂的线程处理,锁等并发相关问题。

RxJs

RxJS是用javascript实现的一个RX类库, 官方说明里指出RxJS = Observables + Operators + Schedulers。其中Observables用于生产消息,而Subscriber则用于消费消息,这和生产者和消费者的概念有点类似。

Obversables其实是一组事件流,比如你在键盘上输入”hello”这五个字母,你就有了”h - e - l - l -o”十个keydown + keyup事件组成的一组序列,这就称为Obversables,但是它是不可更改的。如果你只想要过滤出keydown事件怎么做呢?这时候你就需要利用Operators,在响应式编程概念里可以利用其组合出新的行为和事件。在这里,你可以用它用来操作这个不可变的Obversable从而生成你想要的结果,同时,可以采用多个链式调用来进行更加复杂的操作,如:

1
obversables.filter((event) => {return event === 'keydown' });

Observable

Observable 实际上是应用了观察者模式和迭代器模式的事件或者说消息序列。在RxJs中提供了多个API来将生成Observable对象,如基本的create,of, fromEvent等。

Observable可以被订阅(subscribe),随后会将数据push给所有订阅者(subscribers)。

你可能在处理异步操作的时候,会应用到Promise这个技术。那么ObservablePromise相比,又有什么区别呢?

首先,Observable是不可变的,这也是函数式编程的思想。你每次需要获取新的序列的时候,都需要利用函数操作对其做变换,这也避免了无意中修改数据造成的Bug。其次,我们知道Promise对象一旦生成并触发后,是不可以取消的,而Observable是可以,这也提供了一些灵活性。同时,当你需要共享变量的时候,Observable是可以组合使用的。最后,还有一个特性是Promise每次只能返回一个值,而Observable可以返回多值。

Observer

Observer: is a collection of callbacks that knows how to listen to values delivered by the Observable.

Observer就是一组回调函数的集合,包括next, error, complete三个,它的值是Observable传进来的,然后在监听的时候来触发这些函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100);
observer.next(200);
setTimeout(() => {
observer.next(300); // happens asynchronously
}, 1000);
});

console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');

输出:
"before"
"Hello"
42
100
200
"after"
300

Operator

由于Observables是不可变的,因此要根据原生的数据结构生成新的数据结构,必须要借助强大的函数组合来达到效果。Operator就是这样的一个工具箱。它不仅仅提供了我们常见的map,filter,reduce操作,也提供了如连接,条件判断,转换,聚合,操作时间等方法。

在Javascript原生的数组操作中,也经常可以看到map,filter,reduce等函数的身影,如:

1
2
var source = ['1', '2', 'hello', 'world'];
var result = source.map(x => parseInt(x)).filter(x => !isNaN(x));

而RxJs提供的Operator和对数组操作的又有什么区别呢?Operator工作和数组相比较而言,数组每次操作会直接处理整个数组,但是Operator是一个迭代器,它会在处理完一个值后才转去处理下一个值。

安装和使用

现在官方提供的RxJs有两个仓库,RxJs5以及RxJS,你可以自己选择

安装rxjs用

1
npm install rx

而安装rxjs5就用

1
npm install rxjs

然后导入项目中应用就可以了。

如果你嫌麻烦的话,我在github上创建了新的初始项目,可以直接上手应用RxJS,仓库地址https://github.com/scq000/rxjs-quick-starter用来练手写个小Demo还是很方便的。

动手写一写

下面我们来写一个小的Demo。任务是通过查询GitHub的API, 获取用户列表,然后当点击特定用户名的时候,获取这个用户的详细信息。基于Github官方的提供的两个API:

  1. https://api.github.com/users
  2. https://api.github.com/users/username

其实,利用原生的Javascript我们也能很好地实现这样的需求。不过我们通常会依赖前一次的回调状态,因此它不适用于模块化或者修改要传递给下一个回调的数据)。当我们要回调的层数比较多的时候,我们就陷入了“回调地狱”中去了。现在就让我们看看RxJs怎么实现这样一个需求吧。

  1. 先创建HTML页面结构:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    <button id="getAllBtn">Get All Users</button>

    <form onsubmit="return false;">
    <input id="search-input" type="text" placeholder="search">
    </form>

    <div>
    <ul id="user-lists">
    </ul>
    </div>

    <div id="user-info">
    </div>

  2. 写JS代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    //导入依赖
    const $ = require('jquery');
    const Rx = require('rxjs/Rx');

    //获取页面元素
    const getAllBtn = $('#getAllBtn');
    const searchInput = $('#search-input');
    let keyword = '';

    //定义事件流
    const clickEventStream = Rx.Observable.fromEvent(getAllBtn, 'click');
    const inputEventStream = Rx.Observable.fromEvent(searchInput, 'keyup').filter(event => event.keyCode !== 13);
    const clickUserItemStream = Rx.Observable.fromEvent($('#user-lists'), 'click');

    //将用户触发的事件流转换成API请求流
    const getUserListStream = clickEventStream.flatMap(() => {
    return Rx.Observable.fromPromise($.getJSON('https://api.github.com/users'));
    });

    const filterUserStream = inputEventStream.flatMap(event => {
    return Rx.Observable.fromPromise($.getJSON('https://api.github.com/users'));
    });

    const getUserInformation = clickUserItemStream.flatMap(event => {
    console.log(event.target.innerText);
    return Rx.Observable.fromPromise($.getJSON('https://api.github.com/users/' + event.target.innerText));
    });

    //当响应到达时触发
    getUserInformation.subscribe(user => {
    console.log(user);
    renderUserInfo(user);
    });

    filterUserStream.subscribe(users => {
    console.log(users);
    renderUserLists(users.filter(user => user.login.includes(keyword)));
    });

    clickEventStream.subscribe(
    value => console.log('GetUsers btn click!')
    );

    inputEventStream.subscribe(event => {
    console.log(searchInput.val());
    keyword = searchInput.val();
    });

    clickUserItemStream.subscribe(event => {
    console.log(event.target);
    });

    getUserListStream.catch(err => {
    Rx.Observable.of(err); //使用catch函数避免错误被中断
    }).subscribe(users => {
    console.log(users);
    renderUserLists(users)
    });

    //将数据渲染到DOM元素上
    function renderUserLists(users) {
    $('#user-lists').html('');
    users.forEach((user) => {
    $('#user-lists').append(`<li>${user.login}</li>`);
    });
    }

    function renderUserInfo(user) {
    $('#user-info').html('');
    for (var key in user) {
    $('#user-info').append(`<div>${key} ---> ${user[key]}</div>`);
    }
    }

代码已经放在Github,如果你感兴趣的话,可以clone下来跑跑看。

总结

响应式编程的思想比较不好理解,我在学习过程中,也查阅了很多的资料,只能算是刚刚入门。所以,这篇文章也算是对整个学习过程的一次总结吧。

Read More

React Programming
An introduction to reactive programming
Slidershare: introduction to functional reactive programming
Functional Reactive Programming from First Principles
A survey of functional reactive programming

响应式编程一览

C#设计模式系列(16)-迭代器模式
学习教程
RxJs GitBook
RxJs官方文档
Introduction to reactive programming 视频