Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

child_process: create proper public API for channel #30165

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions doc/api/child_process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1018,13 +1018,33 @@ See [Advanced Serialization][] for more details.
### `subprocess.channel`
<!-- YAML
added: v7.1.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30165
description: The object no longer accidentally exposes native C++ bindings.
-->

* {Object} A pipe representing the IPC channel to the child process.

The `subprocess.channel` property is a reference to the child's IPC channel. If
no IPC channel currently exists, this property is `undefined`.

#### `subprocess.channel.ref()`
<!-- YAML
added: v7.1.0
-->

This method makes the IPC channel keep the event loop of the parent process
running if `.unref()` has been called before.

#### `subprocess.channel.unref()`
<!-- YAML
added: v7.1.0
-->

This method makes the IPC channel not keep the event loop of the parent process
running, and lets it finish even while the channel is open.

### `subprocess.connected`
<!-- YAML
added: v0.7.2
Expand Down
28 changes: 28 additions & 0 deletions doc/api/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,10 @@ $ bash -c 'exec -a customArgv0 ./node'
## `process.channel`
<!-- YAML
added: v7.1.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30165
description: The object no longer accidentally exposes native C++ bindings.
-->

* {Object}
Expand All @@ -635,6 +639,30 @@ If the Node.js process was spawned with an IPC channel (see the
property is a reference to the IPC channel. If no IPC channel exists, this
property is `undefined`.

### `process.channel.ref()`
<!-- YAML
added: v7.1.0
-->

This method makes the IPC channel keep the event loop of the process
running if `.unref()` has been called before.

Typically, this is managed through the number of `'disconnect'` and `'message'`
listeners on the `process` object. However, this method can be used to
explicitly request a specific behavior.

### `process.channel.unref()`
<!-- YAML
added: v7.1.0
-->

This method makes the IPC channel not keep the event loop of the process
running, and lets it finish even while the channel is open.

Typically, this is managed through the number of `'disconnect'` and `'message'`
listeners on the `process` object. However, this method can be used to
explicitly request a specific behavior.

## `process.chdir(directory)`
<!-- YAML
added: v0.1.17
Expand Down
4 changes: 2 additions & 2 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ function _forkChild(fd, serializationMode) {
p.unref();
const control = setupChannel(process, p, serializationMode);
process.on('newListener', function onNewListener(name) {
if (name === 'message' || name === 'disconnect') control.ref();
if (name === 'message' || name === 'disconnect') control.refCounted();
});
process.on('removeListener', function onRemoveListener(name) {
if (name === 'message' || name === 'disconnect') control.unref();
if (name === 'message' || name === 'disconnect') control.unrefCounted();
});
}

Expand Down
3 changes: 2 additions & 1 deletion lib/internal/bootstrap/switches/is_main_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ function createWritableStdioStream(fd) {
// an error when trying to use it again. In that case, create the socket
// using the existing handle instead of the fd.
if (process.channel && process.channel.fd === fd) {
const { kChannelHandle } = require('internal/child_process');
stream = new net.Socket({
handle: process.channel,
handle: process[kChannelHandle],
readable: false,
writable: true
});
Expand Down
56 changes: 41 additions & 15 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ let freeParser;
let HTTPParser;

const MAX_HANDLE_RETRANSMISSIONS = 3;
const kChannelHandle = Symbol('kChannelHandle');
const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');

// This object contain function to convert TCP objects to native handle objects
Expand Down Expand Up @@ -108,7 +109,7 @@ const handleConversion = {
// The worker should keep track of the socket
message.key = socket.server._connectionKey;

const firstTime = !this.channel.sockets.send[message.key];
const firstTime = !this[kChannelHandle].sockets.send[message.key];
const socketList = getSocketList('send', this, message.key);

// The server should no longer expose a .connection property
Expand Down Expand Up @@ -508,30 +509,55 @@ ChildProcess.prototype.unref = function() {
};

class Control extends EventEmitter {
#channel = null;
#refs = 0;
#refExplicitlySet = false;

constructor(channel) {
super();
this.channel = channel;
this.refs = 0;
this.#channel = channel;
}
ref() {
if (++this.refs === 1) {
this.channel.ref();

// The methods keeping track of the counter are being used to track the
// listener count on the child process object as well as when writes are
// in progress. Once the user has explicitly requested a certain state, these
// methods become no-ops in order to not interfere with the user's intentions.
refCounted() {
if (++this.#refs === 1 && !this.#refExplicitlySet) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is refExplicitlySet necessary? Are these *Counted methods? It's the user's responsibility to pair ref and unref calls, balancing internal ref/unref calls is our responsibility. I'm not sure I see why they're needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnoordhuis Yeah, I’m not a fan of this either, but without this tests are failing and I wouldn’t know a better solution … the issue isn’t so much that the calls might end up unbalanced, it’s that currently, a process.channel.unref() call for example from userland is expected to unref the channel even if there are event listeners that would otherwise keep it ref()’ed.

this.#channel.ref();
}
}
unref() {
if (--this.refs === 0) {
this.channel.unref();

unrefCounted() {
if (--this.#refs === 0 && !this.#refExplicitlySet) {
this.#channel.unref();
this.emit('unref');
}
}

ref() {
this.#refExplicitlySet = true;
this.#channel.ref();
}

unref() {
this.#refExplicitlySet = true;
this.#channel.unref();
}

get fd() {
return this.#channel ? this.#channel.fd : undefined;
}
}

const channelDeprecationMsg = '_channel is deprecated. ' +
'Use ChildProcess.channel instead.';

let serialization;
function setupChannel(target, channel, serializationMode) {
target.channel = channel;
const control = new Control(channel);
target.channel = control;
target[kChannelHandle] = channel;

ObjectDefineProperty(target, '_channel', {
get: deprecate(() => {
Expand All @@ -547,8 +573,6 @@ function setupChannel(target, channel, serializationMode) {
target._handleQueue = null;
target._pendingMessage = null;

const control = new Control(channel);

if (serialization === undefined)
serialization = require('internal/child_process/serialization');
const {
Expand Down Expand Up @@ -796,11 +820,11 @@ function setupChannel(target, channel, serializationMode) {

if (wasAsyncWrite) {
req.oncomplete = () => {
control.unref();
control.unrefCounted();
if (typeof callback === 'function')
callback(null);
};
control.ref();
control.refCounted();
} else if (typeof callback === 'function') {
process.nextTick(callback, null);
}
Expand Down Expand Up @@ -855,6 +879,7 @@ function setupChannel(target, channel, serializationMode) {

// This marks the fact that the channel is actually disconnected.
this.channel = null;
this[kChannelHandle] = null;

if (this._pendingMessage)
closePendingHandle(this);
Expand Down Expand Up @@ -1011,7 +1036,7 @@ function getValidStdio(stdio, sync) {


function getSocketList(type, worker, key) {
const sockets = worker.channel.sockets[type];
const sockets = worker[kChannelHandle].sockets[type];
let socketList = sockets[key];
if (!socketList) {
const Construct = type === 'send' ? SocketListSend : SocketListReceive;
Expand Down Expand Up @@ -1054,6 +1079,7 @@ function spawnSync(options) {

module.exports = {
ChildProcess,
kChannelHandle,
setupChannel,
getValidStdio,
stdioStringToArray,
Expand Down
9 changes: 6 additions & 3 deletions test/parallel/test-child-process-recv-handle.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ else
function master() {
// spawn() can only create one IPC channel so we use stdin/stdout as an
// ad-hoc command channel.
const proc = spawn(process.execPath, [__filename, 'worker'], {
const proc = spawn(process.execPath, [
'--expose-internals', __filename, 'worker'
], {
stdio: ['pipe', 'pipe', 'pipe', 'ipc']
});
let handle = null;
Expand All @@ -57,12 +59,13 @@ function master() {
}

function worker() {
process.channel.readStop(); // Make messages batch up.
const { kChannelHandle } = require('internal/child_process');
process[kChannelHandle].readStop(); // Make messages batch up.
process.stdout.ref();
process.stdout.write('ok\r\n');
process.stdin.once('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'ok\r\n');
process.channel.readStart();
process[kChannelHandle].readStart();
}));
let n = 0;
process.on('message', common.mustCall((msg, handle) => {
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-child-process-silent.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ if (process.argv[2] === 'pipe') {
const child = childProcess.fork(process.argv[1], ['pipe'], { silent: true });

// Allow child process to self terminate
child.channel.close();
child.channel = null;
child.disconnect();

child.on('exit', function() {
process.exit(0);
Expand Down