diff --git a/benchmark/streams/writable-manywrites.js b/benchmark/streams/writable-manywrites.js index e4ae9ab91e5f4a..025a5017ee6446 100644 --- a/benchmark/streams/writable-manywrites.js +++ b/benchmark/streams/writable-manywrites.js @@ -7,11 +7,12 @@ const bench = common.createBenchmark(main, { n: [2e6], sync: ['yes', 'no'], writev: ['yes', 'no'], - callback: ['yes', 'no'] + callback: ['yes', 'no'], + len: [1024, 32 * 1024] }); -function main({ n, sync, writev, callback }) { - const b = Buffer.allocUnsafe(1024); +function main({ n, sync, writev, callback, len }) { + const b = Buffer.allocUnsafe(len); const s = new Writable(); sync = sync === 'yes'; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 7c40929d73c180..2dc949316a2dce 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -337,11 +337,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { state.length += len; - const ret = state.length < state.highWaterMark; - // We must ensure that previous needDrain will not be reset to false. - if (!ret) - state.needDrain = true; - if (state.writing || state.corked || state.errored) { state.buffered.push({ chunk, encoding, callback }); if (state.allBuffers && encoding !== 'buffer') { @@ -359,6 +354,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { state.sync = false; } + const ret = state.length < state.highWaterMark; + + // We must ensure that previous needDrain will not be reset to false. + if (!ret) + state.needDrain = true; + // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. return ret && !state.errored && !state.destroyed; diff --git a/test/parallel/test-stream-big-packet.js b/test/parallel/test-stream-big-packet.js index 0dca3391961a93..fdbe3cd21145ee 100644 --- a/test/parallel/test-stream-big-packet.js +++ b/test/parallel/test-stream-big-packet.js @@ -36,7 +36,11 @@ class TestStream extends stream.Transform { } } -const s1 = new stream.PassThrough(); +const s1 = new stream.Transform({ + transform(chunk, encoding, cb) { + process.nextTick(cb, null, chunk); + } +}); const s2 = new stream.PassThrough(); const s3 = new TestStream(); s1.pipe(s3); diff --git a/test/parallel/test-stream-catch-rejections.js b/test/parallel/test-stream-catch-rejections.js index 848c2ada130e64..81427c35757ca8 100644 --- a/test/parallel/test-stream-catch-rejections.js +++ b/test/parallel/test-stream-catch-rejections.js @@ -30,7 +30,7 @@ const assert = require('assert'); captureRejections: true, highWaterMark: 1, write(chunk, enc, cb) { - cb(); + process.nextTick(cb); } }); diff --git a/test/parallel/test-stream-pipe-await-drain-push-while-write.js b/test/parallel/test-stream-pipe-await-drain-push-while-write.js index 6dbf3c669bc177..a717291cda2b03 100644 --- a/test/parallel/test-stream-pipe-await-drain-push-while-write.js +++ b/test/parallel/test-stream-pipe-await-drain-push-while-write.js @@ -19,7 +19,7 @@ const writable = new stream.Writable({ }); } - cb(); + process.nextTick(cb); }, 3) }); diff --git a/test/parallel/test-stream-pipe-await-drain.js b/test/parallel/test-stream-pipe-await-drain.js index 3ae248e08b854f..90d418a09783e3 100644 --- a/test/parallel/test-stream-pipe-await-drain.js +++ b/test/parallel/test-stream-pipe-await-drain.js @@ -19,7 +19,7 @@ reader._read = () => {}; writer1._write = common.mustCall(function(chunk, encoding, cb) { this.emit('chunk-received'); - cb(); + process.nextTick(cb); }, 1); writer1.once('chunk-received', () => { @@ -42,7 +42,7 @@ writer2._write = common.mustCall((chunk, encoding, cb) => { reader._readableState.awaitDrainWriters.size, 1, 'awaitDrain should be 1 after first push, actual is ' + - reader._readableState.awaitDrainWriters + reader._readableState.awaitDrainWriters.size ); // Not calling cb here to "simulate" slow stream. // This should be called exactly once, since the first .write() call @@ -54,7 +54,7 @@ writer3._write = common.mustCall((chunk, encoding, cb) => { reader._readableState.awaitDrainWriters.size, 2, 'awaitDrain should be 2 after second push, actual is ' + - reader._readableState.awaitDrainWriters + reader._readableState.awaitDrainWriters.size ); // Not calling cb here to "simulate" slow stream. // This should be called exactly once, since the first .write() call diff --git a/test/parallel/test-stream-writable-needdrain-state.js b/test/parallel/test-stream-writable-needdrain-state.js index ea5617d997d5ed..0e72d832bc3ff0 100644 --- a/test/parallel/test-stream-writable-needdrain-state.js +++ b/test/parallel/test-stream-writable-needdrain-state.js @@ -10,8 +10,10 @@ const transform = new stream.Transform({ }); function _transform(chunk, encoding, cb) { - assert.strictEqual(transform._writableState.needDrain, true); - cb(); + process.nextTick(() => { + assert.strictEqual(transform._writableState.needDrain, true); + cb(); + }); } assert.strictEqual(transform._writableState.needDrain, false); diff --git a/test/parallel/test-stream2-finish-pipe.js b/test/parallel/test-stream2-finish-pipe.js index 1cee74063233b2..5e2969aad4f259 100644 --- a/test/parallel/test-stream2-finish-pipe.js +++ b/test/parallel/test-stream2-finish-pipe.js @@ -30,7 +30,7 @@ r._read = function(size) { const w = new stream.Writable(); w._write = function(data, encoding, cb) { - cb(null); + process.nextTick(cb, null); }; r.pipe(w);