From abc1d0b642c2bc897dd36c0c04d0fadfef68b2c3 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 16 Apr 2020 21:49:41 +0200 Subject: [PATCH 1/2] stream: avoid drain for sync streams Previously a sync writable receiving chunks larger than highwatermark would unecessarily ping pong needDrain. --- benchmark/streams/writable-manywrites.js | 7 ++++--- lib/_stream_writable.js | 11 ++++++----- test/parallel/test-stream-big-packet.js | 6 +++++- test/parallel/test-stream-catch-rejections.js | 2 +- .../test-stream-pipe-await-drain-push-while-write.js | 2 +- test/parallel/test-stream-pipe-await-drain.js | 6 +++--- test/parallel/test-stream-writable-needdrain-state.js | 6 ++++-- 7 files changed, 24 insertions(+), 16 deletions(-) diff --git a/benchmark/streams/writable-manywrites.js b/benchmark/streams/writable-manywrites.js index e4ae9ab91e5f4a..025a5017ee6446 100644 --- a/benchmark/streams/writable-manywrites.js +++ b/benchmark/streams/writable-manywrites.js @@ -7,11 +7,12 @@ const bench = common.createBenchmark(main, { n: [2e6], sync: ['yes', 'no'], writev: ['yes', 'no'], - callback: ['yes', 'no'] + callback: ['yes', 'no'], + len: [1024, 32 * 1024] }); -function main({ n, sync, writev, callback }) { - const b = Buffer.allocUnsafe(1024); +function main({ n, sync, writev, callback, len }) { + const b = Buffer.allocUnsafe(len); const s = new Writable(); sync = sync === 'yes'; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 25235166d9fa7d..f8cfe88f01d3b5 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -344,11 +344,6 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { state.length += len; - const ret = state.length < state.highWaterMark; - // We must ensure that previous needDrain will not be reset to false. - if (!ret) - state.needDrain = true; - if (state.writing || state.corked || state.errored) { const last = state.lastBufferedRequest; state.lastBufferedRequest = { @@ -367,6 +362,12 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { doWrite(stream, state, false, len, chunk, encoding, cb); } + const ret = state.length < state.highWaterMark; + + // We must ensure that previous needDrain will not be reset to false. + if (!ret) + state.needDrain = true; + // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. return ret && !state.errored && !state.destroyed; diff --git a/test/parallel/test-stream-big-packet.js b/test/parallel/test-stream-big-packet.js index 0dca3391961a93..fdbe3cd21145ee 100644 --- a/test/parallel/test-stream-big-packet.js +++ b/test/parallel/test-stream-big-packet.js @@ -36,7 +36,11 @@ class TestStream extends stream.Transform { } } -const s1 = new stream.PassThrough(); +const s1 = new stream.Transform({ + transform(chunk, encoding, cb) { + process.nextTick(cb, null, chunk); + } +}); const s2 = new stream.PassThrough(); const s3 = new TestStream(); s1.pipe(s3); diff --git a/test/parallel/test-stream-catch-rejections.js b/test/parallel/test-stream-catch-rejections.js index 848c2ada130e64..81427c35757ca8 100644 --- a/test/parallel/test-stream-catch-rejections.js +++ b/test/parallel/test-stream-catch-rejections.js @@ -30,7 +30,7 @@ const assert = require('assert'); captureRejections: true, highWaterMark: 1, write(chunk, enc, cb) { - cb(); + process.nextTick(cb); } }); diff --git a/test/parallel/test-stream-pipe-await-drain-push-while-write.js b/test/parallel/test-stream-pipe-await-drain-push-while-write.js index 6dbf3c669bc177..a717291cda2b03 100644 --- a/test/parallel/test-stream-pipe-await-drain-push-while-write.js +++ b/test/parallel/test-stream-pipe-await-drain-push-while-write.js @@ -19,7 +19,7 @@ const writable = new stream.Writable({ }); } - cb(); + process.nextTick(cb); }, 3) }); diff --git a/test/parallel/test-stream-pipe-await-drain.js b/test/parallel/test-stream-pipe-await-drain.js index 3ae248e08b854f..90d418a09783e3 100644 --- a/test/parallel/test-stream-pipe-await-drain.js +++ b/test/parallel/test-stream-pipe-await-drain.js @@ -19,7 +19,7 @@ reader._read = () => {}; writer1._write = common.mustCall(function(chunk, encoding, cb) { this.emit('chunk-received'); - cb(); + process.nextTick(cb); }, 1); writer1.once('chunk-received', () => { @@ -42,7 +42,7 @@ writer2._write = common.mustCall((chunk, encoding, cb) => { reader._readableState.awaitDrainWriters.size, 1, 'awaitDrain should be 1 after first push, actual is ' + - reader._readableState.awaitDrainWriters + reader._readableState.awaitDrainWriters.size ); // Not calling cb here to "simulate" slow stream. // This should be called exactly once, since the first .write() call @@ -54,7 +54,7 @@ writer3._write = common.mustCall((chunk, encoding, cb) => { reader._readableState.awaitDrainWriters.size, 2, 'awaitDrain should be 2 after second push, actual is ' + - reader._readableState.awaitDrainWriters + reader._readableState.awaitDrainWriters.size ); // Not calling cb here to "simulate" slow stream. // This should be called exactly once, since the first .write() call diff --git a/test/parallel/test-stream-writable-needdrain-state.js b/test/parallel/test-stream-writable-needdrain-state.js index ea5617d997d5ed..0e72d832bc3ff0 100644 --- a/test/parallel/test-stream-writable-needdrain-state.js +++ b/test/parallel/test-stream-writable-needdrain-state.js @@ -10,8 +10,10 @@ const transform = new stream.Transform({ }); function _transform(chunk, encoding, cb) { - assert.strictEqual(transform._writableState.needDrain, true); - cb(); + process.nextTick(() => { + assert.strictEqual(transform._writableState.needDrain, true); + cb(); + }); } assert.strictEqual(transform._writableState.needDrain, false); From ad219912f1f640c69c5b1b280772bf8f7ded0503 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 17 Apr 2020 17:00:55 +0200 Subject: [PATCH 2/2] fixup: test --- test/parallel/test-stream2-finish-pipe.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/parallel/test-stream2-finish-pipe.js b/test/parallel/test-stream2-finish-pipe.js index 1cee74063233b2..5e2969aad4f259 100644 --- a/test/parallel/test-stream2-finish-pipe.js +++ b/test/parallel/test-stream2-finish-pipe.js @@ -30,7 +30,7 @@ r._read = function(size) { const w = new stream.Writable(); w._write = function(data, encoding, cb) { - cb(null); + process.nextTick(cb, null); }; r.pipe(w);