Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the execution to avoid setTimeout #2999

Merged
merged 4 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ impl<TPlat: Platform> Background<TPlat> {

// We yield once between each request in order to politely let other tasks
// do some work and not monopolize the CPU.
crate::util::yield_twice().await;
TPlat::yield_after_cpu_intensive().await;
}
}
.boxed(),
Expand Down
6 changes: 6 additions & 0 deletions bin/light-base/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod async_std;
/// Access to a platform's capabilities.
pub trait Platform: Send + 'static {
type Delay: Future<Output = ()> + Unpin + Send + 'static;
type Yield: Future<Output = ()> + Unpin + Send + 'static;
type Instant: Clone
+ ops::Add<Duration, Output = Self::Instant>
+ ops::Sub<Self::Instant, Output = Duration>
Expand Down Expand Up @@ -71,6 +72,11 @@ pub trait Platform: Send + 'static {
/// Creates a future that becomes ready after the given instant has been reached.
fn sleep_until(when: Self::Instant) -> Self::Delay;

/// Should be called after a CPU-intensive operation in order to yield back control.
///
/// This function can be implemented as no-op on platforms where this is irrelevant.
fn yield_after_cpu_intensive() -> Self::Yield;

/// Starts a connection attempt to the given multiaddress.
///
/// The multiaddress is passed as a string. If the string can't be parsed, an error should be
Expand Down
6 changes: 6 additions & 0 deletions bin/light-base/src/platform/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct AsyncStdTcpWebSocket;
// TODO: this trait implementation was written before GATs were stable in Rust; now that the associated types have lifetimes, it should be possible to considerably simplify this code
impl Platform for AsyncStdTcpWebSocket {
type Delay = future::BoxFuture<'static, ()>;
type Yield = future::Ready<()>;
type Instant = std::time::Instant;
type Connection = std::convert::Infallible;
type Stream = Stream;
Expand Down Expand Up @@ -68,6 +69,11 @@ impl Platform for AsyncStdTcpWebSocket {
Self::sleep(duration)
}

fn yield_after_cpu_intensive() -> Self::Yield {
// No-op.
future::ready(())
}

fn connect(multiaddr: &str) -> Self::ConnectFuture {
// We simply copy the address to own it. We could be more zero-cost here, but doing so
// would considerably complicate the implementation.
Expand Down
10 changes: 6 additions & 4 deletions bin/light-base/src/runtime_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
existing_runtime
} else {
// No identical runtime was found. Try compiling the new runtime.
let runtime = SuccessfulRuntime::from_storage(&storage_code, &storage_heap_pages).await;
let runtime =
SuccessfulRuntime::from_storage::<TPlat>(&storage_code, &storage_heap_pages).await;
let runtime = Arc::new(Runtime {
heap_pages: storage_heap_pages,
runtime_code: storage_code,
Expand Down Expand Up @@ -1543,7 +1544,8 @@ impl<TPlat: Platform> Background<TPlat> {
let runtime = if let Some(existing_runtime) = existing_runtime {
existing_runtime
} else {
let runtime = SuccessfulRuntime::from_storage(&storage_code, &storage_heap_pages).await;
let runtime =
SuccessfulRuntime::from_storage::<TPlat>(&storage_code, &storage_heap_pages).await;
match &runtime {
Ok(runtime) => {
log::info!(
Expand Down Expand Up @@ -2013,12 +2015,12 @@ struct SuccessfulRuntime {
}

impl SuccessfulRuntime {
async fn from_storage(
async fn from_storage<TPlat: Platform>(
code: &Option<Vec<u8>>,
heap_pages: &Option<Vec<u8>>,
) -> Result<Self, RuntimeError> {
// Since compiling the runtime is a CPU-intensive operation, we yield once before.
crate::util::yield_twice().await;
TPlat::yield_after_cpu_intensive().await;

// Parameters for `HostVmPrototype::new`.
let module = code.as_ref().ok_or(RuntimeError::CodeNotFound)?;
Expand Down
8 changes: 3 additions & 5 deletions bin/light-base/src/sync_service/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,9 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
if has_done_verif {
queue_empty = false;

// Since JavaScript/Wasm is single-threaded, executing many CPU-heavy operations
// in a row would prevent all the other tasks in the background from running.
// In order to provide a better granularity, we force a yield after each
// verification.
crate::util::yield_twice().await;
// As explained in the documentation of `yield_after_cpu_intensive`, we should
// yield after a CPU-intensive operation. This helps provide a better granularity.
TPlat::yield_after_cpu_intensive().await;
}

queue_empty
Expand Down
29 changes: 0 additions & 29 deletions bin/light-base/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,6 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

/// Use in an asynchronous context to interrupt the current task execution and schedule it back.
/// Twice.
///
/// This function is useful in order to guarantee a fine granularity of tasks execution time in
/// situations where a CPU-heavy task is being performed.
///
/// We do not yield once, but twice.
/// The reason is that, at the time of writing, `FuturesUnordered` yields to the outside after
/// one of its futures has yielded twice. We use `FuturesUnordered` in the Wasm node.
/// Yielding to the outside is important in the context of the browser node because it gives
/// time to the browser to run its own events loop.
/// See <https://github.com/rust-lang/futures-rs/blob/7a98cf0bbeb397dcfaf5f020b371ab9e836d33d4/futures-util/src/stream/futures_unordered/mod.rs#L531>
/// See <https://github.com/rust-lang/futures-rs/issues/2053> for a discussion about a proper
/// solution.
// TODO: this is a complete hack ^
pub async fn yield_twice() {
let mut num_pending_remain = 2;
core::future::poll_fn(move |cx| {
if num_pending_remain > 0 {
num_pending_remain -= 1;
cx.waker().wake_by_ref();
core::task::Poll::Pending
} else {
core::task::Poll::Ready(())
}
})
.await
}

/// Iterator combinator. Truncates the given `char`-yielding iterator to the given number of
/// elements, and if the limit is reached adds a `…` at the end.
pub fn truncate_str_iter(
Expand Down
4 changes: 1 addition & 3 deletions bin/wasm-node/javascript/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,7 @@ export function start(options: ClientOptions, platformBindings: PlatformBindings
return Promise.reject(new AlreadyDestroyedError());
if (options.disableJsonRpc)
return Promise.reject(new JsonRpcDisabledError());
return new Promise((resolve, reject) => {
instance.nextJsonRpcResponse(chainId, resolve, reject)
});
return instance.nextJsonRpcResponse(chainId);
},
remove: () => {
if (alreadyDestroyedError)
Expand Down
5 changes: 5 additions & 0 deletions bin/wasm-node/javascript/src/index-browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ export function start(options?: ClientOptions): Client {
trustedBase64DecodeAndZlibInflate: (input) => {
return Promise.resolve(inflate(classicDecode(input)))
},
registerShouldPeriodicallyYield: (callback) => {
const wrappedCallback = () => callback(document.visibilityState === 'visible');
document.addEventListener('visibilitychange', wrappedCallback);
return [document.visibilityState === 'visible', () => { document.removeEventListener('visibilitychange', wrappedCallback) }]
},
performanceNow: () => {
return performance.now()
},
Expand Down
3 changes: 3 additions & 0 deletions bin/wasm-node/javascript/src/index-deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export function start(options?: ClientOptions): Client {
}
return concatenated;
},
registerShouldPeriodicallyYield: (_callback) => {
return [true, () => {}]
},
performanceNow: () => {
return performance.now()
},
Expand Down
3 changes: 3 additions & 0 deletions bin/wasm-node/javascript/src/index-nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ export function start(options?: ClientOptions): Client {
trustedBase64DecodeAndZlibInflate: (input) => {
return Promise.resolve(inflate(Buffer.from(input, 'base64')))
},
registerShouldPeriodicallyYield: (_callback) => {
return [true, () => {}]
},
performanceNow: () => {
return performance.now()
},
Expand Down
3 changes: 2 additions & 1 deletion bin/wasm-node/javascript/src/instance/bindings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
*/
export interface SmoldotWasmExports extends WebAssembly.Exports {
memory: WebAssembly.Memory,
init: (maxLogLevel: number, enableCurrentTask: number, cpuRateLimit: number) => void,
init: (maxLogLevel: number, enableCurrentTask: number, cpuRateLimit: number, periodicallyYield: number) => void,
set_periodically_yield: (periodicallyYield: number) => void,
start_shutdown: () => void,
alloc: (len: number) => number,
add_chain: (chainSpecPointer: number, chainSpecLen: number, databaseContentPointer: number, databaseContentLen: number, jsonRpcRunning: number, potentialRelayChainsPtr: number, potentialRelayChainsLen: number) => number;
Expand Down
108 changes: 44 additions & 64 deletions bin/wasm-node/javascript/src/instance/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export interface Config {

export interface Instance {
request: (request: string, chainId: number) => void
nextJsonRpcResponse: (chainId: number, resolve: (response: string) => void, reject: (error: Error) => void) => void
nextJsonRpcResponse: (chainId: number) => Promise<string>
addChain: (chainSpec: string, databaseContent: string, potentialRelayChains: number[], disableJsonRpc: boolean) => Promise<{ success: true, chainId: number } | { success: false, error: string }>
removeChain: (chainId: number) => void
startShutdown: () => void
Expand All @@ -76,7 +76,7 @@ export function start(configMessage: Config, platformBindings: instance.Platform
// - At initialization, it is a Promise containing the Wasm VM is still initializing.
// - After the Wasm VM has finished initialization, contains the `WebAssembly.Instance` object.
//
let state: { initialized: false, promise: Promise<SmoldotWasmInstance> } | { initialized: true, instance: SmoldotWasmInstance };
let state: { initialized: false, promise: Promise<SmoldotWasmInstance> } | { initialized: true, instance: SmoldotWasmInstance, unregisterCallback: () => void };

const crashError: { error?: CrashError } = {};

Expand Down Expand Up @@ -114,44 +114,10 @@ export function start(configMessage: Config, platformBindings: instance.Platform
configMessage.logCallback(level, target, message)
},
jsonRpcResponsesNonEmptyCallback: (chainId) => {
// We shouldn't call back into the Wasm virtual machine from a callback called by the virtual
// machine itself. For this reason, we setup a closure to be called immediately after.
const update = () => {
try {
if (!state.initialized)
throw new Error("Internal error");

const promises = chains.get(chainId)?.jsonRpcResponsesPromises;
if (!promises)
return;
const mem = new Uint8Array(state.instance.exports.memory.buffer);

// Immediately read all the elements of the queue and remove them.
// `json_rpc_responses_non_empty` is only guaranteed to be called if the queue is
// empty.
while (promises.length !== 0) {
const responseInfo = state.instance.exports.json_rpc_responses_peek(chainId) >>> 0;
const ptr = buffer.readUInt32LE(mem, responseInfo) >>> 0;
const len = buffer.readUInt32LE(mem, responseInfo + 4) >>> 0;
// `len === 0` means "queue is empty" according to the API.
if (len === 0)
break;

const message = buffer.utf8BytesToString(mem, ptr, len);
state.instance.exports.json_rpc_responses_pop(chainId);
promises.shift()!.resolve(message);
}

} catch(_error) {}
};

// In browsers, `setTimeout` works as expected when `ms` equals 0. However, NodeJS requires
// a minimum of 1 millisecond (if `0` is passed, it is automatically replaced with `1`) and
// wants you to use `setImmediate` instead.
if (typeof setImmediate === "function") {
setImmediate(update)
} else {
setTimeout(update, 0)
// Notify every single promise found in `jsonRpcResponsesPromises`.
const promises = chains.get(chainId)!.jsonRpcResponsesPromises;
while (promises.length !== 0) {
promises.shift()!.resolve();
}
},
currentTaskCallback: (taskName) => {
Expand All @@ -173,9 +139,16 @@ export function start(configMessage: Config, platformBindings: instance.Platform

// Smoldot requires an initial call to the `init` function in order to do its internal
// configuration.
instance.exports.init(configMessage.maxLogLevel, configMessage.enableCurrentTask ? 1 : 0, cpuRateLimit);
const [periodicallyYield, unregisterCallback] = platformBindings.registerShouldPeriodicallyYield((newValue) => {
if (state.initialized && !crashError.error) {
try {
state.instance.exports.set_periodically_yield(newValue ? 1 : 0)
} catch(_error) {}
}
});
instance.exports.init(configMessage.maxLogLevel, configMessage.enableCurrentTask ? 1 : 0, cpuRateLimit, periodicallyYield ? 1 : 0);

state = { initialized: true, instance };
state = { initialized: true, instance, unregisterCallback };
return instance;
})
};
Expand Down Expand Up @@ -223,35 +196,40 @@ export function start(configMessage: Config, platformBindings: instance.Platform
}
},

nextJsonRpcResponse: (chainId: number, resolve: (response: string) => void, reject: (error: Error) => void) => {
nextJsonRpcResponse: async (chainId: number): Promise<string> => {
// Because `nextJsonRpcResponse` is passed as parameter an identifier returned by `addChain`,
// it is always the case that the Wasm instance is already initialized. The only possibility
// for it to not be the case is if the user completely invented the `chainId`.
if (!state.initialized)
throw new Error("Internal error");
if (crashError.error)
throw crashError.error;

try {
const mem = new Uint8Array(state.instance.exports.memory.buffer);
const responseInfo = state.instance.exports.json_rpc_responses_peek(chainId) >>> 0;
const ptr = buffer.readUInt32LE(mem, responseInfo) >>> 0;
const len = buffer.readUInt32LE(mem, responseInfo + 4) >>> 0;

// `len === 0` means "queue is empty" according to the API.
// In that situation, queue the resolve/reject.
if (len === 0) {
chains.get(chainId)!.jsonRpcResponsesPromises.push({ resolve, reject })
return;
}
while (true) {
if (crashError.error)
throw crashError.error;

const message = buffer.utf8BytesToString(mem, ptr, len);
resolve(message);
// Try to pop a message from the queue.
try {
const mem = new Uint8Array(state.instance.exports.memory.buffer);
const responseInfo = state.instance.exports.json_rpc_responses_peek(chainId) >>> 0;
const ptr = buffer.readUInt32LE(mem, responseInfo) >>> 0;
const len = buffer.readUInt32LE(mem, responseInfo + 4) >>> 0;

// `len === 0` means "queue is empty" according to the API.
// In that situation, queue the resolve/reject.
if (len !== 0) {
const message = buffer.utf8BytesToString(mem, ptr, len);
state.instance.exports.json_rpc_responses_pop(chainId);
return message;
}
} catch (_error) {
console.assert(crashError.error);
throw crashError.error
}

state.instance.exports.json_rpc_responses_pop(chainId);
} catch (_error) {
console.assert(crashError.error);
throw crashError.error
// If no message is available, wait for one to be.
await new Promise((resolve, reject) => {
chains.get(chainId)!.jsonRpcResponsesPromises.push({ resolve: () => resolve(undefined), reject })
});
}
},

Expand Down Expand Up @@ -345,6 +323,8 @@ export function start(configMessage: Config, platformBindings: instance.Platform
// exception when the user wants the shutdown to happen.
if (crashError.error)
return;
if (state.initialized)
state.unregisterCallback();
try {
printError.printError = false
instance.exports.start_shutdown()
Expand All @@ -357,6 +337,6 @@ export function start(configMessage: Config, platformBindings: instance.Platform
}

interface JsonRpcResponsesPromise {
resolve: (data: string) => void,
resolve: () => void,
reject: (error: Error) => void,
}
13 changes: 13 additions & 0 deletions bin/wasm-node/javascript/src/instance/raw-instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ export interface PlatformBindings {
*/
getRandomValues: (buffer: Uint8Array) => void,

/**
* The "should periodically yield" setting indicates whether the execution should periodically
* yield back control by using `setTimeout(..., 0)`.
*
* On platforms such as browsers where `setTimeout(..., 0)` takes way more than 0 milliseconds,
* this setting should be set to `false`.
*
* This function registers a callback that is called when the setting should be updated with
* a new value. It returns the initial value of the setting and a function used to unregister
* the callback.
*/
registerShouldPeriodicallyYield: (callback: (newValue: boolean) => void) => [boolean, () => void],

/**
* Tries to open a new connection using the given configuration.
*
Expand Down
Loading