From 5d8e1acbf30f4a4eba485631922f794975ecd76f Mon Sep 17 00:00:00 2001 From: aldenhu Date: Tue, 3 Dec 2024 00:36:29 +0000 Subject: [PATCH] LatestState, LatestStateSummary --- Cargo.lock | 1 + .../executor-types/src/execution_output.rs | 28 ++--- .../src/state_checkpoint_output.rs | 20 ++-- execution/executor/src/block_executor/mod.rs | 5 +- .../src/chunk_executor/chunk_commit_queue.rs | 12 +-- execution/executor/src/chunk_executor/mod.rs | 10 +- .../src/chunk_executor/transaction_chunk.rs | 15 ++- execution/executor/src/db_bootstrapper/mod.rs | 3 +- execution/executor/src/lib.rs | 4 + execution/executor/src/tests/mod.rs | 9 ++ .../src/types/partial_state_compute_result.rs | 6 +- .../src/workflow/do_get_execution_output.rs | 90 +++++++++------- .../src/workflow/do_state_checkpoint.rs | 4 +- execution/executor/src/workflow/mod.rs | 2 +- storage/aptosdb/src/backup/restore_utils.rs | 102 +++++++++--------- .../aptosdb/src/db/include/aptosdb_writer.rs | 19 ++-- .../aptosdb/src/state_store/buffered_state.rs | 94 +++++++--------- .../aptosdb/src/state_store/current_state.rs | 14 +-- storage/aptosdb/src/state_store/mod.rs | 39 +++---- storage/storage-interface/Cargo.toml | 1 + .../storage-interface/src/chunk_to_commit.rs | 9 +- .../storage-interface/src/ledger_summary.rs | 24 +++-- .../src/state_store/state.rs | 43 ++++++++ .../src/state_store/state_summary.rs | 50 ++++++++- .../state_view/cached_state_view.rs | 2 +- 25 files changed, 346 insertions(+), 260 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf366fdd9ba27f..a9ab477111f8c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3923,6 +3923,7 @@ dependencies = [ "bcs 0.1.4", "crossbeam-channel", "dashmap", + "derive_more", "once_cell", "parking_lot 0.12.1", "proptest", diff --git a/execution/executor-types/src/execution_output.rs b/execution/executor-types/src/execution_output.rs index fbfba3c3c521de..1f7c2abc7c88be 100644 --- a/execution/executor-types/src/execution_output.rs +++ b/execution/executor-types/src/execution_output.rs @@ -10,7 +10,7 @@ use crate::{ }; use aptos_drop_helper::DropHelper; use aptos_storage_interface::state_store::{ - state::State, state_view::cached_state_view::ShardedStateCache, + state::LatestState, state_view::cached_state_view::ShardedStateCache, }; use aptos_types::{ contract_event::ContractEvent, @@ -36,23 +36,18 @@ impl ExecutionOutput { to_commit: TransactionsToKeep, to_discard: TransactionsWithOutput, to_retry: TransactionsWithOutput, - last_checkpoint_state: Option, - result_state: State, + result_state: LatestState, state_reads: ShardedStateCache, block_end_info: Option, next_epoch_state: Option, subscribable_events: Planned>, ) -> Self { let next_version = first_version + to_commit.len() as Version; - assert_eq!(next_version, result_state.next_version()); + assert_eq!(next_version, result_state.state().next_version()); if is_block { // If it's a block, ensure it ends with state checkpoint. assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint()); - assert!(last_checkpoint_state.is_some()); - assert!(last_checkpoint_state - .as_ref() - .unwrap() - .is_the_same(&result_state)); + assert!(result_state.is_checkpoint()); } else { // If it's not, there shouldn't be any transaction to be discarded or retried. assert!(to_discard.is_empty() && to_retry.is_empty()); @@ -65,7 +60,6 @@ impl ExecutionOutput { to_commit, to_discard, to_retry, - last_checkpoint_state, result_state, state_reads, block_end_info, @@ -74,16 +68,15 @@ impl ExecutionOutput { }) } - pub fn new_empty(parent_state: State) -> Self { + pub fn new_empty(state: LatestState) -> Self { Self::new_impl(Inner { is_block: false, - first_version: parent_state.next_version(), + first_version: state.next_version(), statuses_for_input_txns: vec![], to_commit: TransactionsToKeep::new_empty(), to_discard: TransactionsWithOutput::new_empty(), to_retry: TransactionsWithOutput::new_empty(), - last_checkpoint_state: None, - result_state: parent_state, + result_state: state, state_reads: ShardedStateCache::default(), block_end_info: None, next_epoch_state: None, @@ -101,8 +94,7 @@ impl ExecutionOutput { to_commit: TransactionsToKeep::new_dummy_success(txns), to_discard: TransactionsWithOutput::new_empty(), to_retry: TransactionsWithOutput::new_empty(), - last_checkpoint_state: None, - result_state: State::new_empty(), + result_state: LatestState::new_empty(), state_reads: ShardedStateCache::default(), block_end_info: None, next_epoch_state: None, @@ -122,7 +114,6 @@ impl ExecutionOutput { to_commit: TransactionsToKeep::new_empty(), to_discard: TransactionsWithOutput::new_empty(), to_retry: TransactionsWithOutput::new_empty(), - last_checkpoint_state: None, result_state: self.result_state.clone(), state_reads: ShardedStateCache::default(), block_end_info: None, @@ -163,8 +154,7 @@ pub struct Inner { pub to_discard: TransactionsWithOutput, pub to_retry: TransactionsWithOutput, - pub last_checkpoint_state: Option, - pub result_state: State, + pub result_state: LatestState, /// State items read during execution, useful for calculating the state storge usage and /// indices used by the db pruner. pub state_reads: ShardedStateCache, diff --git a/execution/executor-types/src/state_checkpoint_output.rs b/execution/executor-types/src/state_checkpoint_output.rs index 0803405d16c364..209c7215b1079f 100644 --- a/execution/executor-types/src/state_checkpoint_output.rs +++ b/execution/executor-types/src/state_checkpoint_output.rs @@ -5,7 +5,7 @@ use aptos_crypto::HashValue; use aptos_drop_helper::DropHelper; -use aptos_storage_interface::state_store::state_summary::StateSummary; +use aptos_storage_interface::state_store::state_summary::LatestStateSummary; use derive_more::Deref; use std::sync::Arc; @@ -17,27 +17,24 @@ pub struct StateCheckpointOutput { impl StateCheckpointOutput { pub fn new( - last_state_checkpoint_summary: Option, - result_state_summary: StateSummary, + state_summary: LatestStateSummary, state_checkpoint_hashes: Vec>, ) -> Self { Self::new_impl(Inner { - last_state_checkpoint_summary, - result_state_summary, + state_summary, state_checkpoint_hashes, }) } - pub fn new_empty(parent_state_summary: StateSummary) -> Self { + pub fn new_empty(parent_state_summary: LatestStateSummary) -> Self { Self::new_impl(Inner { - last_state_checkpoint_summary: None, - result_state_summary: parent_state_summary, + state_summary: parent_state_summary, state_checkpoint_hashes: vec![], }) } pub fn new_dummy() -> Self { - Self::new_empty(StateSummary::new_empty()) + Self::new_empty(LatestStateSummary::new_empty()) } fn new_impl(inner: Inner) -> Self { @@ -47,13 +44,12 @@ impl StateCheckpointOutput { } pub fn reconfig_suffix(&self) -> Self { - Self::new_empty(self.result_state_summary.clone()) + Self::new_empty(self.state_summary.clone()) } } #[derive(Debug)] pub struct Inner { - pub last_state_checkpoint_summary: Option, - pub result_state_summary: StateSummary, + pub state_summary: LatestStateSummary, pub state_checkpoint_hashes: Vec>, } diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index 90e2e2518487de..c2c1cccab0d6ec 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -215,7 +215,7 @@ where CachedStateView::new( StateViewId::BlockExecution { block_id }, Arc::clone(&self.db.reader), - parent_output.execution_output.result_state.clone(), + parent_output.execution_output.result_state.state().clone(), )? }; @@ -229,6 +229,7 @@ where DoGetExecutionOutput::by_transaction_execution( &self.block_executor, transactions, + &parent_output.execution_output.result_state, state_view, onchain_config.clone(), TransactionSliceMetadata::block(parent_block_id, block_id), @@ -293,7 +294,7 @@ where }); output.set_state_checkpoint_output(DoStateCheckpoint::run( &output.execution_output, - parent_block.output.expect_result_state_summary().clone(), + parent_block.output.expect_result_state_summary(), Option::>::None, )?); output.set_ledger_update_output(DoLedgerUpdate::run( diff --git a/execution/executor/src/chunk_executor/chunk_commit_queue.rs b/execution/executor/src/chunk_executor/chunk_commit_queue.rs index d1ea2fc2515357..9d4ba45468ca03 100644 --- a/execution/executor/src/chunk_executor/chunk_commit_queue.rs +++ b/execution/executor/src/chunk_executor/chunk_commit_queue.rs @@ -14,7 +14,7 @@ use crate::{ use anyhow::{anyhow, ensure, Result}; use aptos_metrics_core::TimerHelper; use aptos_storage_interface::{ - state_store::{state::State, state_summary::StateSummary}, + state_store::{state::LatestState, state_summary::LatestStateSummary}, DbReader, LedgerSummary, }; use aptos_types::{proof::accumulator::InMemoryTransactionAccumulator, transaction::Version}; @@ -39,8 +39,8 @@ pub(crate) struct ChunkToUpdateLedger { /// pub struct ChunkCommitQueue { /// Notice that latest_state and latest_txn_accumulator are at different versions. - latest_state: State, - latest_state_summary: StateSummary, + latest_state: LatestState, + latest_state_summary: LatestStateSummary, latest_txn_accumulator: Arc, to_commit: VecDeque>, to_update_ledger: VecDeque>, @@ -63,7 +63,7 @@ impl ChunkCommitQueue { }) } - pub(crate) fn latest_state(&self) -> &State { + pub(crate) fn latest_state(&self) -> &LatestState { &self.latest_state } @@ -91,7 +91,7 @@ impl ChunkCommitQueue { pub(crate) fn next_chunk_to_update_ledger( &mut self, ) -> Result<( - StateSummary, + LatestStateSummary, Arc, ChunkToUpdateLedger, )> { @@ -125,7 +125,7 @@ impl ChunkCommitQueue { self.latest_state_summary = chunk .output .expect_state_checkpoint_output() - .result_state_summary + .state_summary .clone(); self.latest_txn_accumulator = chunk .output diff --git a/execution/executor/src/chunk_executor/mod.rs b/execution/executor/src/chunk_executor/mod.rs index 0d6aa943f40a76..4808fe72f2f3b6 100644 --- a/execution/executor/src/chunk_executor/mod.rs +++ b/execution/executor/src/chunk_executor/mod.rs @@ -301,8 +301,8 @@ impl ChunkExecutorInner { let num_txns = chunk.len(); - let state_view = self.state_view(&parent_state)?; - let execution_output = chunk.into_output::(state_view)?; + let state_view = self.state_view(parent_state.state())?; + let execution_output = chunk.into_output::(&parent_state, state_view)?; let output = PartialStateComputeResult::new(execution_output); // Enqueue for next stage. @@ -336,7 +336,7 @@ impl ChunkExecutorInner { output.set_state_checkpoint_output(DoStateCheckpoint::run( &output.execution_output, - parent_state_summary, + &parent_state_summary, Some( chunk_verifier .transaction_infos() @@ -580,7 +580,8 @@ impl ChunkExecutorInner { verify_execution_mode: &VerifyExecutionMode, ) -> Result { // Execute transactions. - let state_view = self.state_view(self.commit_queue.lock().latest_state())?; + let parent_state = self.commit_queue.lock().latest_state().clone(); + let state_view = self.state_view(parent_state.state())?; let txns = transactions .iter() .take((end_version - begin_version) as usize) @@ -592,6 +593,7 @@ impl ChunkExecutorInner { let execution_output = DoGetExecutionOutput::by_transaction_execution::( &V::new(), txns.into(), + &parent_state, state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), TransactionSliceMetadata::chunk(begin_version, end_version), diff --git a/execution/executor/src/chunk_executor/transaction_chunk.rs b/execution/executor/src/chunk_executor/transaction_chunk.rs index c1958bfb666474..6396fbec32bbcb 100644 --- a/execution/executor/src/chunk_executor/transaction_chunk.rs +++ b/execution/executor/src/chunk_executor/transaction_chunk.rs @@ -9,7 +9,9 @@ use anyhow::Result; use aptos_executor_types::execution_output::ExecutionOutput; use aptos_experimental_runtimes::thread_manager::optimal_min_len; use aptos_metrics_core::TimerHelper; -use aptos_storage_interface::state_store::state_view::cached_state_view::CachedStateView; +use aptos_storage_interface::state_store::{ + state::LatestState, state_view::cached_state_view::CachedStateView, +}; use aptos_types::{ block_executor::{ config::BlockExecutorConfigFromOnchain, @@ -43,6 +45,7 @@ pub trait TransactionChunk { fn into_output( self, + parent_state: &LatestState, state_view: CachedStateView, ) -> Result; } @@ -63,6 +66,7 @@ impl TransactionChunk for ChunkToExecute { fn into_output( self, + parent_state: &LatestState, state_view: CachedStateView, ) -> Result { let ChunkToExecute { @@ -89,6 +93,7 @@ impl TransactionChunk for ChunkToExecute { DoGetExecutionOutput::by_transaction_execution::( &V::new(), sig_verified_txns.into(), + parent_state, state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), TransactionSliceMetadata::unknown(), @@ -113,6 +118,7 @@ impl TransactionChunk for ChunkToApply { fn into_output( self, + parent_state: &LatestState, state_view: CachedStateView, ) -> Result { let Self { @@ -121,6 +127,11 @@ impl TransactionChunk for ChunkToApply { first_version: _, } = self; - DoGetExecutionOutput::by_transaction_output(transactions, transaction_outputs, state_view) + DoGetExecutionOutput::by_transaction_output( + transactions, + transaction_outputs, + parent_state, + state_view, + ) } } diff --git a/execution/executor/src/db_bootstrapper/mod.rs b/execution/executor/src/db_bootstrapper/mod.rs index 19d7782b14cf7f..dae4164bfd5025 100644 --- a/execution/executor/src/db_bootstrapper/mod.rs +++ b/execution/executor/src/db_bootstrapper/mod.rs @@ -139,6 +139,7 @@ pub fn calculate_genesis( let execution_output = DoGetExecutionOutput::by_transaction_execution::( &V::new(), vec![genesis_txn.clone().into()].into(), + &ledger_summary.state, base_state_view, BlockExecutorConfigFromOnchain::new_no_block_limit(), TransactionSliceMetadata::unknown(), @@ -160,7 +161,7 @@ pub fn calculate_genesis( let state_view = CachedStateView::new( StateViewId::Miscellaneous, Arc::clone(&db.reader), - output.execution_output.result_state.clone(), + output.execution_output.result_state.state().clone(), )?; let next_epoch = epoch .checked_add(1) diff --git a/execution/executor/src/lib.rs b/execution/executor/src/lib.rs index e03c9443c573a5..15d84c0464b648 100644 --- a/execution/executor/src/lib.rs +++ b/execution/executor/src/lib.rs @@ -3,6 +3,10 @@ // SPDX-License-Identifier: Apache-2.0 #![forbid(unsafe_code)] +// FIXME(aldenhu) +#![allow(dead_code)] +#![allow(unused_imports)] +#![allow(unused_variables)] #[cfg(any(test, feature = "fuzzing"))] pub mod fuzzing; diff --git a/execution/executor/src/tests/mod.rs b/execution/executor/src/tests/mod.rs index 8c331115f2749a..5a5b5f9d690d34 100644 --- a/execution/executor/src/tests/mod.rs +++ b/execution/executor/src/tests/mod.rs @@ -457,6 +457,7 @@ fn apply_transaction_by_writeset( db: &DbReaderWriter, transactions_and_writesets: Vec<(Transaction, WriteSet)>, ) { + /* let ledger_summary: LedgerSummary = db.reader.get_pre_committed_ledger_summary().unwrap(); let (txns, txn_outs) = transactions_and_writesets @@ -505,6 +506,10 @@ fn apply_transaction_by_writeset( true, /* sync_commit */ ) .unwrap(); + FIXME(aldenhu) + */ + + todo!() } #[test] @@ -677,6 +682,7 @@ fn run_transactions_naive( transactions: Vec, block_executor_onchain_config: BlockExecutorConfigFromOnchain, ) -> HashValue { + /* let executor = TestExecutor::new(); let db = &executor.db; @@ -710,6 +716,9 @@ fn run_transactions_naive( .unwrap() .transaction_accumulator .root_hash() + FIXME(aldenhu) + */ + todo!() } proptest! { diff --git a/execution/executor/src/types/partial_state_compute_result.rs b/execution/executor/src/types/partial_state_compute_result.rs index 7b8f4f56742f08..e25d732db4768c 100644 --- a/execution/executor/src/types/partial_state_compute_result.rs +++ b/execution/executor/src/types/partial_state_compute_result.rs @@ -8,7 +8,7 @@ use aptos_executor_types::{ state_compute_result::StateComputeResult, LedgerUpdateOutput, }; use aptos_storage_interface::{ - state_store::{state::State, state_summary::StateSummary}, + state_store::{state::LatestState, state_summary::LatestStateSummary}, LedgerSummary, }; use once_cell::sync::OnceCell; @@ -67,11 +67,11 @@ impl PartialStateComputeResult { .expect("StateCheckpointOutput not set") } - pub fn result_state(&self) -> &State { + pub fn result_state(&self) -> &LatestState { &self.execution_output.result_state } - pub fn expect_result_state_summary(&self) -> &StateSummary { + pub fn expect_result_state_summary(&self) -> &LatestStateSummary { // FIXME(aldenhu): // &self.expect_state_checkpoint_output().result_state_summary todo!() diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 896d4238f544a6..599661799ae643 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -24,7 +24,7 @@ use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_logger::prelude::*; use aptos_metrics_core::TimerHelper; use aptos_storage_interface::state_store::{ - state::State, + state::LatestState, state_view::cached_state_view::{CachedStateView, ShardedStateCache}, }; #[cfg(feature = "consensus-only-perf-test")] @@ -58,6 +58,7 @@ impl DoGetExecutionOutput { pub fn by_transaction_execution( executor: &V, transactions: ExecutableTransactions, + parent_state: &LatestState, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, transaction_slice_metadata: TransactionSliceMetadata, @@ -67,6 +68,7 @@ impl DoGetExecutionOutput { Self::by_transaction_execution_unsharded::( executor, txns, + parent_state, state_view, onchain_config, transaction_slice_metadata, @@ -74,6 +76,7 @@ impl DoGetExecutionOutput { }, ExecutableTransactions::Sharded(txns) => Self::by_transaction_execution_sharded::( txns, + parent_state, state_view, onchain_config, transaction_slice_metadata.append_state_checkpoint_to_block(), @@ -98,6 +101,7 @@ impl DoGetExecutionOutput { fn by_transaction_execution_unsharded( executor: &V, transactions: Vec, + parent_state: &LatestState, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, transaction_slice_metadata: TransactionSliceMetadata, @@ -122,6 +126,7 @@ impl DoGetExecutionOutput { .map(|t| t.into_inner()) .collect(), transaction_outputs, + parent_state, state_view, block_end_info, append_state_checkpoint_to_block, @@ -130,6 +135,7 @@ impl DoGetExecutionOutput { pub fn by_transaction_execution_sharded( transactions: PartitionedTransactions, + parent_state: &LatestState, state_view: CachedStateView, onchain_config: BlockExecutorConfigFromOnchain, append_state_checkpoint_to_block: Option, @@ -153,6 +159,7 @@ impl DoGetExecutionOutput { .map(|t| t.into_txn().into_inner()) .collect(), transaction_outputs, + parent_state, state_view, None, // block end info append_state_checkpoint_to_block, @@ -162,6 +169,7 @@ impl DoGetExecutionOutput { pub fn by_transaction_output( transactions: Vec, transaction_outputs: Vec, + parent_state: &LatestState, state_view: CachedStateView, ) -> Result { // collect all accounts touched and dedup @@ -177,6 +185,7 @@ impl DoGetExecutionOutput { state_view.next_version(), transactions, transaction_outputs, + parent_state, state_view, None, // block end info None, // append state checkpoint to block @@ -288,6 +297,7 @@ impl Parser { first_version: Version, mut transactions: Vec, mut transaction_outputs: Vec, + parent_state: &LatestState, base_state_view: CachedStateView, block_end_info: Option, append_state_checkpoint_to_block: Option, @@ -334,9 +344,8 @@ impl Parser { .transpose()? }; - let (base_state, state_reads) = base_state_view.finish(); - let (last_checkpoint_state, result_state) = - Self::update_state(&to_commit, &base_state, &state_reads); + let state_reads = base_state_view.finish(); + let result_state = Self::update_state(&to_commit, parent_state, &state_reads); let out = ExecutionOutput::new( is_block, @@ -345,7 +354,6 @@ impl Parser { to_commit, to_discard, to_retry, - last_checkpoint_state, result_state, state_reads, block_end_info, @@ -486,9 +494,9 @@ impl Parser { fn update_state( _to_commit: &TransactionsToKeep, - _base_state: &State, + _base_state: &LatestState, _state_cache: &ShardedStateCache, - ) -> (Option, State) { + ) -> LatestState { // FIXME(aldenhu): todo!() } @@ -530,37 +538,41 @@ mod tests { #[test] fn should_filter_subscribable_events() { - let event_0 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_1".to_vec()); - let event_1 = ContractEvent::new_v2_with_type_tag_str( - "0x2345::random_module::RandomEvent", - b"random_x".to_vec(), - ); - let event_2 = - ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_2".to_vec()); - - let txns = vec![Transaction::dummy(), Transaction::dummy()]; - let txn_outs = vec![ - TransactionOutput::new( - WriteSet::default(), - vec![event_0.clone()], - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - ), - TransactionOutput::new( - WriteSet::default(), - vec![event_1.clone(), event_2.clone()], - 0, - TransactionStatus::Keep(ExecutionStatus::Success), - TransactionAuxiliaryData::default(), - ), - ]; - let execution_output = - Parser::parse(0, txns, txn_outs, CachedStateView::new_dummy(), None, None).unwrap(); - assert_eq!( - vec![event_0, event_2], - *execution_output.subscribable_events - ); + /* + let event_0 = + ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_1".to_vec()); + let event_1 = ContractEvent::new_v2_with_type_tag_str( + "0x2345::random_module::RandomEvent", + b"random_x".to_vec(), + ); + let event_2 = + ContractEvent::new_v2_with_type_tag_str("0x1::dkg::DKGStartEvent", b"dkg_2".to_vec()); + + let txns = vec![Transaction::dummy(), Transaction::dummy()]; + let txn_outs = vec![ + TransactionOutput::new( + WriteSet::default(), + vec![event_0.clone()], + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + ), + TransactionOutput::new( + WriteSet::default(), + vec![event_1.clone(), event_2.clone()], + 0, + TransactionStatus::Keep(ExecutionStatus::Success), + TransactionAuxiliaryData::default(), + ), + ]; + let execution_output = + Parser::parse(0, txns, txn_outs, CachedStateView::new_dummy(), None, None).unwrap(); + assert_eq!( + vec![event_0, event_2], + *execution_output.subscribable_events + ); + FIXME(aldenhu) + */ + todo!() } } diff --git a/execution/executor/src/workflow/do_state_checkpoint.rs b/execution/executor/src/workflow/do_state_checkpoint.rs index e2b91b9e599ac5..69af323075a49a 100644 --- a/execution/executor/src/workflow/do_state_checkpoint.rs +++ b/execution/executor/src/workflow/do_state_checkpoint.rs @@ -8,14 +8,14 @@ use aptos_executor_types::{ execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, }; use aptos_metrics_core::TimerHelper; -use aptos_storage_interface::state_store::state_summary::StateSummary; +use aptos_storage_interface::state_store::state_summary::LatestStateSummary; pub struct DoStateCheckpoint; impl DoStateCheckpoint { pub fn run( _execution_output: &ExecutionOutput, - _parent_state_summary: StateSummary, + _parent_state_summary: &LatestStateSummary, _known_state_checkpoints: Option>>, ) -> Result { let _timer = OTHER_TIMERS.timer_with(&["do_state_checkpoint"]); diff --git a/execution/executor/src/workflow/mod.rs b/execution/executor/src/workflow/mod.rs index 322359c17ededd..a09cd51375d833 100644 --- a/execution/executor/src/workflow/mod.rs +++ b/execution/executor/src/workflow/mod.rs @@ -24,7 +24,7 @@ impl ApplyExecutionOutput { ) -> Result { let state_checkpoint_output = DoStateCheckpoint::run( &execution_output, - base_view.state_summary, + &base_view.state_summary, Option::>::None, // known_state_checkpoint_hashes )?; let ledger_update_output = DoLedgerUpdate::run( diff --git a/storage/aptosdb/src/backup/restore_utils.rs b/storage/aptosdb/src/backup/restore_utils.rs index 4abb1e3050e43f..2f15b9625c0253 100644 --- a/storage/aptosdb/src/backup/restore_utils.rs +++ b/storage/aptosdb/src/backup/restore_utils.rs @@ -131,60 +131,60 @@ pub(crate) fn save_transactions( ) -> Result<()> { todo!() /* - if let Some((ledger_db_batch, state_kv_batches, _state_kv_metadata_batch)) = existing_batch { - save_transactions_impl( - state_store, - ledger_db, - first_version, - txns, - txn_infos, - events, - write_sets.as_ref(), - ledger_db_batch, - state_kv_batches, - kv_replay, - )?; - } else { - let mut ledger_db_batch = LedgerDbSchemaBatches::new(); - let mut sharded_kv_schema_batch = new_sharded_kv_schema_batch(); - let state_kv_metadata_batch = SchemaBatch::new(); - save_transactions_impl( - Arc::clone(&state_store), - Arc::clone(&ledger_db), - first_version, - txns, - txn_infos, - events, - write_sets.as_ref(), - &mut ledger_db_batch, - &mut sharded_kv_schema_batch, - kv_replay, - )?; - // get the last version and commit to the state kv db - // commit the state kv before ledger in case of failure happens - let last_version = first_version + txns.len() as u64 - 1; - state_store.state_db.state_kv_db.commit( - last_version, - state_kv_metadata_batch, - sharded_kv_schema_batch, - )?; + if let Some((ledger_db_batch, state_kv_batches, _state_kv_metadata_batch)) = existing_batch { + save_transactions_impl( + state_store, + ledger_db, + first_version, + txns, + txn_infos, + events, + write_sets.as_ref(), + ledger_db_batch, + state_kv_batches, + kv_replay, + )?; + } else { + let mut ledger_db_batch = LedgerDbSchemaBatches::new(); + let mut sharded_kv_schema_batch = new_sharded_kv_schema_batch(); + let state_kv_metadata_batch = SchemaBatch::new(); + save_transactions_impl( + Arc::clone(&state_store), + Arc::clone(&ledger_db), + first_version, + txns, + txn_infos, + events, + write_sets.as_ref(), + &mut ledger_db_batch, + &mut sharded_kv_schema_batch, + kv_replay, + )?; + // get the last version and commit to the state kv db + // commit the state kv before ledger in case of failure happens + let last_version = first_version + txns.len() as u64 - 1; + state_store.state_db.state_kv_db.commit( + last_version, + state_kv_metadata_batch, + sharded_kv_schema_batch, + )?; - ledger_db.write_schemas(ledger_db_batch)?; + ledger_db.write_schemas(ledger_db_batch)?; -<<<<<<< HEAD - state_store - .current_state() - .set(StateDelta::new_empty_with_version(Some(last_version))); -======= - *state_store.current_state().store( - // FIXME(aldenhu) - StateDelta::new_empty_with_version(Some(last_version)) - ); ->>>>>>> 4f5660e3ec (DB: kv write path) - } + <<<<<<< HEAD + state_store + .current_state() + .set(StateDelta::new_empty_with_version(Some(last_version))); + ======= + *state_store.current_state().store( + // FIXME(aldenhu) + StateDelta::new_empty_with_version(Some(last_version)) + ); + >>>>>>> 4f5660e3ec (DB: kv write path) + } - Ok(()) - */ + Ok(()) + */ } /// A helper function that saves the ledger infos to the given change set diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 806be42d1b0b26..717cd9324d13ef 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -21,7 +21,7 @@ impl DbWriter for AptosDB { .expect("Concurrent committing detected."); let _timer = OTHER_TIMERS_SECONDS.timer_with(&["pre_commit_ledger"]); - chunk.state_summary.global_state_summary.log_generation("db_save"); + chunk.state_summary.state_summary().global_state_summary.log_generation("db_save"); self.pre_commit_validation(&chunk)?; let _new_root_hash = self.calculate_and_commit_ledger_and_state_kv( @@ -34,8 +34,7 @@ impl DbWriter for AptosDB { // n.b make sure buffered_state.update() is called after all other commits are done, since // internally it updates state_store.current_state which indicates the "pre-committed version" self.state_store.buffered_state().lock().update( - chunk.last_state_checkpoint, - chunk.state, + chunk.state_summary, sync_commit || chunk.is_reconfig, )?; @@ -222,13 +221,14 @@ impl AptosDB { ensure!(!chunk.is_empty(), "chunk is empty, nothing to save."); - let current_state = self.state_store.current_state().current.clone(); + let next_version = self.state_store.current_state().next_version(); // Ensure the incoming committing requests are always consecutive and the version in // buffered state is consistent with that in db. - ensure!(chunk.first_version == current_state.next_version(), + ensure!( + chunk.first_version == next_version, "The first version passed in ({}), and the next version expected by db ({}) are inconsistent.", chunk.first_version, - current_state.next_version(), + next_version, ); Ok(()) @@ -305,7 +305,6 @@ impl AptosDB { let state_kv_metadata_batch = SchemaBatch::new(); self.state_store.put_value_sets( - chunk.last_state_checkpoint, chunk.state, chunk.state_update_refs, chunk.state_reads, @@ -313,10 +312,6 @@ impl AptosDB { &sharded_state_kv_batches, // TODO(grao): remove after APIs migrated off the DB to the indexer. self.state_store.state_kv_db.enabled_sharding(), - // FIXME(aldenhu): remove or implement differently - chunk.transaction_infos - .iter() - .rposition(|t| t.state_checkpoint_hash().is_some()), )?; // Write block index if event index is skipped. @@ -509,7 +504,7 @@ impl AptosDB { version_to_commit: Version, ) -> Result> { let old_committed_ver = self.ledger_db.metadata_db().get_synced_version()?; - let pre_committed_ver = self.state_store.current_state().current.version(); + let pre_committed_ver = self.state_store.current_state().version(); ensure!( old_committed_ver.is_none() || version_to_commit >= old_committed_ver.unwrap(), "Version too old to commit. Committed: {:?}; Trying to commit with LI: {}", diff --git a/storage/aptosdb/src/state_store/buffered_state.rs b/storage/aptosdb/src/state_store/buffered_state.rs index feb2a0dc7b0f83..056794fda8ee9a 100644 --- a/storage/aptosdb/src/state_store/buffered_state.rs +++ b/storage/aptosdb/src/state_store/buffered_state.rs @@ -22,6 +22,7 @@ use aptos_storage_interface::{ db_ensure as ensure, state_store::{ sharded_state_updates::ShardedStateUpdates, state::State, state_delta::StateDelta, + state_summary::LatestStateSummary, }, AptosDbError, Result, }; @@ -45,12 +46,7 @@ pub(crate) const TARGET_SNAPSHOT_INTERVAL_IN_VERSION: u64 = 100_000; /// state_until_checkpoint.current = state_after_checkpoint.base, same for their versions. #[derive(Debug)] pub struct BufferedState { - /// state until the latest checkpoint. The `base` is the newest persisted state. - state_until_checkpoint: Option>, - /// state after the latest checkpoint. The `current` is the latest speculative state. - /// n.b. this is an `Arc` shared with the StateStore so that merely querying the latest state - /// does not require locking the buffered state. - state_after_checkpoint: Arc>, + current_state: Arc>, state_commit_sender: SyncSender>>, target_items: usize, join_handle: Option>, @@ -163,66 +159,48 @@ impl BufferedState { } fn report_latest_committed_version(&self) { - /* - LATEST_CHECKPOINT_VERSION.set( - self.state_after_checkpoint - .lock() - .base_version - .map_or(-1, |v| v as i64), - ); - FIXME(aldenhu) - */ - todo!() + LATEST_CHECKPOINT_VERSION.set(self.current_state.lock().version().map_or(-1, |v| v as i64)); } /// This method updates the buffered state with new data. - pub fn update( - &mut self, - latest_state_checkpoint: Option<&State>, - state: &State, - sync_commit: bool, - ) -> Result<()> { - /* + pub fn update(&mut self, state_summary: &LatestStateSummary, sync_commit: bool) -> Result<()> { + /* FIXME(aldenhu) let _timer = OTHER_TIMERS_SECONDS.timer_with(&["buffered_state___update"]); - { - let _timer = OTHER_TIMERS_SECONDS.timer_with(&["update_current_state"]); - let mut state_after_checkpoint = self.state_after_checkpoint.lock(); - assert!(new_state_after_checkpoint - .current - .is_family(&state_after_checkpoint.current)); + assert!(new_state_after_checkpoint + .current + .is_family(&state_after_checkpoint.current)); + ensure!( + new_state_after_checkpoint.base_version >= state_after_checkpoint.base_version, + "new state base version smaller than state after checkpoint base version", + ); + if let Some(updates_until_next_checkpoint_since_current) = + updates_until_next_checkpoint_since_current_option + { ensure!( - new_state_after_checkpoint.base_version >= state_after_checkpoint.base_version, - "new state base version smaller than state after checkpoint base version", + new_state_after_checkpoint.base_version > state_after_checkpoint.base_version, + "Diff between base and latest checkpoints provided, while they are the same.", ); - if let Some(updates_until_next_checkpoint_since_current) = - updates_until_next_checkpoint_since_current_option - { - ensure!( - new_state_after_checkpoint.base_version > state_after_checkpoint.base_version, - "Diff between base and latest checkpoints provided, while they are the same.", - ); - state_after_checkpoint - .updates_since_base - .clone_merge(updates_until_next_checkpoint_since_current); - - let mut old_state = - state_after_checkpoint.replace_with(new_state_after_checkpoint.clone()); - old_state.current = state_after_checkpoint.base.clone(); - old_state.current_version = state_after_checkpoint.base_version; - - if let Some(ref mut delta) = self.state_until_checkpoint { - delta.merge(old_state); - } else { - self.state_until_checkpoint = Some(Box::new(old_state)); - } + state_after_checkpoint + .updates_since_base + .clone_merge(updates_until_next_checkpoint_since_current); + + let mut old_state = + state_after_checkpoint.replace_with(new_state_after_checkpoint.clone()); + old_state.current = state_after_checkpoint.base.clone(); + old_state.current_version = state_after_checkpoint.base_version; + + if let Some(ref mut delta) = self.state_until_checkpoint { + delta.merge(old_state); } else { - ensure!( - new_state_after_checkpoint.base_version == state_after_checkpoint.base_version, - "Diff between base and latest checkpoints not provided.", - ); - state_after_checkpoint.set(new_state_after_checkpoint.clone()); + self.state_until_checkpoint = Some(Box::new(old_state)); } + } else { + ensure!( + new_state_after_checkpoint.base_version == state_after_checkpoint.base_version, + "Diff between base and latest checkpoints not provided.", + ); + state_after_checkpoint.set(new_state_after_checkpoint.clone()); } // n.b. make sure these are called after self.state_after_checkpoint is unlocked. @@ -231,7 +209,7 @@ impl BufferedState { self.maybe_commit(sync_commit); self.report_latest_committed_version(); Ok(()) - FIXME(aldenhu) + */ todo!() } diff --git a/storage/aptosdb/src/state_store/current_state.rs b/storage/aptosdb/src/state_store/current_state.rs index 6b251c9dc4f2f5..b5f902ecaa8ce9 100644 --- a/storage/aptosdb/src/state_store/current_state.rs +++ b/storage/aptosdb/src/state_store/current_state.rs @@ -1,28 +1,28 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use aptos_storage_interface::state_store::state_delta::StateDelta; +use aptos_storage_interface::state_store::{state::LatestState, state_delta::StateDelta}; use derive_more::{Deref, DerefMut}; #[derive(Clone, Debug, Deref, DerefMut)] pub(crate) struct CurrentState { #[deref] #[deref_mut] - from_latest_checkpoint_to_current: StateDelta, + latest_state: LatestState, } impl CurrentState { pub fn new_dummy() -> Self { Self { - from_latest_checkpoint_to_current: StateDelta::new_empty(), + latest_state: LatestState::new_empty(), } } - pub fn set(&mut self, from_latest_checkpoint_to_current: StateDelta) { - self.from_latest_checkpoint_to_current = from_latest_checkpoint_to_current; + pub fn set(&mut self, latest_state: LatestState) { + self.latest_state = latest_state } - pub fn get(&self) -> &StateDelta { - &self.from_latest_checkpoint_to_current + pub fn get(&self) -> &LatestState { + &self.latest_state } } diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 8cdea243338e01..c6c4bf8d787470 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -60,7 +60,7 @@ use aptos_storage_interface::{ db_ensure as ensure, db_other_bail as bail, state_store::{ sharded_state_update_refs::{ShardedStateUpdateRefs, StateUpdateRefWithOffset}, - state::State, + state::{LatestState, State}, state_delta::StateDelta, state_update::StateValueWithVersionOpt, state_view::{ @@ -665,15 +665,10 @@ impl StateStore { &self.buffered_state } - // FIXME(aldenhu): make current_state_cloned() this pub fn current_state(&self) -> MutexGuard { self.current_state.lock() } - pub fn current_state_cloned(&self) -> State { - self.current_state().get().current.clone() - } - pub fn persisted_state(&self) -> MutexGuard { self.persisted_state.lock() } @@ -734,27 +729,23 @@ impl StateStore { /// Put the `value_state_sets` into its own CF. pub fn put_value_sets( &self, - last_checkpoint_state: Option<&State>, - state: &State, + state: &LatestState, state_update_refs: &ShardedStateUpdateRefs, sharded_state_cache: Option<&ShardedStateCache>, ledger_batch: &SchemaBatch, sharded_state_kv_batches: &ShardedStateKvSchemaBatch, enable_sharding: bool, - last_checkpoint_index: Option, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_value_sets"]); - let current_state = self.current_state().current.clone(); + let current_state = self.current_state().state().clone(); self.put_stats_and_indices( ¤t_state, - last_checkpoint_state, state, state_update_refs, sharded_state_cache, ledger_batch, sharded_state_kv_batches, - last_checkpoint_index, enable_sharding, )?; @@ -813,15 +804,13 @@ impl StateStore { pub fn put_stats_and_indices( &self, current_state: &State, - last_checkpoint_state: Option<&State>, - state: &State, + latest_state: &LatestState, state_update_refs: &ShardedStateUpdateRefs, // If not None, it must contains all keys in the value_state_sets. // TODO(grao): Restructure this function. sharded_state_cache: Option<&ShardedStateCache>, batch: &SchemaBatch, sharded_state_kv_batches: &ShardedStateKvSchemaBatch, - last_checkpoint_index: Option, enable_sharding: bool, ) -> Result<()> { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_stats_and_indices"]); @@ -832,7 +821,7 @@ impl StateStore { } else { // If no cache is provided, we load the old values of all keys inline. _state_cache = ShardedStateCache::default(); - self.prime_state_cache(current_state, state, &_state_cache); + self.prime_state_cache(current_state, latest_state, &_state_cache); &_state_cache }; @@ -842,17 +831,19 @@ impl StateStore { sharded_state_kv_batches, enable_sharding, primed_state_cache, - state.usage().is_untracked() || current_state.version().is_none(), // ignore_state_cache_miss + latest_state.usage().is_untracked() || current_state.version().is_none(), // ignore_state_cache_miss ); { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_stats_and_indices__put_usage"]); - if let Some(last_checkpoint_state) = last_checkpoint_state { - if !last_checkpoint_state.is_the_same(state) { - Self::put_usage(last_checkpoint_state, batch)?; - } + if !latest_state.is_checkpoint() + && latest_state.last_checkpoint_state().next_version() + > current_state.next_version() + { + // a state checkpoint in the middle of the chunk + Self::put_usage(latest_state.last_checkpoint_state(), batch)?; } - Self::put_usage(state, batch)?; + Self::put_usage(latest_state, batch)?; } Ok(()) @@ -861,14 +852,14 @@ impl StateStore { fn prime_state_cache( &self, current_state: &State, - state: &State, + latest_state: &State, state_cache: &ShardedStateCache, ) { if let Some(base_version) = current_state.version() { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_stats_and_indices__prime_state_cache"]); - state + latest_state .clone() .into_delta(current_state.clone()) .updates diff --git a/storage/storage-interface/Cargo.toml b/storage/storage-interface/Cargo.toml index d478b057a49913..8d89dce94091a3 100644 --- a/storage/storage-interface/Cargo.toml +++ b/storage/storage-interface/Cargo.toml @@ -28,6 +28,7 @@ arr_macro = { workspace = true } bcs = { workspace = true } crossbeam-channel = { workspace = true } dashmap = { workspace = true } +derive_more = { workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true } proptest = { workspace = true } diff --git a/storage/storage-interface/src/chunk_to_commit.rs b/storage/storage-interface/src/chunk_to_commit.rs index 610f425dedcd78..5c486f7d6385db 100644 --- a/storage/storage-interface/src/chunk_to_commit.rs +++ b/storage/storage-interface/src/chunk_to_commit.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use crate::state_store::{ - sharded_state_update_refs::ShardedStateUpdateRefs, state::State, state_summary::StateSummary, + sharded_state_update_refs::ShardedStateUpdateRefs, + state::{LatestState, State}, + state_summary::LatestStateSummary, state_view::cached_state_view::ShardedStateCache, }; use aptos_types::transaction::{Transaction, TransactionInfo, TransactionOutput, Version}; @@ -14,9 +16,8 @@ pub struct ChunkToCommit<'a> { pub transaction_outputs: &'a [TransactionOutput], pub transaction_infos: &'a [TransactionInfo], pub last_state_checkpoint: Option<&'a State>, - pub state: &'a State, - pub last_state_checkpoint_summary: Option<&'a StateSummary>, - pub state_summary: &'a StateSummary, + pub state: &'a LatestState, + pub state_summary: &'a LatestStateSummary, pub state_update_refs: &'a ShardedStateUpdateRefs<'a>, pub state_reads: Option<&'a ShardedStateCache>, pub is_reconfig: bool, diff --git a/storage/storage-interface/src/ledger_summary.rs b/storage/storage-interface/src/ledger_summary.rs index b0f3d7adad5e78..012ba8fcd79881 100644 --- a/storage/storage-interface/src/ledger_summary.rs +++ b/storage/storage-interface/src/ledger_summary.rs @@ -3,8 +3,8 @@ use crate::{ state_store::{ - state::State, - state_summary::StateSummary, + state::LatestState, + state_summary::LatestStateSummary, state_view::{async_proof_fetcher::AsyncProofFetcher, cached_state_view::CachedStateView}, }, DbReader, @@ -20,19 +20,19 @@ use std::sync::Arc; #[derive(Clone, Debug)] pub struct LedgerSummary { - pub state: State, - pub state_summary: StateSummary, + pub state: LatestState, + pub state_summary: LatestStateSummary, pub transaction_accumulator: Arc, } impl LedgerSummary { pub fn new( - state: State, - state_summary: StateSummary, + state: LatestState, + state_summary: LatestStateSummary, transaction_accumulator: Arc, ) -> Self { - assert_eq!(state.next_version(), state_summary.next_version()); - assert_eq!(state.next_version(), transaction_accumulator.num_leaves()); + state_summary.assert_versions_match(&state); + Self { state, state_summary, @@ -41,7 +41,7 @@ impl LedgerSummary { } pub fn next_version(&self) -> Version { - self.state.next_version() + self.transaction_accumulator.num_leaves() } pub fn version(&self) -> Option { @@ -71,9 +71,11 @@ impl LedgerSummary { } pub fn new_empty() -> Self { + let state = LatestState::new_empty(); + let state_summary = LatestStateSummary::new_empty(); Self::new( - State::new_empty(), - StateSummary::new_empty(), + state, + state_summary, Arc::new(InMemoryAccumulator::new_empty()), ) } diff --git a/storage/storage-interface/src/state_store/state.rs b/storage/storage-interface/src/state_store/state.rs index 6fbcd5ebfe0415..0f8be548223289 100644 --- a/storage/storage-interface/src/state_store/state.rs +++ b/storage/storage-interface/src/state_store/state.rs @@ -7,6 +7,7 @@ use aptos_types::{ state_store::{state_key::StateKey, state_storage_usage::StateStorageUsage}, transaction::Version, }; +use derive_more::Deref; use std::sync::Arc; /// Represents the blockchain state at a given version. @@ -58,4 +59,46 @@ impl State { // FIXME(aldenhu) todo!() } + + pub fn is_family(&self, _rhs: &State) -> bool { + // FIXME(aldenhu) + todo!() + } +} + +/// At a given version, the state and the last checkpoint state at or before the version. +#[derive(Clone, Debug, Deref)] +pub struct LatestState { + last_checkpoint_state: State, + #[deref] + state: State, +} + +impl LatestState { + pub fn new(last_checkpoint_state: State, state: State) -> Self { + assert!(last_checkpoint_state.next_version() <= state.next_version()); + assert!(last_checkpoint_state.is_family(&state)); + + Self { + last_checkpoint_state, + state, + } + } + + pub fn new_empty() -> Self { + let state = State::new_empty(); + Self::new(state.clone(), state) + } + + pub fn state(&self) -> &State { + &self.state + } + + pub fn last_checkpoint_state(&self) -> &State { + &self.last_checkpoint_state + } + + pub fn is_checkpoint(&self) -> bool { + self.state.is_the_same(&self.last_checkpoint_state) + } } diff --git a/storage/storage-interface/src/state_store/state_summary.rs b/storage/storage-interface/src/state_store/state_summary.rs index 9660e17ba93994..6b83507dff32b2 100644 --- a/storage/storage-interface/src/state_store/state_summary.rs +++ b/storage/storage-interface/src/state_store/state_summary.rs @@ -1,7 +1,7 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::state_store::state_delta::StateDelta; +use crate::state_store::{state::LatestState, state_delta::StateDelta}; use aptos_crypto::HashValue; use aptos_scratchpad::SparseMerkleTree; use aptos_types::{state_store::state_value::StateValue, transaction::Version}; @@ -47,4 +47,52 @@ impl StateSummary { // FIXME(aldenhu) todo!() } + + pub fn is_family(&self, _rhs: &Self) -> bool { + // FIXME(aldenhu) + todo!() + } +} + +/// At a given version, the summaries of the state and the last checkpoint state at or before the version. +#[derive(Clone, Debug)] +pub struct LatestStateSummary { + last_checkpoint_summary: StateSummary, + state_summary: StateSummary, +} + +impl LatestStateSummary { + pub fn new(last_checkpoint_summary: StateSummary, state_summary: StateSummary) -> Self { + assert!(last_checkpoint_summary.next_version() <= state_summary.next_version()); + + Self { + last_checkpoint_summary, + state_summary, + } + } + + pub fn new_empty() -> Self { + let state_summary = StateSummary::new_empty(); + Self::new(state_summary.clone(), state_summary) + } + + pub fn next_version(&self) -> Version { + self.state_summary.next_version() + } + + pub fn assert_versions_match(&self, latest_state: &LatestState) { + assert_eq!(self.next_version(), latest_state.next_version()); + assert_eq!( + self.last_checkpoint_summary.next_version(), + latest_state.last_checkpoint_state().next_version() + ); + } + + pub fn last_checkpoint_summary(&self) -> &StateSummary { + &self.last_checkpoint_summary + } + + pub fn state_summary(&self) -> &StateSummary { + &self.state_summary + } } diff --git a/storage/storage-interface/src/state_store/state_view/cached_state_view.rs b/storage/storage-interface/src/state_store/state_view/cached_state_view.rs index 1396794211b2bd..5bce923312de3f 100644 --- a/storage/storage-interface/src/state_store/state_view/cached_state_view.rs +++ b/storage/storage-interface/src/state_store/state_view/cached_state_view.rs @@ -149,7 +149,7 @@ impl CachedStateView { } /// Consumes `Self` and returns the state and all the memorized state reads. - pub fn finish(self) -> (State, ShardedStateCache) { + pub fn finish(self) -> ShardedStateCache { todo!() /* FIXME(aldenhu) let Self {