From 35ec93115df6721ec359d07774eece084f756072 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 29 Sep 2023 20:13:44 +0200 Subject: [PATCH] stream: writable state bitmap PR-URL: https://github.com/nodejs/node/pull/49899 Reviewed-By: Benjamin Gruenbaum Reviewed-By: Matteo Collina Reviewed-By: Raz Luvaton Reviewed-By: Yagiz Nizipli --- benchmark/streams/writable-manywrites.js | 4 +- lib/internal/streams/writable.js | 267 ++++++++++++++++------- 2 files changed, 192 insertions(+), 79 deletions(-) diff --git a/benchmark/streams/writable-manywrites.js b/benchmark/streams/writable-manywrites.js index e6ab65162c366c..d244c7d606479e 100644 --- a/benchmark/streams/writable-manywrites.js +++ b/benchmark/streams/writable-manywrites.js @@ -4,7 +4,7 @@ const common = require('../common'); const Writable = require('stream').Writable; const bench = common.createBenchmark(main, { - n: [2e6], + n: [1e5], sync: ['yes', 'no'], writev: ['yes', 'no'], callback: ['yes', 'no'], @@ -13,7 +13,7 @@ const bench = common.createBenchmark(main, { function main({ n, sync, writev, callback, len }) { const b = Buffer.allocUnsafe(len); - const s = new Writable(); + const s = new Writable({ highWaterMark: 16 * 1024 }); sync = sync === 'yes'; const writecb = (cb) => { diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 595aadc23c8bec..7b1896baeb47c2 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -72,7 +72,11 @@ ObjectSetPrototypeOf(Writable, Stream); function nop() {} -const kOnFinished = Symbol('kOnFinished'); +const kOnFinishedValue = Symbol('kOnFinishedValue'); +const kErroredValue = Symbol('kErroredValue'); +const kDefaultEncodingValue = Symbol('kDefaultEncodingValue'); +const kWriteCbValue = Symbol('kWriteCbValue'); +const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue'); const kObjectMode = 1 << 0; const kEnded = 1 << 1; @@ -94,6 +98,16 @@ const kBufferProcessing = 1 << 16; const kPrefinished = 1 << 17; const kAllBuffers = 1 << 18; const kAllNoop = 1 << 19; +const kOnFinished = 1 << 20; +const kErrored = 1 << 21; +const kHasWritable = 1 << 22; +const kWritable = 1 << 23; +const kCorked = 1 << 24; +const kDefaultUTF8Encoding = 1 << 25; +const kWriteCb = 1 << 26; +const kExpectWriteCb = 1 << 27; +const kAfterWriteTickInfo = 1 << 28; +const kAfterWritePending = 1 << 29; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { @@ -176,6 +190,85 @@ ObjectDefineProperties(WritableState.prototype, { allBuffers: makeBitMapDescriptor(kAllBuffers), allNoop: makeBitMapDescriptor(kAllNoop), + + // Indicates whether the stream has errored. When true all write() calls + // should return false. This is needed since when autoDestroy + // is disabled we need a way to tell whether the stream has failed. + // This is/should be a cold path. + errored: { + __proto__: null, + enumerable: false, + get() { return (this.state & kErrored) !== 0 ? this[kErroredValue] : null; }, + set(value) { + if (value) { + this[kErroredValue] = value; + this.state |= kErrored; + } else { + this.state &= ~kErrored; + } + }, + }, + + writable: { + __proto__: null, + enumerable: false, + get() { return (this.state & kHasWritable) !== 0 ? (this.state & kWritable) !== 0 : undefined; }, + set(value) { + if (value == null) { + this.state &= ~(kHasWritable | kWritable); + } else if (value) { + this.state |= (kHasWritable | kWritable); + } else { + this.state |= kHasWritable; + this.state &= ~kWritable; + } + }, + }, + + defaultEncoding: { + __proto__: null, + enumerable: false, + get() { return (this.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; }, + set(value) { + if (value === 'utf8' || value === 'utf-8') { + this.state |= kDefaultUTF8Encoding; + } else { + this.state &= ~kDefaultUTF8Encoding; + this[kDefaultEncodingValue] = value; + } + }, + }, + + // The callback that the user supplies to write(chunk, encoding, cb). + writecb: { + __proto__: null, + enumerable: false, + get() { return (this.state & kWriteCb) !== 0 ? this[kWriteCbValue] : nop; }, + set(value) { + if (value) { + this[kWriteCbValue] = value; + this.state |= kWriteCb; + } else { + this.state &= ~kWriteCb; + } + }, + }, + + // Storage for data passed to the afterWrite() callback in case of + // synchronous _write() completion. + afterWriteTickInfo: { + __proto__: null, + enumerable: false, + get() { return (this.state & kAfterWriteTickInfo) !== 0 ? this[kAfterWriteTickInfoValue] : null; }, + set(value) { + if (value) { + this[kAfterWriteTickInfoValue] = value; + this.state |= kAfterWriteTickInfo; + } else { + this.state &= ~kAfterWriteTickInfo; + } + }, + }, }); function WritableState(options, stream, isDuplex) { @@ -213,10 +306,11 @@ function WritableState(options, stream, isDuplex) { // encoding is 'binary' so we have to make this configurable. // Everything else in the universe uses 'utf8', though. const defaultEncoding = options?.defaultEncoding; - if (defaultEncoding == null) { - this.defaultEncoding = 'utf8'; + if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') { + this.state |= kDefaultUTF8Encoding; } else if (Buffer.isEncoding(defaultEncoding)) { - this.defaultEncoding = defaultEncoding; + this.state &= ~kDefaultUTF8Encoding; + this[kDefaultEncodingValue] = defaultEncoding; } else { throw new ERR_UNKNOWN_ENCODING(defaultEncoding); } @@ -232,28 +326,14 @@ function WritableState(options, stream, isDuplex) { // The callback that's passed to _write(chunk, cb). this.onwrite = onwrite.bind(undefined, stream); - // The callback that the user supplies to write(chunk, encoding, cb). - this.writecb = null; - // The amount that is being written when _write is called. this.writelen = 0; - // Storage for data passed to the afterWrite() callback in case of - // synchronous _write() completion. - this.afterWriteTickInfo = null; - resetBuffer(this); // Number of pending user-supplied write callbacks // this must be 0 before 'finish' can be emitted. this.pendingcb = 0; - - // Indicates whether the stream has errored. When true all write() calls - // should return false. This is needed since when autoDestroy - // is disabled we need a way to tell whether the stream has failed. - this.errored = null; - - this[kOnFinished] = []; } function resetBuffer(state) { @@ -344,10 +424,10 @@ function _write(stream, chunk, encoding, cb) { if (typeof encoding === 'function') { cb = encoding; - encoding = state.defaultEncoding; + encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; } else { if (!encoding) - encoding = state.defaultEncoding; + encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding; else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) throw new ERR_UNKNOWN_ENCODING(encoding); if (typeof cb !== 'function') @@ -394,7 +474,10 @@ Writable.prototype.write = function(chunk, encoding, cb) { }; Writable.prototype.cork = function() { - this._writableState.corked++; + const state = this._writableState; + + state.state |= kCorked; + state.corked++; }; Writable.prototype.uncork = function() { @@ -403,6 +486,10 @@ Writable.prototype.uncork = function() { if (state.corked) { state.corked--; + if (!state.corked) { + state.state &= ~kCorked; + } + if ((state.state & kWriting) === 0) clearBuffer(this, state); } @@ -428,11 +515,13 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { // stream._write resets state.length const ret = state.length < state.highWaterMark; + // We must ensure that previous needDrain will not be reset to false. - if (!ret) + if (!ret) { state.state |= kNeedDrain; + } - if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) { + if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) { state.buffered.push({ chunk, encoding, callback }); if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') { state.state &= ~kAllBuffers; @@ -442,21 +531,25 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { } } else { state.writelen = len; - state.writecb = callback; - state.state |= kWriting | kSync; + if (callback !== nop) { + state.writecb = callback; + } + state.state |= kWriting | kSync | kExpectWriteCb; stream._write(chunk, encoding, state.onwrite); state.state &= ~kSync; } // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. - return ret && !state.errored && (state.state & kDestroyed) === 0; + return ret && (state.state & (kDestroyed | kErrored)) === 0; } function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writelen = len; - state.writecb = cb; - state.state |= kWriting | kSync; + if (cb !== nop) { + state.writecb = cb; + } + state.state |= kWriting | kSync | kExpectWriteCb; if ((state.state & kDestroyed) !== 0) state.onwrite(new ERR_STREAM_DESTROYED('write')); else if (writev) @@ -481,16 +574,16 @@ function onwriteError(stream, state, er, cb) { function onwrite(stream, er) { const state = stream._writableState; - const sync = (state.state & kSync) !== 0; - const cb = state.writecb; - if (typeof cb !== 'function') { + if ((state.state & kExpectWriteCb) === 0) { errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); return; } - state.state &= ~kWriting; - state.writecb = null; + const sync = (state.state & kSync) !== 0; + const cb = (state.state & kWriteCb) !== 0 ? state[kWriteCbValue] : nop; + + state.state &= ~(kWriting | kExpectWriteCb | kWriteCb); state.length -= state.writelen; state.writelen = 0; @@ -523,12 +616,20 @@ function onwrite(stream, er) { // the same. In that case, we do not schedule a new nextTick(), but // rather just increase a counter, to improve performance and avoid // memory allocations. - if (state.afterWriteTickInfo !== null && - state.afterWriteTickInfo.cb === cb) { + if (cb === nop) { + if ((state.state & kAfterWritePending) === 0) { + process.nextTick(afterWrite, stream, state, 1, cb); + state.state |= kAfterWritePending; + } else { + state.pendingcb -= 1; + } + } else if (state.afterWriteTickInfo !== null && + state.afterWriteTickInfo.cb === cb) { state.afterWriteTickInfo.count++; } else { state.afterWriteTickInfo = { count: 1, cb, stream, state }; process.nextTick(afterWriteTick, state.afterWriteTickInfo); + state.state |= kAfterWritePending; } } else { afterWrite(stream, state, 1, cb); @@ -542,7 +643,9 @@ function afterWriteTick({ stream, state, count, cb }) { } function afterWrite(stream, state, count, cb) { - const needDrain = (state.state & (kEnding | kNeedDrain)) === kNeedDrain && !stream.destroyed && state.length === 0; + state.state &= ~kAfterWritePending; + + const needDrain = (state.state & (kEnding | kNeedDrain | kDestroyed)) === kNeedDrain && state.length === 0; if (needDrain) { state.state &= ~kNeedDrain; stream.emit('drain'); @@ -573,19 +676,16 @@ function errorBuffer(state) { callback(state.errored ?? new ERR_STREAM_DESTROYED('write')); } - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end')); - } + + callFinishedCallbacks(state, state.errored ?? new ERR_STREAM_DESTROYED('end')); resetBuffer(state); } // If there's something in the buffer waiting, then process it. function clearBuffer(stream, state) { - if (state.corked || - (state.state & (kDestroyed | kBufferProcessing)) !== 0 || - (state.state & kConstructed) === 0) { + if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 || + (state.state & kConstructed) === 0) { return; } @@ -661,7 +761,7 @@ Writable.prototype.end = function(chunk, encoding, cb) { let err; - if (chunk !== null && chunk !== undefined) { + if (chunk != null) { const ret = _write(this, chunk, encoding); if (ret instanceof Error) { err = ret; @@ -669,14 +769,14 @@ Writable.prototype.end = function(chunk, encoding, cb) { } // .end() fully uncorks. - if (state.corked) { + if ((state.state & kCorked) !== 0) { state.corked = 1; this.uncork(); } if (err) { // Do nothing... - } else if (!state.errored && (state.state & kEnding) === 0) { + } else if ((state.state & (kEnding | kErrored)) === 0) { // This is forgiving in terms of unnecessary calls to end() and can hide // logic errors. However, usually such errors are harmless and causing a // hard error can be disproportionately destructive. It is not always @@ -698,7 +798,9 @@ Writable.prototype.end = function(chunk, encoding, cb) { } else if ((state.state & kFinished) !== 0) { process.nextTick(cb, null); } else { - state[kOnFinished].push(cb); + state.state |= kOnFinished; + state[kOnFinishedValue] ??= []; + state[kOnFinishedValue].push(cb); } } @@ -715,10 +817,10 @@ function needFinish(state) { kFinished | kWriting | kErrorEmitted | - kCloseEmitted + kCloseEmitted | + kErrored )) === (kEnding | kConstructed) && state.length === 0 && - !state.errored && state.buffered.length === 0); } @@ -734,10 +836,7 @@ function callFinal(stream, state) { state.pendingcb--; if (err) { - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - onfinishCallbacks[i](err); - } + callFinishedCallbacks(state, err); errorOrDestroy(stream, err, (state.state & kSync) !== 0); } else if (needFinish(state)) { state.state |= kPrefinished; @@ -799,10 +898,7 @@ function finish(stream, state) { state.pendingcb--; state.state |= kFinished; - const onfinishCallbacks = state[kOnFinished].splice(0); - for (let i = 0; i < onfinishCallbacks.length; i++) { - onfinishCallbacks[i](null); - } + callFinishedCallbacks(state, null); stream.emit('finish'); @@ -822,8 +918,20 @@ function finish(stream, state) { } } -ObjectDefineProperties(Writable.prototype, { +function callFinishedCallbacks(state, err) { + if ((state.state & kOnFinished) === 0) { + return; + } + + const onfinishCallbacks = state[kOnFinishedValue]; + state[kOnFinishedValue] = null; + state.state &= ~kOnFinished; + for (let i = 0; i < onfinishCallbacks.length; i++) { + onfinishCallbacks[i](err); + } +} +ObjectDefineProperties(Writable.prototype, { closed: { __proto__: null, get() { @@ -867,60 +975,64 @@ ObjectDefineProperties(Writable.prototype, { writableFinished: { __proto__: null, get() { - return this._writableState ? (this._writableState.state & kFinished) !== 0 : false; + const state = this._writableState; + return state ? (state.state & kFinished) !== 0 : false; }, }, writableObjectMode: { __proto__: null, get() { - return this._writableState ? (this._writableState.state & kObjectMode) !== 0 : false; + const state = this._writableState; + return state ? (state.state & kObjectMode) !== 0 : false; }, }, writableBuffer: { __proto__: null, get() { - return this._writableState && this._writableState.getBuffer(); + const state = this._writableState; + return state && state.getBuffer(); }, }, writableEnded: { __proto__: null, get() { - return this._writableState ? (this._writableState.state & kEnding) !== 0 : false; + const state = this._writableState; + return state ? (state.state & kEnding) !== 0 : false; }, }, writableNeedDrain: { __proto__: null, get() { - const wState = this._writableState; - if (!wState) return false; - - // !destroyed && !ending && needDrain - return (wState.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain; + const state = this._writableState; + return state ? (state.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain : false; }, }, writableHighWaterMark: { __proto__: null, get() { - return this._writableState && this._writableState.highWaterMark; + const state = this._writableState; + return state && state.highWaterMark; }, }, writableCorked: { __proto__: null, get() { - return this._writableState ? this._writableState.corked : 0; + const state = this._writableState; + return state ? state.corked : 0; }, }, writableLength: { __proto__: null, get() { - return this._writableState && this._writableState.length; + const state = this._writableState; + return state && state.length; }, }, @@ -928,18 +1040,19 @@ ObjectDefineProperties(Writable.prototype, { __proto__: null, enumerable: false, get() { - return this._writableState ? this._writableState.errored : null; + const state = this._writableState; + return state ? state.errored : null; }, }, writableAborted: { __proto__: null, - enumerable: false, get: function() { - return !!( - this._writableState.writable !== false && - ((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) && - (this._writableState.state & kFinished) === 0 + const state = this._writableState; + return ( + (state.state & (kHasWritable | kWritable)) !== kHasWritable && + (state.state & (kDestroyed | kErrored)) !== 0 && + (state.state & kFinished) === 0 ); }, }, @@ -952,7 +1065,7 @@ Writable.prototype.destroy = function(err, cb) { // Invoke pending callbacks. if ((state.state & kDestroyed) === 0 && (state.bufferedIndex < state.buffered.length || - state[kOnFinished].length)) { + (state.state & kOnFinished) !== 0)) { process.nextTick(errorBuffer, state); }