Skip to content

Commit

Permalink
Make pinned_block_runtime_lock return an error on obsolete subscripti…
Browse files Browse the repository at this point in the history
…on (#2621)

* Make pinned_block_runtime_lock return an error on obsolete subscription

* CHANGELOG
  • Loading branch information
tomaka authored Aug 12, 2022
1 parent dca4894 commit 579b364
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 52 deletions.
17 changes: 9 additions & 8 deletions bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1303,21 +1303,22 @@ impl<TPlat: Platform> Background<TPlat> {

// Try to find the block in the cache of recent blocks. Most of the time, the call target
// should be in there.
if cache_lock.recent_pinned_blocks.contains(block_hash) {
let lock = if cache_lock.recent_pinned_blocks.contains(block_hash) {
// The runtime service has the block pinned, meaning that we can ask the runtime
// service to perform the call.
let runtime_call_lock = self
.runtime_service
self.runtime_service
.pinned_block_runtime_lock(
cache_lock.subscription_id.clone().unwrap(),
block_hash,
)
.await;

// Make sure to unlock the cache, in order to not block the other requests.
drop::<futures::lock::MutexGuard<_>>(cache_lock);
.await
.ok()
} else {
None
};

runtime_call_lock
if let Some(lock) = lock {
lock
} else {
// Second situation: the block is not in the cache of recent blocks. This isn't great.
drop::<futures::lock::MutexGuard<_>>(cache_lock);
Expand Down
9 changes: 4 additions & 5 deletions bin/light-base/src/json_rpc_service/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,10 @@ impl<TPlat: Platform> Background<TPlat> {
return;
}

Some(
me.runtime_service
.pinned_block_runtime_lock(runtime_service_subscribe_all, &hash.0)
.await,
)
me.runtime_service
.pinned_block_runtime_lock(runtime_service_subscribe_all, &hash.0)
.await
.ok()
} else {
None
}
Expand Down
22 changes: 12 additions & 10 deletions bin/light-base/src/json_rpc_service/state_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,23 +1157,25 @@ impl<TPlat: Platform> Background<TPlat> {

// Try to find the block in the cache of recent blocks. Most of the time, the call
// target should be in there.
if cache_lock.recent_pinned_blocks.contains(&block_hash) {
let spec = if cache_lock.recent_pinned_blocks.contains(&block_hash) {
// The runtime service has the block pinned, meaning that we can ask the runtime
// service for the specification.
let runtime_call_lock = self
.runtime_service
self.runtime_service
.pinned_block_runtime_lock(
cache_lock.subscription_id.clone().unwrap(),
&block_hash,
)
.await;

// Unlock the cache early. While the call to `specification` shouldn't be very
// long, it doesn't cost anything to unlock this mutex early.
drop::<futures::lock::MutexGuard<_>>(cache_lock);
.await
.ok()
.map(|rt| rt.specification())
} else {
None
};

// Obtain the specification of that runtime.
runtime_call_lock.specification()
// If the block isn't a recent block or if the subscription is obsolete, fall back
// to downloading it.
if let Some(spec) = spec {
spec
} else {
// Second situation: the block is not in the cache of recent blocks. This
// isn't great.
Expand Down
39 changes: 30 additions & 9 deletions bin/light-base/src/runtime_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,6 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
.unwrap();
*finalized_pinned_remaining += 1;
}
} else {
panic!("Invalid subscription")
}
}

Expand All @@ -406,6 +404,9 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
/// to make the call. The block must be currently pinned in the context of the provided
/// [`SubscriptionId`].
///
/// Returns an error if the subscription is stale, meaning that it has been reset by the
/// runtime service.
///
/// # Panic
///
/// Panics if the given block isn't currently pinned by the given subscription.
Expand All @@ -414,7 +415,7 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
&'a self,
subscription_id: SubscriptionId,
block_hash: &[u8; 32],
) -> RuntimeLock<'a, TPlat> {
) -> Result<RuntimeLock<'a, TPlat>, PinnedBlockRuntimeLockError> {
// Note: copying the hash ahead of time fixes some weird intermittent borrow checker
// issue.
let block_hash = *block_hash;
Expand All @@ -423,22 +424,35 @@ impl<TPlat: Platform> RuntimeService<TPlat> {
let guarded = &mut *guarded;

let (runtime, block_state_root_hash, block_number, _) = {
if let GuardedInner::FinalizedBlockRuntimeKnown { pinned_blocks, .. } =
&mut guarded.tree
if let GuardedInner::FinalizedBlockRuntimeKnown {
all_blocks_subscriptions,
pinned_blocks,
..
} = &mut guarded.tree
{
(*pinned_blocks.get(&(subscription_id.0, block_hash)).unwrap()).clone()
match pinned_blocks.get(&(subscription_id.0, block_hash)) {
Some(v) => v.clone(),
None => {
// Cold path.
if all_blocks_subscriptions.contains_key(&subscription_id.0) {
panic!("block already unpinned");
} else {
return Err(PinnedBlockRuntimeLockError::ObsoleteSubscription);
}
}
}
} else {
panic!("Invalid subscription")
return Err(PinnedBlockRuntimeLockError::ObsoleteSubscription);
}
};

RuntimeLock {
Ok(RuntimeLock {
service: self,
hash: block_hash,
runtime,
block_number,
block_state_root_hash,
}
})
}

/// Lock the runtime service and prepare a call to a runtime entry point.
Expand Down Expand Up @@ -694,6 +708,13 @@ async fn is_near_head_of_chain_heuristic<TPlat: Platform>(
guarded.lock().await.best_near_head_of_chain
}

/// See [`RuntimeService::pinned_block_runtime_lock`].
#[derive(Debug, derive_more::Display, Clone)]
pub enum PinnedBlockRuntimeLockError {
/// Subscription is dead.
ObsoleteSubscription,
}

/// See [`RuntimeService::pinned_block_runtime_lock`].
// TODO: rename, as it doesn't lock anything anymore
#[must_use]
Expand Down
20 changes: 18 additions & 2 deletions bin/light-base/src/sync_service/parachain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ pub(super) async fn start_parachain<TPlat: Platform>(
relay_chain_subscribe_all.new_blocks.unpin_block(hash).await;
}
},
Err(ParaheadError::ObsoleteSubscription) => {
// The relay chain runtime service has some kind of gap or issue and
// has discarded the runtime.
// Jump to the outer loop to recreate the channel.
break;
}
Err(error) => {
// Several chains initially didn't support parachains, and have later
// been upgraded to support them. Similarly, the parachain might not
Expand Down Expand Up @@ -675,9 +681,16 @@ async fn parahead<TPlat: Platform>(
) -> Result<Vec<u8>, ParaheadError> {
// For each relay chain block, call `ParachainHost_persisted_validation_data` in
// order to know where the parachains are.
let precall = relay_chain_sync
let precall = match relay_chain_sync
.pinned_block_runtime_lock(subscription_id, block_hash)
.await;
.await
{
Ok(p) => p,
Err(runtime_service::PinnedBlockRuntimeLockError::ObsoleteSubscription) => {
return Err(ParaheadError::ObsoleteSubscription)
}
};

let (runtime_call_lock, virtual_machine) = precall
.start(
para::PERSISTED_VALIDATION_FUNCTION_NAME,
Expand Down Expand Up @@ -781,6 +794,8 @@ enum ParaheadError {
InvalidRuntimeOutput(para::Error),
/// Fetching following keys is not supported by call proofs.
NextKeyForbidden,
/// Runtime service subscription is no longer valid.
ObsoleteSubscription,
}

impl ParaheadError {
Expand All @@ -794,6 +809,7 @@ impl ParaheadError {
ParaheadError::NoCore => false,
ParaheadError::InvalidRuntimeOutput(_) => false,
ParaheadError::NextKeyForbidden => false,
ParaheadError::ObsoleteSubscription => false,
}
}
}
65 changes: 47 additions & 18 deletions bin/light-base/src/transactions_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ enum InvalidOrError {
ValidateError(ValidateTransactionError),
}

#[derive(Debug, Clone)]
enum ValidationError {
InvalidOrError(InvalidOrError),
ObsoleteSubscription,
}

/// Message sent from the foreground service to the background.
enum ToBackground {
SubmitTransaction {
Expand Down Expand Up @@ -787,7 +793,7 @@ async fn background_task<TPlat: Platform>(
continue;
}

match &validation_result {
let validation_result = match validation_result {
Ok(result) => {
log::debug!(
target: &log_target,
Expand All @@ -809,8 +815,15 @@ async fn background_task<TPlat: Platform>(
worker.next_reannounce.push(async move {
maybe_validated_tx_id
}.boxed());

Ok(result)
}
Err(ValidationError::ObsoleteSubscription) => {
// Runtime service subscription is obsolete. Throw away everything and
// rebuild it.
continue 'channels_rebuild
}
Err(InvalidOrError::Invalid(error)) => {
Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(error))) => {
log::debug!(
target: &log_target,
"TxValidations => Invalid(tx={}, block={}, error={:?})",
Expand All @@ -826,8 +839,10 @@ async fn background_task<TPlat: Platform>(
HashDisplay(&block_hash),
error,
);

Err(InvalidOrError::Invalid(error))
}
Err(InvalidOrError::ValidateError(error)) => {
Err(ValidationError::InvalidOrError(InvalidOrError::ValidateError(error))) => {
log::debug!(
target: &log_target,
"TxValidations => Error(tx={}, block={}, error={:?})",
Expand All @@ -842,8 +857,10 @@ async fn background_task<TPlat: Platform>(
HashDisplay(&tx_hash),
error
);

Err(InvalidOrError::ValidateError(error))
}
}
};

// No matter whether the validation is successful, we store the result in
// the transactions pool. This will later be picked up by the code that removes
Expand Down Expand Up @@ -1046,7 +1063,10 @@ struct PendingTransaction<TPlat: Platform> {

/// If `Some`, will receive the result of the validation of the transaction.
validation_in_progress: Option<
future::RemoteHandle<([u8; 32], Result<validate::ValidTransaction, InvalidOrError>)>,
future::RemoteHandle<(
[u8; 32],
Result<validate::ValidTransaction, ValidationError>,
)>,
>,
}

Expand Down Expand Up @@ -1085,10 +1105,16 @@ async fn validate_transaction<TPlat: Platform>(
block_scale_encoded_header: &[u8],
scale_encoded_transaction: impl AsRef<[u8]> + Clone,
source: validate::TransactionSource,
) -> Result<validate::ValidTransaction, InvalidOrError> {
let runtime_lock = relay_chain_sync
) -> Result<validate::ValidTransaction, ValidationError> {
let runtime_lock = match relay_chain_sync
.pinned_block_runtime_lock(relay_chain_sync_subscription_id, &block_hash)
.await;
.await
{
Ok(l) => l,
Err(runtime_service::PinnedBlockRuntimeLockError::ObsoleteSubscription) => {
return Err(ValidationError::ObsoleteSubscription)
}
};

log::debug!(
target: log_target,
Expand Down Expand Up @@ -1120,7 +1146,8 @@ async fn validate_transaction<TPlat: Platform>(
)
.await
.map_err(ValidateTransactionError::Call)
.map_err(InvalidOrError::ValidateError)?;
.map_err(InvalidOrError::ValidateError)
.map_err(ValidationError::InvalidOrError)?;

let mut validation_in_progress = validate::validate_transaction(validate::Config {
runtime,
Expand All @@ -1144,24 +1171,26 @@ async fn validate_transaction<TPlat: Platform>(
virtual_machine,
} => {
runtime_call_lock.unlock(virtual_machine);
break Err(InvalidOrError::Invalid(invalid));
break Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(
invalid,
)));
}
validate::Query::Finished {
result: Err(error),
virtual_machine,
} => {
runtime_call_lock.unlock(virtual_machine);
break Err(InvalidOrError::ValidateError(
ValidateTransactionError::Validation(error),
break Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Validation(error)),
));
}
validate::Query::StorageGet(get) => {
let storage_value = match runtime_call_lock.storage_entry(&get.key_as_vec()) {
Ok(v) => v,
Err(err) => {
runtime_call_lock.unlock(validate::Query::StorageGet(get).into_prototype());
return Err(InvalidOrError::ValidateError(
ValidateTransactionError::Call(err),
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Call(err)),
));
}
};
Expand All @@ -1170,8 +1199,8 @@ async fn validate_transaction<TPlat: Platform>(
validate::Query::NextKey(nk) => {
// TODO:
runtime_call_lock.unlock(validate::Query::NextKey(nk).into_prototype());
break Err(InvalidOrError::ValidateError(
ValidateTransactionError::NextKeyForbidden,
break Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::NextKeyForbidden),
));
}
validate::Query::PrefixKeys(prefix) => {
Expand All @@ -1185,8 +1214,8 @@ async fn validate_transaction<TPlat: Platform>(
Err(err) => {
runtime_call_lock
.unlock(validate::Query::PrefixKeys(prefix).into_prototype());
return Err(InvalidOrError::ValidateError(
ValidateTransactionError::Call(err),
return Err(ValidationError::InvalidOrError(
InvalidOrError::ValidateError(ValidateTransactionError::Call(err)),
));
}
}
Expand Down
1 change: 1 addition & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Fix panic that occured when connecting to a peer, then discovering it through the background discovery process, then disconnecting from it. ([#2616](https://github.com/paritytech/smoldot/pull/2616))
- Fix circular dependency between JavaScript modules. ([#2614](https://github.com/paritytech/smoldot/pull/2614))
- Fix panic when a handshake timeout or protocol error happens on a connection at the same time as the local node tries to shut it down. ([#2620](https://github.com/paritytech/smoldot/pull/2620))
- Fix panic when a runtime call is made at the same time as a warp sync succeeds or that the limit to the number of blocks in memory is exceeded. ([#2621](https://github.com/paritytech/smoldot/pull/2621))

## 0.6.29 - 2022-08-09

Expand Down

0 comments on commit 579b364

Please sign in to comment.