Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make pinned_block_runtime_lock return an error on obsolete subscription #2621

Merged
merged 2 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,21 +1303,22 @@ impl<TPlat: Platform> Background<TPlat> {

// Try to find the block in the cache of recent blocks. Most of the time, the call target
// should be in there.
if cache_lock.recent_pinned_blocks.contains(block_hash) {
let lock = if cache_lock.recent_pinned_blocks.contains(block_hash) {
// The runtime service has the block pinned, meaning that we can ask the runtime
// service to perform the call.
let runtime_call_lock = self
.runtime_service
self.runtime_service
.pinned_block_runtime_lock(
cache_lock.subscription_id.clone().unwrap(),
block_hash,
)
.await;

// Make sure to unlock the cache, in order to not block the other requests.
drop::<futures::lock::MutexGuard<_>>(cache_lock);
.await
.ok()
} else {
None
};

runtime_call_lock
if let Some(lock) = lock {
lock
} else {
// Second situation: the block is not in the cache of recent blocks. This isn't great.
drop::<futures::lock::MutexGuard<_>>(cache_lock);
Expand Down
9 changes: 4 additions & 5 deletions bin/light-base/src/json_rpc_service/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,10 @@ impl<TPlat: Platform> Background<TPlat> {
return;
}

Some(
me.runtime_service
.pinned_block_runtime_lock(runtime_service_subscribe_all, &hash.0)
.await,
)
me.runtime_service
.pinned_block_runtime_lock(runtime_service_subscribe_all, &hash.0)
.await
.ok()
} else {
None
}
Expand Down
22 changes: 12 additions & 10 deletions bin/light-base/src/json_rpc_service/state_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,23 +1157,25 @@ impl<TPlat: Platform> Background<TPlat> {

// Try to find the block in the cache of recent blocks. Most of the time, the call
// target should be in there.
if cache_lock.recent_pinned_blocks.contains(&block_hash) {
let spec = if cache_lock.recent_pinned_blocks.contains(&block_hash) {
// The runtime service has the block pinned, meaning that we can ask the runtime
// service for the specification.
let runtime_call_lock = self
.runtime_service
self.runtime_service
.pinned_block_runtime_lock(
cache_lock.subscription_id.clone().unwrap(),
&block_hash,
)
.await;

// Unlock the cache early. While the call to `specification` shouldn't be very
// long, it doesn't cost anything to unlock this mutex early.
drop::<futures::lock::MutexGuard<_>>(cache_lock);
.await
.ok()
.map(|rt| rt.specification())
} else {
None
};

// Obtain the specification of that runtime.
runtime_call_lock.specification()
// If the block isn't a recent block or if the subscription is obsolete, fall back
// to downloading it.
if let Some(spec) = spec {
spec
} else {
// Second situation: the block is not in the cache of recent blocks. This
// isn't great.
Expand Down
39 changes: 30 additions & 9 deletions bin/light-base/src/runtime_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,6 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
.unwrap();
*finalized_pinned_remaining += 1;
}
} else {
panic!("Invalid subscription")
}
}

Expand All @@ -406,6 +404,9 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
/// to make the call. The block must be currently pinned in the context of the provided
/// [`SubscriptionId`].
///
/// Returns an error if the subscription is stale, meaning that it has been reset by the
/// runtime service.
///
/// # Panic
///
/// Panics if the given block isn't currently pinned by the given subscription.
Expand All @@ -414,7 +415,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
&'a self,
subscription_id: SubscriptionId,
block_hash: &[u8; 32],
) -> RuntimeLock<'a, TPlat> {
) -> Result<RuntimeLock<'a, TPlat>, PinnedBlockRuntimeLockError> {
// Note: copying the hash ahead of time fixes some weird intermittent borrow checker
// issue.
let block_hash = *block_hash;
Expand All @@ -423,22 +424,35 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
let guarded = &mut *guarded;

