Skip to content

Commit

Permalink
stream: cleanup async handling
Browse files Browse the repository at this point in the history
Cleanup async stream method handling.

PR-URL: #39329
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: James M Snell <[email protected]>
  • Loading branch information
ronag authored and targos committed Jul 13, 2021
1 parent 1fc6382 commit cb32f69
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 113 deletions.
130 changes: 36 additions & 94 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,16 @@ function destroy(err, cb) {

function _destroy(self, err, cb) {
let called = false;
const result = self._destroy(err || null, (err) => {
const r = self._readableState;
const w = self._writableState;

function onDestroy(err) {
if (called) {
return;
}
called = true;

const r = self._readableState;
const w = self._writableState;

checkError(err, w, r);

if (w) {
Expand All @@ -93,64 +97,24 @@ function _destroy(self, err, cb) {
} else {
process.nextTick(emitCloseNT, self);
}
});
if (result !== undefined && result !== null) {
try {
}
try {
const result = self._destroy(err || null, onDestroy);
if (result != null) {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (called)
return;

const r = self._readableState;
const w = self._writableState;

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb);
}

process.nextTick(emitCloseNT, self);
process.nextTick(onDestroy, null);
},
function(err) {
const r = self._readableState;
const w = self._writableState;
err.stack; // eslint-disable-line no-unused-expressions

called = true;

if (w && !w.errored) {
w.errored = err;
}
if (r && !r.errored) {
r.errored = err;
}

if (w) {
w.closed = true;
}
if (r) {
r.closed = true;
}

if (typeof cb === 'function') {
process.nextTick(cb, err);
}

process.nextTick(emitErrorCloseNT, self, err);
process.nextTick(onDestroy, err);
});
}
} catch (err) {
process.nextTick(emitErrorNT, self, err);
}
} catch (err) {
onDestroy(err);
}
}

Expand Down Expand Up @@ -284,74 +248,52 @@ function construct(stream, cb) {
}

function constructNT(stream) {
const r = stream._readableState;
const w = stream._writableState;
// With duplex streams we use the writable side for state.
const s = w || r;

let called = false;
const result = stream._construct((err) => {

function onConstruct(err) {
if (called) {
errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK());
return;
}
called = true;

const r = stream._readableState;
const w = stream._writableState;
const s = w || r;

if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}

if (called) {
err = new ERR_MULTIPLE_CALLBACK();
} else {
called = true;
}

if (s.destroyed) {
stream.emit(kDestroy, err);
} else if (err) {
errorOrDestroy(stream, err, true);
} else {
process.nextTick(emitConstructNT, stream);
}
});
if (result !== undefined && result !== null) {
try {
}

try {
const result = stream._construct(onConstruct);
if (result != null) {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
// If the callback was invoked, do nothing further.
if (called)
return;
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy));
} else {
process.nextTick(emitConstructNT, stream);
}
process.nextTick(onConstruct, null);
},
function(err) {
if (r) {
r.constructed = true;
}
if (w) {
w.constructed = true;
}
called = true;
if (s.destroyed) {
process.nextTick(() => stream.emit(kDestroy, err));
} else {
process.nextTick(errorOrDestroy, stream, err);
}
process.nextTick(onConstruct, err);
});
}
} catch (err) {
process.nextTick(emitErrorNT, stream, err);
}
} catch (err) {
onConstruct(err);
}
}

Expand Down
2 changes: 2 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,10 @@ Readable.prototype.read = function(n) {
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.needReadable = true;

// Call internal read method
this._read(state.highWaterMark);

state.sync = false;
// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
Expand Down
41 changes: 22 additions & 19 deletions lib/internal/streams/writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,15 @@ function needFinish(state) {
}

function callFinal(stream, state) {
state.sync = true;
state.pendingcb++;
const result = stream._final((err) => {
let called = false;

function onFinish(err) {
if (called) {
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
return;
}
called = true;

state.pendingcb--;
if (err) {
const onfinishCallbacks = state[kOnFinished].splice(0);
Expand All @@ -679,33 +685,30 @@ function callFinal(stream, state) {
state.pendingcb++;
process.nextTick(finish, stream, state);
}
});
if (result !== undefined && result !== null) {
try {
}

state.sync = true;
state.pendingcb++;

try {
const result = stream._final(onFinish);
if (result != null) {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
function() {
if (state.prefinished || !needFinish(state))
return;
state.prefinish = true;
process.nextTick(() => stream.emit('prefinish'));
state.pendingcb++;
process.nextTick(finish, stream, state);
process.nextTick(onFinish, null);
},
function(err) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
process.nextTick(onfinishCallbacks[i], err);
}
process.nextTick(errorOrDestroy, stream, err, state.sync);
process.nextTick(onFinish, err);
});
}
} catch (err) {
process.nextTick(errorOrDestroy, stream, err, state.sync);
}
} catch (err) {
onFinish(stream, state, err);
}

state.sync = false;
}

Expand Down
3 changes: 3 additions & 0 deletions test/parallel/test-stream-construct-async-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const assert = require('assert');

const foo = new Foo();
foo.write('test', common.mustCall());
foo.on('error', common.expectsError({
code: 'ERR_MULTIPLE_CALLBACK'
}));
}

{
Expand Down

0 comments on commit cb32f69

Please sign in to comment.