Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unlimited orphan, max inflight inflight blocks, BlockFetch::fetch's end to best_known #27

Open
wants to merge 1 commit into
base: exec/async.unverified.db.no-ticker
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions chain/src/chain_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(missing_docs)]

use crate::consume_orphan::ConsumeOrphan;
use crate::{LonelyBlock, ProcessBlockRequest};
use crate::{LonelyBlock, LonelyBlockHash, ProcessBlockRequest};
use ckb_channel::{select, Receiver};
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{self, debug, error, info, warn};
Expand Down Expand Up @@ -126,7 +126,18 @@ impl ChainService {
return;
}
}
self.consume_orphan.process_lonely_block(lonely_block);
let db_txn = self.shared.store().begin_transaction();
if let Err(err) = db_txn.insert_block(&lonely_block.block()) {
error!("insert block failed: {:?}", err);
return;
}
if let Err(err) = db_txn.commit() {
error!("commit block failed: {:?}", err);
return;
}

let lonely_block_hash: LonelyBlockHash = lonely_block.into();
self.consume_orphan.process_lonely_block(lonely_block_hash);

debug!(
"processing block: {}-{}, (tip:unverified_tip):({}:{})",
Expand Down
118 changes: 11 additions & 107 deletions chain/src/consume_orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,77 +15,6 @@ use ckb_types::U256;
use ckb_verification::InvalidParentError;
use std::sync::Arc;

// Store the an unverified block to the database. We may usually do this
// for an orphan block with unknown parent. But this function is also useful in testing.
pub fn store_unverified_block(
shared: &Shared,
block: Arc<BlockView>,
) -> Result<(HeaderView, U256), Error> {
let (block_number, block_hash) = (block.number(), block.hash());

let parent_header = shared
.store()
.get_block_header(&block.data().header().raw().parent_hash())
.expect("parent already store");

if let Some(ext) = shared.store().get_block_ext(&block.hash()) {
debug!("block {}-{} has stored BlockExt", block_number, block_hash);
return Ok((parent_header, ext.total_difficulty));
}

trace!("begin accept block: {}-{}", block.number(), block.hash());

let parent_ext = shared
.store()
.get_block_ext(&block.data().header().raw().parent_hash())
.expect("parent already store");

if parent_ext.verified == Some(false) {
return Err(InvalidParentError {
parent_hash: parent_header.hash(),
}
.into());
}

let cannon_total_difficulty =
parent_ext.total_difficulty.to_owned() + block.header().difficulty();

let db_txn = Arc::new(shared.store().begin_transaction());

db_txn.insert_block(block.as_ref())?;

let next_block_epoch = shared
.consensus()
.next_epoch_ext(&parent_header, &db_txn.borrow_as_data_loader())
.expect("epoch should be stored");
let new_epoch = next_block_epoch.is_head();
let epoch = next_block_epoch.epoch();

db_txn.insert_block_epoch_index(
&block.header().hash(),
&epoch.last_block_hash_in_previous_epoch(),
)?;
if new_epoch {
db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?;
}

let ext = BlockExt {
received_at: unix_time_as_millis(),
total_difficulty: cannon_total_difficulty.clone(),
total_uncles_count: parent_ext.total_uncles_count + block.data().uncles().len() as u64,
verified: None,
txs_fees: vec![],
cycles: None,
txs_sizes: None,
};

db_txn.insert_block_ext(&block.header().hash(), &ext)?;

db_txn.commit()?;

Ok((parent_header, cannon_total_difficulty))
}

pub(crate) struct ConsumeOrphan {
shared: Shared,

Expand Down Expand Up @@ -117,7 +46,7 @@ impl ConsumeOrphan {
continue;
}

let descendants: Vec<LonelyBlock> = self
let descendants: Vec<LonelyBlockHash> = self
.orphan_blocks_broker
.remove_blocks_by_parent(&leader_hash);
if descendants.is_empty() {
Expand All @@ -131,7 +60,7 @@ impl ConsumeOrphan {
}
}

pub(crate) fn process_lonely_block(&self, lonely_block: LonelyBlock) {
pub(crate) fn process_lonely_block(&self, lonely_block: LonelyBlockHash) {
let parent_hash = lonely_block.block().parent_hash();
let parent_status = self
.shared
Expand Down Expand Up @@ -164,7 +93,7 @@ impl ConsumeOrphan {
.set(self.orphan_blocks_broker.len() as i64)
});
}
fn send_unverified_block(&self, lonely_block: LonelyBlockHash, total_difficulty: U256) {
fn send_unverified_block(&self, lonely_block: LonelyBlockHash) {
let block_number = lonely_block.block_number_and_hash.number();
let block_hash = lonely_block.block_number_and_hash.hash();
if let Some(metrics) = ckb_metrics::handle() {
Expand All @@ -185,12 +114,11 @@ impl ConsumeOrphan {
return;
}
};

if total_difficulty.gt(self.shared.get_unverified_tip().total_difficulty()) {
if lonely_block.block_number_and_hash.number() > self.shared.snapshot().tip_number() {
self.shared.set_unverified_tip(ckb_shared::HeaderIndex::new(
block_number,
block_hash.clone(),
total_difficulty,
0.into(),
));
self.shared
.get_unverified_index()
Expand All @@ -205,42 +133,18 @@ impl ConsumeOrphan {
block_hash.clone(),
block_number.saturating_sub(self.shared.snapshot().tip_number())
)
} else {
debug!(
"received a block {}-{} with lower or equal difficulty than unverified_tip {}-{}",
block_number,
block_hash,
self.shared.get_unverified_tip().number(),
self.shared.get_unverified_tip().hash(),
);
}
}

pub(crate) fn process_descendant(&self, lonely_block: LonelyBlock) {
match store_unverified_block(&self.shared, lonely_block.block().to_owned()) {
Ok((_parent_header, total_difficulty)) => {
self.shared
.insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED);
self.shared.remove_header_view(&lonely_block.block().hash());
pub(crate) fn process_descendant(&self, lonely_block: LonelyBlockHash) {
self.shared
.insert_block_status(lonely_block.block().hash(), BlockStatus::BLOCK_STORED);
self.shared.remove_header_view(&lonely_block.block().hash());

let lonely_block_hash: LonelyBlockHash = lonely_block.into();

self.send_unverified_block(lonely_block_hash, total_difficulty)
}

Err(err) => {
error!(
"accept block {} failed: {}",
lonely_block.block().hash(),
err
);

lonely_block.execute_callback(Err(err));
}
}
self.send_unverified_block(lonely_block)
}

fn accept_descendants(&self, descendants: Vec<LonelyBlock>) {
fn accept_descendants(&self, descendants: Vec<LonelyBlockHash>) {
for descendant_block in descendants {
self.process_descendant(descendant_block);
}
Expand Down
2 changes: 1 addition & 1 deletion chain/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;

const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize;
const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 200) as usize;

pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {
let orphan_blocks_broker = Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE));
Expand Down
40 changes: 20 additions & 20 deletions chain/src/utils/orphan_block_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]
use crate::LonelyBlock;
use crate::LonelyBlockHash;
use ckb_logger::debug;
use ckb_types::core::{BlockView, EpochNumber};
use ckb_types::core::EpochNumber;
use ckb_types::packed;
use ckb_util::{parking_lot::RwLock, shrink_to_fit};
use std::collections::{HashMap, HashSet, VecDeque};
Expand All @@ -15,7 +15,7 @@ const EXPIRED_EPOCH: u64 = 6;
#[derive(Default)]
struct InnerPool {
// Group by blocks in the pool by the parent hash.
blocks: HashMap<ParentHash, HashMap<packed::Byte32, LonelyBlock>>,
blocks: HashMap<ParentHash, HashMap<packed::Byte32, LonelyBlockHash>>,
// The map tells the parent hash when given the hash of a block in the pool.
//
// The block is in the orphan pool if and only if the block hash exists as a key in this map.
Expand All @@ -33,7 +33,7 @@ impl InnerPool {
}
}

fn insert(&mut self, lonely_block: LonelyBlock) {
fn insert(&mut self, lonely_block: LonelyBlockHash) {
let hash = lonely_block.block().header().hash();
let parent_hash = lonely_block.block().data().header().raw().parent_hash();
self.blocks
Expand All @@ -53,7 +53,7 @@ impl InnerPool {
self.parents.insert(hash, parent_hash);
}

pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec<LonelyBlock> {
pub fn remove_blocks_by_parent(&mut self, parent_hash: &ParentHash) -> Vec<LonelyBlockHash> {
// try remove leaders first
if !self.leaders.remove(parent_hash) {
return Vec::new();
Expand All @@ -62,7 +62,7 @@ impl InnerPool {
let mut queue: VecDeque<packed::Byte32> = VecDeque::new();
queue.push_back(parent_hash.to_owned());

let mut removed: Vec<LonelyBlock> = Vec::new();
let mut removed: Vec<LonelyBlockHash> = Vec::new();
while let Some(parent_hash) = queue.pop_front() {
if let Some(orphaned) = self.blocks.remove(&parent_hash) {
let (hashes, blocks): (Vec<_>, Vec<_>) = orphaned.into_iter().unzip();
Expand All @@ -87,15 +87,15 @@ impl InnerPool {
removed
}

pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<BlockView>> {
self.parents.get(hash).and_then(|parent_hash| {
self.blocks.get(parent_hash).and_then(|blocks| {
blocks
.get(hash)
.map(|lonely_block| Arc::clone(lonely_block.block()))
})
})
}
// pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<BlockView>> {
// self.parents.get(hash).and_then(|parent_hash| {
// self.blocks.get(parent_hash).and_then(|blocks| {
// blocks
// .get(hash)
// .map(|lonely_block| Arc::clone(lonely_block.block()))
// })
// })
// }

pub fn contains_block(&self, hash: &packed::Byte32) -> bool {
self.parents.contains_key(hash)
Expand Down Expand Up @@ -148,17 +148,17 @@ impl OrphanBlockPool {
}

/// Insert orphaned block, for which we have already requested its parent block
pub fn insert(&self, lonely_block: LonelyBlock) {
pub fn insert(&self, lonely_block: LonelyBlockHash) {
self.inner.write().insert(lonely_block);
}

pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec<LonelyBlock> {
pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec<LonelyBlockHash> {
self.inner.write().remove_blocks_by_parent(parent_hash)
}

pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<BlockView>> {
self.inner.read().get_block(hash)
}
// pub fn get_block(&self, hash: &packed::Byte32) -> Option<Arc<BlockView>> {
// self.inner.read().get_block(hash)
// }

pub fn contains_block(&self, hash: &packed::Byte32) -> bool {
self.inner.read().contains_block(hash)
Expand Down
3 changes: 2 additions & 1 deletion sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ impl BlockFetcher {
IBDState::Out => last_common.number() + 1,
}
};
let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW);
// let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW);
let mut end = best_known.number();
let n_fetch = min(
end.saturating_sub(start) as usize + 1,
state.read_inflight_blocks().peer_can_fetch_count(self.peer),
Expand Down