From 11145537550fbb8a5121b0e67a8f83ce32d864b7 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Fri, 9 Feb 2024 18:24:27 +0800 Subject: [PATCH] unlimited orphan Signed-off-by: Eval EXEC --- chain/src/chain_service.rs | 15 ++- chain/src/consume_orphan.rs | 145 +++++++------------------ chain/src/init.rs | 2 +- chain/src/utils/orphan_block_pool.rs | 40 +++---- sync/src/synchronizer/block_fetcher.rs | 3 +- 5 files changed, 73 insertions(+), 132 deletions(-) diff --git a/chain/src/chain_service.rs b/chain/src/chain_service.rs index b30c7ac14a9..4fa41b19e20 100644 --- a/chain/src/chain_service.rs +++ b/chain/src/chain_service.rs @@ -2,7 +2,7 @@ #![allow(missing_docs)] use crate::consume_orphan::ConsumeOrphan; -use crate::{LonelyBlock, ProcessBlockRequest}; +use crate::{LonelyBlock, LonelyBlockHash, ProcessBlockRequest}; use ckb_channel::{select, Receiver}; use ckb_error::{Error, InternalErrorKind}; use ckb_logger::{self, debug, error, info, warn}; @@ -126,7 +126,18 @@ impl ChainService { return; } } - self.consume_orphan.process_lonely_block(lonely_block); + let db_txn = self.shared.store().begin_transaction(); + if let Err(err) = db_txn.insert_block(&lonely_block.block()) { + error!("insert block failed: {:?}", err); + return; + } + if let Err(err) = db_txn.commit() { + error!("commit block failed: {:?}", err); + return; + } + + let lonely_block_hash: LonelyBlockHash = lonely_block.into(); + self.consume_orphan.process_lonely_block(lonely_block_hash); debug!( "processing block: {}-{}, (tip:unverified_tip):({}:{})", diff --git a/chain/src/consume_orphan.rs b/chain/src/consume_orphan.rs index f3cac4b810f..a0fd86bb088 100644 --- a/chain/src/consume_orphan.rs +++ b/chain/src/consume_orphan.rs @@ -15,77 +15,6 @@ use ckb_types::U256; use ckb_verification::InvalidParentError; use std::sync::Arc; -// Store the an unverified block to the database. We may usually do this -// for an orphan block with unknown parent. But this function is also useful in testing. -pub fn store_unverified_block( - shared: &Shared, - block: Arc, -) -> Result<(HeaderView, U256), Error> { - let (block_number, block_hash) = (block.number(), block.hash()); - - let parent_header = shared - .store() - .get_block_header(&block.data().header().raw().parent_hash()) - .expect("parent already store"); - - if let Some(ext) = shared.store().get_block_ext(&block.hash()) { - debug!("block {}-{} has stored BlockExt", block_number, block_hash); - return Ok((parent_header, ext.total_difficulty)); - } - - trace!("begin accept block: {}-{}", block.number(), block.hash()); - - let parent_ext = shared - .store() - .get_block_ext(&block.data().header().raw().parent_hash()) - .expect("parent already store"); - - if parent_ext.verified == Some(false) { - return Err(InvalidParentError { - parent_hash: parent_header.hash(), - } - .into()); - } - - let cannon_total_difficulty = - parent_ext.total_difficulty.to_owned() + block.header().difficulty(); - - let db_txn = Arc::new(shared.store().begin_transaction()); - - db_txn.insert_block(block.as_ref())?; - - let next_block_epoch = shared - .consensus() - .next_epoch_ext(&parent_header, &db_txn.borrow_as_data_loader()) - .expect("epoch should be stored"); - let new_epoch = next_block_epoch.is_head(); - let epoch = next_block_epoch.epoch(); - - db_txn.insert_block_epoch_index( - &block.header().hash(), - &epoch.last_block_hash_in_previous_epoch(), - )?; - if new_epoch { - db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?; - } - - let ext = BlockExt { - received_at: unix_time_as_millis(), - total_difficulty: cannon_total_difficulty.clone(), - total_uncles_count: parent_ext.total_uncles_count + block.data().uncles().len() as u64, - verified: None, - txs_fees: vec![], - cycles: None, - txs_sizes: None, - }; - - db_txn.insert_block_ext(&block.header().hash(), &ext)?; - - db_txn.commit()?; - - Ok((parent_header, cannon_total_difficulty)) -} - pub(crate) struct ConsumeOrphan { shared: Shared, @@ -117,7 +46,7 @@ impl ConsumeOrphan { continue; } - let descendants: Vec = self + let descendants: Vec = self .orphan_blocks_broker .remove_blocks_by_parent(&leader_hash); if descendants.is_empty() { @@ -131,7 +60,7 @@ impl ConsumeOrphan { } } - pub(crate) fn process_lonely_block(&self, lonely_block: LonelyBlock) { + pub(crate) fn process_lonely_block(&self, lonely_block: LonelyBlockHash) { let parent_hash = lonely_block.block().parent_hash(); let parent_status = self .shared @@ -164,7 +93,7 @@ impl ConsumeOrphan { .set(self.orphan_blocks_broker.len() as i64) }); } - fn send_unverified_block(&self, lonely_block: LonelyBlockHash, total_difficulty: U256) { + fn send_unverified_block(&self, lonely_block: LonelyBlockHash) { let block_number = lonely_block.block_number_and_hash.number(); let block_hash = lonely_block.block_number_and_hash.hash(); if let Some(metrics) = ckb_metrics::handle() { @@ -185,12 +114,11 @@ impl ConsumeOrphan { return; } }; - - if total_difficulty.gt(self.shared.get_unverified_tip().total_difficulty()) { + if lonely_block.block_number_and_hash.number() > self.shared.snapshot().tip_number() { self.shared.set_unverified_tip(ckb_shared::HeaderIndex::new( block_number, block_hash.clone(), - total_difficulty, + 0.into(), )); self.shared .get_unverified_index() @@ -205,42 +133,43 @@ impl ConsumeOrphan { block_hash.clone(), block_number.saturating_sub(self.shared.snapshot().tip_number()) ) - } else { - debug!( - "received a block {}-{} with lower or equal difficulty than unverified_tip {}-{}", - block_number, - block_hash, - self.shared.get_unverified_tip().number(), - self.shared.get_unverified_tip().hash(), - ); } - } - - pub(crate) fn process_descendant(&self, lonely_block: LonelyBlock) { - match store_unverified_block(&self.shared, lonely_block.block().to_owned()) { - Ok((_parent_header, total_difficulty)) => { - self.shared - .insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED); - self.shared.remove_header_view(&lonely_block.block().hash()); - let lonely_block_hash: LonelyBlockHash = lonely_block.into(); - - self.send_unverified_block(lonely_block_hash, total_difficulty) - } - - Err(err) => { - error!( - "accept block {} failed: {}", - lonely_block.block().hash(), - err - ); + // if total_difficulty.gt(self.shared.get_unverified_tip().total_difficulty()) { + // } else { + // debug!( + // "received a block {}-{} with lower or equal difficulty than unverified_tip {}-{}", + // block_number, + // block_hash, + // self.shared.get_unverified_tip().number(), + // self.shared.get_unverified_tip().hash(), + // ); + // } + } - lonely_block.execute_callback(Err(err)); - } - } + pub(crate) fn process_descendant(&self, lonely_block: LonelyBlockHash) { + self.shared + .insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED); + self.shared.remove_header_view(&lonely_block.block().hash()); + + self.send_unverified_block(lonely_block) + // match store_unverified_block(&self.shared, lonely_block.block().to_owned()) { + // Ok((_parent_header, total_difficulty)) => { + // } + // + // Err(err) => { + // error!( + // "accept block {} failed: {}", + // lonely_block.block().hash(), + // err + // ); + // + // lonely_block.execute_callback(Err(err)); + // } + // } } - fn accept_descendants(&self, descendants: Vec) { + fn accept_descendants(&self, descendants: Vec) { for descendant_block in descendants { self.process_descendant(descendant_block); } diff --git a/chain/src/init.rs b/chain/src/init.rs index 8ba88f85e24..cc4d09083fd 100644 --- a/chain/src/init.rs +++ b/chain/src/init.rs @@ -17,7 +17,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::thread; -const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize; +const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 200) as usize; pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController { let orphan_blocks_broker = Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE)); diff --git a/chain/src/utils/orphan_block_pool.rs b/chain/src/utils/orphan_block_pool.rs index 7556f6d6c77..1513c7b4f39 100644 --- a/chain/src/utils/orphan_block_pool.rs +++ b/chain/src/utils/orphan_block_pool.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] -use crate::LonelyBlock; +use crate::LonelyBlockHash; use ckb_logger::debug; -use ckb_types::core::{BlockView, EpochNumber}; +use ckb_types::core::EpochNumber; use ckb_types::packed; use ckb_util::{parking_lot::RwLock, shrink_to_fit}; use std::collections::{HashMap, HashSet, VecDeque}; @@ -15,7 +15,7 @@ const EXPIRED_EPOCH: u64 = 6; #[derive(Default)] struct InnerPool { // Group by blocks in the pool by the parent hash. - blocks: HashMap>, + blocks: HashMap>, // The map tells the parent hash when given the hash of a block in the pool. // // The block is in the orphan pool if and only if the block hash exists as a key in this map. @@ -33,7 +33,7 @@ impl InnerPool { } } - fn insert(&mut self, lonely_block: LonelyBlock) { + fn insert(&mut self, lonely_block: LonelyBlockHash) { let hash = lonely_block.block().header().hash(); let parent_hash = lonely_block.block().data().header().raw().parent_hash(); self.blocks @@ -53,7 +53,7 @@ impl InnerPool { self.parents.insert(hash, parent_hash); } - pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec { + pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec { // try remove leaders first if !self.leaders.remove(parent_hash) { return Vec::new(); @@ -62,7 +62,7 @@ impl InnerPool { let mut queue: VecDeque = VecDeque::new(); queue.push_back(parent_hash.to_owned()); - let mut removed: Vec = Vec::new(); + let mut removed: Vec = Vec::new(); while let Some(parent_hash) = queue.pop_front() { if let Some(orphaned) = self.blocks.remove(&parent_hash) { let (hashes, blocks): (Vec<_>, Vec<_>) = orphaned.into_iter().unzip(); @@ -87,15 +87,15 @@ impl InnerPool { removed } - pub fn get_block(&self, hash: &packed::Byte32) -> Option> { - self.parents.get(hash).and_then(|parent_hash| { - self.blocks.get(parent_hash).and_then(|blocks| { - blocks - .get(hash) - .map(|lonely_block| Arc::clone(lonely_block.block())) - }) - }) - } + // pub fn get_block(&self, hash: &packed::Byte32) -> Option> { + // self.parents.get(hash).and_then(|parent_hash| { + // self.blocks.get(parent_hash).and_then(|blocks| { + // blocks + // .get(hash) + // .map(|lonely_block| Arc::clone(lonely_block.block())) + // }) + // }) + // } pub fn contains_block(&self, hash: &packed::Byte32) -> bool { self.parents.contains_key(hash) @@ -148,17 +148,17 @@ impl OrphanBlockPool { } /// Insert orphaned block, for which we have already requested its parent block - pub fn insert(&self, lonely_block: LonelyBlock) { + pub fn insert(&self, lonely_block: LonelyBlockHash) { self.inner.write().insert(lonely_block); } - pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec { + pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec { self.inner.write().remove_blocks_by_parent(parent_hash) } - pub fn get_block(&self, hash: &packed::Byte32) -> Option> { - self.inner.read().get_block(hash) - } + // pub fn get_block(&self, hash: &packed::Byte32) -> Option> { + // self.inner.read().get_block(hash) + // } pub fn contains_block(&self, hash: &packed::Byte32) -> bool { self.inner.read().contains_block(hash) diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index c2c4ce0eb00..968cfc0aa37 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -174,7 +174,8 @@ impl BlockFetcher { IBDState::Out => last_common.number() + 1, } }; - let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW); + // let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW); + let mut end = best_known.number(); let n_fetch = min( end.saturating_sub(start) as usize + 1, state.read_inflight_blocks().peer_can_fetch_count(self.peer),