Skip to content

Commit

Permalink
Add BestBlockChanged notif in sync and runtime services (#2457)
Browse files Browse the repository at this point in the history
* Add BestBlockChanged notif in sync and runtime services

* CHANGELOG entry

* Remove incorrect TODO

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Jul 4, 2022
1 parent 68eb96d commit 84745ac
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 45 deletions.
3 changes: 2 additions & 1 deletion bin/light-base/src/json_rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,8 @@ impl<TPlat: Platform> Background<TPlat> {
.recent_pinned_blocks
.put(hash, block.scale_encoded_header);
}
Some(runtime_service::Notification::Finalized { .. }) => {}
Some(runtime_service::Notification::Finalized { .. })
| Some(runtime_service::Notification::BestBlockChanged { .. }) => {}
None => break,
}
}
Expand Down
20 changes: 20 additions & 0 deletions bin/light-base/src/json_rpc_service/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,26 @@ impl<TPlat: Platform> Background<TPlat> {
break;
}
}
either::Left(Some(runtime_service::Notification::BestBlockChanged {
hash,
}))
| either::Right(Some(sync_service::Notification::BestBlockChanged {
hash,
})) => {
let _ = me
.requests_subscriptions
.try_push_notification(
&state_machine_subscription,
methods::ServerToClient::chainHead_unstable_followEvent {
subscription: (&subscription_id).into(),
result: methods::FollowEvent::BestBlockChanged {
best_block_hash: methods::HashHexString(hash),
},
}
.to_json_call_object_parameters(None),
)
.await;
}
either::Left(Some(runtime_service::Notification::Block(block))) => {
let hash =
header::hash_from_scale_encoded_header(&block.scale_encoded_header);
Expand Down
3 changes: 2 additions & 1 deletion bin/light-base/src/json_rpc_service/state_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ impl<TPlat: Platform> Background<TPlat> {
)
.await;
}
Some(runtime_service::Notification::Finalized { .. }) => {}
Some(runtime_service::Notification::BestBlockChanged { .. })
| Some(runtime_service::Notification::Finalized { .. }) => {}
None => {
// TODO: must recreate the channel
return;
Expand Down
108 changes: 74 additions & 34 deletions bin/light-base/src/json_rpc_service/state_chain/sub_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
.subscribe_all(16, NonZeroUsize::new(24).unwrap())
.await;

// Map of runtimes by hash. Contains all non-finalized blocks runtimes.
let mut non_finalized_headers = hashbrown::HashMap::<
// Map of runtimes by hash. Contains all non-finalized blocks, plus the current finalized
// block.
let mut headers = hashbrown::HashMap::<
[u8; 32],
Arc<Result<executor::CoreVersion, RuntimeError>>,
fnv::FnvBuildHasher,
Expand All @@ -62,7 +63,7 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
.unpin_block(&current_finalized_hash)
.await;

non_finalized_headers.insert(
headers.insert(
current_finalized_hash,
Arc::new(subscribe_all.finalized_block_runtime),
);
Expand All @@ -73,13 +74,13 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
subscribe_all.new_blocks.unpin_block(&hash).await;

if let Some(new_runtime) = block.new_runtime {
non_finalized_headers.insert(hash, Arc::new(new_runtime));
headers.insert(hash, Arc::new(new_runtime));
} else {
let parent_runtime = non_finalized_headers
let parent_runtime = headers
.get(&block.parent_hash)
.unwrap()
.clone();
non_finalized_headers.insert(hash, parent_runtime);
headers.insert(hash, parent_runtime);
}

if block.is_new_best {
Expand All @@ -88,19 +89,19 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
}
}
let current_best = current_best.unwrap_or(current_finalized_hash);
let current_best_runtime = (**non_finalized_headers.get(&current_best).unwrap()).clone();
let current_best_runtime = (**headers.get(&current_best).unwrap()).clone();

// Turns `subscribe_all.new_blocks` into a stream of headers.
let substream = stream::unfold(
(
subscribe_all.new_blocks,
non_finalized_headers,
headers,
current_finalized_hash,
current_best,
),
|(
mut new_blocks,
mut non_finalized_headers,
mut headers,
mut current_finalized_hash,
mut current_best,
)| async move {
Expand All @@ -112,19 +113,19 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
new_blocks.unpin_block(&hash).await;

if let Some(new_runtime) = block.new_runtime {
non_finalized_headers.insert(hash, Arc::new(new_runtime));
headers.insert(hash, Arc::new(new_runtime));
} else {
let parent_runtime = non_finalized_headers
let parent_runtime = headers
.get(&block.parent_hash)
.unwrap()
.clone();
non_finalized_headers.insert(hash, parent_runtime);
headers.insert(hash, parent_runtime);
}

if block.is_new_best {
let current_best_runtime =
non_finalized_headers.get(&current_best).unwrap();
let new_best_runtime = non_finalized_headers.get(&hash).unwrap();
headers.get(&current_best).unwrap();
let new_best_runtime = headers.get(&hash).unwrap();
current_best = hash;

if !Arc::ptr_eq(&current_best_runtime, new_best_runtime) {
Expand All @@ -133,7 +134,7 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
runtime,
(
new_blocks,
non_finalized_headers,
headers,
current_finalized_hash,
current_best,
),
Expand All @@ -147,17 +148,17 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
best_block_hash,
} => {
let current_best_runtime =
non_finalized_headers.get(&current_best).unwrap().clone();
headers.get(&current_best).unwrap().clone();
let new_best_runtime =
non_finalized_headers.get(&best_block_hash).unwrap().clone();
headers.get(&best_block_hash).unwrap().clone();

// Clean up the headers we won't need anymore.
for pruned_block in pruned_blocks {
let _was_in = non_finalized_headers.remove(&pruned_block);
let _was_in = headers.remove(&pruned_block);
debug_assert!(_was_in.is_some());
}

let _ = non_finalized_headers
let _ = headers
.remove(&current_finalized_hash)
.unwrap();
current_finalized_hash = hash;
Expand All @@ -169,7 +170,28 @@ pub async fn subscribe_runtime_version<TPlat: Platform>(
runtime,
(
new_blocks,
non_finalized_headers,
headers,
current_finalized_hash,
current_best,
),
));
}
}
Notification::BestBlockChanged { hash } => {
let current_best_runtime =
headers.get(&current_best).unwrap().clone();
let new_best_runtime =
headers.get(&hash).unwrap().clone();

current_best = hash;

if !Arc::ptr_eq(&current_best_runtime, &new_best_runtime) {
let runtime = (*new_best_runtime).clone();
break Some((
runtime,
(
new_blocks,
headers,
current_finalized_hash,
current_best,
),
Expand Down Expand Up @@ -254,6 +276,7 @@ pub async fn subscribe_finalized<TPlat: Platform>(
let header = non_finalized_headers.remove(&hash).unwrap();
break Some((header, (new_blocks, non_finalized_headers)));
}
Notification::BestBlockChanged { .. } => {}
}
}
},
Expand Down Expand Up @@ -290,8 +313,9 @@ pub async fn subscribe_best<TPlat: Platform>(
.subscribe_all(16, NonZeroUsize::new(32).unwrap())
.await;

// Map of block headers by hash. Contains all non-finalized blocks headers.
let mut non_finalized_headers =
// Map of block headers by hash. Contains all non-finalized blocks headers, plus the
// current finalized block header.
let mut headers =
hashbrown::HashMap::<[u8; 32], Vec<u8>, fnv::FnvBuildHasher>::with_capacity_and_hasher(
16,
Default::default(),
Expand All @@ -306,7 +330,7 @@ pub async fn subscribe_best<TPlat: Platform>(
.unpin_block(&current_finalized_hash)
.await;

non_finalized_headers.insert(
headers.insert(
current_finalized_hash,
subscribe_all.finalized_block_scale_encoded_header,
);
Expand All @@ -315,27 +339,27 @@ pub async fn subscribe_best<TPlat: Platform>(
for block in subscribe_all.non_finalized_blocks_ancestry_order {
let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header);
subscribe_all.new_blocks.unpin_block(&hash).await;
non_finalized_headers.insert(hash, block.scale_encoded_header);
headers.insert(hash, block.scale_encoded_header);

if block.is_new_best {
debug_assert!(current_best.is_none());
current_best = Some(hash);
}
}
let current_best = current_best.unwrap_or(current_finalized_hash);
let current_best_header = non_finalized_headers.get(&current_best).unwrap().clone();
let current_best_header = headers.get(&current_best).unwrap().clone();

// Turns `subscribe_all.new_blocks` into a stream of headers.
let substream = stream::unfold(
(
subscribe_all.new_blocks,
non_finalized_headers,
headers,
current_finalized_hash,
current_best,
),
|(
mut new_blocks,
mut non_finalized_headers,
mut headers,
mut current_finalized_hash,
mut current_best,
)| async move {
Expand All @@ -345,17 +369,17 @@ pub async fn subscribe_best<TPlat: Platform>(
let hash =
header::hash_from_scale_encoded_header(&block.scale_encoded_header);
new_blocks.unpin_block(&hash).await;
non_finalized_headers.insert(hash, block.scale_encoded_header);
headers.insert(hash, block.scale_encoded_header);

if block.is_new_best {
current_best = hash;
let header =
non_finalized_headers.get(&current_best).unwrap().clone();
headers.get(&current_best).unwrap().clone();
break Some((
header,
(
new_blocks,
non_finalized_headers,
headers,
current_finalized_hash,
current_best,
),
Expand All @@ -369,24 +393,40 @@ pub async fn subscribe_best<TPlat: Platform>(
} => {
// Clean up the headers we won't need anymore.
for pruned_block in pruned_blocks {
let _was_in = non_finalized_headers.remove(&pruned_block);
let _was_in = headers.remove(&pruned_block);
debug_assert!(_was_in.is_some());
}

let _ = non_finalized_headers
let _ = headers
.remove(&current_finalized_hash)
.unwrap();
current_finalized_hash = hash;

if best_block_hash != current_best {
current_best = best_block_hash;
let header =
non_finalized_headers.get(&current_best).unwrap().clone();
headers.get(&current_best).unwrap().clone();
break Some((
header,
(
new_blocks,
headers,
current_finalized_hash,
current_best,
),
));
}
}
Notification::BestBlockChanged { hash } => {
if hash != current_best {
current_best = hash;
let header =
headers.get(&current_best).unwrap().clone();
break Some((
header,
(
new_blocks,
non_finalized_headers,
headers,
current_finalized_hash,
current_best,
),
Expand Down
Loading

0 comments on commit 84745ac

Please sign in to comment.