Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: JSON-RPC responses async iterable iterator #1937

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Added

- Add `jsonRpcResponses` async iterable iterator to `Chain`, as a more convenient alternative to the `nextJsonRpcResponse` function. ([#1937](https://github.com/smol-dot/smoldot/pull/1937))

### Fixed

- Fix potential panic in parachain syncing code. ([#1912](https://github.com/smol-dot/smoldot/pull/1912))
Expand Down
3 changes: 1 addition & 2 deletions wasm-node/javascript/demo/demo-deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ while(true) {

(async () => {
try {
while(true) {
const response = await chain.nextJsonRpcResponse();
for await (const response of chain.jsonRpcResponses) {
socket.send(response);
}
} catch(_error) {}
Expand Down
6 changes: 2 additions & 4 deletions wasm-node/javascript/demo/demo.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ wsServer.on('connection', function (connection, request) {

(async () => {
try {
while(true) {
const response = await para.nextJsonRpcResponse();
for await (const response of para.jsonRpcResponses) {
connection.send(response);
}
} catch(_error) {}
Expand All @@ -177,8 +176,7 @@ wsServer.on('connection', function (connection, request) {

(async () => {
try {
while(true) {
const response = await relay.nextJsonRpcResponse();
for await (const response of relay.jsonRpcResponses) {
connection.send(response);
}
} catch(_error) {}
Expand Down
50 changes: 32 additions & 18 deletions wasm-node/javascript/src/internals/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -552,27 +552,41 @@ export function start(options: ClientOptions, wasmModule: SmoldotBytecode | Prom
default: throw new Error("Internal error: unknown json_rpc_send error code: " + retVal)
}
},
jsonRpcResponses: {
next: async () => {
while (true) {
if (!state.chains.has(chainId))
return { done: true, value: undefined };
if (options.disableJsonRpc)
throw new JsonRpcDisabledError();
if (state.instance.status === "destroyed")
throw state.instance.error;
if (state.instance.status !== "ready")
throw new Error(); // Internal error. Never supposed to happen.

// Try to pop a message from the queue.
const message = state.instance.instance.peekJsonRpcResponse(chainId);
if (message)
return { done: false, value: message };

// If no message is available, wait for one to be.
await new Promise<void>((resolve) => {
state.chains.get(chainId)!.jsonRpcResponsesPromises.push(resolve)
});
}
},
[Symbol.asyncIterator]() {
return this
}
},
nextJsonRpcResponse: async () => {
while (true) {
if (!state.chains.has(chainId))
throw new AlreadyDestroyedError();
if (options.disableJsonRpc)
return Promise.reject(new JsonRpcDisabledError());
if (state.instance.status === "destroyed")
throw state.instance.error;
if (state.instance.status !== "ready")
throw new Error(); // Internal error. Never supposed to happen.
const result = await newChain.jsonRpcResponses.next();

// Try to pop a message from the queue.
const message = state.instance.instance.peekJsonRpcResponse(chainId);
if (message)
return message;

// If no message is available, wait for one to be.
await new Promise<void>((resolve) => {
state.chains.get(chainId)!.jsonRpcResponsesPromises.push(resolve)
});
if (result.done) {
throw new AlreadyDestroyedError();
}

return result.value;
},
remove: () => {
if (state.instance.status === "destroyed")
Expand Down
14 changes: 14 additions & 0 deletions wasm-node/javascript/src/public-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,20 @@ export interface Chain {
*/
nextJsonRpcResponse(): Promise<string>;

/**
* JSON-RPC responses or notifications async iterable.
*
* Each chain contains a buffer of the responses waiting to be sent out. Iterating over this
* pulls one element from the buffer. If the iteration happen at a slower rate than
* responses are generated, then the buffer will eventually become full, at which point calling
* {@link Chain.sendJsonRpc} will throw an exception. The size of this buffer can be configured
* through {@link AddChainOptions.jsonRpcMaxPendingRequests}.
*
* @throws {@link JsonRpcDisabledError} If the JSON-RPC system was disabled in the options of the chain.
* @throws {@link CrashError} If the background client has crashed.
*/
readonly jsonRpcResponses: AsyncIterableIterator<string>

/**
* Disconnects from the blockchain.
*
Expand Down
Loading