Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix cloned webstreams not being unref correctly #51526

Merged
merged 1 commit into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,11 @@ class ReadableStream {
const transfer = lazyTransfer();
setupReadableStreamDefaultControllerFromSource(
this,
new transfer.CrossRealmTransformReadableSource(port),
// The MessagePort is set to be referenced when reading.
// After two MessagePorts are closed, there is a problem with
// lingering promise not being properly resolved.
// https://github.com/nodejs/node/issues/51486
new transfer.CrossRealmTransformReadableSource(port, true),
0, () => 1);
}
}
Expand Down
36 changes: 26 additions & 10 deletions lib/internal/webstreams/transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,11 @@ function InternalCloneableDOMException() {
InternalCloneableDOMException[kDeserialize] = () => {};

class CrossRealmTransformReadableSource {
constructor(port) {
constructor(port, unref) {
this[kState] = {
port,
controller: undefined,
unref,
};

port.onmessage = ({ data }) => {
Expand Down Expand Up @@ -143,13 +144,19 @@ class CrossRealmTransformReadableSource {
error);
port.close();
};

port.unref();
}

start(controller) {
this[kState].controller = controller;
}

async pull() {
if (this[kState].unref) {
this[kState].unref = false;
this[kState].port.ref();
}
this[kState].port.postMessage({ type: 'pull' });
}

Expand All @@ -170,11 +177,12 @@ class CrossRealmTransformReadableSource {
}

class CrossRealmTransformWritableSink {
constructor(port) {
constructor(port, unref) {
this[kState] = {
port,
controller: undefined,
backpressurePromise: createDeferredPromise(),
unref,
};

port.onmessage = ({ data }) => {
Expand Down Expand Up @@ -211,13 +219,18 @@ class CrossRealmTransformWritableSink {
port.close();
};

port.unref();
}

start(controller) {
this[kState].controller = controller;
}

async write(chunk) {
if (this[kState].unref) {
this[kState].unref = false;
this[kState].port.ref();
}
if (this[kState].backpressurePromise === undefined) {
this[kState].backpressurePromise = {
promise: PromiseResolve(),
Expand Down Expand Up @@ -262,12 +275,12 @@ class CrossRealmTransformWritableSink {
}

function newCrossRealmReadableStream(writable, port) {
const readable =
new ReadableStream(
new CrossRealmTransformReadableSource(port));
// MessagePort should always be unref.
// There is a problem with the process not terminating.
// https://github.com/nodejs/node/issues/44985
const readable = new ReadableStream(new CrossRealmTransformReadableSource(port, false));

const promise =
readableStreamPipeTo(readable, writable, false, false, false);
const promise = readableStreamPipeTo(readable, writable, false, false, false);

setPromiseHandled(promise);

Expand All @@ -278,12 +291,15 @@ function newCrossRealmReadableStream(writable, port) {
}

function newCrossRealmWritableSink(readable, port) {
const writable =
new WritableStream(
new CrossRealmTransformWritableSink(port));
// MessagePort should always be unref.
// There is a problem with the process not terminating.
// https://github.com/nodejs/node/issues/44985
const writable = new WritableStream(new CrossRealmTransformWritableSink(port, false));

const promise = readableStreamPipeTo(readable, writable, false, false, false);

setPromiseHandled(promise);

return {
writable,
promise,
Expand Down
8 changes: 5 additions & 3 deletions lib/internal/webstreams/writablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ class WritableStream {
this[kState].transfer.readable = readable;
this[kState].transfer.promise = promise;

setPromiseHandled(this[kState].transfer.promise);
tsctx marked this conversation as resolved.
Show resolved Hide resolved

return {
data: { port: this[kState].transfer.port2 },
deserializeInfo:
Expand All @@ -283,7 +281,11 @@ class WritableStream {
const transfer = lazyTransfer();
setupWritableStreamDefaultControllerFromSink(
this,
new transfer.CrossRealmTransformWritableSink(port),
// The MessagePort is set to be referenced when reading.
// After two MessagePorts are closed, there is a problem with
// lingering promise not being properly resolved.
// https://github.com/nodejs/node/issues/51486
new transfer.CrossRealmTransformWritableSink(port, true),
1,
() => 1);
}
Expand Down
16 changes: 16 additions & 0 deletions test/parallel/test-webstreams-clone-unref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict';

require('../common');
const { ok } = require('node:assert');

// This test verifies that cloned ReadableStream and WritableStream instances
// do not keep the process alive. The test fails if it timesout (it should just
// exit immediately)

const rs1 = new ReadableStream();
const ws1 = new WritableStream();

const [rs2, ws2] = structuredClone([rs1, ws1], { transfer: [rs1, ws1] });

ok(rs2 instanceof ReadableStream);
ok(ws2 instanceof WritableStream);
11 changes: 11 additions & 0 deletions test/parallel/test-whatwg-webstreams-transfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,12 +464,23 @@ const theData = 'hello';
tracker.verify();
});

// We create an interval to keep the event loop alive while
// we wait for the stream read to complete. The reason this is needed is because there's
// otherwise nothing to keep the worker thread event loop alive long enough to actually
// complete the read from the stream. Under the covers the ReadableStream uses an
// unref'd MessagePort to communicate with the main thread. Because the MessagePort
// is unref'd, it's existence would not keep the thread alive on its own. There was previously
// a bug where this MessagePort was ref'd which would block the thread and main thread
// from terminating at all unless the stream was consumed/closed.
const i = setInterval(() => {}, 1000);

parentPort.onmessage = tracker.calls(({ data }) => {
assert(isReadableStream(data));
const reader = data.getReader();
reader.read().then(tracker.calls((result) => {
assert(!result.done);
assert(result.value instanceof Uint8Array);
clearInterval(i);
}));
parentPort.close();
});
Expand Down