Skip to content

Commit

Permalink
doc: esm examples /w imports for process, Buffer
Browse files Browse the repository at this point in the history
PR-URL: #39043
Reviewed-By: Bradley Farias <[email protected]>
  • Loading branch information
guybedford authored and targos committed Jul 11, 2021
1 parent 412b101 commit 81cebec
Show file tree
Hide file tree
Showing 13 changed files with 3,659 additions and 405 deletions.
8 changes: 8 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ module.exports = {
name: 'require',
message: 'Use import instead',
},
{
name: 'Buffer',
message: 'Import Buffer instead of using the global'
},
{
name: 'process',
message: 'Import process instead of using the global'
},
] },
},
],
Expand Down
1 change: 1 addition & 0 deletions doc/api/assert.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ for the verification to take place. The usual pattern would be to call it in a

```mjs
import assert from 'assert';
import process from 'process';

const tracker = new assert.CallTracker();

Expand Down
205 changes: 198 additions & 7 deletions doc/api/async_context.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ in other languages.
The `AsyncLocalStorage` and `AsyncResource` classes are part of the
`async_hooks` module:

```js
```mjs
import async_hooks from 'async_hooks';
```

```cjs
const async_hooks = require('async_hooks');
```

Expand All @@ -40,7 +44,39 @@ The following example uses `AsyncLocalStorage` to build a simple logger
that assigns IDs to incoming HTTP requests and includes them in messages
logged within each request.

```js
```mjs
import http from 'http';
import { AsyncLocalStorage } from 'async_hooks';

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);

http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
```

```cjs
const http = require('http');
const { AsyncLocalStorage } = require('async_hooks');

Expand Down Expand Up @@ -299,7 +335,35 @@ The `init` hook will trigger when an `AsyncResource` is instantiated.

The following is an overview of the `AsyncResource` API.

```js
```mjs
import { AsyncResource, executionAsyncId } from 'async_hooks';

// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false }
);

// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);

// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();

// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();

// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
```

```cjs
const { AsyncResource, executionAsyncId } = require('async_hooks');

// AsyncResource() is meant to be extended. Instantiating a
Expand Down Expand Up @@ -446,7 +510,14 @@ database connection pools, can follow a similar model.
Assuming that the task is adding two numbers, using a file named
`task_processor.js` with the following content:

```js
```mjs
import { parentPort } from 'worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
```

```cjs
const { parentPort } = require('worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
Expand All @@ -455,7 +526,95 @@ parentPort.on('message', (task) => {

a Worker pool around it could use the following structure:

```js
```mjs
import { AsyncResource } from 'async_hooks';
import { EventEmitter } from 'events';
import path from 'path';
import { Worker } from 'worker_threads';

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}

done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}

export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];

for (let i = 0; i < numThreads; i++)
this.addNewWorker();

// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}

addNewWorker() {
const worker = new Worker(new URL('task_processer.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}

runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}

const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}

close() {
for (const worker of this.workers) worker.terminate();
}
}
```
```cjs
const { AsyncResource } = require('async_hooks');
const { EventEmitter } = require('events');
const path = require('path');
Expand Down Expand Up @@ -553,7 +712,23 @@ were scheduled.
This pool could be used as follows:
```js
```mjs
import WorkerPool from './worker_pool.js';
import os from 'os';

const pool = new WorkerPool(os.cpus().length);

let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
```
```cjs
const WorkerPool = require('./worker_pool.js');
const os = require('os');

Expand All @@ -579,7 +754,22 @@ The following example shows how to use the `AsyncResource` class to properly
associate an event listener with the correct execution context. The same
approach can be applied to a [`Stream`][] or a similar event-driven class.
```js
```mjs
import { createServer } from 'http';
import { AsyncResource, executionAsyncId } from 'async_hooks';

const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);
```
```cjs
const { createServer } = require('http');
const { AsyncResource, executionAsyncId } = require('async_hooks');

Expand All @@ -593,6 +783,7 @@ const server = createServer((req, res) => {
res.end();
}).listen(3000);
```
[`AsyncResource`]: #async_context_class_asyncresource
[`EventEmitter`]: events.md#events_class_eventemitter
[`Stream`]: stream.md#stream_stream
Expand Down
Loading

0 comments on commit 81cebec

Please sign in to comment.