Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add promise-based IPC #1059

Merged
merged 1 commit into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,37 @@ Split a `command` string into an array. For example, `'npm run build'` returns `

[More info.](escaping.md#user-defined-input)

### sendMessage(message)

`message`: [`Message`](ipc.md#message-type)\
_Returns_: `Promise<void>`

Send a `message` to the parent process.

This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#message-type) of `message` depends on the [`serialization`](#optionsserialization) option.

[More info.](ipc.md#exchanging-messages)

### getOneMessage()

_Returns_: [`Promise<Message>`](ipc.md#message-type)

Receive a single `message` from the parent process.

This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#message-type) of `message` depends on the [`serialization`](#optionsserialization) option.

[More info.](ipc.md#exchanging-messages)

### getEachMessage()

_Returns_: [`AsyncIterable<Message>`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols)

Iterate over each `message` from the parent process.

This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#message-type) of `message` depends on the [`serialization`](#optionsserialization) option.

[More info.](ipc.md#listening-to-messages)

## Return value

_TypeScript:_ [`ResultPromise`](typescript.md)\
Expand Down Expand Up @@ -217,31 +248,37 @@ This is `undefined` if the subprocess failed to spawn.

[More info.](termination.md#inter-process-termination)

### subprocess.send(message)

`message`: `unknown`\
_Returns_: `boolean`
### subprocess.sendMessage(message)

Send a `message` to the subprocess. The type of `message` depends on the [`serialization`](#optionsserialization) option.
The subprocess receives it as a [`message` event](https://nodejs.org/api/process.html#event-message).
`message`: [`Message`](ipc.md#message-type)\
_Returns_: `Promise<void>`

This returns `true` on success.
Send a `message` to the subprocess.

This requires the [`ipc`](#optionsipc) option to be `true`.
This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#message-type) of `message` depends on the [`serialization`](#optionsserialization) option.

[More info.](ipc.md#exchanging-messages)

### subprocess.on('message', (message) => void)
### subprocess.getOneMessage()

`message`: `unknown`
_Returns_: [`Promise<Message>`](ipc.md#message-type)

Receives a `message` from the subprocess. The type of `message` depends on the [`serialization`](#optionsserialization) option.
The subprocess sends it using [`process.send(message)`](https://nodejs.org/api/process.html#processsendmessage-sendhandle-options-callback).
Receive a single `message` from the subprocess.

This requires the [`ipc`](#optionsipc) option to be `true`.
This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#message-type) of `message` depends on the [`serialization`](#optionsserialization) option.

[More info.](ipc.md#exchanging-messages)

### subprocess.getEachMessage()

_Returns_: [`AsyncIterable<Message>`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols)

Iterate over each `message` from the subprocess.

This requires the [`ipc`](#optionsipc) option to be `true`. The [type](ipc.md#message-type) of `message` depends on the [`serialization`](#optionsserialization) option.

[More info.](ipc.md#listening-to-messages)

### subprocess.stdin

_Type:_ [`Writable | null`](https://nodejs.org/api/stream.html#class-streamwritable)
Expand Down Expand Up @@ -853,7 +890,7 @@ By default, this applies to both `stdout` and `stderr`, but [different values ca
_Type:_ `boolean`\
_Default:_ `true` if the [`node`](#optionsnode) option is enabled, `false` otherwise

Enables exchanging messages with the subprocess using [`subprocess.send(message)`](#subprocesssendmessage) and [`subprocess.on('message', (message) => {})`](#subprocessonmessage-message--void).
Enables exchanging messages with the subprocess using [`subprocess.sendMessage(message)`](#subprocesssendmessagemessage), [`subprocess.getOneMessage()`](#subprocessgetonemessage) and [`subprocess.getEachMessage()`](#subprocessgeteachmessage).

[More info.](ipc.md)

Expand Down
6 changes: 3 additions & 3 deletions docs/bash.md
Original file line number Diff line number Diff line change
Expand Up @@ -792,11 +792,11 @@ await $({detached: true})`npm run build`;

```js
// Execa
const subprocess = $({ipc: true})`node script.js`;
const subprocess = $({node: true})`script.js`;

