Skip to content

Commit

Permalink
stream: refactor Writable buffering
Browse files Browse the repository at this point in the history
Refactors buffering in Writable to use an array
instead of a linked list.

PR-URL: #31046
Reviewed-By: Ruben Bridgewater <[email protected]>
Reviewed-By: Denys Otrishko <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
ronag authored and BethGriggs committed Apr 28, 2020
1 parent 180b935 commit 7abc61f
Showing 1 changed file with 88 additions and 132 deletions.
220 changes: 88 additions & 132 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
'use strict';

const {
Array,
FunctionPrototype,
ObjectDefineProperty,
ObjectDefineProperties,
Expand Down Expand Up @@ -150,8 +149,7 @@ function WritableState(options, stream, isDuplex) {
// synchronous _write() completion.
this.afterWriteTickInfo = null;

this.bufferedRequest = null;
this.lastBufferedRequest = null;
resetBuffer(this);

// Number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
Expand All @@ -177,27 +175,25 @@ function WritableState(options, stream, isDuplex) {

// Indicates whether the stream has finished destroying.
this.closed = false;
}

// Count buffered requests
this.bufferedRequestCount = 0;

// Allocate the first CorkedRequest, there is always
// one allocated and free to use, and we maintain at most two
const corkReq = { next: null, entry: null, finish: undefined };
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, this);
this.corkedRequestsFree = corkReq;
function resetBuffer(state) {
state.buffered = [];
state.bufferedIndex = 0;
state.allBuffers = true;
state.allNoop = true;
}

WritableState.prototype.getBuffer = function getBuffer() {
let current = this.bufferedRequest;
const out = [];
while (current) {
out.push(current);
current = current.next;
}
return out;
return this.buffered.slice(this.bufferedIndex);
};

ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
get() {
return this.buffered.length - this.bufferedIndex;
}
});

