Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move DoStateCheckpoint off critical path #15411

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e28071f
temp: adding types
msmouse Nov 26, 2024
849ef4f
add result_state to ExecutionOutput
msmouse Nov 27, 2024
06902a0
Move DoStateCheckpoint to the ledger_udpate stage for blocks
msmouse Nov 29, 2024
48292c5
Move DoStateCheckpoint to the ledger_udpate stage for chunks
msmouse Nov 30, 2024
2aa0f22
DB: kv write path
msmouse Nov 30, 2024
702871c
LedgerState, LedgerStateSummary
msmouse Dec 3, 2024
42cd827
temp
msmouse Dec 3, 2024
50825fc
BufferedState
msmouse Dec 4, 2024
e30e19d
LedgerState::update
msmouse Dec 4, 2024
17140fc
LedgerStateSummary::update
msmouse Dec 8, 2024
2091798
integrate
msmouse Dec 8, 2024
8230fce
integrate further
msmouse Dec 10, 2024
392ed5f
fix
msmouse Dec 10, 2024
c198944
par_collect
msmouse Dec 10, 2024
444ef78
fix tests
msmouse Dec 11, 2024
6a3ce68
minor
msmouse Dec 11, 2024
0fb68a4
fix restore
msmouse Dec 11, 2024
506dadd
fix tests 2
msmouse Dec 12, 2024
e662501
fix tests 3
msmouse Dec 12, 2024
26a250a
temp
msmouse Dec 12, 2024
7c77656
fix tests 4
msmouse Dec 15, 2024
cabdc90
refine
msmouse Dec 17, 2024
4af69b1
remove usage from smt
msmouse Dec 17, 2024
99cf945
partial proof
msmouse Dec 17, 2024
e2b97fd
fix
msmouse Dec 17, 2024
bb994bb
lint
msmouse Dec 17, 2024
2002617
fix tests 5
msmouse Dec 17, 2024
7dd07f0
par count
msmouse Dec 17, 2024
592d1d9
fixme: adhoc
msmouse Dec 17, 2024
3f1bda6
refine
msmouse Dec 18, 2024
9a16478
sample proof check
msmouse Dec 18, 2024
95fe6da
cheap estimate
msmouse Dec 18, 2024
f8d4bef
high concurrency proof read
msmouse Dec 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 17 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ num-derive = "0.3.3"
num-integer = "0.1.42"
num-traits = "0.2.15"
once_cell = "1.10.0"
once_map = "0.4.21"
open = "5.3.1"
ordered-float = "3.9.1"
ouroboros = "0.15.6"
Expand Down Expand Up @@ -778,6 +779,7 @@ shadow-rs = "0.16.2"
simplelog = "0.9.0"
smallbitvec = "2.5.1"
smallvec = "1.8.0"
stable_deref_trait = "1.2.0"
static_assertions = "1.1.0"
stats_alloc = "0.1.8"
status-line = "0.2.0"
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl ExecutionPipeline {
});
let start = Instant::now();
executor
.execute_and_state_checkpoint(
.execute_and_update_state(
block,
parent_block_id,
block_executor_onchain_config,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ impl PipelineBuilder {
let start = Instant::now();
tokio::task::spawn_blocking(move || {
executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block.id(), txns).into(),
block.parent_id(),
onchain_execution_config,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ async fn test_commit_sync_race() {
Ok(StateComputeResult::new_dummy())
}

fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
_block: ExecutableBlock,
_parent_block_id: HashValue,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/state_computer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl BlockExecutorTrait for DummyBlockExecutor {
Ok(())
}

fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
block: ExecutableBlock,
_parent_block_id: HashValue,
Expand Down
5 changes: 5 additions & 0 deletions crates/aptos-crypto/src/hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,11 @@ pub static ACCUMULATOR_PLACEHOLDER_HASH: Lazy<HashValue> =
pub static SPARSE_MERKLE_PLACEHOLDER_HASH: Lazy<HashValue> =
Lazy::new(|| create_literal_hash("SPARSE_MERKLE_PLACEHOLDER_HASH"));

/// Useful at places where we have to set a hash value for placeholder before
/// knowing the actual hash.
pub static CORRUPTION_SENTINEL: Lazy<HashValue> =
Lazy::new(|| create_literal_hash("CORRUPTION_SENTINEL"));

/// Placeholder hash of hot state tier Merkle Tree.
pub static HOT_STATE_PLACE_HOLDER_HASH: Lazy<HashValue> =
Lazy::new(|| create_literal_hash("HOT_STATE_PLACEHOLDER_HASH"));
Expand Down
4 changes: 2 additions & 2 deletions execution/executor-benchmark/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ mod tests {
let parent_block_id = vm_executor.committed_block_id();
let block_id = HashValue::random();
vm_executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block_id, vec![txn.clone()]).into(),
parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand All @@ -930,7 +930,7 @@ mod tests {
let parent_block_id = other_executor.committed_block_id();
let block_id = HashValue::random();
other_executor
.execute_and_state_checkpoint(
.execute_and_update_state(
(block_id, vec![txn]).into(),
parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand Down
2 changes: 1 addition & 1 deletion execution/executor-benchmark/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
{
let _timer = TIMER.with_label_values(&["execute"]).start_timer();
self.executor
.execute_and_state_checkpoint(
.execute_and_update_state(
executable_block,
self.parent_block_id,
BENCHMARKS_BLOCK_EXECUTOR_ONCHAIN_CONFIG,
Expand Down
31 changes: 20 additions & 11 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::state_store::{
state_delta::StateDelta, state_view::cached_state_view::StateCache,
state::LedgerState, state_view::cached_state_view::ShardedStateCache,
};
use aptos_types::{
contract_event::ContractEvent,
Expand All @@ -36,14 +36,18 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep,
to_discard: TransactionsWithOutput,
to_retry: TransactionsWithOutput,
state_cache: StateCache,
result_state: LedgerState,
state_reads: ShardedStateCache,
block_end_info: Option<BlockEndInfo>,
next_epoch_state: Option<EpochState>,
subscribable_events: Planned<Vec<ContractEvent>>,
) -> Self {
let next_version = first_version + to_commit.len() as Version;
assert_eq!(next_version, result_state.latest().next_version());
if is_block {
// If it's a block, ensure it ends with state checkpoint.
assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint());
assert!(result_state.is_checkpoint());
} else {
// If it's not, there shouldn't be any transaction to be discarded or retried.
assert!(to_discard.is_empty() && to_retry.is_empty());
Expand All @@ -56,22 +60,24 @@ impl ExecutionOutput {
to_commit,
to_discard,
to_retry,
state_cache,
result_state,
state_reads,
block_end_info,
next_epoch_state,
subscribable_events,
})
}

pub fn new_empty(state: Arc<StateDelta>) -> Self {
pub fn new_empty(state: LedgerState) -> Self {
Self::new_impl(Inner {
is_block: false,
first_version: state.next_version(),
statuses_for_input_txns: vec![],
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_empty(state.current.clone()),
state_reads: ShardedStateCache::new_empty(state.version()),
result_state: state,
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
Expand All @@ -88,7 +94,8 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep::new_dummy_success(txns),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_dummy(),
result_state: LedgerState::new_empty(),
state_reads: ShardedStateCache::new_empty(None),
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
Expand All @@ -107,7 +114,8 @@ impl ExecutionOutput {
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_dummy(),
result_state: self.result_state.clone(),
state_reads: ShardedStateCache::new_empty(self.next_version().checked_sub(1)),
block_end_info: None,
next_epoch_state: self.next_epoch_state.clone(),
subscribable_events: Planned::ready(vec![]),
Expand Down Expand Up @@ -146,10 +154,11 @@ pub struct Inner {
pub to_discard: TransactionsWithOutput,
pub to_retry: TransactionsWithOutput,

/// Carries the frozen base state view, so all in-mem nodes involved won't drop before the
/// execution result is processed; as well as all the accounts touched during execution, together
/// with their proofs.
pub state_cache: StateCache,
pub result_state: LedgerState,
/// State items read during execution, useful for calculating the state storge usage and
/// indices used by the db pruner.
pub state_reads: ShardedStateCache,

/// Optional StateCheckpoint payload
pub block_end_info: Option<BlockEndInfo>,
/// Optional EpochState payload.
Expand Down
31 changes: 4 additions & 27 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

use anyhow::Result;
use aptos_crypto::HashValue;
use aptos_scratchpad::{ProofRead, SparseMerkleTree};
use aptos_scratchpad::SparseMerkleTree;
use aptos_types::{
account_config::{NEW_EPOCH_EVENT_MOVE_TYPE_TAG, NEW_EPOCH_EVENT_V2_MOVE_TYPE_TAG},
block_executor::{config::BlockExecutorConfigFromOnchain, partitioner::ExecutableBlock},
contract_event::ContractEvent,
dkg::DKG_START_EVENT_MOVE_TYPE_TAG,
jwks::OBSERVED_JWK_UPDATED_MOVE_TYPE_TAG,
ledger_info::LedgerInfoWithSignatures,
proof::SparseMerkleProofExt,
state_store::{state_key::StateKey, state_value::StateValue},
transaction::{
Transaction, TransactionInfo, TransactionListWithProof, TransactionOutputListWithProof,
Expand All @@ -25,7 +24,7 @@ pub use error::{ExecutorError, ExecutorResult};
pub use ledger_update_output::LedgerUpdateOutput;
use state_compute_result::StateComputeResult;
use std::{
collections::{BTreeSet, HashMap},
collections::BTreeSet,
ops::Deref,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -135,12 +134,12 @@ pub trait BlockExecutorTrait: Send + Sync {
onchain_config: BlockExecutorConfigFromOnchain,
) -> ExecutorResult<StateComputeResult> {
let block_id = block.block_id;
self.execute_and_state_checkpoint(block, parent_block_id, onchain_config)?;
self.execute_and_update_state(block, parent_block_id, onchain_config)?;
self.ledger_update(block_id, parent_block_id)
}

/// Executes a block and returns the state checkpoint output.
fn execute_and_state_checkpoint(
fn execute_and_update_state(
&self,
block: ExecutableBlock,
parent_block_id: HashValue,
Expand Down Expand Up @@ -266,28 +265,6 @@ pub struct ChunkCommitNotification {
pub reconfiguration_occurred: bool,
}

pub struct ProofReader<'a> {
proofs: Option<&'a HashMap<HashValue, SparseMerkleProofExt>>,
}

impl<'a> ProofReader<'a> {
pub fn new(proofs: &'a HashMap<HashValue, SparseMerkleProofExt>) -> Self {
Self {
proofs: Some(proofs),
}
}

pub fn new_empty() -> Self {
Self { proofs: None }
}
}

impl<'a> ProofRead for ProofReader<'a> {
fn get_proof(&self, key: HashValue) -> Option<&SparseMerkleProofExt> {
self.proofs.and_then(|proofs| proofs.get(&key))
}
}

/// Used in both state sync and consensus to filter the txn events that should be subscribable by node components.
pub fn should_forward_to_subscription_service(event: &ContractEvent) -> bool {
let type_tag = event.type_tag();
Expand Down
Loading
Loading