Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用 Stream #7

Open
luokuning opened this issue Dec 14, 2016 · 0 comments
Open

使用 Stream #7

luokuning opened this issue Dec 14, 2016 · 0 comments
Labels

Comments

@luokuning
Copy link
Owner

luokuning commented Dec 14, 2016

并不逐字翻译,原文请点这里

可读数据流

这篇文章会继续讲述如何建造我们的家用自动监控系统。 在我们的系统里有一个温度计,它会频繁的发出温度数据,就好像是一个数据水龙头一样,而这在 Node.js 里通常被称为可读数据流。

我们可以这样监听温度计发出的数据:

var thermometer = home.rooms.living_room.thermometer;

thermometer.on('data', function(temperature) {  
  console.log('Living room temperature is: %d C', temperature);
});

你可以看到可读数据流其实是事件发射器(event emitter)的一个特例,当有可用数据的时候不断发出 data 事件

一个可读数据流可以发出任何类型的数据: 二进制形式的 buffer 或者字符串,甚至是更复杂的 Javascript 对象。

除了能发出数据,可读数据流还能暂停和恢复:

thermometer.pause();

// in 5 seconds, resume monitoring temperature:
setTimeout(function() {  
  thermometer.resume();
}, 5000);

当暂停的时候,可读数据流将不会再发出数据,直到被恢复流动(resumed)。

很多接口和实例都实现了可读数据流,有些是 Node.js 内置实现的,有些是外部实现的:

  • 读取文件内容
  • 服务器 HTTP 请求体
  • TCP 连接
  • 客户端 HTTP 响应体
  • 数据库变更
  • 音/视频流
  • 数据库查询返回结果
  • 当然还有很多...

创建一个可读数据流

创建一个可读数据流有很多方法,其中有一个是继承 Node.js 的 stream.Readable 类,并实现 _read 方法:

var Readable = require('stream').Readable;  
var util = require('util');

module.exports = Thermometer;

function Thermometer(options) {  
  if (! (this instanceof Thermometer)) return new Thermometer(options);
  if (! options) options = {};
  options.objectMode = true;
  Readable.call(this, options);
}

util.inherits(Thermometer, Readable);

Thermometer.prototype._read = function read() {  
  var self = this;

  getTemperatureReadingFromThermometer(function(err, temperature) {
    if (err) self.emit('error', err);
    else self.push(temperature);
  });
};

Thermometer 构造函数通过调用父类的构造函数来初始化,同时设定 options.objectMode = true 来让可读数据流能够处理除字符串和 buffer 之外的数据类型。

除了继承 stream.Readable 之外,我们的自定义可读数据流类还必须实现 _read 方法,以便当数据流准备好的时候拉取数据。这个方法会从底层资源获取必要的数据,而一旦获取到数据,应该使用 stream.push(data) 将数据推到流中 (译者注:_read 方法不应该直接调用,应该是定义在子类上,并且由内部类方法自动调用)。

拉取(pull)数据流 vs 推送(push)数据流

有两种主要的可读数据流: 有一种是你必须主动去它那拉取数据,我们称之为拉取数据流,另一种是它会推送数据给你,我们可以称之为推送数据流。对于推送数据流的类比是水龙头: 一旦你打开它,就会有源源不断的水流。而对于拉取数据流的类比是一根吸管: 很明显只有你去洗的时候才会有水上来。

以现实生活中的另一个例子来类比,多人传递水桶救火的场景大家应该都熟悉,如果后面没有人来接力这个水桶,那么拿着桶的人只能原地干等着。

Node.js 核心的 stream 类这两种模式都有。如果你只是简单的监听 data 事件,那么推送模式就会被激活,并且数据流会尽可能快的流动起来,速度取决于底层资源推送的速度:

var Thermometer = require('./thermometer');

var thermomether = Thermometer();

thermomether.on('data', function(temp) {  
  console.log('temp:', temp);
});

相反,如果你主动的从 stream 读取数据,那么你使用的就是默认的拉取模式,并且读取的速率由你自己控制,比如下面这个例子:

var Thermometer = require('./thermometer');

var thermometer = Thermometer({highWaterMark: 1});

setInterval(function() {  
  var temp = thermometer.read();
  console.log('temp:', temp);
}, 1000);

