diff --git a/consensus/consensus-types/src/block.rs b/consensus/consensus-types/src/block.rs index c36946e493e7f..b2ada4a9b474c 100644 --- a/consensus/consensus-types/src/block.rs +++ b/consensus/consensus-types/src/block.rs @@ -9,7 +9,7 @@ use crate::{ }; use anyhow::{bail, ensure, format_err}; use aptos_bitvec::BitVec; -use aptos_crypto::{bls12381, hash::CryptoHash, HashValue}; +use aptos_crypto::{bls12381::{self, Signature}, hash::CryptoHash, HashValue}; use aptos_infallible::duration_since_epoch; use aptos_types::{ account_address::AccountAddress, @@ -87,6 +87,14 @@ impl Block { self.is_opt } + pub fn set_timestamp(&mut self, timestamp: u64) { + self.block_data.set_timestamp(timestamp); + } + + pub fn set_signature(&mut self, signature: Signature) { + self.signature = Some(signature); + } + pub fn set_quorum_cert(&mut self, qc: QuorumCert) { self.block_data.set_quorum_cert(qc); } @@ -524,7 +532,7 @@ impl Block { self.epoch(), self.round(), self.author().unwrap_or(AccountAddress::ZERO), - vec![1; self.previous_bitvec().num_buckets()], // optimistic proposal hack + BitVec::default().into(), // optimistic proposal hack // For nil block, we use 0x0 which is convention for nil address in move. self.block_data() .failed_authors() diff --git a/consensus/consensus-types/src/block_data.rs b/consensus/consensus-types/src/block_data.rs index 82b181017f368..dbf2afc4dd0fa 100644 --- a/consensus/consensus-types/src/block_data.rs +++ b/consensus/consensus-types/src/block_data.rs @@ -112,6 +112,10 @@ impl CryptoHash for BlockData { } impl BlockData { + pub fn set_timestamp(&mut self, timestamp: u64) { + self.timestamp_usecs = timestamp; + } + pub fn set_quorum_cert(&mut self, qc: QuorumCert) { self.quorum_cert = qc; } diff --git a/consensus/consensus-types/src/proposal_msg.rs b/consensus/consensus-types/src/proposal_msg.rs index 7a6c089cf27e4..4d52f54aacfbc 100644 --- a/consensus/consensus-types/src/proposal_msg.rs +++ b/consensus/consensus-types/src/proposal_msg.rs @@ -4,6 +4,7 @@ use crate::{block::Block, common::Author, proof_of_store::ProofCache, sync_info::SyncInfo}; use anyhow::{anyhow, ensure, format_err, Context, Ok, Result}; +use aptos_crypto::bls12381::Signature; use aptos_short_hex_str::AsShortHexStr; use aptos_types::validator_verifier::ValidatorVerifier; use serde::{Deserialize, Serialize}; @@ -26,6 +27,14 @@ impl ProposalMsg { } } + pub fn set_timestamp(&mut self, timestamp: u64) { + self.proposal.set_timestamp(timestamp); + } + + pub fn set_signature(&mut self, signature: Signature) { + self.proposal.set_signature(signature); + } + pub fn epoch(&self) -> u64 { self.proposal.epoch() } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index 3aaa3b001ead9..f8842926b166b 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -69,6 +69,7 @@ pub struct PipelineBuilder { state_sync_notifier: Arc, payload_manager: Arc, txn_notifier: Arc, + execution_lock: Arc>, } fn spawn_shared_fut< @@ -191,6 +192,7 @@ impl PipelineBuilder { state_sync_notifier, payload_manager, txn_notifier, + execution_lock: Arc::new(Mutex::new(())), } } @@ -320,6 +322,7 @@ impl PipelineBuilder { self.is_randomness_enabled, self.validators.clone(), self.block_executor_onchain_config.clone(), + self.execution_lock.clone(), ), &mut abort_handles, ); @@ -456,6 +459,7 @@ impl PipelineBuilder { is_randomness_enabled: bool, validator: Arc<[AccountAddress]>, onchain_execution_config: BlockExecutorConfigFromOnchain, + execution_lock: Arc>, ) -> TaskResult { let mut tracker = Tracker::new("execute", &block); parent_block_execute_phase.await?; @@ -487,6 +491,7 @@ impl PipelineBuilder { .concat(); let start = Instant::now(); tokio::task::spawn_blocking(move || { + let _guard = execution_lock.lock(); executor .execute_and_state_checkpoint( (block.id(), txns).into(), diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index da74d1bc00b7b..e3f751fc9111b 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -459,7 +459,7 @@ impl RoundManager { sync_info, network.clone(), proposal_generator, - safety_rules, + safety_rules.clone(), proposer_election, parent_id, ) @@ -474,9 +474,19 @@ impl RoundManager { { if Self::check_whether_to_inject_reconfiguration_error() { Self::attempt_to_inject_reconfiguration_error( + epoch_state.clone(), + network.clone(), + &proposal_msg, + ) + .await?; + } + + if Self::check_whether_to_equivocate() { + Self::attempt_to_equivocate( epoch_state, network.clone(), &proposal_msg, + safety_rules.clone() ) .await?; } @@ -1939,6 +1949,12 @@ impl RoundManager { false } + #[cfg(feature = "failpoints")] + fn check_whether_to_equivocate() -> bool { + fail_point!("consensus::leader_equivocation", |_| true); + false + } + /// Given R1 <- B2 if R1 has the reconfiguration txn, we inject error on B2 if R1.round + 1 = B2.round /// Direct suffix is checked by parent.has_reconfiguration && !parent.parent.has_reconfiguration /// The error is injected by sending proposals to half of the validators to force a timeout. @@ -1973,4 +1989,29 @@ impl RoundManager { Ok(()) } } + + #[cfg(feature = "failpoints")] + async fn attempt_to_equivocate( + epoch_state: Arc, + network: Arc, + proposal_msg: &ProposalMsg, + safety_rules: Arc>, + ) -> anyhow::Result<()> { + info!("[Test] Leader of epoch {} round {} equivocates", epoch_state.epoch, proposal_msg.proposal().round()); + + let all_peers: Vec<_> = epoch_state + .verifier + .get_ordered_account_addresses_iter() + .collect(); + let mut timestamp = proposal_msg.proposal().block_data().timestamp_usecs(); + for peer in all_peers { + timestamp += 1; + let mut modified_proposal_msg = proposal_msg.clone(); + modified_proposal_msg.set_timestamp(timestamp); + let signature = safety_rules.lock().sign_proposal(modified_proposal_msg.proposal().block_data())?; + modified_proposal_msg.set_signature(signature); + network.send_proposal(modified_proposal_msg.clone(), vec![peer]).await; + } + Err(anyhow::anyhow!("Injected leader equivocation")) + } } diff --git a/terraform/helm/genesis/files/genesis.sh b/terraform/helm/genesis/files/genesis.sh index 155ec754056ad..cb73285a30bc9 100644 --- a/terraform/helm/genesis/files/genesis.sh +++ b/terraform/helm/genesis/files/genesis.sh @@ -189,8 +189,7 @@ create_secrets() { ) if [[ "$include_genesis_blob" == "true" ]]; then - tar -C $WORKSPACE -czf ${WORKSPACE}/genesis.blob.tar.gz genesis.blob - files_to_include+=("--from-file=genesis.blob.tar.gz=${WORKSPACE}/genesis.blob.tar.gz") + files_to_include+=("--from-file=genesis.blob=${WORKSPACE}/genesis.blob") fi kubectl create secret generic "${username}-genesis-e${ERA}" "${files_to_include[@]}" diff --git a/testsuite/forge-cli/src/suites/realistic_environment.rs b/testsuite/forge-cli/src/suites/realistic_environment.rs index 958a8b5e76907..5c700fd97ca16 100644 --- a/testsuite/forge-cli/src/suites/realistic_environment.rs +++ b/testsuite/forge-cli/src/suites/realistic_environment.rs @@ -61,8 +61,9 @@ pub(crate) fn realistic_env_sweep_wrap( .with_initial_fullnode_count(num_fullnodes) .with_validator_override_node_config_fn(Arc::new(|config, _| { config.execution.processed_transactions_detailed_counters = true; + config.api.failpoints_enabled = true; })) - .add_network_test(test) + .add_network_test(wrap_with_realistic_env(num_validators, test)) // Test inherits the main EmitJobRequest, so update here for more precise latency measurements .with_emit_job( EmitJobRequest::default().latency_polling_interval(Duration::from_millis(100)), @@ -82,10 +83,13 @@ pub(crate) fn realistic_env_sweep_wrap( pub(crate) fn realistic_env_load_sweep_test() -> ForgeConfig { realistic_env_sweep_wrap(10, 5, LoadVsPerfBenchmark { test: Box::new(PerformanceBenchmark), - workloads: Workloads::TPS(vec![100, 10000]), + workloads: Workloads::TPS(vec![1000, 2500, 5000, 7500, 10000]), criteria: [ (95, 0.9, 1.1, 1.2, 0), - (6700, 2.5, 3.5, 5.0, 0), + (95, 0.9, 1.1, 1.2, 0), + (95, 0.9, 1.1, 1.2, 0), + (95, 0.9, 1.1, 1.2, 0), + (95, 0.9, 1.1, 1.2, 0), // TODO add 9k or 10k. Allow some expired transactions (high-load) ] .into_iter() diff --git a/testsuite/forge-test-runner-template.yaml b/testsuite/forge-test-runner-template.yaml index c31e864dec8ba..2a4dc3abfdc46 100644 --- a/testsuite/forge-test-runner-template.yaml +++ b/testsuite/forge-test-runner-template.yaml @@ -83,9 +83,6 @@ spec: - effect: NoExecute key: aptos.org/nodepool value: validators - - effect: NoExecute - key: AptosNodeOnly - value: "false" volumes: - name: multiregion-kubeconfig secret: diff --git a/testsuite/forge.py b/testsuite/forge.py index dcf15e3e3233b..e0a1d4f8f88b7 100644 --- a/testsuite/forge.py +++ b/testsuite/forge.py @@ -1560,7 +1560,7 @@ def test( asyncio.run(forge_cluster.write(context.shell)) # These features and profile flags are set as strings - enable_failpoints = forge_enable_failpoints == "true" + enable_failpoints = True enable_performance_profile = forge_enable_performance == "true" # In the below, assume that the image is pushed to all registries diff --git a/testsuite/forge/src/backend/k8s/cluster_helper.rs b/testsuite/forge/src/backend/k8s/cluster_helper.rs index 090ba01bb006d..93fe76c773d59 100644 --- a/testsuite/forge/src/backend/k8s/cluster_helper.rs +++ b/testsuite/forge/src/backend/k8s/cluster_helper.rs @@ -647,7 +647,7 @@ pub async fn install_testnet_resources( kube_client.clone(), kube_namespace.clone(), FORGE_TESTNET_DEPLOYER_DOCKER_IMAGE_REPO.to_string(), - Some("7ab32f3eb20e28fbc2103b0fbe82148063794b30".into()), + None, ); testnet_deployer.start(config).await?; diff --git a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs index dc640d98146da..8bbda0172bd26 100644 --- a/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs +++ b/testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs @@ -347,6 +347,47 @@ async fn test_execution_retry() { .unwrap(); } +#[tokio::test] +async fn test_fault_tolerance_of_leader_equivocation() { + let num_validators = 4; + + let swarm = create_swarm(num_validators, 1).await; + let (validator_clients, public_info) = { + ( + swarm.get_validator_clients_with_names(), + swarm.aptos_public_info(), + ) + }; + test_consensus_fault_tolerance( + validator_clients, + public_info, + 3, + 5.0, + 1, + Box::new(FailPointFailureInjection::new(Box::new(move |cycle, _| { + ( + vec![( + cycle % num_validators, + "consensus::leader_equivocation".to_string(), + format!("{}%return", 50), + )], + true, + ) + }))), + Box::new( + move |_, executed_epochs, executed_rounds, executed_transactions, _, _| { + successful_criteria(executed_epochs, executed_rounds, executed_transactions); + Ok(()) + }, + ), + true, + false, + ) + .await + .unwrap(); + panic!("test_fault_tolerance_of_leader_equivocation"); +} + #[tokio::test] async fn test_fault_tolerance_of_network_send() { // Randomly increase network failure rate, until network halts, and check that it comes back afterwards. diff --git a/testsuite/test_framework/cluster.py b/testsuite/test_framework/cluster.py index 0f12e3d47fedb..e7e8ac7bc7db8 100644 --- a/testsuite/test_framework/cluster.py +++ b/testsuite/test_framework/cluster.py @@ -63,7 +63,7 @@ def set_kubeconf(self, kubeconf: str) -> ForgeCluster: @property def kubectl_create_context_arg(self) -> List[str]: - return [] if self.is_multiregion else [] + return ["--context=karmada-apiserver"] if self.is_multiregion else [] async def write(self, shell: Shell) -> None: assert self.kubeconf is not None, "kubeconf must be set" diff --git a/testsuite/testcases/src/performance_test.rs b/testsuite/testcases/src/performance_test.rs index 63786565d1f98..ebcdead6d6481 100644 --- a/testsuite/testcases/src/performance_test.rs +++ b/testsuite/testcases/src/performance_test.rs @@ -2,8 +2,11 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::{sync::Arc, time::Duration}; +use anyhow::{anyhow, bail, Context}; + use crate::NetworkLoadTest; -use aptos_forge::{NetworkContextSynchronizer, NetworkTest, Result, Test}; +use aptos_forge::{NetworkContextSynchronizer, NetworkTest, Result, Swarm, SwarmExt, Test, TestReport}; use async_trait::async_trait; pub struct PerformanceBenchmark; @@ -14,7 +17,34 @@ impl Test for PerformanceBenchmark { } } -impl NetworkLoadTest for PerformanceBenchmark {} +#[async_trait] +impl NetworkLoadTest for PerformanceBenchmark { + async fn test( + &self, + swarm: Arc>>, + _report: &mut TestReport, + duration: Duration, + ) -> Result<()> { + let validators = { swarm.read().await.get_validator_clients_with_names() }; + let num_bad_leaders = validators.len() / 5; + for (name, validator) in validators[..num_bad_leaders].iter() { + validator + .set_failpoint( + "consensus::leader_equivocation".to_string(), + "return".to_string(), + ) + .await + .map_err(|e| { + anyhow!( + "set_failpoint to set consensus leader equivocation on {} failed, {:?}", + name, + e + ) + })?; + }; + Ok(()) + } +} #[async_trait] impl NetworkTest for PerformanceBenchmark {