博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RxJS的另外四种实现方式(六)——使用Stream类实现
阅读量:6117 次
发布时间:2019-06-21

本文共 2367 字,大约阅读时间需要 7 分钟。

hot3.png

接上一篇

该实现方式与之前几种不同的,该实现方式仅针对Nodejs环境。在Nodejs环境中,提供了Stream类,包括Readable、Transform、Writeable等子类都是可扩展的。从字面上看,正好对应Rx中的生产者、传递者、消费者。

实现该库的起因是,一次在Nodejs中需要在koa框架里面提供event-stream功能,目前除了IE浏览器外其他浏览器都支持了服务端事件推送,这个功能可以很好的代替轮询。webpack用的热更新就是通过这个功能实现的。

言归正传,首先得实现生产者,我们先来看interval

class Interval extends Readable {    constructor(period) {        super({ objectMode: true })        this.period = period        this.i = 0    }    _read(size) {        setTimeout(() => this.push(this.i++), this.period)    }}exports.interval = period => new Interval(period)

说明一下,构造函数传入objectMode:true的对象是让stream处于对象模式,而不是二进制流模式。_read函数必须覆盖父类,否则出错,当有订阅者连接上来后,就会调用_read方法。我们在这个方法里面发送数据,即调用push方法,将数据发送给流的接收者。

当调用过push方法后,后面的接收者如果调用了callback回调,则表示数据消费完毕,会再次调用_read方法,直到push(null)表示生产者已经complete

FromArray也十分简单易读

class FromArray extends Readable {    constructor(array) {        super({ objectMode: true })        this.array = array        this.pos = 0        this.size = array.length    }    _read(size) {        if (this.pos < this.size) {            this.push(this.array[this.pos++])        } else            this.push(null)    }}exports.fromArray = array => new FromArray(array)

下面要实现一个转换器(操作符)Filter

class Filter extends Transform {    constructor(f) {        super({ readableObjectMode: true, writableObjectMode: true })        this.f = f    }    _transform(data, encoding, callback) {        const f = this.f        if (f(data)) {            this.push(data);        }        callback();    }    _flush(callback) {        callback()    }}exports.filter = f => new Filter(f)

这时候我们需要覆盖_transform、_flush函数,同样的,push方法会让数据流到下面的流中,而callback回调会使得上一个流继续发送数据。

最后我们来实现Subscriber

class Subscriber extends Writable {    constructor(n, e, c) {        super({ objectMode: true })        this.n = n        this.e = e        this.c = c    }    _write(chunk, encoding, callback) {        this.n(chunk)        callback(null)    }    _final(callback) {        this.c()        callback()    }}exports.subscribe = (n, e = noop, c = noop) => new Subscriber(n, e, c)

Subscriber是一个可写流,我们必须覆盖_write方法用于消费数据,_final方法用于complete事件处理。这里没有实现error事件。有兴趣的同学可以思考如何实现。

最后我们需要把各种stream串起来,变成一个长长的水管

exports.pipe = pipeline || ((first, ...cbs) => cbs.reduce((aac, c) => aac.pipe(c), first));

高版本的Nodejs已经提供了pipeline方法,可以直接使用,低版本的话,可以用上面的方法进行连接。

至此,我们已经使用Nodejs提供的Stream类实现了Rx的基本逻辑。(完)

转载于:https://my.oschina.net/langhuihui/blog/2208141

你可能感兴趣的文章
充分利用HTML标签元素 – 简单的xtyle前端框架
查看>>
设计模式(十一):FACADE外观模式 -- 结构型模式
查看>>
iOS xcodebuile 自动编译打包ipa
查看>>
程序员眼中的 SQL Server-执行计划教会我如何创建索引?
查看>>
【BZOJ】1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路(floyd)
查看>>
cmake总结
查看>>
数据加密插件
查看>>
linux后台运行程序
查看>>
win7 vs2012/2013 编译boost 1.55
查看>>
IIS7如何显示详细错误信息
查看>>
ViewPager切换动画PageTransformer使用
查看>>
coco2d-x 基于视口的地图设计
查看>>
C++文件读写详解(ofstream,ifstream,fstream)
查看>>
Android打包常见错误之Export aborted because fatal lint errors were found
查看>>
Tar打包、压缩与解压缩到指定目录的方法
查看>>
新手如何学习 jQuery?
查看>>
配置spring上下文
查看>>
Python异步IO --- 轻松管理10k+并发连接
查看>>
mysql-python模块编译问题解决
查看>>
Oracle中drop user和drop user cascade的区别
查看>>