diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 7d5689ddadb7fc..bc2d0ccfcaf142 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -147,10 +147,17 @@ function eos(stream, opts, callback) { if (opts.error !== false) stream.on('error', onerror); stream.on('close', onclose); - const closed = (wState && wState.closed) || (rState && rState.closed) || - (wState && wState.errorEmitted) || (rState && rState.errorEmitted) || - (wState && wState.finished) || (rState && rState.endEmitted) || - (rState && stream.req && stream.aborted); + const closed = ( + (wState && wState.closed) || + (rState && rState.closed) || + (wState && wState.errorEmitted) || + (rState && rState.errorEmitted) || + (rState && stream.req && stream.aborted) || + ( + (!writable || (wState && wState.finished)) && + (!readable || (rState && rState.endEmitted)) + ) + ); if (closed) { // TODO(ronag): Re-throw error if errorEmitted? @@ -158,6 +165,7 @@ function eos(stream, opts, callback) { // before being closed? i.e. if closed but not errored, ended or finished. // TODO(ronag): Throw some kind of error? Does it make sense // to call finished() on a "finished" stream? + // TODO(ronag): willEmitClose? process.nextTick(() => { callback(); }); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index d4336e84db35d6..4622d2c695e1fa 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -1,7 +1,14 @@ 'use strict'; const common = require('../common'); -const { Writable, Readable, Transform, finished, Duplex } = require('stream'); +const { + Writable, + Readable, + Transform, + finished, + Duplex, + PassThrough +} = require('stream'); const assert = require('assert'); const EE = require('events'); const fs = require('fs'); @@ -396,3 +403,80 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); r.destroyed = true; r.push(null); } + +{ + // Regression https://github.com/nodejs/node/issues/33130 + const response = new PassThrough(); + + class HelloWorld extends Duplex { + constructor(response) { + super({ + autoDestroy: false + }); + + this.response = response; + this.readMore = false; + + response.once('end', () => { + this.push(null); + }); + + response.on('readable', () => { + if (this.readMore) { + this._read(); + } + }); + } + + _read() { + const { response } = this; + + this.readMore = true; + + if (response.readableLength) { + this.readMore = false; + } + + let data; + while ((data = response.read()) !== null) { + this.push(data); + } + } + } + + const instance = new HelloWorld(response); + instance.setEncoding('utf8'); + instance.end(); + + (async () => { + await EE.once(instance, 'finish'); + + setImmediate(() => { + response.write('chunk 1'); + response.write('chunk 2'); + response.write('chunk 3'); + response.end(); + }); + + let res = ''; + for await (const data of instance) { + res += data; + } + + assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); + })().then(common.mustCall()); +} + +{ + const p = new PassThrough(); + p.end(); + finished(p, common.mustNotCall()); +} + +{ + const p = new PassThrough(); + p.end(); + p.on('finish', common.mustCall(() => { + finished(p, common.mustNotCall()); + })); +}