Skip to content

Commit

Permalink
Accmulate trie nodes for state with proof
Browse files Browse the repository at this point in the history
  • Loading branch information
liuchengxu committed Dec 12, 2024
1 parent c8f61a4 commit 81fac33
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions substrate/client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use sp_state_machine::{
};
use sp_storage::{ChildInfo, StorageData, StorageKey};
pub use sp_trie::MerkleValue;
use sp_trie::PrefixedMemoryDB;

use crate::{blockchain::Backend as BlockchainBackend, UsageInfo};

Expand Down Expand Up @@ -235,6 +236,8 @@ pub trait BlockImportOperation<Block: BlockT> {

/// Configure whether to create a block gap if newly imported block is missing parent
fn set_create_gap(&mut self, create_gap: bool);

fn import_state_db(&mut self, state_db: PrefixedMemoryDB<HashingFor<Block>>);
}

/// Interface for performing operations on the backend.
Expand Down
2 changes: 2 additions & 0 deletions substrate/client/api/src/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ impl<Block: BlockT> backend::BlockImportOperation<Block> for BlockImportOperatio
}

fn set_create_gap(&mut self, _create_gap: bool) {}

fn import_state_db(&mut self, _state_db: sp_trie::PrefixedMemoryDB<HashingFor<Block>>) {}
}

/// In-memory backend. Keeps all states and blocks in memory.
Expand Down
1 change: 1 addition & 0 deletions substrate/client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sp-consensus = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
sp-state-machine = { workspace = true, default-features = true }
sp-trie = { workspace = true, default-features = true }

[dev-dependencies]
sp-test-primitives = { workspace = true }
15 changes: 10 additions & 5 deletions substrate/client/consensus/common/src/block_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,21 @@ pub enum StorageChanges<Block: BlockT> {

/// Imported state data. A vector of key-value pairs that should form a trie.
#[derive(PartialEq, Eq, Clone)]
pub struct ImportedState<B: BlockT> {
/// Target block hash.
pub block: B::Hash,
pub enum ImportedState<B: BlockT> {
/// State keys and values.
pub state: sp_state_machine::KeyValueStates,
FromKeyValue(sp_state_machine::KeyValueStates),
/// State db.
FromProof { proof: sp_trie::PrefixedMemoryDB<HashingFor<B>> },
}

impl<B: BlockT> std::fmt::Debug for ImportedState<B> {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
fmt.debug_struct("ImportedState").field("block", &self.block).finish()
fmt.debug_tuple("ImportedState")
.field(&match self {
Self::FromKeyValue(_) => "FromKeyValue",
Self::FromProof { .. } => "FromProof",
})
.finish()
}
}

Expand Down
26 changes: 25 additions & 1 deletion substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,7 @@ pub struct BlockImportOperation<Block: BlockT> {
set_head: Option<Block::Hash>,
commit_state: bool,
create_gap: bool,
import_db: Option<PrefixedMemoryDB<HashingFor<Block>>>,
index_ops: Vec<IndexOperation>,
}

Expand Down Expand Up @@ -993,6 +994,11 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block>
fn set_create_gap(&mut self, create_gap: bool) {
self.create_gap = create_gap;
}

fn import_state_db(&mut self, state_db: PrefixedMemoryDB<HashingFor<Block>>) {
self.import_db.replace(state_db);
self.commit_state = true;
}
}

struct StorageDb<Block: BlockT> {
Expand Down Expand Up @@ -1536,7 +1542,7 @@ impl<Block: BlockT> Backend<Block> {
}
}

let finalized = if operation.commit_state {
let finalized = if operation.commit_state || operation.import_db.is_some() {
let mut changeset: sc_state_db::ChangeSet<Vec<u8>> =
sc_state_db::ChangeSet::default();
let mut ops: u64 = 0;
Expand Down Expand Up @@ -1568,6 +1574,23 @@ impl<Block: BlockT> Backend<Block> {
}
}
}

if let Some(mut db) = operation.import_db {
for (mut key, (val, rc)) in db.drain() {
self.storage.db.sanitize_key(&mut key);
if rc > 0 {
if rc == 1 {
changeset.inserted.push((key, val.to_vec()));
} else {
changeset.inserted.push((key.clone(), val.to_vec()));
for _ in 0..rc - 1 {
changeset.inserted.push((key.clone(), val.to_vec()));
}
}
}
}
}

self.state_usage.tally_writes_nodes(ops, bytes);
self.state_usage.tally_removed_nodes(removal, bytes_removal);

Expand Down Expand Up @@ -2119,6 +2142,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
set_head: None,
commit_state: false,
create_gap: true,
import_db: None,
index_ops: Default::default(),
})
}
Expand Down
4 changes: 4 additions & 0 deletions substrate/client/network/sync/src/strategy/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2020,6 +2020,10 @@ where
debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
Err(BadPeer(*peer_id, rep::BAD_BLOCK))
},
ImportResult::CorruptedProofData => {
debug!(target: LOG_TARGET, "Internal error");
Ok(())
},
}
}