// Test _writableState for inheritance to account for Duplex streams,
// whose prototype chain only points to Readable.
let realHasInstance;
Expand Down Expand Up @@ -318,10 +314,7 @@ Writable.prototype.uncork = function() {
if (state.corked) {
state.corked--;

if (!state.writing &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest)
if (!state.writing)
clearBuffer(this, state);
}
};
Expand All @@ -339,7 +332,7 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
// If we're already writing something, then just put this
// in the queue, and wait our turn. Otherwise, call _write
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb) {
function writeOrBuffer(stream, state, chunk, encoding, callback) {
const len = state.objectMode ? 1 : chunk.length;

state.length += len;
Expand All @@ -350,22 +343,16 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
state.needDrain = true;

if (state.writing || state.corked || state.errored) {
const last = state.lastBufferedRequest;
state.lastBufferedRequest = {
chunk,
encoding,
callback: cb,
next: null
};
if (last) {
last.next = state.lastBufferedRequest;
} else {
state.bufferedRequest = state.lastBufferedRequest;
state.buffered.push({ chunk, encoding, callback });
if (state.allBuffers && encoding !== 'buffer') {
state.allBuffers = false;
}
if (state.allNoop && callback !== nop) {
state.allNoop = false;
}
state.bufferedRequestCount += 1;
} else {
state.writelen = len;
state.writecb = cb;
state.writecb = callback;
state.writing = true;
state.sync = true;
stream._write(chunk, encoding, state.onwrite);
Expand Down Expand Up @@ -427,30 +414,27 @@ function onwrite(stream, er) {
onwriteError(stream, state, er, cb);
}
} else {
// Check if we're actually ready to finish, but don't emit yet
const finished = needFinish(state) || stream.destroyed;

if (!finished &&
!state.corked &&
!state.bufferProcessing &&
state.bufferedRequest) {
if (!state.destroyed) {
clearBuffer(stream, state);
}

if (sync) {
// It is a common case that the callback passed to .write() is always
// the same. In that case, we do not schedule a new nextTick(), but rather
// just increase a counter, to improve performance and avoid memory
// allocations.
if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.count++;
if (state.needDrain || cb !== nop || state.ending || state.destroyed) {
if (sync) {
// It is a common case that the callback passed to .write() is always
// the same. In that case, we do not schedule a new nextTick(), but
// rather just increase a counter, to improve performance and avoid
// memory allocations.
if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.count++;
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
}
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
afterWrite(stream, state, 1, cb);
}
} else {
afterWrite(stream, state, 1, cb);
state.pendingcb--;
}
}
}
Expand Down Expand Up @@ -482,83 +466,69 @@ function afterWrite(stream, state, count, cb) {

// If there's something in the buffer waiting, then invoke callbacks.
function errorBuffer(state, err) {
if (state.writing || !state.bufferedRequest) {
if (state.writing) {
return;
}

for (let entry = state.bufferedRequest; entry; entry = entry.next) {
const len = state.objectMode ? 1 : entry.chunk.length;
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
const { chunk, callback } = state.buffered[n];
const len = state.objectMode ? 1 : chunk.length;
state.length -= len;
entry.callback(err);
callback(err);
}
state.bufferedRequest = null;
state.lastBufferedRequest = null;
state.bufferedRequestCount = 0;

resetBuffer(state);
}

// If there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
if (state.corked || state.bufferProcessing) {
return;
}

const { buffered, bufferedIndex, objectMode } = state;
const bufferedLength = buffered.length - bufferedIndex;

if (!bufferedLength) {
return;
}

let i = bufferedIndex;

state.bufferProcessing = true;
let entry = state.bufferedRequest;

if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
const l = state.bufferedRequestCount;
const buffer = new Array(l);
const holder = state.corkedRequestsFree;
holder.entry = entry;

let count = 0;
let allBuffers = true;
while (entry) {
buffer[count] = entry;
if (entry.encoding !== 'buffer')
allBuffers = false;
entry = entry.next;
count += 1;
}
buffer.allBuffers = allBuffers;
if (bufferedLength > 1 && stream._writev) {
state.pendingcb -= bufferedLength - 1;

const callback = state.allNoop ? nop : (err) => {
for (let n = i; n < buffered.length; ++n) {
buffered[n].callback(err);
}
};
// Make a copy of `buffered` if it's going to be used by `callback` above,
// since `doWrite` will mutate the array.
const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
chunks.allBuffers = state.allBuffers;

doWrite(stream, state, true, state.length, buffer, '', holder.finish);
doWrite(stream, state, true, state.length, chunks, '', callback);

// doWrite is almost always async, defer these to save a bit of time
// as the hot path ends with doWrite
state.pendingcb++;
state.lastBufferedRequest = null;
if (holder.next) {
state.corkedRequestsFree = holder.next;
holder.next = null;
} else {
const corkReq = { next: null, entry: null, finish: undefined };
corkReq.finish = onCorkedFinish.bind(undefined, corkReq, state);
state.corkedRequestsFree = corkReq;
}
state.bufferedRequestCount = 0;
resetBuffer(state);
} else {
// Slow case, write chunks one-by-one
while (entry) {
const chunk = entry.chunk;
const encoding = entry.encoding;
const cb = entry.callback;
const len = state.objectMode ? 1 : chunk.length;

doWrite(stream, state, false, len, chunk, encoding, cb);
entry = entry.next;
state.bufferedRequestCount--;
// If we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
break;
}
do {
const { chunk, encoding, callback } = buffered[i];
buffered[i++] = null;
const len = objectMode ? 1 : chunk.length;
doWrite(stream, state, false, len, chunk, encoding, callback);
} while (i < buffered.length && !state.writing);

if (i === buffered.length) {
resetBuffer(state);
} else if (i > 256) {
buffered.splice(0, i);
state.bufferedIndex = 0;
} else {
state.bufferedIndex = i;
}

if (entry === null)
state.lastBufferedRequest = null;
}

state.bufferedRequest = entry;
state.bufferProcessing = false;
}

Expand Down Expand Up @@ -622,7 +592,7 @@ function needFinish(state) {
return (state.ending &&
state.length === 0 &&
!state.errored &&
state.bufferedRequest === null &&
state.buffered.length === 0 &&
!state.finished &&
!state.writing);
}
Expand Down Expand Up @@ -693,20 +663,6 @@ function finish(stream, state) {
}
}

function onCorkedFinish(corkReq, state, err) {
let entry = corkReq.entry;
corkReq.entry = null;
while (entry) {
const cb = entry.callback;
state.pendingcb--;
cb(err);
entry = entry.next;
}

// Reuse the free corkReq.
state.corkedRequestsFree.next = corkReq;
}

// TODO(ronag): Avoid using events to implement internal logic.
function onFinished(stream, state, cb) {
function onerror(err) {
Expand Down

0 comments on commit 7abc61f

Please sign in to comment.