Skip to content

Commit

Permalink
reinstate max_txns_to_execute
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Nov 4, 2024
1 parent 96987c3 commit fe4a6e6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 14 deletions.
27 changes: 17 additions & 10 deletions consensus/src/block_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ impl BlockPreparer {
loop {
info!("get_transactions waiting for next: {}", idx);
match futures.next().await {
// TODO: we are turning off the max txns from block to execute feature for now
Some(Ok((block_txns, _max_txns))) => {
Some(Ok((block_txns, max_txns))) => {
txns.extend(block_txns);
max_txns_from_block_to_execute = None;
// We only care about max_txns from the current block, which is the last future
max_txns_from_block_to_execute = max_txns;
},
Some(Err(e)) => {
return Err(e);
Expand All @@ -94,7 +94,7 @@ impl BlockPreparer {
&self,
block: &Block,
block_window: &OrderedBlockWindow,
) -> ExecutorResult<Vec<SignedTransaction>> {
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
fail_point!("consensus::prepare_block", |_| {
use aptos_executor_types::ExecutorError;
use std::{thread, time::Duration};
Expand Down Expand Up @@ -155,7 +155,12 @@ impl BlockPreparer {
let txn_deduper = self.txn_deduper.clone();
let block_id = block.id();
let block_timestamp_usecs = block.timestamp_usecs();
let max_prepared_block_txns = self.max_block_txns as usize * 2;
let max_prepared_block_txns =
if let Some(max_txns_from_block_to_execute) = max_txns_from_block_to_execute {
max_txns_from_block_to_execute
} else {
self.max_block_txns * 2
};
// Transaction filtering, deduplication and shuffling are CPU intensive tasks, so we run them in a blocking task.
let result = tokio::task::spawn_blocking(move || {
// stable sort to ensure batches with same gas are in the same order
Expand All @@ -178,18 +183,20 @@ impl BlockPreparer {
batched_txns
.into_iter()
.flatten()
.take(max_prepared_block_txns)
.take(max_prepared_block_txns as usize)
.collect()
);
let filtered_txns = monitor!("filter_transactions", {
txn_filter.filter(block_id, block_timestamp_usecs, txns)
});
let mut deduped_txns = monitor!("dedup_transactions", txn_deduper.dedup(filtered_txns));
let deduped_txns = monitor!("dedup_transactions", txn_deduper.dedup(filtered_txns));
// TODO: cannot truncate here, need to pass it to execution
let mut num_txns_to_execute = deduped_txns.len() as u64;
if let Some(max_txns_from_block_to_execute) = max_txns_from_block_to_execute {
deduped_txns.truncate(max_txns_from_block_to_execute as usize);
num_txns_to_execute = num_txns_to_execute.min(max_txns_from_block_to_execute);
}
MAX_TXNS_FROM_BLOCK_TO_EXECUTE.observe(deduped_txns.len() as f64);
Ok(deduped_txns)
MAX_TXNS_FROM_BLOCK_TO_EXECUTE.observe(num_txns_to_execute as f64);
Ok((deduped_txns, max_txns_from_block_to_execute))
})
.await
.expect("Failed to spawn blocking task for transaction generation");
Expand Down
14 changes: 10 additions & 4 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,17 @@ impl ExecutionPipeline {
} = command;
counters::PREPARE_BLOCK_WAIT_TIME.observe_duration(command_creation_time.elapsed());
debug!("prepare_block received block {}.", block.id());
let input_txns = block_preparer
let prepare_result = block_preparer
.prepare_block(block.block(), &block_window)
.await;
if let Err(e) = input_txns {
if let Err(e) = prepare_result {
result_tx
.send(Err(e))
.unwrap_or_else(log_failed_to_send_result("prepare_block", block.id()));
return;
}
let validator_txns = block.validator_txns().cloned().unwrap_or_default();
let input_txns = input_txns.expect("input_txns must be Some.");
let (input_txns, max_txns_to_execute) = prepare_result.expect("input_txns must be Some.");
tokio::task::spawn_blocking(move || {
let txns_to_execute =
Block::combine_to_input_transactions(validator_txns, input_txns.clone(), metadata);
Expand All @@ -194,6 +194,7 @@ impl ExecutionPipeline {
execute_block_tx
.send(ExecuteBlockCommand {
input_txns,
max_txns_to_execute,
pipelined_block: block,
block: (block_id, sig_verified_txns).into(),
block_window,
Expand Down Expand Up @@ -230,6 +231,7 @@ impl ExecutionPipeline {
) {
'outer: while let Some(ExecuteBlockCommand {
input_txns: _,
max_txns_to_execute,
pipelined_block,
block,
block_window,
Expand Down Expand Up @@ -311,7 +313,9 @@ impl ExecutionPipeline {
let transactions_len = transactions.len();
(
transactions,
Arc::new(BlockingTxnsProvider::new(transactions_len)),
Arc::new(BlockingTxnsProvider::new(
max_txns_to_execute.unwrap_or(transactions_len as u64) as usize,
)),
)
},
ExecutableTransactions::UnshardedBlocking(_) => {
Expand Down Expand Up @@ -339,6 +343,7 @@ impl ExecutionPipeline {
"Execution: Split validator txns from user txns in {} micros",
timer.elapsed().as_micros()
);
// TODO: we could probably constrain this too with max_txns_to_execute
let shuffle_iterator = crate::transaction_shuffler::use_case_aware::iterator::ShuffledTransactionIterator::new(crate::transaction_shuffler::use_case_aware::Config {
sender_spread_factor: 32,
platform_use_case_spread_factor: 0,
Expand Down Expand Up @@ -586,6 +591,7 @@ struct PrepareBlockCommand {

struct ExecuteBlockCommand {
input_txns: Vec<SignedTransaction>,
max_txns_to_execute: Option<u64>,
pipelined_block: PipelinedBlock,
block: ExecutableBlock,
block_window: OrderedBlockWindow,
Expand Down

0 comments on commit fe4a6e6

Please sign in to comment.