let (runtime, block_state_root_hash, block_number, _) = {
if let GuardedInner::FinalizedBlockRuntimeKnown { pinned_blocks, .. } =
&mut guarded.tree
if let GuardedInner::FinalizedBlockRuntimeKnown {
all_blocks_subscriptions,
pinned_blocks,
..
} = &mut guarded.tree
{
(*pinned_blocks.get(&(subscription_id.0, block_hash)).unwrap()).clone()
match pinned_blocks.get(&(subscription_id.0, block_hash)) {
Some(v) => v.clone(),
None => {
// Cold path.
if all_blocks_subscriptions.contains_key(&subscription_id.0) {
panic!("block already unpinned");
} else {
return Err(PinnedBlockRuntimeLockError::ObsoleteSubscription);
}
}
}
} else {
panic!("Invalid subscription")
return Err(PinnedBlockRuntimeLockError::ObsoleteSubscription);
}
};

RuntimeLock {
Ok(RuntimeLock {
service: self,
hash: block_hash,
runtime,
block_number,
block_state_root_hash,
}
})
}

/// Lock the runtime service and prepare a call to a runtime entry point.
Expand Down Expand Up @@ -694,6 +708,13 @@ async fn is_near_head_of_chain_heuristic<TPlat: Platform>(
guarded.lock().await.best_near_head_of_chain
}

/// See [`RuntimeService::pinned_block_runtime_lock`].
#[derive(Debug, derive_more::Display, Clone)]
pub enum PinnedBlockRuntimeLockError {
/// Subscription is dead.
ObsoleteSubscription,
}

