From 45c27c6b17fa71df682a86a50cf5abab207bdf6d Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 19:36:23 +0200 Subject: [PATCH 01/11] stream: fix stream.finished on Duplex finished would incorrectly believe that a Duplex is already closed if either the readable or writable side has completed. Fixes: https://github.com/nodejs/node/issues/33130 --- lib/internal/streams/end-of-stream.js | 16 ++++-- test/parallel/test-stream-finished.js | 73 ++++++++++++++++++++++++++- 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 7d5689ddadb7fc..bc2d0ccfcaf142 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -147,10 +147,17 @@ function eos(stream, opts, callback) { if (opts.error !== false) stream.on('error', onerror); stream.on('close', onclose); - const closed = (wState && wState.closed) || (rState && rState.closed) || - (wState && wState.errorEmitted) || (rState && rState.errorEmitted) || - (wState && wState.finished) || (rState && rState.endEmitted) || - (rState && stream.req && stream.aborted); + const closed = ( + (wState && wState.closed) || + (rState && rState.closed) || + (wState && wState.errorEmitted) || + (rState && rState.errorEmitted) || + (rState && stream.req && stream.aborted) || + ( + (!writable || (wState && wState.finished)) && + (!readable || (rState && rState.endEmitted)) + ) + ); if (closed) { // TODO(ronag): Re-throw error if errorEmitted? @@ -158,6 +165,7 @@ function eos(stream, opts, callback) { // before being closed? i.e. if closed but not errored, ended or finished. // TODO(ronag): Throw some kind of error? Does it make sense // to call finished() on a "finished" stream? + // TODO(ronag): willEmitClose? process.nextTick(() => { callback(); }); diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index d4336e84db35d6..14493d368fdef3 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -1,7 +1,14 @@ 'use strict'; const common = require('../common'); -const { Writable, Readable, Transform, finished, Duplex } = require('stream'); +const { + Writable, + Readable, + Transform, + finished, + Duplex, + PassThrough +} = require('stream'); const assert = require('assert'); const EE = require('events'); const fs = require('fs'); @@ -396,3 +403,67 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); r.destroyed = true; r.push(null); } + +{ + // Regression https://github.com/nodejs/node/issues/33130 + + const response = new PassThrough(); + setTimeout(() => response.write('chunk 1'), 500); + setTimeout(() => response.write('chunk 2'), 1000); + setTimeout(() => response.write('chunk 3'), 1500); + setTimeout(() => response.end(), 2000); + + class HelloWorld extends Duplex { + constructor(response) { + super(); + + this.response = response; + this.readMore = false; + + response.once('end', () => { + this.push(null); + }); + + response.on('readable', () => { + if (this.readMore) { + this._read(); + } + }); + } + + _read() { + const { response } = this; + + this.readMore = true; + + if (response.readableLength) { + this.readMore = false; + } + + let data; + while ((data = response.read()) !== null) { + this.push(data); + } + } + } + + const instance = new HelloWorld(response); + instance.end(); + + (async () => { + await new Promise((resolve) => setTimeout(resolve, 100)); + + let res = ''; + for await (const data of instance) { + res += data.toString(); + } + + assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); + })(); +} + +{ + const p = new PassThrough(); + p.end(); + finished(p, common.mustNotCall()); +} From bfd17cdfe30f09bab87de54651f0208b5e0da063 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 19:43:25 +0200 Subject: [PATCH 02/11] fixup: test --- test/parallel/test-stream-finished.js | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 14493d368fdef3..af48cbdbaceb5c 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -408,10 +408,10 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); // Regression https://github.com/nodejs/node/issues/33130 const response = new PassThrough(); - setTimeout(() => response.write('chunk 1'), 500); - setTimeout(() => response.write('chunk 2'), 1000); - setTimeout(() => response.write('chunk 3'), 1500); - setTimeout(() => response.end(), 2000); + response.write('chunk 1'); + response.write('chunk 2'); + response.write('chunk 3'); + response.end(); class HelloWorld extends Duplex { constructor(response) { @@ -453,9 +453,11 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); (async () => { await new Promise((resolve) => setTimeout(resolve, 100)); + instance.setEncoding('utf8'); + let res = ''; for await (const data of instance) { - res += data.toString(); + res += data; } assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); From 5bd8e7cc209e6383bd28a2dd5022389b289995ff Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 19:48:37 +0200 Subject: [PATCH 03/11] fixup: test --- test/parallel/test-stream-finished.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index af48cbdbaceb5c..920567e0d6c7d8 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -451,8 +451,6 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); instance.end(); (async () => { - await new Promise((resolve) => setTimeout(resolve, 100)); - instance.setEncoding('utf8'); let res = ''; @@ -461,7 +459,7 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); } assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); - })(); + })().then(common.mustCall()); } { From 02be7f370b4bb94cd7c06910261f331d53bb7ffa Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 19:49:07 +0200 Subject: [PATCH 04/11] fixup: test --- test/parallel/test-stream-finished.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 920567e0d6c7d8..0b8e8e69226abf 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -448,10 +448,10 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); } const instance = new HelloWorld(response); + instance.setEncoding('utf8'); instance.end(); (async () => { - instance.setEncoding('utf8'); let res = ''; for await (const data of instance) { From 9b83b7eeaaf63e43e8a7c827eb84f1ca562a53e7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 19:50:47 +0200 Subject: [PATCH 05/11] fixup: test --- test/parallel/test-stream-finished.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 0b8e8e69226abf..305c6272816c38 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -451,7 +451,8 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); instance.setEncoding('utf8'); instance.end(); - (async () => { + instance.on('finish', common.mustCall(async () => { + assert.strictEqual(instance.writableFinished, true); let res = ''; for await (const data of instance) { @@ -459,7 +460,7 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); } assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); - })().then(common.mustCall()); + })); } { From 327dcb3f9bbf667728a713fd372f8a3473b39269 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 19:51:48 +0200 Subject: [PATCH 06/11] fixup: test --- test/parallel/test-stream-finished.js | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 305c6272816c38..20b36300113961 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -468,3 +468,11 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); p.end(); finished(p, common.mustNotCall()); } + +{ + const p = new PassThrough(); + p.end(); + p.on('finish', common.mustCall(() => { + finished(p, common.mustNotCall()); + })); +} From 1fc6c3e54a2f4d81e8394986e128678872d7f8ae Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 19:56:28 +0200 Subject: [PATCH 07/11] fixup: test --- test/parallel/test-stream-finished.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 20b36300113961..b9c41779d5b68e 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -451,15 +451,17 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); instance.setEncoding('utf8'); instance.end(); - instance.on('finish', common.mustCall(async () => { - assert.strictEqual(instance.writableFinished, true); + instance.on('finish', common.mustCall(() => { + (async () => { + assert.strictEqual(instance.writableFinished, true); - let res = ''; - for await (const data of instance) { - res += data; - } + let res = ''; + for await (const data of instance) { + res += data; + } - assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); + assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); + })().then(common.mustCall()); })); } From 20fe402464c3af07cc35312dfccd45497462d475 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 20:05:09 +0200 Subject: [PATCH 08/11] fixup: test --- test/parallel/test-stream-finished.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index b9c41779d5b68e..35749aafd15ede 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -408,10 +408,6 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); // Regression https://github.com/nodejs/node/issues/33130 const response = new PassThrough(); - response.write('chunk 1'); - response.write('chunk 2'); - response.write('chunk 3'); - response.end(); class HelloWorld extends Duplex { constructor(response) { @@ -452,6 +448,12 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); instance.end(); instance.on('finish', common.mustCall(() => { + setImmediate(common.mustCall(() => { + response.write('chunk 1'); + response.write('chunk 2'); + response.write('chunk 3'); + response.end(); + })); (async () => { assert.strictEqual(instance.writableFinished, true); From 8cab4dd7c79ea1c0fae8ded3c85d29c084448871 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 28 Apr 2020 20:07:11 +0200 Subject: [PATCH 09/11] fixup: test --- test/parallel/test-stream-finished.js | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index 35749aafd15ede..4622d2c695e1fa 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -406,12 +406,13 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); { // Regression https://github.com/nodejs/node/issues/33130 - const response = new PassThrough(); class HelloWorld extends Duplex { constructor(response) { - super(); + super({ + autoDestroy: false + }); this.response = response; this.readMore = false; @@ -447,24 +448,23 @@ testClosed((opts) => new Writable({ write() {}, ...opts })); instance.setEncoding('utf8'); instance.end(); - instance.on('finish', common.mustCall(() => { - setImmediate(common.mustCall(() => { + (async () => { + await EE.once(instance, 'finish'); + + setImmediate(() => { response.write('chunk 1'); response.write('chunk 2'); response.write('chunk 3'); response.end(); - })); - (async () => { - assert.strictEqual(instance.writableFinished, true); + }); - let res = ''; - for await (const data of instance) { - res += data; - } + let res = ''; + for await (const data of instance) { + res += data; + } - assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); - })().then(common.mustCall()); - })); + assert.strictEqual(res, 'chunk 1chunk 2chunk 3'); + })().then(common.mustCall()); } { From 882f704231fd9df2584a09e85b553f570023e78f Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 29 Apr 2020 18:22:38 +0200 Subject: [PATCH 10/11] fixup: simplify according to suggestion --- lib/internal/streams/end-of-stream.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index bc2d0ccfcaf142..973c99d4746e57 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -166,9 +166,7 @@ function eos(stream, opts, callback) { // TODO(ronag): Throw some kind of error? Does it make sense // to call finished() on a "finished" stream? // TODO(ronag): willEmitClose? - process.nextTick(() => { - callback(); - }); + process.nextTick(callback); } return function() { From c7899a92a326fbaf360b1de5d312bd4487627ed1 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 30 Apr 2020 11:39:56 +0200 Subject: [PATCH 11/11] Revert "fixup: simplify according to suggestion" This reverts commit 882f704231fd9df2584a09e85b553f570023e78f. --- lib/internal/streams/end-of-stream.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 973c99d4746e57..bc2d0ccfcaf142 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -166,7 +166,9 @@ function eos(stream, opts, callback) { // TODO(ronag): Throw some kind of error? Does it make sense // to call finished() on a "finished" stream? // TODO(ronag): willEmitClose? - process.nextTick(callback); + process.nextTick(() => { + callback(); + }); } return function() {