Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify the handling of shutdowns and fix terminate not waiting for the shutdown to finish #538

Merged
merged 8 commits into from
May 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- The `Promise` returned by `terminate()` now correctly waits for everything to be completely shut down before yielding instead of letting the shutdown continue happening in the background. ([#538](https://github.com/smol-dot/smoldot/pull/538))

## 1.0.5 - 2023-05-05

It is now possible to run the CPU-heavy tasks of smoldot within a worker (WebWorker, worker threads, etc.). To do so, create two ports using `new MessageChannel()`, pass one of the two ports in the `ClientOptions.portToWorker` field and send the other port to a web worker, then call `run(port)` from within that worker. The `run` function can be found by importing `import { run } from 'smoldot/worker'`. If a `portToWorker` is provided, then the `cpuRateLimit` setting applies to the worker.
Expand Down
17 changes: 16 additions & 1 deletion wasm-node/javascript/src/internals/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom
// FIFO queue. When `addChain` is called, an entry is added to this queue. When the
// instance notifies that a chain creation has succeeded or failed, an entry is popped.
addChainResults: Array<(outcome: { success: true, chainId: number } | { success: false, error: string }) => void>,
/// Callback called when the `executor-shutdown` or `wasm-panic` event is received.
onExecutorShutdownOrWasmPanic: () => void,
// List of all active chains. Keys are chainIDs assigned by the instance.
chains: Map<number, {
// Callbacks woken up when a JSON-RPC response is ready or when the chain is destroyed
Expand All @@ -268,6 +270,7 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom
currentTaskName: null,
connections: new Map(),
addChainResults: [],
onExecutorShutdownOrWasmPanic: () => {},
chains: new Map(),
};

Expand Down Expand Up @@ -306,8 +309,17 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom

state.currentTaskName = null;

const cb = state.onExecutorShutdownOrWasmPanic;
state.onExecutorShutdownOrWasmPanic = () => {};
cb();
break
}
case "executor-shutdown": {
const cb = state.onExecutorShutdownOrWasmPanic;
state.onExecutorShutdownOrWasmPanic = () => {};
cb();
break;
}
case "log": {
logCallback(event.level, event.target, event.message)
break;
Expand Down Expand Up @@ -570,7 +582,7 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom
throw state.instance.error;
if (state.instance.status !== "ready")
throw new Error(); // Internal error. Never supposed to happen.
state.instance.instance.startShutdown();
state.instance.instance.shutdownExecutor();
state.instance = { status: "destroyed", error: new AlreadyDestroyedError() };
state.connections.forEach((connec) => connec.reset());
state.connections.clear();
Expand All @@ -586,6 +598,9 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom
}
state.chains.clear();
state.currentTaskName = null;

// Wait for the `executor-shutdown` event to be generated.
await new Promise<void>((resolve) => state.onExecutorShutdownOrWasmPanic = resolve);
}
}
}
53 changes: 37 additions & 16 deletions wasm-node/javascript/src/internals/local-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ export type Event =
{ ty: "log", level: number, target: string, message: string } |
{ ty: "json-rpc-responses-non-empty", chainId: number } |
{ ty: "current-task", taskName: string | null } |
// Smoldot has crashed. Note that the public API of the instance can technically still be
// used, as all functions will start running fallback code. Existing connections are *not*
// closed. It is the responsibility of the API user to close all connections if they stop
// using the instance.
{ ty: "wasm-panic", message: string } |
{ ty: "executor-shutdown" } |
{ ty: "new-connection", connectionId: number, address: ParsedMultiaddr } |
{ ty: "connection-reset", connectionId: number } |
{ ty: "connection-stream-open", connectionId: number } |
Expand All @@ -79,12 +84,14 @@ export interface Instance {
addChain: (chainSpec: string, databaseContent: string, potentialRelayChains: number[], disableJsonRpc: boolean) => void,
removeChain: (chainId: number) => void,
/**
* Starts the instance shutdown process in the background. After this function returns, no
* more event is generated and all further function calls might throw or have no effect.
* Notifies the background executor that it should stop. Once it has effectively stopped,
* a `shutdown-finished` event will be generated.
* Note that the instance can technically still be used, and all the functions still work, but
* in practice nothing is being run in the background and as such it won't do much.
* Existing connections are *not* closed. It is the responsibility of the API user to close
* all connections.
*/
startShutdown: () => void,
shutdownExecutor: () => void,
connectionOpened: (connectionId: number, info: { type: 'single-stream', handshake: 'multistream-select-noise-yamux', initialWritableBytes: number, writeClosable: boolean } | { type: 'multi-stream', handshake: 'webrtc', localTlsCertificateMultihash: Uint8Array, remoteTlsCertificateMultihash: Uint8Array }) => void,
connectionReset: (connectionId: number, message: string) => void,
streamWritableBytes: (connectionId: number, numExtra: number, streamId?: number) => void,
Expand Down Expand Up @@ -113,12 +120,14 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly
advanceExecutionPromise: null | (() => void),
stdoutBuffer: string,
stderrBuffer: string,
onShutdownExecutorOrWasmPanic: () => void,
} = {
instance: null,
bufferIndices: new Array(),
advanceExecutionPromise: null,
stdoutBuffer: "",
stderrBuffer: "",
onShutdownExecutorOrWasmPanic: () => { }
};