/// See [`RuntimeService::pinned_block_runtime_lock`].
// TODO: rename, as it doesn't lock anything anymore
#[must_use]
Expand Down
20 changes: 18 additions & 2 deletions bin/light-base/src/sync_service/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ pub(super) async fn start_parachain<TPlat: Platform>(
relay_chain_subscribe_all.new_blocks.unpin_block(hash).await;
}
},
Err(ParaheadError::ObsoleteSubscription) => {
// The relay chain runtime service has some kind of gap or issue and
// has discarded the runtime.
// Jump to the outer loop to recreate the channel.
break;
}
Err(error) => {
// Several chains initially didn't support parachains, and have later
// been upgraded to support them. Similarly, the parachain might not
Expand Down Expand Up @@ -675,9 +681,16 @@ async fn parahead<TPlat: Platform>(
) -> Result<Vec<u8>, ParaheadError> {
// For each relay chain block, call `ParachainHost_persisted_validation_data` in
// order to know where the parachains are.
let precall = relay_chain_sync
let precall = match relay_chain_sync
.pinned_block_runtime_lock(subscription_id, block_hash)
.await;
.await
{
Ok(p) => p,
Err(runtime_service::PinnedBlockRuntimeLockError::ObsoleteSubscription) => {
return Err(ParaheadError::ObsoleteSubscription)
}
};

let (runtime_call_lock, virtual_machine) = precall
.start(
para::PERSISTED_VALIDATION_FUNCTION_NAME,
Expand Down Expand Up @@ -781,6 +794,8 @@ enum ParaheadError {
InvalidRuntimeOutput(para::Error),
/// Fetching following keys is not supported by call proofs.
NextKeyForbidden,
/// Runtime service subscription is no longer valid.
ObsoleteSubscription,
}

impl ParaheadError {
Expand All @@ -794,6 +809,7 @@ impl ParaheadError {
ParaheadError::NoCore => false,
ParaheadError::InvalidRuntimeOutput(_) => false,
ParaheadError::NextKeyForbidden => false,
ParaheadError::ObsoleteSubscription => false,
}
}
}
65 changes: 47 additions & 18 deletions bin/light-base/src/transactions_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ enum InvalidOrError {
ValidateError(ValidateTransactionError),
}

#[derive(Debug, Clone)]
enum ValidationError {
InvalidOrError(InvalidOrError),
ObsoleteSubscription,
}

/// Message sent from the foreground service to the background.
enum ToBackground {
SubmitTransaction {
Expand Down Expand Up @@ -787,7 +793,7 @@ async fn background_task<TPlat: Platform>(
continue;
}

match &validation_result {
let validation_result = match validation_result {
Ok(result) => {
log::debug!(
target: &log_target,
Expand All @@ -809,8 +815,15 @@ async fn background_task<TPlat: Platform>(
worker.next_reannounce.push(async move {
maybe_validated_tx_id
}.boxed());

Ok(result)
}
Err(ValidationError::ObsoleteSubscription) => {
// Runtime service subscription is obsolete. Throw away everything and
// rebuild it.
continue 'channels_rebuild
}
Err(InvalidOrError::Invalid(error)) => {
Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(error))) => {
log::debug!(
target: &log_target,
"TxValidations => Invalid(tx={}, block={}, error={:?})",
Expand All @@ -826,8 +839,10 @@ async fn background_task<TPlat: Platform>(
HashDisplay(&block_hash),
error,
);

Err(InvalidOrError::Invalid(error))
}
Err(InvalidOrError::ValidateError(error)) => {
Err(ValidationError::InvalidOrError(InvalidOrError::ValidateError(error))) => {
log::debug!(
target: &log_target,
"TxValidations => Error(tx={}, block={}, error={:?})",
Expand All @@ -842,8 +857,10 @@ async fn background_task<TPlat: Platform>(
HashDisplay(&tx_hash),
error
);

Err(InvalidOrError::ValidateError(error))
}
}
};

// No matter whether the validation is successful, we store the result in
// the transactions pool. This will later be picked up by the code that removes
Expand Down Expand Up @@ -1046,7 +1063,10 @@ struct PendingTransaction<TPlat: Platform> {

/// If `Some`, will receive the result of the validation of the transaction.
validation_in_progress: Option<
future::RemoteHandle<([u8; 32], Result<validate::ValidTransaction, InvalidOrError>)>,
future::RemoteHandle<(
[u8; 32],
Result<validate::ValidTransaction, ValidationError>,
)>,
>,
}

Expand Down Expand Up @@ -1085,10 +1105,16 @@ async fn validate_transaction<TPlat: Platform>(
block_scale_encoded_header: &[u8],
scale_encoded_transaction: impl AsRef<[u8]> + Clone,
source: validate::TransactionSource,
) -> Result<validate::ValidTransaction, InvalidOrError> {
let runtime_lock = relay_chain_sync
) -> Result<validate::ValidTransaction, ValidationError> {
let runtime_lock = match relay_chain_sync
.pinned_block_runtime_lock(relay_chain_sync_subscription_id, &block_hash)
.await;
.await
{
Ok(l) => l,
Err(runtime_service::PinnedBlockRuntimeLockError::ObsoleteSubscription) => {
return Err(ValidationError::ObsoleteSubscription)
}
};

log::debug!(
target: log_target,
Expand Down Expand Up @@ -1120,7 +1146,8 @@ async fn validate_transaction<TPlat: Platform>(
)
.await
.map_err(ValidateTransactionError::Call)
.map_err(InvalidOrError::ValidateError)?;
.map_err(InvalidOrError::ValidateError)
.map_err(ValidationError::InvalidOrError)?;

let mut validation_in_progress = validate::validate_transaction(validate::Config {
runtime,
Expand All @@ -1144,24 +1171,26 @@ async fn validate_transaction<TPlat: Platform>(
virtual_machine,
} => {
runtime_call_lock.unlock(virtual_machine);
break Err(InvalidOrError::Invalid(invalid));
break Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(
invalid,
)));
}
validate::Query::Finished {
result: Err(error),
virtual_machine,
} => {
runtime_call_lock.unlock(virtual_machine);
break Err(InvalidOrError::ValidateError(
ValidateTransactionError::Validation(error),
break Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Validation(error)),
));
}
validate::Query::StorageGet(get) => {
let storage_value = match runtime_call_lock.storage_entry(&get.key_as_vec()) {
Ok(v) => v,
Err(err) => {
runtime_call_lock.unlock(validate::Query::StorageGet(get).into_prototype());
return Err(InvalidOrError::ValidateError(
ValidateTransactionError::Call(err),
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Call(err)),
));
}
};
Expand All @@ -1170,8 +1199,8 @@ async fn validate_transaction<TPlat: Platform>(
validate::Query::NextKey(nk) => {
// TODO:
runtime_call_lock.unlock(validate::Query::NextKey(nk).into_prototype());
break Err(InvalidOrError::ValidateError(
ValidateTransactionError::NextKeyForbidden,
break Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::NextKeyForbidden),
));
}
validate::Query::PrefixKeys(prefix) => {
Expand All @@ -1185,8 +1214,8 @@ async fn validate_transaction<TPlat: Platform>(
Err(err) => {
runtime_call_lock
.unlock(validate::Query::PrefixKeys(prefix).into_prototype());
return Err(InvalidOrError::ValidateError(
ValidateTransactionError::Call(err),
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Call(err)),
));
}
}
Expand Down