客户端使用 stream.read 方法来从 stream 里读取最新的数据。注意我们实例化 thermomete 的时候传入了参数 highWaterMark 等于 1,当 stream 的 objectMode 等于 true 的时候,这个值表示的是 stream 最多能缓存多少个对象。由于我们不想获得老旧的温度数据,所以这里定义了最多只缓存一个数据。

当我们后面介绍能串接多个 stream 的 stream.pipe 方法时,你会清楚这些值以及数据流向是怎么结合在一起的。

可写数据流

相较于发出数据的数据流,我们还有一种能接收数据的数据流: 可写数据流。Node.js 里有一些实例就是可写数据流:

  • 处于添加模式的可写文件
  • TCP 连接
  • 标准输出 (stdout)
  • 服务器 HTTP 响应体
  • 数据库中的库或者表
  • HTML 解析器
  • 远程的日志输出 (remote logger)
  • 当然还有很多其他的...

要往可写数据流中输入数据,可以简单的调用 stream.write(o),传入你想写入数据即可。你也可以传入一个回调函数,当数据被写入的时候调用: stream.write(payload, callback)

与实现自定义的可读数据流类似,实现自定义的可写数据流需要继承自 stream.Writable,并且实现受保护的 stream._write 方法。

现在假如你想把读取到的每一个温度数据都存进数据库中,但是你使用的数据库连接模块并没有提供流式 API,那么我们就来自己实现一个:

var Writable = require('stream').Writable;  
var util = require('util');

module.exports = DatabaseWriteStream;

function DatabaseWriteStream(options) {  
  if (! (this instanceof DatabaseWriteStream))
    return new DatabaseWriteStream(options);
  if (! options) options = {};
  options.objectMode = true;
  Writable.call(this, options);
}

util.inherits(DatabaseWriteStream, Writable);

DatabaseWriteStream.prototype._write = function write(doc, encoding, callback) {  
  insertIntoDatabase(JSON.stringify(doc), callback);
};

实现这样一个可写数据流真的很简单: 除了继承的套路和开启 ObjectMode 之外,只需要再定义一个 _write 方法,这个方法会把数据写入底层资源(这个例子是写进数据库中)。_write 方法必须接受三个参数:

  • chunk: 需要写入的数据;
  • encoding: 如果数据是字符串,则定义其编码;
  • callback: 当数据写入完成或发生错误时的回调函数;

现在我们可以使用这个自定义的类来把温度数据写进数据库中:

var DbWriteStream = require('./db_write_stream');  
var db = DbWriteStream();

var Thermometer = require('./thermometer');

var thermomether = Thermometer();

thermomether.on('data', function(temp) {  
  db.write({when: Date.now(), temperature: temp});
});

看起来不错,但是回过头来想想,我们为什么要费劲为数据库写一个可写的数据流类?为什么不直接使用本来的数据库连接模块?因为 stream 能通过 stream.pipe 方法组合串接起来。

使用管道串接流

相比于手动的链接两个流,我们可以使用 stream.pipe 方法把可读数据流串接到可写数据流上,比如这样:

var DbWriteStream = require('./db_write_stream');  
var db = DbWriteStream();

var Thermometer = require('./thermometer');  
var thermomether = Thermometer();

thermomether.pipe(db); 

上面的代码会保持 thermomether 与数据库之间的数据流动。假如后面我们想停止数据流动,只需要调用 unpipe 方法解除彼此的连接即可:

// 10 秒后解除连接

setTimeout(function() {  
  thermometer.unpipe(db);
}, 10e3);

pipe 方法的好处不仅可以让我们少写数据从一个流转移到另一个流的冗余代码,还能随时让我们控制数据的流动性(暂定/恢复)。

数据流中的流控制 (Flow controll in streams)

当你使用 readable.pipe(writable) 把一个可读数据流跟另一个可写数据流串联起来的时候,数据流动的速率会根据消费者(consumer)的吸收的速率来调整(译者注: 这里的消费者就是可写数据流)。底层的机制是,pipe 会使用可写数据流的 write 方法,并且根据这个方法的返回值来决定是否暂定可读数据流的方法: 如果 write 方法返回 true,表示数据已经被写入底层的数据目的地了(在这个例子里数据目的地就是数据库),而如果返回 false,表示要被写入的数据正在被缓存,等待被写入,也就意味着数据源需要暂停发送数据。一旦可写数据流中的数据已经被排干净了(drained),它就会适当的发出 drained 事件,从而通知管道回复数据流动。

