Skip to content

Commit

Permalink
stream: unify stream utils
Browse files Browse the repository at this point in the history
Unify stream helps into utils.

PR-URL: #39294
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
ronag committed Jul 11, 2021
1 parent c4f8363 commit bb275ef
Show file tree
Hide file tree
Showing 10 changed files with 235 additions and 111 deletions.
2 changes: 0 additions & 2 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const {
prepareError,
} = require('_http_common');
const { OutgoingMessage } = require('_http_outgoing');
const { kDestroy } = require('internal/streams/destroy');
const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
Expand Down Expand Up @@ -610,7 +609,6 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.res = res;
res.req = req;
res[kDestroy] = null;

// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
Expand Down
6 changes: 0 additions & 6 deletions lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const {
} = primordials;

const { Readable, finished } = require('stream');
const { kDestroy } = require('internal/streams/destroy');

const kHeaders = Symbol('kHeaders');
const kHeadersCount = Symbol('kHeadersCount');
Expand Down Expand Up @@ -199,11 +198,6 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
}
};

IncomingMessage.prototype[kDestroy] = function(err) {
this.socket = null;
this.destroy(err);
};

IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
function _addHeaderLines(headers, n) {
if (headers && headers.length) {
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/streams/add-abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ const validateAbortSignal = (signal, name) => {
}
};

function isStream(obj) {
function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}

module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
if (!isStream(stream)) {
if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
Expand Down
29 changes: 10 additions & 19 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ const {
const {
Symbol,
} = primordials;
const {
kDestroyed,
isDestroyed,
isFinished,
isServerRequest
} = require('internal/streams/utils');

const kDestroy = Symbol('kDestroy');
const kConstruct = Symbol('kConstruct');
Expand Down Expand Up @@ -364,8 +370,6 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

const kDestroyed = Symbol('kDestroyed');

function emitCloseLegacy(stream) {
stream.emit('close');
}
Expand All @@ -375,31 +379,20 @@ function emitErrorCloseLegacy(stream, err) {
process.nextTick(emitCloseLegacy, stream);
}

function isDestroyed(stream) {
return stream.destroyed || stream[kDestroyed];
}

function isReadable(stream) {
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
}

function isWritable(stream) {
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
}

// Normalize destroy for legacy.
function destroyer(stream, err) {
if (isDestroyed(stream)) {
return;
}

if (!err && (isReadable(stream) || isWritable(stream))) {
if (!err && !isFinished(stream)) {
err = new AbortError();
}

// TODO: Remove isRequest branches.
if (typeof stream[kDestroy] === 'function') {
stream[kDestroy](err);
if (isServerRequest(stream)) {
stream.socket = null;
stream.destroy(err);
} else if (isRequest(stream)) {
stream.abort();
} else if (isRequest(stream.req)) {
Expand All @@ -421,8 +414,6 @@ function destroyer(stream, err) {
}

module.exports = {
kDestroy,
isDestroyed,
construct,
destroyer,
destroy,
Expand Down
90 changes: 29 additions & 61 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,48 +17,23 @@ const {
validateObject,
} = require('internal/validators');

const {
isClosed,
isReadable,
isReadableNodeStream,
isReadableFinished,
isWritable,
isWritableNodeStream,
isWritableFinished,
willEmitClose: _willEmitClose,
} = require('internal/streams/utils');

function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}

function isServerResponse(stream) {
return (
typeof stream._sent100 === 'boolean' &&
typeof stream._removedConnection === 'boolean' &&
typeof stream._removedContLen === 'boolean' &&
typeof stream._removedTE === 'boolean' &&
typeof stream._closed === 'boolean'
);
}

function isReadable(stream) {
return typeof stream.readable === 'boolean' ||
typeof stream.readableEnded === 'boolean' ||
!!stream._readableState;
}

function isWritable(stream) {
return typeof stream.writable === 'boolean' ||
typeof stream.writableEnded === 'boolean' ||
!!stream._writableState;
}

function isWritableFinished(stream) {
if (stream.writableFinished) return true;
const wState = stream._writableState;
if (!wState || wState.errored) return false;
return wState.finished || (wState.ended && wState.length === 0);
}

const nop = () => {};

function isReadableEnded(stream) {
if (stream.readableEnded) return true;
const rState = stream._readableState;
if (!rState || rState.errored) return false;
return rState.endEmitted || (rState.ended && rState.length === 0);
}

function eos(stream, options, callback) {
if (arguments.length === 2) {
callback = options;
Expand All @@ -74,13 +49,12 @@ function eos(stream, options, callback) {
callback = once(callback);

const readable = options.readable ||
(options.readable !== false && isReadable(stream));
(options.readable !== false && isReadableNodeStream(stream));
const writable = options.writable ||
(options.writable !== false && isWritable(stream));
(options.writable !== false && isWritableNodeStream(stream));

const wState = stream._writableState;
const rState = stream._readableState;
const state = wState || rState;

const onlegacyfinish = () => {
if (!stream.writable) onfinish();
Expand All @@ -89,16 +63,13 @@ function eos(stream, options, callback) {
// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
let willEmitClose = isServerResponse(stream) || (
state &&
state.autoDestroy &&
state.emitClose &&
state.closed === false &&
isReadable(stream) === readable &&
isWritable(stream) === writable
let willEmitClose = (
_willEmitClose(stream) &&
isReadableNodeStream(stream) === readable &&
isWritableNodeStream(stream) === writable
);

let writableFinished = stream.writableFinished || wState?.finished;
let writableFinished = isWritableFinished(stream, false);
const onfinish = () => {
writableFinished = true;
// Stream should not be destroyed here. If it is that
Expand All @@ -107,12 +78,12 @@ function eos(stream, options, callback) {
if (stream.destroyed) willEmitClose = false;

if (willEmitClose && (!stream.readable || readable)) return;
if (!readable || readableEnded) callback.call(stream);
if (!readable || readableFinished) callback.call(stream);
};

let readableEnded = stream.readableEnded || rState?.endEmitted;
let readableFinished = isReadableFinished(stream, false);
const onend = () => {
readableEnded = true;
readableFinished = true;
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
Expand All @@ -126,7 +97,7 @@ function eos(stream, options, callback) {
callback.call(stream, err);
};

let closed = wState?.closed || rState?.closed;
let closed = isClosed(stream);

const onclose = () => {
closed = true;
Expand All @@ -137,13 +108,13 @@ function eos(stream, options, callback) {
return callback.call(stream, errored);
}

if (readable && !readableEnded) {
if (!isReadableEnded(stream))
if (readable && !readableFinished) {
if (!isReadableFinished(stream, false))
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream))
if (!isWritableFinished(stream, false))
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
Expand Down Expand Up @@ -185,19 +156,16 @@ function eos(stream, options, callback) {
}
} else if (
!readable &&
(!willEmitClose || stream.readable) &&
writableFinished
(!willEmitClose || isReadable(stream)) &&
(writableFinished || !isWritable(stream))
) {
process.nextTick(onclose);
} else if (
!writable &&
(!willEmitClose || stream.writable) &&
readableEnded
(!willEmitClose || isWritable(stream)) &&
(readableFinished || !isReadable(stream))
) {
process.nextTick(onclose);
} else if (!wState && !rState && stream._closed === true) {
// _closed is for OutgoingMessage which is not a proper Writable.
process.nextTick(onclose);
} else if ((rState && stream.req && stream.aborted)) {
process.nextTick(onclose);
}
Expand Down
14 changes: 7 additions & 7 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');

const {
isIterable,
isReadable,
isStream,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');

let PassThrough;
Expand Down Expand Up @@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
} else if (isReadable(val)) {
} else if (isReadableNodeStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
Expand Down Expand Up @@ -204,7 +204,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;

if (isStream(stream)) {
if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
}
Expand All @@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
} else if (isIterable(stream) || isReadable(stream)) {
} else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
throw new ERR_INVALID_ARG_TYPE(
Expand Down Expand Up @@ -271,8 +271,8 @@ function pipeline(...streams) {
finishCount++;
destroys.push(destroyer(ret, false, true, finish));
}
} else if (isStream(stream)) {
if (isReadable(ret)) {
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
ret.pipe(stream);

// Compat. Before node v10.12.0 stdio used to throw an error so
Expand Down
Loading

0 comments on commit bb275ef

Please sign in to comment.