Skip to content

Commit

Permalink
Refactor the execution to avoid setTimeout (#2999)
Browse files Browse the repository at this point in the history
Close #2996

This PR does many things:

- Remove the `crate::util::yield_twice` function in favor of a
platform-specific function `TPlat::yield_after_cpu_intensive`. This way,
the implementation of the yielding, which is a bit hacky due to the
requirements of the Wasm node, is specific to the Wasm node.

- Refactor the `nextJsonRpcResponse` function and the
`jsonRpcResponsesNonEmptyCallback` callback. Right now, when we wait for
a JSON-RPC response to come, we wait for smoldot to notify that it has a
response available. But because this notification is done through a
callback, we can't immediately query the content of the response, so
instead we use `setTimeout(..., 0)` to register a task that queries the
content of the response and then notify the `Promise` that was waiting.
Now we instead immediately notify a `Promise`, and this `Promise` then
queries the content of the response as an aftermath.

- In the JS code, add a `registerShouldPeriodicallyYield` function whose
implementation is "platform-specific" (browsers, NodeJS, Deno). This
function controls a new setting called "should periodically yield". In
NodeJS and Deno, this is always `false`. In the browser, this is `false`
if and only if the page is in the background and `true` if it is in the
foreground. Passing `false` reduces the usage of `setTimeout`, while
passing `true` promotes the usage of `setTimeout` to avoid blocking the
browser.

- In the JS <-> Rust bindings, add a `periodically_yield` parameter to
`init` and add a new `set_periodically_yield` function. See above point.

- Refactor the way the execution is done. Currently we spawn at
initialization a task (using `setTimeout(..., 0)`) that runs everything.
When something needs to be executed, this task wakes up by calling
`setTimeout(..., 0)` again. After this PR, there is no "background task"
anymore. Instead, the `Future` that drives everything is now put in
`CLIENT` variable, and every single binding function calls a new
`advance_execution` function before returning. `advance_execution` polls
that `Future` and, depending on the `periodically_yield` setting, either
executes everything until there is nothing more to do, or executes until
the first yield.

All these changes eliminate the usage of `setTimeout` to the strict
minimum, which is when we actually need a timer.
  • Loading branch information
tomaka authored Nov 14, 2022
1 parent a5d6e00 commit 5b30f5e
Show file tree
Hide file tree
Showing 18 changed files with 298 additions and 210 deletions.
2 changes: 1 addition & 1 deletion bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ impl<TPlat: Platform> Background<TPlat> {

// We yield once between each request in order to politely let other tasks
// do some work and not monopolize the CPU.
crate::util::yield_twice().await;
TPlat::yield_after_cpu_intensive().await;
}
}
.boxed(),
Expand Down
6 changes: 6 additions & 0 deletions bin/light-base/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod async_std;
/// Access to a platform's capabilities.
pub trait Platform: Send + 'static {
type Delay: Future<Output = ()> + Unpin + Send + 'static;
type Yield: Future<Output = ()> + Unpin + Send + 'static;
type Instant: Clone
+ ops::Add<Duration, Output = Self::Instant>
+ ops::Sub<Self::Instant, Output = Duration>
Expand Down Expand Up @@ -71,6 +72,11 @@ pub trait Platform: Send + 'static {
/// Creates a future that becomes ready after the given instant has been reached.
fn sleep_until(when: Self::Instant) -> Self::Delay;

/// Should be called after a CPU-intensive operation in order to yield back control.
///
/// This function can be implemented as no-op on platforms where this is irrelevant.
fn yield_after_cpu_intensive() -> Self::Yield;

