Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework runtime service to directly perform runtime calls #1570

Merged
merged 24 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 40 additions & 188 deletions light-base/src/json_rpc_service/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use core::{
};
use futures_channel::oneshot;
use smoldot::{
executor::{host, runtime_call},
json_rpc::{self, methods, service},
libp2p::{multiaddr, PeerId},
};
Expand Down Expand Up @@ -83,7 +82,7 @@ struct Background<TPlat: PlatformRef> {

/// Channel where to send requests that concern the legacy JSON-RPC API that are handled by
/// a dedicated task.
to_legacy: Mutex<async_channel::Sender<legacy_state_sub::Message<TPlat>>>,
to_legacy: Mutex<async_channel::Sender<legacy_state_sub::Message>>,

/// When `state_getKeysPaged` is called and the response is truncated, the response is
/// inserted in this cache. The API user is likely to call `state_getKeysPaged` again with
Expand Down Expand Up @@ -776,28 +775,29 @@ impl<TPlat: PlatformRef> Background<TPlat> {
Ok(result)
}

/// Obtain a lock to the runtime of the given block against the runtime service.
/// Obtain a pin of the runtime of the given block against the runtime service, plus the
/// block hash and number.
// TODO: return better error?
async fn runtime_access(
async fn pinned_runtime_and_block_info(
self: &Arc<Self>,
block_hash: &[u8; 32],
) -> Result<runtime_service::RuntimeAccess<TPlat>, RuntimeCallError> {
) -> Result<(runtime_service::PinnedRuntime, [u8; 32], u64), RuntimeCallError> {
// Try to find the block in the cache of recent blocks. Most of the time, the call target
// should be in there.
if let Some(runtime_access) = {
if let Some((pinned_runtime, state_trie_root_hash, block_number)) = {
let (tx, rx) = oneshot::channel();
self.to_legacy
.lock()
.await
.send(legacy_state_sub::Message::RecentBlockRuntimeAccess {
.send(legacy_state_sub::Message::RecentBlockPinnedRuntime {
block_hash: *block_hash,
result_tx: tx,
})
.await
.unwrap();
rx.await.unwrap()
} {
return Ok(runtime_access);
return Ok((pinned_runtime, state_trie_root_hash, block_number));
};

// Second situation: the block is not in the cache of recent blocks. This isn't great.
Expand Down Expand Up @@ -857,8 +857,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
NonZeroU32::new(1).unwrap(),
)
.await
.map_err(runtime_service::RuntimeCallError::StorageQuery)
.map_err(RuntimeCallError::Call)?;
.map_err(RuntimeCallError::StorageQuery)?;
// TODO: not elegant
let heap_pages = entries
.iter()
Expand Down Expand Up @@ -911,7 +910,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {

// Give the code and heap pages to the runtime service. The runtime service will
// try to find any similar runtime it might have, and if not will compile it.
let pinned_runtime_id = self
let pinned_runtime = self
.runtime_service
.compile_and_pin_runtime(
storage_code,
Expand All @@ -922,42 +921,30 @@ impl<TPlat: PlatformRef> Background<TPlat> {
.await
.map_err(|err| match err {
runtime_service::CompileAndPinRuntimeError::Crash => {
RuntimeCallError::RuntimeServiceCrash
RuntimeCallError::Call(runtime_service::RuntimeCallError::Crash)
}
})?;

let precall = self
.runtime_service
.pinned_runtime_access(
pinned_runtime_id.clone(),
*block_hash,
block_number,
state_trie_root_hash,
)
.await;

// TODO: consider keeping pinned runtimes in a cache instead
self.runtime_service.unpin_runtime(pinned_runtime_id).await;

Ok(precall)
Ok((pinned_runtime, state_trie_root_hash, block_number))
}

/// Performs a runtime call to a random block.
async fn runtime_call(
self: &Arc<Self>,
block_hash: &[u8; 32],
runtime_api: &str,
required_api_version_range: impl ops::RangeBounds<u32>,
function_to_call: &str,
call_parameters: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
runtime_api: String,
required_api_version: ops::RangeInclusive<u32>,
function_to_call: String,
call_parameters: Vec<u8>,
total_attempts: u32,
timeout_per_request: Duration,
max_parallel: NonZeroU32,
) -> Result<RuntimeCallResult, RuntimeCallError> {
let (return_value, api_version) = self
.runtime_call_inner(
block_hash,
Some((runtime_api, required_api_version_range)),
Some((runtime_api, required_api_version)),
function_to_call,
call_parameters,
total_attempts,
Expand All @@ -977,16 +964,16 @@ impl<TPlat: PlatformRef> Background<TPlat> {
async fn runtime_call_no_api_check(
self: &Arc<Self>,
block_hash: &[u8; 32],
function_to_call: &str,
call_parameters: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
function_to_call: String,
call_parameters: Vec<u8>,
total_attempts: u32,
timeout_per_request: Duration,
max_parallel: NonZeroU32,
) -> Result<Vec<u8>, RuntimeCallError> {
let (return_value, _api_version) = self
.runtime_call_inner(
block_hash,
None::<(&str, ops::RangeFull)>,
None,
function_to_call,
call_parameters,
total_attempts,
Expand All @@ -1002,157 +989,37 @@ impl<TPlat: PlatformRef> Background<TPlat> {
async fn runtime_call_inner(
self: &Arc<Self>,
block_hash: &[u8; 32],
runtime_api_check: Option<(&str, impl ops::RangeBounds<u32>)>,
function_to_call: &str,
call_parameters: impl Iterator<Item = impl AsRef<[u8]>> + Clone,
runtime_api_check: Option<(String, ops::RangeInclusive<u32>)>,
function_to_call: String,
parameters_vectored: Vec<u8>,
total_attempts: u32,
timeout_per_request: Duration,
max_parallel: NonZeroU32,
) -> Result<(Vec<u8>, Option<u32>), RuntimeCallError> {
// This function contains two steps: obtaining the runtime of the block in question,
// then performing the actual call. The first step is the longest and most difficult.
let precall = self.runtime_access(block_hash).await?;
let (pinned_runtime, block_state_trie_root_hash, block_number) =
self.pinned_runtime_and_block_info(block_hash).await?;

let (runtime_call_lock, virtual_machine) = precall
.start(
match self
.runtime_service
.runtime_call(
pinned_runtime,
*block_hash,
block_number,
block_state_trie_root_hash,
function_to_call,
call_parameters.clone(),
runtime_api_check,
parameters_vectored,
total_attempts,
timeout_per_request,
max_parallel,
)
.await
.unwrap(); // TODO: don't unwrap

// Check that the runtime version is correct.
let runtime_api_version = if let Some((api_name, version_range)) = runtime_api_check {
let version = virtual_machine
.runtime_version()
.decode()
.apis
.find_version(api_name);
match version {
None => {
runtime_call_lock.unlock(virtual_machine);
return Err(RuntimeCallError::ApiNotFound);
}
Some(v) if version_range.contains(&v) => Some(v),
Some(v) => {
runtime_call_lock.unlock(virtual_machine);
return Err(RuntimeCallError::ApiVersionUnknown { actual_version: v });
}
}
} else {
None
};

// Now that we have obtained the virtual machine, we can perform the call.
// This is a CPU-only operation that executes the virtual machine.
// The virtual machine might access the storage.
// TODO: finish doc

let mut runtime_call = match runtime_call::run(runtime_call::Config {
virtual_machine,
function_to_call,
parameter: call_parameters,
storage_main_trie_changes: Default::default(),
max_log_level: 0,
calculate_trie_changes: false,
}) {
Ok(vm) => vm,
Err((err, prototype)) => {
runtime_call_lock.unlock(prototype);
return Err(RuntimeCallError::StartError(err));
}
};

loop {
match runtime_call {
runtime_call::RuntimeCall::Finished(Ok(success)) => {
let output = success.virtual_machine.value().as_ref().to_vec();
runtime_call_lock.unlock(success.virtual_machine.into_prototype());
break Ok((output, runtime_api_version));
}
runtime_call::RuntimeCall::Finished(Err(error)) => {
runtime_call_lock.unlock(error.prototype);
break Err(RuntimeCallError::RuntimeError(error.detail));
}
runtime_call::RuntimeCall::StorageGet(get) => {
let storage_value = {
let child_trie = get.child_trie();
runtime_call_lock.storage_entry(
child_trie.as_ref().map(|c| c.as_ref()),
get.key().as_ref(),
)
};
let storage_value = match storage_value {
Ok(v) => v,
Err(err) => {
runtime_call_lock.unlock(
runtime_call::RuntimeCall::StorageGet(get).into_prototype(),
);
break Err(RuntimeCallError::Call(err));
}
};
runtime_call =
get.inject_value(storage_value.map(|(val, vers)| (iter::once(val), vers)));
}
runtime_call::RuntimeCall::ClosestDescendantMerkleValue(mv) => {
let merkle_value = {
let child_trie = mv.child_trie();
runtime_call_lock.closest_descendant_merkle_value(
child_trie.as_ref().map(|c| c.as_ref()),
mv.key(),
)
};
let merkle_value = match merkle_value {
Ok(v) => v,
Err(err) => {
runtime_call_lock.unlock(
runtime_call::RuntimeCall::ClosestDescendantMerkleValue(mv)
.into_prototype(),
);
break Err(RuntimeCallError::Call(err));
}
};
runtime_call = mv.inject_merkle_value(merkle_value);
}
runtime_call::RuntimeCall::NextKey(nk) => {
let next_key = {
let child_trie = nk.child_trie();
runtime_call_lock.next_key(
child_trie.as_ref().map(|c| c.as_ref()),
nk.key(),
nk.or_equal(),
nk.prefix(),
nk.branch_nodes(),
)
};
let next_key = match next_key {
Ok(v) => v,
Err(err) => {
runtime_call_lock
.unlock(runtime_call::RuntimeCall::NextKey(nk).into_prototype());
break Err(RuntimeCallError::Call(err));
}
};
runtime_call = nk.inject_key(next_key);
}
runtime_call::RuntimeCall::OffchainStorageSet(req) => {
runtime_call = req.resume();
}
runtime_call::RuntimeCall::SignatureVerification(sig) => {
runtime_call = sig.verify_and_resume();
}
runtime_call::RuntimeCall::Offchain(ctx) => {
runtime_call_lock
.unlock(runtime_call::RuntimeCall::Offchain(ctx).into_prototype());
break Err(RuntimeCallError::ForbiddenHostCall);
}
runtime_call::RuntimeCall::LogEmit(log) => {
// Logs are ignored.
runtime_call = log.resume();
}
{
Ok(output) => Ok((output.output, output.api_version)),
Err(error) => {
return Err(RuntimeCallError::Call(error));
}
}
}
Expand All @@ -1174,25 +1041,10 @@ enum RuntimeCallError {
/// Error while finding the storage root hash of the requested block.
#[display(fmt = "Failed to obtain block state trie root: {_0}")]
FindStorageRootHashError(legacy_state_sub::StateTrieRootHashError),
/// Error while downloading the runtime from the network.
StorageQuery(sync_service::StorageQueryError),
#[display(fmt = "{_0}")]
Call(runtime_service::RuntimeCallError),
#[display(fmt = "{_0}")]
StartError(host::StartErr),
#[display(fmt = "{_0}")]
RuntimeError(runtime_call::ErrorDetail),
/// Required runtime API isn't supported by the runtime.
#[display(fmt = "Required runtime API isn't supported by the runtime")]
ApiNotFound,
/// Version requirement of runtime API isn't supported.
#[display(fmt = "Version {actual_version} of the runtime API not supported")]
ApiVersionUnknown {
/// Version that the runtime supports.
actual_version: u32,
},
/// Runtime called a forbidden host function.
ForbiddenHostCall,
/// Runtime service has crashed while compiling the runtime.
RuntimeServiceCrash,
}

#[derive(Debug)]
Expand Down
Loading
Loading