const smoldotJsBindings = {
Expand All @@ -133,6 +142,8 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly

const message = buffer.utf8BytesToString(new Uint8Array(instance.exports.memory.buffer), ptr, len);
eventCallback({ ty: "wasm-panic", message });
state.onShutdownExecutorOrWasmPanic();
state.onShutdownExecutorOrWasmPanic = () => { };
throw new Error();
},

Expand Down Expand Up @@ -401,6 +412,8 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly
proc_exit: (retCode: number) => {
state.instance = null;
eventCallback({ ty: "wasm-panic", message: `proc_exit called: ${retCode}` });
state.onShutdownExecutorOrWasmPanic();
state.onShutdownExecutorOrWasmPanic = () => { };
throw new Error();
},

Expand Down Expand Up @@ -469,6 +482,10 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly
// configuration.
state.instance.exports.init(config.maxLogLevel);

// Promise that is notified when the `shutdownExecutor` function is called or when a Wasm
// panic happens.
const shutdownExecutorOrWasmPanicPromise = new Promise<"stop">((resolve) => state.onShutdownExecutorOrWasmPanic = () => resolve("stop"));

(async () => {
const cpuRateLimit = config.cpuRateLimit;

Expand All @@ -479,14 +496,11 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly
let now = config.performanceNow();

while (true) {
const whenReadyAgain = new Promise((resolve) => state.advanceExecutionPromise = resolve as () => void);
const whenReadyAgain = new Promise<"ready">((resolve) => state.advanceExecutionPromise = () => resolve("ready"));

if (!state.instance)
break;
const outcome = state.instance.exports.advance_execution();
if (outcome === 0) {
break;
}
state.instance.exports.advance_execution();

const afterExec = config.performanceNow();
const elapsed = afterExec - now;
Expand All @@ -506,10 +520,14 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly
// really care for such extreme values.
if (missingSleep > 2147483646) // Doc says `> 2147483647`, but I don't really trust their pedanticism so let's be safe
missingSleep = 2147483646;
await new Promise((resolve) => setTimeout(resolve, missingSleep));

const sleepFinished = new Promise<"timeout" | "stop">((resolve) => setTimeout(() => resolve("timeout"), missingSleep));
if (await Promise.race([sleepFinished, shutdownExecutorOrWasmPanicPromise]) === "stop")
break;
}

await whenReadyAgain;
if (await Promise.race([whenReadyAgain, shutdownExecutorOrWasmPanicPromise]) === "stop")
break;

const afterWait = config.performanceNow();

Expand All @@ -527,6 +545,10 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly

now = afterWait;
}

if (!state.instance)
return;
eventCallback({ ty: "executor-shutdown" })
})();

return {
Expand Down Expand Up @@ -596,12 +618,12 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly
state.instance.exports.remove_chain(chainId);
},