subprocess.on('message', message => {
for await (const message of subprocess.getEachMessage()) {
if (message === 'ping') {
subprocess.send('pong');
await subprocess.sendMessage('pong');
}
});
```
Expand Down
52 changes: 41 additions & 11 deletions docs/ipc.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,59 @@ When the [`ipc`](api.md#optionsipc) option is `true`, the current process and su

The `ipc` option defaults to `true` when using [`execaNode()`](node.md#run-nodejs-files) or the [`node`](node.md#run-nodejs-files) option.

The current process sends messages with [`subprocess.send(message)`](api.md#subprocesssendmessage) and receives them with [`subprocess.on('message', (message) => {})`](api.md#subprocessonmessage-message--void). The subprocess sends messages with [`process.send(message)`](https://nodejs.org/api/process.html#processsendmessage-sendhandle-options-callback) and [`process.on('message', (message) => {})`](https://nodejs.org/api/process.html#event-message).
The current process sends messages with [`subprocess.sendMessage(message)`](api.md#subprocesssendmessagemessage) and receives them with [`subprocess.getOneMessage()`](api.md#subprocessgetonemessage). The subprocess uses [`sendMessage(message)`](api.md#sendmessagemessage) and [`getOneMessage()`](api.md#getonemessage) instead.

More info on [sending](https://nodejs.org/api/child_process.html#subprocesssendmessage-sendhandle-options-callback) and [receiving](https://nodejs.org/api/child_process.html#event-message) messages.
```js
// parent.js
import {execaNode} from 'execa';

const subprocess = execaNode`child.js`;
console.log(await subprocess.getOneMessage()); // 'Hello from child'
await subprocess.sendMessage('Hello from parent');
```

```js
// child.js
import {sendMessage, getOneMessage} from 'execa';

await sendMessage('Hello from child');
console.log(await getOneMessage()); // 'Hello from parent'
```

## Listening to messages

[`subprocess.getOneMessage()`](api.md#subprocessgetonemessage) and [`getOneMessage()`](api.md#getonemessage) read a single message. To listen to multiple messages in a row, [`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) and [`getEachMessage()`](api.md#geteachmessage) should be used instead.

[`subprocess.getEachMessage()`](api.md#subprocessgeteachmessage) waits for the subprocess to end (even when using [`break`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/break) or [`return`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/return)). It throws if the subprocess [fails](api.md#result). This means you do not need to `await` the subprocess' [promise](execution.md#result).

```js
// parent.js
import {execaNode} from 'execa';

const subprocess = execaNode`child.js`;
subprocess.on('message', messageFromChild => {
/* ... */
});
subprocess.send('Hello from parent');
await subprocess.sendMessage(0);

// This loop ends when the subprocess exits.
// It throws if the subprocess fails.
for await (const message of subprocess.getEachMessage()) {
console.log(message); // 1, 3, 5, 7, 9
await subprocess.sendMessage(message + 1);
}
```

```js
// child.js
import process from 'node:process';
import {sendMessage, getEachMessage} from 'execa';

// The subprocess exits when hitting `break`
for await (const message of getEachMessage()) {
if (message === 10) {
break
}

process.on('message', messageFromParent => {
/* ... */
});
process.send('Hello from child');
console.log(message); // 0, 2, 4, 6, 8
await sendMessage(message + 1);
}
```

## Message type
Expand Down
9 changes: 8 additions & 1 deletion docs/typescript.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

## Available types

The following types can be imported: [`ResultPromise`](api.md#return-value), [`Subprocess`](api.md#subprocess), [`Result`](api.md#result), [`ExecaError`](api.md#execaerror), [`Options`](api.md#options), [`StdinOption`](api.md#optionsstdin), [`StdoutStderrOption`](api.md#optionsstdout) and [`TemplateExpression`](api.md#execacommand).
The following types can be imported: [`ResultPromise`](api.md#return-value), [`Subprocess`](api.md#subprocess), [`Result`](api.md#result), [`ExecaError`](api.md#execaerror), [`Options`](api.md#options), [`StdinOption`](api.md#optionsstdin), [`StdoutStderrOption`](api.md#optionsstdout), [`TemplateExpression`](api.md#execacommand) and [`Message`](api.md#subprocesssendmessagemessage).

```ts
import {
Expand All @@ -20,18 +20,22 @@ import {
type StdinOption,
type StdoutStderrOption,
type TemplateExpression,
type Message,
} from 'execa';

const options: Options = {
stdin: 'inherit' satisfies StdinOption,
stdout: 'pipe' satisfies StdoutStderrOption,
stderr: 'pipe' satisfies StdoutStderrOption,
timeout: 1000,
ipc: true,
};
const task: TemplateExpression = 'build';
const message: Message = 'hello world';

try {
const subprocess: ResultPromise = execa(options)`npm run ${task}`;
await subprocess.sendMessage(message);
const result: Result = await subprocess;
console.log(result.stdout);
} catch (error) {
Expand Down Expand Up @@ -94,11 +98,14 @@ const options = {
stdout: 'pipe',
stderr: 'pipe',
timeout: 1000,
ipc: true,
} as const;
const task = 'build';
const message = 'hello world';

try {
const subprocess = execa(options)`npm run ${task}`;
await subprocess.sendMessage(message);
const result = await subprocess;
printResultStdout(result);
} catch (error) {
Expand Down
11 changes: 10 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@ export type {
StdoutStderrSyncOption,
} from './types/stdio/type.js';
export type {Options, SyncOptions} from './types/arguments/options.js';
export type {TemplateExpression} from './types/methods/template.js';

export type {Result, SyncResult} from './types/return/result.js';
export type {ResultPromise, Subprocess} from './types/subprocess/subprocess.js';
export {ExecaError, ExecaSyncError} from './types/return/final-error.js';
export type {TemplateExpression} from './types/methods/template.js';

export {execa} from './types/methods/main-async.js';
export {execaSync} from './types/methods/main-sync.js';
export {execaCommand, execaCommandSync, parseCommandString} from './types/methods/command.js';
export {$} from './types/methods/script.js';
export {execaNode} from './types/methods/node.js';

export {
sendMessage,
getOneMessage,
getEachMessage,
type Message,
} from './types/ipc.js';
4 changes: 4 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {createExeca} from './lib/methods/create.js';
import {mapCommandAsync, mapCommandSync} from './lib/methods/command.js';
import {mapNode} from './lib/methods/node.js';
import {mapScriptAsync, setScriptSync, deepScriptOptions} from './lib/methods/script.js';
import {getIpcExport} from './lib/ipc/methods.js';

export {parseCommandString} from './lib/methods/command.js';
export {ExecaError, ExecaSyncError} from './lib/return/final-error.js';
Expand All @@ -12,3 +13,6 @@ export const execaCommand = createExeca(mapCommandAsync);
export const execaCommandSync = createExeca(mapCommandSync);
export const execaNode = createExeca(mapNode);
export const $ = createExeca(mapScriptAsync, {}, deepScriptOptions, setScriptSync);

const {sendMessage, getOneMessage, getEachMessage} = getIpcExport();
export {sendMessage, getOneMessage, getEachMessage};
4 changes: 4 additions & 0 deletions lib/ipc/array.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// The `ipc` option adds an `ipc` item to the `stdio` option
export const normalizeIpcStdioArray = (stdioArray, ipc) => ipc && !stdioArray.includes('ipc')
? [...stdioArray, 'ipc']
: stdioArray;
43 changes: 43 additions & 0 deletions lib/ipc/get-each.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {once, on} from 'node:events';
import {validateIpcOption, validateConnection} from './validation.js';

// Like `[sub]process.on('message')` but promise-based
export const getEachMessage = function ({anyProcess, isSubprocess, ipc}) {
const methodName = 'getEachMessage';
validateIpcOption(methodName, isSubprocess, ipc);
validateConnection(methodName, isSubprocess, anyProcess.channel !== null);

const controller = new AbortController();
stopOnExit(anyProcess, isSubprocess, controller);

return iterateOnMessages(anyProcess, isSubprocess, controller);
};

const stopOnExit = async (anyProcess, isSubprocess, controller) => {
try {
const onDisconnect = once(anyProcess, 'disconnect', {signal: controller.signal});
await (isSubprocess
? onDisconnect
: Promise.race([onDisconnect, anyProcess]));
} catch {} finally {
controller.abort();
}
};

const iterateOnMessages = async function * (anyProcess, isSubprocess, controller) {
try {
for await (const [message] of on(anyProcess, 'message', {signal: controller.signal})) {
yield message;
}
} catch (error) {
if (!controller.signal.aborted) {
throw error;
}
} finally {
if (!isSubprocess) {
await anyProcess;
}

controller.abort();
}
};
16 changes: 16 additions & 0 deletions lib/ipc/get-one.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import {once} from 'node:events';
import {validateIpcOption, validateConnection} from './validation.js';

// Like `[sub]process.once('message')` but promise-based
export const getOneMessage = ({anyProcess, isSubprocess, ipc}) => {
const methodName = 'getOneMessage';
validateIpcOption(methodName, isSubprocess, ipc);
validateConnection(methodName, isSubprocess, anyProcess.channel !== null);

return onceMessage(anyProcess);
};

const onceMessage = async anyProcess => {
const [message] = await once(anyProcess, 'message');
return message;
};
30 changes: 30 additions & 0 deletions lib/ipc/methods.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import process from 'node:process';
import {promisify} from 'node:util';
import {sendMessage} from './send.js';
import {getOneMessage} from './get-one.js';
import {getEachMessage} from './get-each.js';

// Add promise-based IPC methods in current process
export const addIpcMethods = (subprocess, {ipc}) => {
Object.assign(subprocess, getIpcMethods(subprocess, false, ipc));
};

// Get promise-based IPC in the subprocess
export const getIpcExport = () => getIpcMethods(process, true, process.channel !== undefined);

// Retrieve the `ipc` shared by both the current process and the subprocess
const getIpcMethods = (anyProcess, isSubprocess, ipc) => {
const anyProcessSend = anyProcess.send === undefined
? undefined
: promisify(anyProcess.send.bind(anyProcess));
return {
sendMessage: sendMessage.bind(undefined, {
anyProcess,
anyProcessSend,
isSubprocess,
ipc,
}),
getOneMessage: getOneMessage.bind(undefined, {anyProcess, isSubprocess, ipc}),
getEachMessage: getEachMessage.bind(undefined, {anyProcess, isSubprocess, ipc}),
};
};
26 changes: 26 additions & 0 deletions lib/ipc/send.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import {
validateIpcOption,
validateConnection,
handleSerializationError,
} from './validation.js';

// Like `[sub]process.send()` but promise-based.
// We do not `await subprocess` during `.sendMessage()` nor `.getOneMessage()` since those methods are transient.
// Users would still need to `await subprocess` after the method is done.
// Also, this would prevent `unhandledRejection` event from being emitted, making it silent.
export const sendMessage = ({anyProcess, anyProcessSend, isSubprocess, ipc}, message) => {
const methodName = 'sendMessage';
validateIpcOption(methodName, isSubprocess, ipc);
validateConnection(methodName, isSubprocess, anyProcess.connected);

return sendOneMessage(anyProcessSend, isSubprocess, message);
};

const sendOneMessage = async (anyProcessSend, isSubprocess, message) => {
try {
await anyProcessSend(message);
} catch (error) {
handleSerializationError(error, isSubprocess, message);
throw error;
}
};
Loading
Loading