Expand Down
4 changes: 4 additions & 0 deletions substrate/client/network/sync/src/strategy/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ impl<B: BlockT> StateStrategy<B> {
debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
Err(BadPeer(*peer_id, rep::BAD_STATE))
},
ImportResult::CorruptedProofData => {
debug!(target: LOG_TARGET, "Internal error");
Ok(())
},
}
}

Expand Down
33 changes: 26 additions & 7 deletions substrate/client/network/sync/src/strategy/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ pub enum ImportResult<B: BlockT> {
Continue,
/// Bad state chunk.
BadResponse,
/// Failed to convert the accumulated proof to `PrefixedMemoryDB`.
CorruptedProofData,
}

struct StateSyncMetadata<B: BlockT> {
Expand Down Expand Up @@ -142,6 +144,7 @@ impl<B: BlockT> StateSyncMetadata<B> {
pub struct StateSync<B: BlockT, Client> {
metadata: StateSyncMetadata<B>,
state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
trie_nodes: Vec<Vec<u8>>,
client: Arc<Client>,
}

Expand Down Expand Up @@ -170,6 +173,7 @@ where
skip_proof,
},
state: HashMap::default(),
trie_nodes: Vec::new(),
}
}

Expand Down Expand Up @@ -257,11 +261,11 @@ where
fn import(&mut self, response: StateResponse) -> ImportResult<B> {
if response.entries.is_empty() && response.proof.is_empty() {
debug!(target: LOG_TARGET, "Bad state response");
return ImportResult::BadResponse
return ImportResult::BadResponse;
}
if !self.metadata.skip_proof && response.proof.is_empty() {
debug!(target: LOG_TARGET, "Missing proof");
return ImportResult::BadResponse
return ImportResult::BadResponse;
}
let complete = if !self.metadata.skip_proof {
debug!(target: LOG_TARGET, "Importing state from {} trie nodes", response.proof.len());
Expand All @@ -270,12 +274,12 @@ where
Ok(proof) => proof,
Err(e) => {
debug!(target: LOG_TARGET, "Error decoding proof: {:?}", e);
return ImportResult::BadResponse
return ImportResult::BadResponse;
},
};
let (values, completed) = match self.client.verify_range_proof(
self.metadata.target_root(),
proof,
proof.clone(),
self.metadata.last_key.as_slice(),
) {
Err(e) => {
Expand All @@ -284,7 +288,7 @@ where
"StateResponse failed proof verification: {}",
e,
);
return ImportResult::BadResponse
return ImportResult::BadResponse;
},
Ok(values) => values,
};
Expand All @@ -295,7 +299,7 @@ where
debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
};

self.process_state_verified(values);
self.trie_nodes.extend(proof.encoded_nodes);
self.metadata.imported_bytes += proof_size;
complete
} else {
Expand All @@ -304,10 +308,25 @@ where
if complete {
self.metadata.complete = true;
let target_hash = self.metadata.target_hash();
let imported_state = if self.metadata.skip_proof {
ImportedState::FromKeyValue(std::mem::take(&mut self.state).into())
} else {
let compact_proof =
CompactProof { encoded_nodes: std::mem::take(&mut self.trie_nodes) };
let state_db =
match compact_proof.to_prefixed_memory_db(Some(&self.metadata.target_root())) {
Ok((state_db, _root)) => state_db,
Err(err) => {
debug!(target: LOG_TARGET, "Error converting CompactProof to PrefixedMemoryDB");
return ImportResult::CorruptedProofData;
},
};
ImportedState::FromProof { proof: state_db }
};
ImportResult::Import(
target_hash,
self.metadata.target_header.clone(),
ImportedState { block: target_hash, state: std::mem::take(&mut self.state).into() },
imported_state,
self.metadata.target_body.clone(),
self.metadata.target_justifications.clone(),
)
Expand Down
19 changes: 13 additions & 6 deletions substrate/client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,12 +665,19 @@ where
Some((main_sc, child_sc))
},
sc_consensus::StorageChanges::Import(changes) => {
import_key_value_states(
operation,
changes.state,
&self.executor,
*import_headers.post().state_root(),
)?;
match changes {
ImportedState::FromKeyValue(state) => {
import_key_value_states(
operation,
state,
&self.executor,
*import_headers.post().state_root(),
)?;
},
ImportedState::FromProof { proof } => {
operation.op.import_state_db(proof);
},
}
None
},
};
Expand Down
21 changes: 21 additions & 0 deletions substrate/primitives/trie/src/storage_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ impl CompactProof {

Ok((db, root))
}

/// Convert self into a [`PrefixedMemoryDB`](crate::PrefixedMemoryDB).
///
/// `expected_root` is the expected root of this compact proof.
///
/// Returns the memory db and the root of the trie.
pub fn to_prefixed_memory_db<H: Hasher>(
&self,
expected_root: Option<&H::Out>,
) -> Result<
(crate::PrefixedMemoryDB<H>, H::Out),
crate::CompactProofError<H::Out, crate::Error<H::Out>>,
> {
let mut db = crate::PrefixedMemoryDB::<H>::new(&[]);
let root = crate::decode_compact::<Layout<H>, _, _>(
&mut db,
self.iter_compact_encoded_nodes(),
expected_root,
)?;
Ok((db, root))
}
}

#[cfg(test)]
Expand Down

0 comments on commit 81fac33

Please sign in to comment.