我们之前也说过,你可以通过定义 options.highWaterMark 的值来控制可读数据流最大的缓存值。如果是二进制流,那么这个值的单位是字节(byte), 如果 options.objectModetrue,那么这个值表示的是最多能缓存的对象的数量。

事实上你不用担心这个问题,当你创建一个可写数据流的时候,你只需要在数据已经被写入完成的时候调用回调函数就行了,而当你使用可读数据流的时候,可以通过设定 options.highWaterMark 来设定最大的缓存值,剩下的其他事情 Node.js 都会帮你搞定。

转换数据流

我们已经看过可读和可写数据流了,除了这两种数据流之外,还有第三种数据流: 组合了这两种数据流的数据流,有时候也被称为转换数据流(transform stream)。

数据流其实并不一定成对使用,例如我们上面说的可写数据流通过管道串接至可写数据流,它们也能通过转换数据流来组合使用: 数据从可读数据流流向一个或多个转换数据流,最后写入可写数据流。

在上面给数据库写入数据所创建的可写数据流例子中,我们接收 Javascript 对象,然后把它们转换成 JSON 字符串,最后再插入数据库中。其实我们也可以创建一个通用的转换数据流来做这个:

var Transform = require('stream').Transform;  
var inherits = require('util').inherits;

module.exports = JSONEncode;

function JSONEncode(options) {  
  if ( ! (this instanceof JSONEncode))
    return new JSONEncode(options);

  if (! options) options = {};
  options.objectMode = true;
  Transform.call(this, options);
}

inherits(JSONEncode, Transform);

JSONEncode.prototype._transform = function _transform(obj, encoding, callback) {  
  try {
    obj = JSON.stringify(obj);
  } catch(err) {
    return callback(err);
  }

  this.push(obj);
  callback();
};

为了构建一个自定义的准换数据流,我们需要继承 Node.js 的 stream.Transform 伪类,并且实现受保护的 _transform 方法,而这个方法正是起到实际转换作用的方法。

你可能已经注意到当我们介绍管道(pipe)的时候,我们只是把温度存到了数据库中,并没有把时间戳也一并存进去。现在我们可以自定义一个转换数据流来实现这个方案:

var Transform = require('stream').Transform;  
var inherits = require('util').inherits;

module.exports = ToTimestampedDocTransform;

function ToTimestampedDocTransform(options) {  
  if ( ! (this instanceof JSONTransform))
    return new JSONTransform(options);

  if (! options) options = {};
  options.objectMode = true;
  Transform.call(this, options);
}

inherits(ToTimestampedDocTransform, Transform);

ToTimestampedDocTransform.prototype._transform = function _transform(temperature, encoding, callback) {  
  this.push({when: Date.now(), temperature: temperature});
  callback();
};

做完这个我们就没有必要去创建文档(document, 译者注: 指的是之前存到数据库中的对象)了, 简化了数据库可写数据流:

var Writable = require('stream').Writable;  
var util = require('util');

module.exports = DatabaseWriteStream;

function DatabaseWriteStream(options) {  
  if (! (this instanceof DatabaseWriteStream))
    return new DatabaseWriteStream(options);
  if (! options) options = {};
  options.objectMode = true;
  Writable.call(this, options);
}

util.inherits(DatabaseWriteStream, Writable);

DatabaseWriteStream.prototype._write = function write(doc, encoding, callback) {  
  insertIntoDatabase(doc, callback);
};

function insertIntoDatabase(doc, cb) {  
  setTimeout(cb, 10);
}

最终我们可以实例化并且使用管道串接所有这些数据流:

var DbWriteStream = require('./db_write_stream');  
var db = DbWriteStream();

var JSONEncodeStream = require('./json_encode_stream');  
var json = JSONEncodeStream();

var ToTimestampedDocumentStream = require('./to_timestamped_document_stream');  
var doc = ToTimestampedDocumentStream();

var Thermometer = require('../thermometer');

var thermometer = Thermometer();

thermometer.pipe(doc).pipe(json).pipe(db); 

因为 pipe 方法会返回目标数据流,所以这里我们可以使用链式调用。

第三方数据流

除了 Node.js 内置的数据流之外,NPM 上还有许多实现了转换数据流的包,你可以自行使用这些包来编码,解码,过滤或者对你的数据做任意的转换。

NPM 上还有许多包是导出流式 API 的(比如数据库,websocket 服务器,websocket 客户端等等),你可以安装,创建并且使用管道串接它们,就像搭乐高积木一样。

来吧,快活吧!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant