Skip to content

Commit

Permalink
temp: adding types
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Nov 27, 2024
1 parent 7b69d4c commit 0eb8d0e
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 264 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions execution/executor/src/types/in_memory_state_calculator_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@ use aptos_storage_interface::state_store::{
sharded_state_update_refs::ShardedStateUpdateRefs,
sharded_state_updates::ShardedStateUpdates,
state_delta::StateDelta,
state_view::cached_state_view::{ShardedStateCache, StateCache},
state_view::cached_state_view::{ShardedStateCache, StateCache, StateCacheShard},
};
use aptos_types::{
state_store::{
state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue,
},
transaction::Version,
write_set::WriteSet,
};
use dashmap::DashMap;
use itertools::{zip_eq, Itertools};
use rayon::prelude::*;
use std::{ops::Deref, sync::Arc};
Expand Down Expand Up @@ -253,12 +251,14 @@ impl InMemoryStateCalculatorV2 {
}

fn add_to_delta(
k: &StateKey,
v: &Option<&StateValue>,
state_cache: &DashMap<StateKey, (Option<Version>, Option<StateValue>)>,
items_delta: &mut i64,
bytes_delta: &mut i64,
_k: &StateKey,
_v: &Option<&StateValue>,
_state_cache: &StateCacheShard,
_items_delta: &mut i64,
_bytes_delta: &mut i64,
) {
todo!()
/* FIXME(aldenhu)
let key_size = k.size();
if let Some(value) = v {
*items_delta += 1;
Expand All @@ -272,6 +272,7 @@ impl InMemoryStateCalculatorV2 {
*items_delta -= 1;
*bytes_delta -= (key_size + old_v.size()) as i64;
}
*/
}

fn calculate_usage(
Expand All @@ -286,6 +287,7 @@ impl InMemoryStateCalculatorV2 {
}

let (items_delta, bytes_delta) = sharded_state_cache
.shards
.par_iter()
.zip_eq(updates.shards.par_iter())
.map(|(cache, updates)| {
Expand Down
74 changes: 18 additions & 56 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use crate::{
ShardedStateKvSchemaBatch,
},
};
use anyhow::Context;
use aptos_crypto::{
hash::{CryptoHash, SPARSE_MERKLE_PLACEHOLDER_HASH},
HashValue,
Expand All @@ -55,6 +54,7 @@ use aptos_storage_interface::{
state_store::{
sharded_state_update_refs::ShardedStateUpdateRefs,
state_delta::StateDelta,
state_update::StateValueWithVersionOpt,
state_view::{
async_proof_fetcher::AsyncProofFetcher,
cached_state_view::{CachedStateView, ShardedStateCache},
Expand Down Expand Up @@ -798,9 +798,6 @@ impl StateStore {
if let Some(base_version) = base_version {
let _timer = OTHER_TIMERS_SECONDS.timer_with(&["put_stats_and_indices__total_get"]);
if let Some(sharded_state_cache) = sharded_state_cache {
// For some entries the base value version is None, here is to fiil those in.
// See `ShardedStateCache`.
self.prepare_version_in_cache(base_version, sharded_state_cache)?;
state_cache_with_version = sharded_state_cache;
} else {
// TODO(aldenhu): get all updates from StateDelta directly
Expand All @@ -821,15 +818,14 @@ impl StateStore {
s.spawn(move |_| {
let _timer = OTHER_TIMERS_SECONDS
.timer_with(&["put_stats_and_indices__get_state_value"]);
let version_and_value = self
let tuple_opt = self
.state_db
.get_state_value_with_version_by_version(key, base_version)
.expect("Must succeed.");
if let Some((version, value)) = version_and_value {
cache.insert((*key).clone(), (Some(version), Some(value)));
} else {
cache.insert((*key).clone(), (Some(base_version), None));
}
cache.insert(
(*key).clone(),
StateValueWithVersionOpt::from_tuple_opt(tuple_opt),
);
});
}
});
Expand Down Expand Up @@ -898,6 +894,7 @@ impl StateStore {
let num_versions = state_update_refs.num_versions;
// calculate total state size in bytes
let usage_deltas: Vec<Vec<_>> = sharded_state_cache
.shards
.par_iter()
.zip_eq(state_update_refs.shards.par_iter())
.zip_eq(sharded_state_kv_batches.par_iter())
Expand Down Expand Up @@ -948,22 +945,24 @@ impl StateStore {
}
}

let old_version_and_value_opt = if let Some((old_version, old_value_opt)) =
cache.insert((*key).clone(), (Some(version), value.cloned()))
{
old_value_opt.map(|value| (old_version, value))
let old_state_value_with_version_opt = if let Some(old) = cache.insert(
(*key).clone(),
StateValueWithVersionOpt::from_state_write_ref(version, *value),
) {
old
} else {
// n.b. all updated state items must be read and recorded in the state cache,
// otherwise we can't calculate the correct usage. The is_untracked() hack
// is to allow some db tests without real execution layer to pass.
assert!(ignore_state_cache_miss, "Must cache read.");
None
StateValueWithVersionOpt::NonExistent
};

if let Some((old_version, old_value)) = old_version_and_value_opt {
let old_version = old_version
.context("Must have old version in cache.")
.unwrap();
if let StateValueWithVersionOpt::Value {
version: old_version,
value: old_value,
} = old_state_value_with_version_opt
{
items_delta -= 1;
bytes_delta -= (key.size() + old_value.size()) as i64;
// stale index of the old value at its version.
Expand Down Expand Up @@ -1173,43 +1172,6 @@ impl StateStore {
}
Ok(keys)
}

fn prepare_version_in_cache(
&self,
base_version: Version,
sharded_state_cache: &ShardedStateCache,
) -> Result<()> {
THREAD_MANAGER.get_high_pri_io_pool().scope(|s| {
sharded_state_cache.par_iter().for_each(|shard| {
shard.iter_mut().for_each(|mut entry| {
match entry.value() {
(None, Some(_)) => s.spawn(move |_| {
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["put_stats_and_indices__get_state_value"])
.start_timer();
let version_and_value = self
.state_db
.get_state_value_with_version_by_version(entry.key(), base_version)
.expect("Must succeed.");
if let Some((version, _)) = version_and_value {
entry.0 = Some(version);
} else {
unreachable!();
}
}),
_ => {
// I just want a counter.
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["put_stats_and_indices__skip"])
.start_timer();
},
};
})
});
});

Ok(())
}
}

impl StateValueWriter<StateKey, StateValue> for StateStore {
Expand Down
1 change: 1 addition & 0 deletions storage/storage-interface/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-crypto = { workspace = true }
aptos-drop-helper = { workspace = true }
aptos-experimental-layered-map = { workspace = true }
aptos-experimental-runtimes = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions storage/storage-interface/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

pub mod sharded_state_update_refs;
pub mod sharded_state_updates;
pub mod state;
pub mod state_delta;
pub mod state_summary;
pub mod state_update;
pub mod state_view;

pub const NUM_STATE_SHARDS: usize = 16;
49 changes: 49 additions & 0 deletions storage/storage-interface/src/state_store/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::state_store::{state_delta::StateDelta, state_update::StateWrite, NUM_STATE_SHARDS};
use aptos_experimental_layered_map::MapLayer;
use aptos_types::{
state_store::{state_key::StateKey, state_storage_usage::StateStorageUsage},
transaction::Version,
};
use std::sync::Arc;

/// Represents the blockchain state at a given version.
/// n.b. the state can be either persisted or speculative.
#[derive(Clone, Debug)]
pub struct State {
/// The next version. If this is 0, the state is the "pre-genesis" empty state.
next_version: Version,
/// The updates made to the state at the current version.
/// N.b. this is not directly iteratable, one needs to make a `StateDelta`
/// between this and a `base_version` to list the updates or create a
/// new `State` at a descendant version.
shards: Arc<[MapLayer<StateKey, StateWrite>; NUM_STATE_SHARDS]>,
/// The total usage of the state at the current version.
usage: StateStorageUsage,
}

impl State {
pub fn new_empty() -> Self {
// FIXME(aldenhu): check call site and implement
todo!()
}

pub fn next_version(&self) -> Version {
self.next_version
}

pub fn usage(&self) -> StateStorageUsage {
self.usage
}

pub fn shards(&self) -> &[MapLayer<StateKey, StateWrite>; NUM_STATE_SHARDS] {
&self.shards
}

pub fn into_delta(self, _base: State) -> StateDelta {
// FIXME(aldnehu)
todo!()
}
}
23 changes: 21 additions & 2 deletions storage/storage-interface/src/state_store/state_delta.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::state_store::sharded_state_updates::ShardedStateUpdates;
use crate::state_store::{sharded_state_updates::ShardedStateUpdates, state_update::StateWrite};
use aptos_crypto::HashValue;
use aptos_drop_helper::DropHelper;
use aptos_scratchpad::SparseMerkleTree;
use aptos_types::{
state_store::{state_storage_usage::StateStorageUsage, state_value::StateValue},
state_store::{
state_key::StateKey, state_storage_usage::StateStorageUsage, state_value::StateValue,
},
transaction::Version,
};

Expand Down Expand Up @@ -109,6 +111,23 @@ impl StateDelta {
std::mem::swap(self, &mut rhs);
rhs
}

pub fn parent_version(&self) -> Option<Version> {
// FIXME(aldenhu): update
self.base_version
}

/// Get the state update for a given state key.
/// `None` indicates the key is not updated in the delta.
pub fn get_state_update(&self, _state_key: &StateKey) -> Option<&StateWrite> {
// FIXME(aldenhu)
todo!()
}

pub fn usage(&self) -> StateStorageUsage {
// FIXME(aldenhu):
todo!()
}
}

impl Default for StateDelta {
Expand Down
42 changes: 42 additions & 0 deletions storage/storage-interface/src/state_store/state_summary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::state_store::state_delta::StateDelta;
use aptos_crypto::HashValue;
use aptos_scratchpad::SparseMerkleTree;
use aptos_types::{state_store::state_value::StateValue, transaction::Version};

/// The data structure through which the entire state at a given
/// version can be summarized to a concise digest (the root hash).
pub struct StateSummary {
/// The next version. If this is 0, the state is the "pre-genesis" empty state.
next_version: Version,
pub global_state_summary: SparseMerkleTree<StateValue>,
}

impl StateSummary {
pub fn new(next_version: Version, global_state_summary: SparseMerkleTree<StateValue>) -> Self {
Self {
next_version,
global_state_summary,
}
}

pub fn update(&self, _persisted: &StateSummary, _state_delta: &StateDelta) -> Self {
// FIXME(aldenhu)
todo!()
}

pub fn root_hash(&self) -> HashValue {
self.global_state_summary.root_hash()
}

pub fn next_version(&self) -> Version {
self.next_version
}

pub fn is_the_same(&self, _rhs: &Self) -> bool {
// FIXME(aldenhu)
todo!()
}
}
Loading

0 comments on commit 0eb8d0e

Please sign in to comment.