startShutdown: (): void => {
shutdownExecutor: (): void => {
if (!state.instance)
return;
const instance = state.instance;
state.instance = null;
instance.exports.start_shutdown();
const cb = state.onShutdownExecutorOrWasmPanic;
state.onShutdownExecutorOrWasmPanic = () => { };
cb();
},

connectionOpened: (connectionId: number, info: { type: 'single-stream', handshake: 'multistream-select-noise-yamux', initialWritableBytes: number, writeClosable: boolean } | { type: 'multi-stream', handshake: 'webrtc', localTlsCertificateMultihash: Uint8Array, remoteTlsCertificateMultihash: Uint8Array }) => {
Expand Down Expand Up @@ -679,8 +701,7 @@ export async function startLocalInstance(config: Config, wasmModule: WebAssembly
interface SmoldotWasmExports extends WebAssembly.Exports {
memory: WebAssembly.Memory,
init: (maxLogLevel: number) => void,
advance_execution: () => number,
start_shutdown: () => void,
advance_execution: () => void,
add_chain: (chainSpecBufferIndex: number, databaseContentBufferIndex: number, jsonRpcRunning: number, potentialRelayChainsBufferIndex: number) => number;
remove_chain: (chainId: number) => void,
chain_is_ok: (chainId: number) => number,
Expand Down
62 changes: 28 additions & 34 deletions wasm-node/javascript/src/internals/remote-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ export async function connectToInstanceServer(config: ConnectConfig): Promise<in

// Update some local state.
switch (message.ty) {
case "wasm-panic": {
state.jsonRpcResponses.clear();
state.connections.clear();
case "wasm-panic":
case "executor-shutdown": {
portToServer.close();
initialPort.close();
break;
Expand Down Expand Up @@ -185,7 +184,7 @@ export async function connectToInstanceServer(config: ConnectConfig): Promise<in
return item;
},

startShutdown() {
shutdownExecutor() {
const msg: ClientToServer = { ty: "shutdown" };
portToServer.postMessage(msg);
},
Expand Down Expand Up @@ -259,10 +258,11 @@ export async function startInstanceServer(config: ServerConfig, initPortToClient
initPortToClient.close();

const state: {
// Always set except at the very beginning.
instance: instance.Instance | null,
connections: Map<number, Set<number>>,
acceptedJsonRpcResponses: Map<number, number>,
onShutdown?: (() => void),
onExecutorShutdownOrWasmPanic?: (() => void),
} = {
instance: null,
connections: new Map(),
Expand All @@ -277,25 +277,24 @@ export async function startInstanceServer(config: ServerConfig, initPortToClient
}
break;
}
case "executor-shutdown":
case "wasm-panic": {
state.instance = null;
state.connections.clear();
state.acceptedJsonRpcResponses.clear();
if (state.onShutdown)
state.onShutdown();
if (state.onExecutorShutdownOrWasmPanic) {
const cb = state.onExecutorShutdownOrWasmPanic;
delete state.onExecutorShutdownOrWasmPanic
cb();
}
break;
}
case "json-rpc-responses-non-empty": {
// Process this event asynchronously because we can't call into `instance`
// from within the events callback itself.
// TODO: do better than setTimeout?
setTimeout(() => {
if (!state.instance)
return;
const numAccepted = state.acceptedJsonRpcResponses.get(event.chainId)!;
if (numAccepted == 0)
return;
const response = state.instance.peekJsonRpcResponse(event.chainId);
const response = state.instance!.peekJsonRpcResponse(event.chainId);
if (response) {
state.acceptedJsonRpcResponses.set(event.chainId, numAccepted - 1);
const msg: ServerToClient = { ty: "json-rpc-response", chainId: event.chainId, response };
Expand All @@ -322,6 +321,10 @@ export async function startInstanceServer(config: ServerConfig, initPortToClient
portToClient.postMessage(ev);
};

// We create the `Promise` ahead of time in order to potentially catch potential `wasm-panic`
// events as early as during initialization.
const execFinishedPromise = new Promise<void>((resolve) => state.onExecutorShutdownOrWasmPanic = resolve);

state.instance = await instance.startLocalInstance({
forbidTcp,
forbidWs,
Expand All @@ -336,24 +339,21 @@ export async function startInstanceServer(config: ServerConfig, initPortToClient
portToClient.onmessage = (messageEvent) => {
const message = messageEvent.data as ClientToServer;

if (!state.instance)
return;

switch (message.ty) {
case "add-chain": {
state.instance.addChain(message.chainSpec, message.databaseContent, message.potentialRelayChains, message.disableJsonRpc);
state.instance!.addChain(message.chainSpec, message.databaseContent, message.potentialRelayChains, message.disableJsonRpc);
break;
}
case "remove-chain": {
state.instance.removeChain(message.chainId);
state.instance!.removeChain(message.chainId);
break;
}
case "request": {
state.instance.request(message.request, message.chainId); // TODO: return value unused
state.instance!.request(message.request, message.chainId); // TODO: return value unused
break;
}
case "accept-more-json-rpc-answers": {
const response = state.instance.peekJsonRpcResponse(message.chainId);
const response = state.instance!.peekJsonRpcResponse(message.chainId);
if (response) {
const msg: ServerToClient = { ty: "json-rpc-response", chainId: message.chainId, response };
portToClient.postMessage(msg);
Expand All @@ -364,24 +364,21 @@ export async function startInstanceServer(config: ServerConfig, initPortToClient
break;
}
case "shutdown": {
if (state.onShutdown)
state.onShutdown();
state.instance.startShutdown();
portToClient.close();
state.instance!.shutdownExecutor();
break;
}
case "connection-reset": {
// The connection might have been reset locally in the past.
if (!state.connections.has(message.connectionId))
return;
state.instance.connectionReset(message.connectionId, message.message);
state.instance!.connectionReset(message.connectionId, message.message);
break;
}
case "connection-opened": {
// The connection might have been reset locally in the past.
if (!state.connections.has(message.connectionId))
return;
state.instance.connectionOpened(message.connectionId, message.info);
state.instance!.connectionOpened(message.connectionId, message.info);
break;
}
case "stream-message": {
Expand All @@ -391,15 +388,15 @@ export async function startInstanceServer(config: ServerConfig, initPortToClient
// The stream might have been reset locally in the past.
if (message.streamId && !state.connections.get(message.connectionId)!.has(message.streamId))
return;
state.instance.streamMessage(message.connectionId, message.message, message.streamId);
state.instance!.streamMessage(message.connectionId, message.message, message.streamId);
break;
}
case "stream-opened": {
// The connection might have been reset locally in the past.
if (!state.connections.has(message.connectionId))
return;
state.connections.get(message.connectionId)!.add(message.streamId);
state.instance.streamOpened(message.connectionId, message.streamId, message.direction, message.initialWritableBytes);
state.instance!.streamOpened(message.connectionId, message.streamId, message.direction, message.initialWritableBytes);
break;
}
case "stream-writable-bytes": {
Expand All @@ -409,7 +406,7 @@ export async function startInstanceServer(config: ServerConfig, initPortToClient
// The stream might have been reset locally in the past.
if (message.streamId && !state.connections.get(message.connectionId)!.has(message.streamId))
return;
state.instance.streamWritableBytes(message.connectionId, message.numExtra, message.streamId);
state.instance!.streamWritableBytes(message.connectionId, message.numExtra, message.streamId);
break;
}
case "stream-reset": {
Expand All @@ -420,16 +417,13 @@ export async function startInstanceServer(config: ServerConfig, initPortToClient
if (message.streamId && !state.connections.get(message.connectionId)!.has(message.streamId))
return;
state.connections.get(message.connectionId)!.delete(message.streamId);
state.instance.streamReset(message.connectionId, message.streamId);
state.instance!.streamReset(message.connectionId, message.streamId);
break;
}
}
};

// The instance might already have crashed. Handle this situation.
if (!state.instance)
return Promise.resolve();
return new Promise((resolve) => state.onShutdown = resolve);
return execFinishedPromise;
}

type InitialMessage = {
Expand Down
Loading