Skip to content

Commit

Permalink
stream: need to cleanup event listeners if last stream is readable
Browse files Browse the repository at this point in the history
fix: #35452

PR-URL: #41954
Fixes: #35452
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
meixg authored Feb 15, 2022
1 parent cc505a5 commit 9fb7ac3
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 12 deletions.
4 changes: 3 additions & 1 deletion doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2575,7 +2575,9 @@ run().catch(console.error);

`stream.pipeline()` leaves dangling event listeners on the streams
after the `callback` has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors.
failure, this can cause event listener leaks and swallowed errors. If the last
stream is readable, dangling event listeners will be removed so that the last
stream can be consumed later.

`stream.pipeline()` closes all the streams when an error is raised.
The `IncomingRequest` usage with `pipeline` could lead to an unexpected behavior
Expand Down
52 changes: 41 additions & 11 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {

const {
isIterable,
isReadable,
isReadableNodeStream,
isNodeStream,
} = require('internal/streams/utils');
Expand All @@ -45,14 +46,17 @@ function destroyer(stream, reading, writing) {
finished = true;
});

eos(stream, { readable: reading, writable: writing }, (err) => {
const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => {
finished = !err;
});

return (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
return {
destroy: (err) => {
if (finished) return;
finished = true;
destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
},
cleanup
};
}

Expand Down Expand Up @@ -159,6 +163,10 @@ function pipelineImpl(streams, callback, opts) {
const signal = ac.signal;
const outerSignal = opts?.signal;

// Need to cleanup event listeners if last stream is readable
// https://github.com/nodejs/node/issues/35452
const lastStreamCleanup = [];

validateAbortSignal(outerSignal, 'options.signal');

function abort() {
Expand Down Expand Up @@ -194,6 +202,9 @@ function pipelineImpl(streams, callback, opts) {
ac.abort();

if (final) {
if (!error) {
lastStreamCleanup.forEach((fn) => fn());
}
process.nextTick(callback, error, value);
}
}
Expand All @@ -204,22 +215,34 @@ function pipelineImpl(streams, callback, opts) {
const reading = i < streams.length - 1;
const writing = i > 0;
const end = reading || opts?.end !== false;
const isLastStream = i === streams.length - 1;

if (isNodeStream(stream)) {
if (end) {
destroys.push(destroyer(stream, reading, writing));
const { destroy, cleanup } = destroyer(stream, reading, writing);
destroys.push(destroy);

if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
}

// Catch stream errors that occur after pipe/pump has completed.
stream.on('error', (err) => {
function onError(err) {
if (
err &&
err.name !== 'AbortError' &&
err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
) {
finish(err);
}
});
}
stream.on('error', onError);
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(() => {
stream.removeListener('error', onError);
});
}
}

if (i === 0) {
Expand Down Expand Up @@ -285,12 +308,19 @@ function pipelineImpl(streams, callback, opts) {

ret = pt;

destroys.push(destroyer(ret, false, true));
const { destroy, cleanup } = destroyer(ret, false, true);
destroys.push(destroy);
if (isLastStream) {
lastStreamCleanup.push(cleanup);
}
}
} else if (isNodeStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount += 2;
pipe(ret, stream, finish, { end });
const cleanup = pipe(ret, stream, finish, { end });
if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup);
}
} else if (isIterable(ret)) {
finishCount++;
pump(ret, stream, finish, { end });
Expand Down Expand Up @@ -345,7 +375,7 @@ function pipe(src, dst, finish, { end }) {
finish(err);
}
});
eos(dst, { readable: false, writable: true }, finish);
return eos(dst, { readable: false, writable: true }, finish);
}

module.exports = { pipelineImpl, pipeline };
76 changes: 76 additions & 0 deletions test/parallel/test-stream-pipeline-listeners.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
'use strict';

const common = require('../common');
const { pipeline, Duplex, PassThrough, Writable } = require('stream');
const assert = require('assert');

process.on('uncaughtException', common.mustCall((err) => {
assert.strictEqual(err.message, 'no way');
}, 2));

// Ensure that listeners is removed if last stream is readble
// And other stream's listeners unchanged
const a = new PassThrough();
a.end('foobar');
const b = new Duplex({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(a, b, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

assert(a.listenerCount('error') > 0);
assert.strictEqual(b.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
b.destroy(new Error('no way'));
}, 100);
}));

// Async generators
const c = new PassThrough();
c.end('foobar');
const d = pipeline(
c,
async function* (source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
},
common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

assert(c.listenerCount('error') > 0);
assert.strictEqual(d.listenerCount('error'), 0);
setTimeout(() => {
assert.strictEqual(b.listenerCount('error'), 0);
d.destroy(new Error('no way'));
}, 100);
})
);

// If last stream is not readable, will not throw and remove listeners
const e = new PassThrough();
e.end('foobar');
const f = new Writable({
write(chunk, encoding, callback) {
callback();
}
});
pipeline(e, f, common.mustCall((error) => {
if (error) {
assert.ifError(error);
}

assert(e.listenerCount('error') > 0);
assert(f.listenerCount('error') > 0);
setTimeout(() => {
assert(f.listenerCount('error') > 0);
f.destroy(new Error('no way'));
}, 100);
}));

0 comments on commit 9fb7ac3

Please sign in to comment.