Skip to content

Commit

Permalink
stream: cleanup and fix Readable.wrap
Browse files Browse the repository at this point in the history
Cleans up Readable.wrap and also ensures destroy
is called for certain events.
  • Loading branch information
ronag committed Jul 4, 2020
1 parent 9b8d317 commit 03e4964
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 53 deletions.
77 changes: 24 additions & 53 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
NumberIsInteger,
NumberIsNaN,
ObjectDefineProperties,
ObjectKeys,
ObjectSetPrototypeOf,
Set,
SymbolAsyncIterator,
Expand Down Expand Up @@ -1007,83 +1008,53 @@ function flow(stream) {
// This is *not* part of the readable stream interface.
// It is an ugly unfortunate mess of history.
Readable.prototype.wrap = function(stream) {
const state = this._readableState;
let paused = false;
let paused;

stream.on('end', () => {
debug('wrapped end');
if (state.decoder && !state.ended) {
const chunk = state.decoder.end();
if (chunk && chunk.length)
this.push(chunk);
}

this.push(null);
});
// TODO (ronag): Should this.destroy(err) emit
// 'error' on the wrapped stream? Would require
// a static factory method, e.g. Readable.wrap(stream).

stream.on('data', (chunk) => {
debug('wrapped data');
if (state.decoder)
chunk = state.decoder.write(chunk);

// Don't skip over falsy values in objectMode.
if (state.objectMode && (chunk === null || chunk === undefined))
return;
else if (!state.objectMode && (!chunk || !chunk.length))
return;

const ret = this.push(chunk);
if (!ret) {
if (!this.push(chunk) && stream.pause) {
paused = true;
stream.pause();
}
});

// Proxy all the other methods. Important when wrapping filters and duplexes.
for (const i in stream) {
if (this[i] === undefined && typeof stream[i] === 'function') {
this[i] = function methodWrap(method) {
return function methodWrapReturnFunction() {
return stream[method].apply(stream, arguments);
};
}(i);
}
}
stream.on('end', () => {
this.push(null);
});

stream.on('error', (err) => {
errorOrDestroy(this, err);
});

stream.on('close', () => {
// TODO(ronag): Update readable state?
this.emit('close');
this.destroy();
});

stream.on('destroy', () => {
// TODO(ronag): this.destroy()?
this.emit('destroy');
this.destroy();
});

stream.on('pause', () => {
// TODO(ronag): this.pause()?
this.emit('pause');
});

stream.on('resume', () => {
// TODO(ronag): this.resume()?
this.emit('resume');
});

// When we try to consume some more bytes, simply unpause the
// underlying stream.
this._read = (n) => {
debug('wrapped _read', n);
if (paused) {
this._read = () => {
if (paused && stream.resume) {
paused = false;
stream.resume();
}
};

// Proxy all the other methods. Important when wrapping filters and duplexes.
for (const i of ObjectKeys(stream)) {
if (this[i] === undefined && typeof stream[i] === 'function') {
this[i] = function methodWrap(method) {
return function methodWrapReturnFunction() {
return stream[method].apply(stream, arguments);
};
}(i);
}
}

return this;
};

Expand Down
27 changes: 27 additions & 0 deletions test/parallel/test-stream2-readable-wrap-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';
const common = require('../common');

const Readable = require('_stream_readable');
const EE = require('events').EventEmitter;

const oldStream = new EE();
oldStream.pause = () => {};
oldStream.resume = () => {};

{
new Readable({
autoDestroy: false,
destroy: common.mustCall()
})
.wrap(oldStream);
oldStream.emit('destroy');
}

{
new Readable({
autoDestroy: false,
destroy: common.mustCall()
})
.wrap(oldStream);
oldStream.emit('close');
}
10 changes: 10 additions & 0 deletions test/parallel/test-stream2-readable-wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ function runTest(highWaterMark, objectMode, produce) {
flow();
};

// Make sure pause is only emitted once.
let pausing = false;
r.on('pause', () => {
assert.strictEqual(pausing, false);
pausing = true;
process.nextTick(() => {
pausing = false;
});
});

let flowing;
let chunks = 10;
let oldEnded = false;
Expand Down

0 comments on commit 03e4964

Please sign in to comment.