接上一篇
该实现方式与之前几种不同的,该实现方式仅针对Nodejs环境。在Nodejs环境中,提供了Stream类,包括Readable、Transform、Writeable等子类都是可扩展的。从字面上看,正好对应Rx中的生产者、传递者、消费者。
实现该库的起因是,一次在Nodejs中需要在koa框架里面提供event-stream功能,目前除了IE浏览器外其他浏览器都支持了服务端事件推送,这个功能可以很好的代替轮询。webpack用的热更新就是通过这个功能实现的。
言归正传,首先得实现生产者,我们先来看interval
class Interval extends Readable { constructor(period) { super({ objectMode: true }) this.period = period this.i = 0 } _read(size) { setTimeout(() => this.push(this.i++), this.period) }}exports.interval = period => new Interval(period)
说明一下,构造函数传入objectMode:true的对象是让stream处于对象模式,而不是二进制流模式。_read函数必须覆盖父类,否则出错,当有订阅者连接上来后,就会调用_read方法。我们在这个方法里面发送数据,即调用push方法,将数据发送给流的接收者。
当调用过push方法后,后面的接收者如果调用了callback回调,则表示数据消费完毕,会再次调用_read方法,直到push(null)表示生产者已经complete
FromArray也十分简单易读
class FromArray extends Readable { constructor(array) { super({ objectMode: true }) this.array = array this.pos = 0 this.size = array.length } _read(size) { if (this.pos < this.size) { this.push(this.array[this.pos++]) } else this.push(null) }}exports.fromArray = array => new FromArray(array)
下面要实现一个转换器(操作符)Filter
class Filter extends Transform { constructor(f) { super({ readableObjectMode: true, writableObjectMode: true }) this.f = f } _transform(data, encoding, callback) { const f = this.f if (f(data)) { this.push(data); } callback(); } _flush(callback) { callback() }}exports.filter = f => new Filter(f)
这时候我们需要覆盖_transform、_flush函数,同样的,push方法会让数据流到下面的流中,而callback回调会使得上一个流继续发送数据。
最后我们来实现Subscriber
class Subscriber extends Writable { constructor(n, e, c) { super({ objectMode: true }) this.n = n this.e = e this.c = c } _write(chunk, encoding, callback) { this.n(chunk) callback(null) } _final(callback) { this.c() callback() }}exports.subscribe = (n, e = noop, c = noop) => new Subscriber(n, e, c)
Subscriber是一个可写流,我们必须覆盖_write方法用于消费数据,_final方法用于complete事件处理。这里没有实现error事件。有兴趣的同学可以思考如何实现。
最后我们需要把各种stream串起来,变成一个长长的水管
exports.pipe = pipeline || ((first, ...cbs) => cbs.reduce((aac, c) => aac.pipe(c), first));
高版本的Nodejs已经提供了pipeline方法,可以直接使用,低版本的话,可以用上面的方法进行连接。
至此,我们已经使用Nodejs提供的Stream类实现了Rx的基本逻辑。(完)