Skip to content

Commit

Permalink
stream: fix stream.finished on Duplex
Browse files Browse the repository at this point in the history
finished would incorrectly believe that a Duplex is already
closed if either the readable or writable side has completed.

Fixes: #33130

PR-URL: #33133
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
ronag authored and targos committed May 4, 2020
1 parent d39254a commit 45032a3
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 5 deletions.
16 changes: 12 additions & 4 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,25 @@ function eos(stream, opts, callback) {
if (opts.error !== false) stream.on('error', onerror);
stream.on('close', onclose);

const closed = (wState && wState.closed) || (rState && rState.closed) ||
(wState && wState.errorEmitted) || (rState && rState.errorEmitted) ||
(wState && wState.finished) || (rState && rState.endEmitted) ||
(rState && stream.req && stream.aborted);
const closed = (
(wState && wState.closed) ||
(rState && rState.closed) ||
(wState && wState.errorEmitted) ||
(rState && rState.errorEmitted) ||
(rState && stream.req && stream.aborted) ||
(
(!writable || (wState && wState.finished)) &&
(!readable || (rState && rState.endEmitted))
)
);

if (closed) {
// TODO(ronag): Re-throw error if errorEmitted?
// TODO(ronag): Throw premature close as if finished was called?
// before being closed? i.e. if closed but not errored, ended or finished.
// TODO(ronag): Throw some kind of error? Does it make sense
// to call finished() on a "finished" stream?
// TODO(ronag): willEmitClose?
process.nextTick(() => {
callback();
});
Expand Down
86 changes: 85 additions & 1 deletion test/parallel/test-stream-finished.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
'use strict';

const common = require('../common');
const { Writable, Readable, Transform, finished, Duplex } = require('stream');
const {
Writable,
Readable,
Transform,
finished,
Duplex,
PassThrough
} = require('stream');
const assert = require('assert');
const EE = require('events');
const fs = require('fs');
Expand Down Expand Up @@ -396,3 +403,80 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
r.destroyed = true;
r.push(null);
}

{
// Regression https://github.com/nodejs/node/issues/33130
const response = new PassThrough();

class HelloWorld extends Duplex {
constructor(response) {
super({
autoDestroy: false
});

this.response = response;
this.readMore = false;

response.once('end', () => {
this.push(null);
});

response.on('readable', () => {
if (this.readMore) {
this._read();
}
});
}

_read() {
const { response } = this;

this.readMore = true;

if (response.readableLength) {
this.readMore = false;
}

let data;
while ((data = response.read()) !== null) {
this.push(data);
}
}
}

const instance = new HelloWorld(response);
instance.setEncoding('utf8');
instance.end();

(async () => {
await EE.once(instance, 'finish');

setImmediate(() => {
response.write('chunk 1');
response.write('chunk 2');
response.write('chunk 3');
response.end();
});

let res = '';
for await (const data of instance) {
res += data;
}

assert.strictEqual(res, 'chunk 1chunk 2chunk 3');
})().then(common.mustCall());
}

{
const p = new PassThrough();
p.end();
finished(p, common.mustNotCall());
}

{
const p = new PassThrough();
p.end();
p.on('finish', common.mustCall(() => {
finished(p, common.mustNotCall());
}));
}

0 comments on commit 45032a3

Please sign in to comment.