Skip to content

Commit

Permalink
stream: lazy allocate back pressure buffer
Browse files Browse the repository at this point in the history
PR-URL: nodejs#50013
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Luigi Pinca <[email protected]>
  • Loading branch information
ronag authored and alexfernandez committed Nov 1, 2023
1 parent 6d366a9 commit 98ddb05
Showing 1 changed file with 41 additions and 23 deletions.
64 changes: 41 additions & 23 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const kErroredValue = Symbol('kErroredValue');
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
const kWriteCbValue = Symbol('kWriteCbValue');
const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue');
const kBufferedValue = Symbol('kBufferedValue');

const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
Expand Down Expand Up @@ -108,7 +109,7 @@ const kWriteCb = 1 << 26;
const kExpectWriteCb = 1 << 27;
const kAfterWriteTickInfo = 1 << 28;
const kAfterWritePending = 1 << 29;
const kHasBuffer = 1 << 30;
const kBuffered = 1 << 30;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -270,6 +271,21 @@ ObjectDefineProperties(WritableState.prototype, {
}
},
},

buffered: {
__proto__: null,
enumerable: false,
get() { return (this.state & kBuffered) !== 0 ? this[kBufferedValue] : []; },
set(value) {
this[kBufferedValue] = value;
if (value) {
this.state |= kBuffered;
} else {
this.state &= ~kBuffered;
}
},
},

});

function WritableState(options, stream, isDuplex) {
Expand Down Expand Up @@ -338,20 +354,20 @@ function WritableState(options, stream, isDuplex) {
}

function resetBuffer(state) {
state.buffered = [];
state[kBufferedValue] = null;
state.bufferedIndex = 0;
state.state |= kAllBuffers | kAllNoop;
state.state &= ~kHasBuffer;
state.state &= ~kBuffered;
}

WritableState.prototype.getBuffer = function getBuffer() {
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
return (this.state & kBuffered) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
};

ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
__proto__: null,
get() {
return this.buffered.length - this.bufferedIndex;
return (this.state & kBuffered) === 0 ? 0 : this[kBufferedValue].length - this.bufferedIndex;
},
});

Expand Down Expand Up @@ -518,8 +534,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
state.length += len;

if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
state.buffered.push({ chunk, encoding, callback });
state.state |= kHasBuffer;
if ((state.state & kBuffered) === 0) {
state.state |= kBuffered;
state[kBufferedValue] = [];
}

state[kBufferedValue].push({ chunk, encoding, callback });
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
state.state &= ~kAllBuffers;
}
Expand Down Expand Up @@ -611,7 +631,7 @@ function onwrite(stream, er) {
onwriteError(stream, state, er, cb);
}
} else {
if ((state.state & kHasBuffer) !== 0) {
if ((state.state & kBuffered) !== 0) {
clearBuffer(stream, state);
}

Expand Down Expand Up @@ -687,11 +707,13 @@ function errorBuffer(state) {
return;
}

for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
const { chunk, callback } = state.buffered[n];
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
state.length -= len;
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
if ((state.state & kBuffered) !== 0) {
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
const { chunk, callback } = state[kBufferedValue][n];
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
state.length -= len;
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
}
}


Expand All @@ -702,13 +724,12 @@ function errorBuffer(state) {

// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer ||
(state.state & kConstructed) === 0) {
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kBuffered)) !== kBuffered) {
return;
}

const objectMode = (state.state & kObjectMode) !== 0;
const { buffered, bufferedIndex } = state;
const { [kBufferedValue]: buffered, bufferedIndex } = state;
const bufferedLength = buffered.length - bufferedIndex;

if (!bufferedLength) {
Expand Down Expand Up @@ -838,10 +859,9 @@ function needFinish(state) {
kWriting |
kErrorEmitted |
kCloseEmitted |
kErrored
)) === (kEnding | kConstructed) &&
state.length === 0 &&
state.buffered.length === 0);
kErrored |
kBuffered
)) === (kEnding | kConstructed) && state.length === 0);
}

function callFinal(stream, state) {
Expand Down Expand Up @@ -1083,9 +1103,7 @@ Writable.prototype.destroy = function(err, cb) {
const state = this._writableState;

// Invoke pending callbacks.
if ((state.state & kDestroyed) === 0 &&
(state.bufferedIndex < state.buffered.length ||
(state.state & kOnFinished) !== 0)) {
if ((state.state & (kBuffered | kOnFinished | kDestroyed)) !== kDestroyed) {
process.nextTick(errorBuffer, state);
}

Expand Down

0 comments on commit 98ddb05

Please sign in to comment.