/// Starts a connection attempt to the given multiaddress.
///
/// The multiaddress is passed as a string. If the string can't be parsed, an error should be
Expand Down
6 changes: 6 additions & 0 deletions bin/light-base/src/platform/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct AsyncStdTcpWebSocket;
// TODO: this trait implementation was written before GATs were stable in Rust; now that the associated types have lifetimes, it should be possible to considerably simplify this code
impl Platform for AsyncStdTcpWebSocket {
type Delay = future::BoxFuture<'static, ()>;
type Yield = future::Ready<()>;
type Instant = std::time::Instant;
type Connection = std::convert::Infallible;
type Stream = Stream;
Expand Down Expand Up @@ -68,6 +69,11 @@ impl Platform for AsyncStdTcpWebSocket {
Self::sleep(duration)
}

fn yield_after_cpu_intensive() -> Self::Yield {
// No-op.
future::ready(())
}

fn connect(multiaddr: &str) -> Self::ConnectFuture {
// We simply copy the address to own it. We could be more zero-cost here, but doing so
// would considerably complicate the implementation.
Expand Down
10 changes: 6 additions & 4 deletions bin/light-base/src/runtime_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
existing_runtime
} else {
// No identical runtime was found. Try compiling the new runtime.
let runtime = SuccessfulRuntime::from_storage(&storage_code, &storage_heap_pages).await;
let runtime =
SuccessfulRuntime::from_storage::<TPlat>(&storage_code, &storage_heap_pages).await;
let runtime = Arc::new(Runtime {
heap_pages: storage_heap_pages,
runtime_code: storage_code,
Expand Down Expand Up @@ -1543,7 +1544,8 @@ impl<TPlat: Platform> Background<TPlat> {
let runtime = if let Some(existing_runtime) = existing_runtime {
existing_runtime
} else {
let runtime = SuccessfulRuntime::from_storage(&storage_code, &storage_heap_pages).await;
let runtime =
SuccessfulRuntime::from_storage::<TPlat>(&storage_code, &storage_heap_pages).await;
match &runtime {
Ok(runtime) => {
log::info!(
Expand Down Expand Up @@ -2013,12 +2015,12 @@ struct SuccessfulRuntime {
}

impl SuccessfulRuntime {
async fn from_storage(
async fn from_storage<TPlat: Platform>(
code: &Option<Vec<u8>>,
heap_pages: &Option<Vec<u8>>,
) -> Result<Self, RuntimeError> {
// Since compiling the runtime is a CPU-intensive operation, we yield once before.
crate::util::yield_twice().await;
TPlat::yield_after_cpu_intensive().await;

// Parameters for `HostVmPrototype::new`.
let module = code.as_ref().ok_or(RuntimeError::CodeNotFound)?;
Expand Down
8 changes: 3 additions & 5 deletions bin/light-base/src/sync_service/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,9 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
if has_done_verif {
queue_empty = false;

// Since JavaScript/Wasm is single-threaded, executing many CPU-heavy operations
// in a row would prevent all the other tasks in the background from running.
// In order to provide a better granularity, we force a yield after each
// verification.
crate::util::yield_twice().await;
// As explained in the documentation of `yield_after_cpu_intensive`, we should
// yield after a CPU-intensive operation. This helps provide a better granularity.
TPlat::yield_after_cpu_intensive().await;
}

queue_empty
Expand Down
29 changes: 0 additions & 29 deletions bin/light-base/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

/// Use in an asynchronous context to interrupt the current task execution and schedule it back.
/// Twice.
///
/// This function is useful in order to guarantee a fine granularity of tasks execution time in
/// situations where a CPU-heavy task is being performed.
///
/// We do not yield once, but twice.
/// The reason is that, at the time of writing, `FuturesUnordered` yields to the outside after
/// one of its futures has yielded twice. We use `FuturesUnordered` in the Wasm node.
/// Yielding to the outside is important in the context of the browser node because it gives
/// time to the browser to run its own events loop.
/// See <https://github.com/rust-lang/futures-rs/blob/7a98cf0bbeb397dcfaf5f020b371ab9e836d33d4/futures-util/src/stream/futures_unordered/mod.rs#L531>
/// See <https://github.com/rust-lang/futures-rs/issues/2053> for a discussion about a proper
/// solution.
// TODO: this is a complete hack ^
pub async fn yield_twice() {
let mut num_pending_remain = 2;
core::future::poll_fn(move |cx| {
if num_pending_remain > 0 {
num_pending_remain -= 1;
cx.waker().wake_by_ref();
core::task::Poll::Pending
} else {
core::task::Poll::Ready(())
}
})
.await
}

/// Iterator combinator. Truncates the given `char`-yielding iterator to the given number of
/// elements, and if the limit is reached adds a `…` at the end.
pub fn truncate_str_iter(
Expand Down
4 changes: 4 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Changed

- In earlier versions of smoldot, `setTimeout(callback, 0)` was frequently used in order split execution of CPU-intensive tasks in multiple smaller ones while still giving back control to the execution environment (such as NodeJS or the browser). Unfortunately, when a web page is in the background, browsers set a minimum delay of one second for `setTimeout`. For this reason, the usage of ̀`setTimeout` has now been reduced to the strict minimum, except when the environment is browser and `document.visibilityState` is equal to `visible`. ([#2999](https://github.com/paritytech/smoldot/pull/2999))

## 0.7.7 - 2022-11-11

### Added
Expand Down
4 changes: 1 addition & 3 deletions bin/wasm-node/javascript/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,7 @@ export function start(options: ClientOptions, platformBindings: PlatformBindings
return Promise.reject(new AlreadyDestroyedError());
if (options.disableJsonRpc)
return Promise.reject(new JsonRpcDisabledError());
return new Promise((resolve, reject) => {
instance.nextJsonRpcResponse(chainId, resolve, reject)
});
return instance.nextJsonRpcResponse(chainId);
},
remove: () => {
if (alreadyDestroyedError)
Expand Down
5 changes: 5 additions & 0 deletions bin/wasm-node/javascript/src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ export function start(options?: ClientOptions): Client {
trustedBase64DecodeAndZlibInflate: (input) => {
return Promise.resolve(inflate(classicDecode(input)))
},
registerShouldPeriodicallyYield: (callback) => {
const wrappedCallback = () => callback(document.visibilityState === 'visible');
document.addEventListener('visibilitychange', wrappedCallback);
return [document.visibilityState === 'visible', () => { document.removeEventListener('visibilitychange', wrappedCallback) }]
},
performanceNow: () => {
return performance.now()
},
Expand Down
3 changes: 3 additions & 0 deletions bin/wasm-node/javascript/src/index-deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export function start(options?: ClientOptions): Client {
}
return concatenated;
},
registerShouldPeriodicallyYield: (_callback) => {
return [true, () => {}]
},
performanceNow: () => {
return performance.now()
},
Expand Down
3 changes: 3 additions & 0 deletions bin/wasm-node/javascript/src/index-nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ export function start(options?: ClientOptions): Client {
trustedBase64DecodeAndZlibInflate: (input) => {
return Promise.resolve(inflate(Buffer.from(input, 'base64')))
},
registerShouldPeriodicallyYield: (_callback) => {
return [true, () => {}]
},
performanceNow: () => {
return performance.now()
},
Expand Down
3 changes: 2 additions & 1 deletion bin/wasm-node/javascript/src/instance/bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
*/
export interface SmoldotWasmExports extends WebAssembly.Exports {
memory: WebAssembly.Memory,
init: (maxLogLevel: number, enableCurrentTask: number, cpuRateLimit: number) => void,
init: (maxLogLevel: number, enableCurrentTask: number, cpuRateLimit: number, periodicallyYield: number) => void,
set_periodically_yield: (periodicallyYield: number) => void,
start_shutdown: () => void,
alloc: (len: number) => number,
add_chain: (chainSpecPointer: number, chainSpecLen: number, databaseContentPointer: number, databaseContentLen: number, jsonRpcRunning: number, potentialRelayChainsPtr: number, potentialRelayChainsLen: number) => number;
Expand Down
108 changes: 44 additions & 64 deletions bin/wasm-node/javascript/src/instance/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export interface Config {

export interface Instance {
request: (request: string, chainId: number) => void
nextJsonRpcResponse: (chainId: number, resolve: (response: string) => void, reject: (error: Error) => void) => void
nextJsonRpcResponse: (chainId: number) => Promise<string>
addChain: (chainSpec: string, databaseContent: string, potentialRelayChains: number[], disableJsonRpc: boolean) => Promise<{ success: true, chainId: number } | { success: false, error: string }>
removeChain: (chainId: number) => void
startShutdown: () => void
Expand All @@ -76,7 +76,7 @@ export function start(configMessage: Config, platformBindings: instance.Platform
// - At initialization, it is a Promise containing the Wasm VM is still initializing.
// - After the Wasm VM has finished initialization, contains the `WebAssembly.Instance` object.
//
let state: { initialized: false, promise: Promise<SmoldotWasmInstance> } | { initialized: true, instance: SmoldotWasmInstance };
let state: { initialized: false, promise: Promise<SmoldotWasmInstance> } | { initialized: true, instance: SmoldotWasmInstance, unregisterCallback: () => void };

const crashError: { error?: CrashError } = {};

Expand Down Expand Up @@ -114,44 +114,10 @@ export function start(configMessage: Config, platformBindings: instance.Platform
configMessage.logCallback(level, target, message)
},
jsonRpcResponsesNonEmptyCallback: (chainId) => {
// We shouldn't call back into the Wasm virtual machine from a callback called by the virtual
// machine itself. For this reason, we setup a closure to be called immediately after.
const update = () => {
try {
if (!state.initialized)
throw new Error("Internal error");

const promises = chains.get(chainId)?.jsonRpcResponsesPromises;
if (!promises)
return;
const mem = new Uint8Array(state.instance.exports.memory.buffer);

// Immediately read all the elements of the queue and remove them.
// `json_rpc_responses_non_empty` is only guaranteed to be called if the queue is
// empty.
while (promises.length !== 0) {
const responseInfo = state.instance.exports.json_rpc_responses_peek(chainId) >>> 0;
const ptr = buffer.readUInt32LE(mem, responseInfo) >>> 0;
const len = buffer.readUInt32LE(mem, responseInfo + 4) >>> 0;
// `len === 0` means "queue is empty" according to the API.
if (len === 0)
break;

const message = buffer.utf8BytesToString(mem, ptr, len);
state.instance.exports.json_rpc_responses_pop(chainId);
promises.shift()!.resolve(message);
}

} catch(_error) {}
};

// In browsers, `setTimeout` works as expected when `ms` equals 0. However, NodeJS requires
// a minimum of 1 millisecond (if `0` is passed, it is automatically replaced with `1`) and
// wants you to use `setImmediate` instead.
if (typeof setImmediate === "function") {
setImmediate(update)
} else {
setTimeout(update, 0)
// Notify every single promise found in `jsonRpcResponsesPromises`.
const promises = chains.get(chainId)!.jsonRpcResponsesPromises;
while (promises.length !== 0) {
promises.shift()!.resolve();
}
},
currentTaskCallback: (taskName) => {
Expand All @@ -173,9 +139,16 @@ export function start(configMessage: Config, platformBindings: instance.Platform

// Smoldot requires an initial call to the `init` function in order to do its internal
// configuration.
instance.exports.init(configMessage.maxLogLevel, configMessage.enableCurrentTask ? 1 : 0, cpuRateLimit);
const [periodicallyYield, unregisterCallback] = platformBindings.registerShouldPeriodicallyYield((newValue) => {
if (state.initialized && !crashError.error) {
try {
state.instance.exports.set_periodically_yield(newValue ? 1 : 0)
} catch(_error) {}
}
});
instance.exports.init(configMessage.maxLogLevel, configMessage.enableCurrentTask ? 1 : 0, cpuRateLimit, periodicallyYield ? 1 : 0);

state = { initialized: true, instance };
state = { initialized: true, instance, unregisterCallback };
return instance;
})
};
Expand Down Expand Up @@ -223,35 +196,40 @@ export function start(configMessage: Config, platformBindings: instance.Platform
}
},

nextJsonRpcResponse: (chainId: number, resolve: (response: string) => void, reject: (error: Error) => void) => {
nextJsonRpcResponse: async (chainId: number): Promise<string> => {
// Because `nextJsonRpcResponse` is passed as parameter an identifier returned by `addChain`,
// it is always the case that the Wasm instance is already initialized. The only possibility
// for it to not be the case is if the user completely invented the `chainId`.
if (!state.initialized)
throw new Error("Internal error");
if (crashError.error)
throw crashError.error;

try {
const mem = new Uint8Array(state.instance.exports.memory.buffer);
const responseInfo = state.instance.exports.json_rpc_responses_peek(chainId) >>> 0;
const ptr = buffer.readUInt32LE(mem, responseInfo) >>> 0;
const len = buffer.readUInt32LE(mem, responseInfo + 4) >>> 0;

// `len === 0` means "queue is empty" according to the API.
// In that situation, queue the resolve/reject.
if (len === 0) {
chains.get(chainId)!.jsonRpcResponsesPromises.push({ resolve, reject })
return;
}
while (true) {
if (crashError.error)
throw crashError.error;

const message = buffer.utf8BytesToString(mem, ptr, len);
resolve(message);
// Try to pop a message from the queue.
try {
const mem = new Uint8Array(state.instance.exports.memory.buffer);
const responseInfo = state.instance.exports.json_rpc_responses_peek(chainId) >>> 0;
const ptr = buffer.readUInt32LE(mem, responseInfo) >>> 0;
const len = buffer.readUInt32LE(mem, responseInfo + 4) >>> 0;

// `len === 0` means "queue is empty" according to the API.
// In that situation, queue the resolve/reject.
if (len !== 0) {
const message = buffer.utf8BytesToString(mem, ptr, len);
state.instance.exports.json_rpc_responses_pop(chainId);
return message;
}
} catch (_error) {
console.assert(crashError.error);
throw crashError.error
}

state.instance.exports.json_rpc_responses_pop(chainId);
} catch (_error) {
console.assert(crashError.error);
throw crashError.error
// If no message is available, wait for one to be.
await new Promise((resolve, reject) => {
chains.get(chainId)!.jsonRpcResponsesPromises.push({ resolve: () => resolve(undefined), reject })
});
}
},

Expand Down Expand Up @@ -345,6 +323,8 @@ export function start(configMessage: Config, platformBindings: instance.Platform
// exception when the user wants the shutdown to happen.
if (crashError.error)
return;
if (state.initialized)
state.unregisterCallback();
try {
printError.printError = false
instance.exports.start_shutdown()
Expand All @@ -357,6 +337,6 @@ export function start(configMessage: Config, platformBindings: instance.Platform
}

interface JsonRpcResponsesPromise {
resolve: (data: string) => void,
resolve: () => void,
reject: (error: Error) => void,
}
13 changes: 13 additions & 0 deletions bin/wasm-node/javascript/src/instance/raw-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ export interface PlatformBindings {
*/
getRandomValues: (buffer: Uint8Array) => void,

/**
* The "should periodically yield" setting indicates whether the execution should periodically
* yield back control by using `setTimeout(..., 0)`.
*
* On platforms such as browsers where `setTimeout(..., 0)` takes way more than 0 milliseconds,
* this setting should be set to `false`.
*
* This function registers a callback that is called when the setting should be updated with
* a new value. It returns the initial value of the setting and a function used to unregister
* the callback.
*/
registerShouldPeriodicallyYield: (callback: (newValue: boolean) => void) => [boolean, () => void],

/**
* Tries to open a new connection using the given configuration.
*
Expand Down
Loading

0 comments on commit 5b30f5e

Please sign in to comment.