Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] paper forge #15587

Draft
wants to merge 10 commits into
base: daniel-paper
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions consensus/consensus-types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use anyhow::{bail, ensure, format_err};
use aptos_bitvec::BitVec;
use aptos_crypto::{bls12381, hash::CryptoHash, HashValue};
use aptos_crypto::{bls12381::{self, Signature}, hash::CryptoHash, HashValue};
use aptos_infallible::duration_since_epoch;
use aptos_types::{
account_address::AccountAddress,
Expand Down Expand Up @@ -87,6 +87,14 @@ impl Block {
self.is_opt
}

pub fn set_timestamp(&mut self, timestamp: u64) {
self.block_data.set_timestamp(timestamp);
}

pub fn set_signature(&mut self, signature: Signature) {
self.signature = Some(signature);
}

pub fn set_quorum_cert(&mut self, qc: QuorumCert) {
self.block_data.set_quorum_cert(qc);
}
Expand Down Expand Up @@ -524,7 +532,7 @@ impl Block {
self.epoch(),
self.round(),
self.author().unwrap_or(AccountAddress::ZERO),
vec![1; self.previous_bitvec().num_buckets()], // optimistic proposal hack
BitVec::default().into(), // optimistic proposal hack
// For nil block, we use 0x0 which is convention for nil address in move.
self.block_data()
.failed_authors()
Expand Down
4 changes: 4 additions & 0 deletions consensus/consensus-types/src/block_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl CryptoHash for BlockData {
}

impl BlockData {
pub fn set_timestamp(&mut self, timestamp: u64) {
self.timestamp_usecs = timestamp;
}

pub fn set_quorum_cert(&mut self, qc: QuorumCert) {
self.quorum_cert = qc;
}
Expand Down
9 changes: 9 additions & 0 deletions consensus/consensus-types/src/proposal_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use crate::{block::Block, common::Author, proof_of_store::ProofCache, sync_info::SyncInfo};
use anyhow::{anyhow, ensure, format_err, Context, Ok, Result};
use aptos_crypto::bls12381::Signature;
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::validator_verifier::ValidatorVerifier;
use serde::{Deserialize, Serialize};
Expand All @@ -26,6 +27,14 @@ impl ProposalMsg {
}
}

pub fn set_timestamp(&mut self, timestamp: u64) {
self.proposal.set_timestamp(timestamp);
}

pub fn set_signature(&mut self, signature: Signature) {
self.proposal.set_signature(signature);
}

pub fn epoch(&self) -> u64 {
self.proposal.epoch()
}
Expand Down
5 changes: 5 additions & 0 deletions consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct PipelineBuilder {
state_sync_notifier: Arc<dyn ConsensusNotificationSender>,
payload_manager: Arc<dyn TPayloadManager>,
txn_notifier: Arc<dyn TxnNotifier>,
execution_lock: Arc<Mutex<()>>,
}

fn spawn_shared_fut<
Expand Down Expand Up @@ -191,6 +192,7 @@ impl PipelineBuilder {
state_sync_notifier,
payload_manager,
txn_notifier,
execution_lock: Arc::new(Mutex::new(())),
}
}

Expand Down Expand Up @@ -320,6 +322,7 @@ impl PipelineBuilder {
self.is_randomness_enabled,
self.validators.clone(),
self.block_executor_onchain_config.clone(),
self.execution_lock.clone(),
),
&mut abort_handles,
);
Expand Down Expand Up @@ -456,6 +459,7 @@ impl PipelineBuilder {
is_randomness_enabled: bool,
validator: Arc<[AccountAddress]>,
onchain_execution_config: BlockExecutorConfigFromOnchain,
execution_lock: Arc<Mutex<()>>,
) -> TaskResult<ExecuteResult> {
let mut tracker = Tracker::new("execute", &block);
parent_block_execute_phase.await?;
Expand Down Expand Up @@ -487,6 +491,7 @@ impl PipelineBuilder {
.concat();
let start = Instant::now();
tokio::task::spawn_blocking(move || {
let _guard = execution_lock.lock();
executor
.execute_and_state_checkpoint(
(block.id(), txns).into(),
Expand Down
43 changes: 42 additions & 1 deletion consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl RoundManager {
sync_info,
network.clone(),
proposal_generator,
safety_rules,
safety_rules.clone(),
proposer_election,
parent_id,
)
Expand All @@ -474,9 +474,19 @@ impl RoundManager {
{
if Self::check_whether_to_inject_reconfiguration_error() {
Self::attempt_to_inject_reconfiguration_error(
epoch_state.clone(),
network.clone(),
&proposal_msg,
)
.await?;
}

if Self::check_whether_to_equivocate() {
Self::attempt_to_equivocate(
epoch_state,
network.clone(),
&proposal_msg,
safety_rules.clone()
)
.await?;
}
Expand Down Expand Up @@ -1939,6 +1949,12 @@ impl RoundManager {
false
}

#[cfg(feature = "failpoints")]
fn check_whether_to_equivocate() -> bool {
fail_point!("consensus::leader_equivocation", |_| true);
false
}

/// Given R1 <- B2 if R1 has the reconfiguration txn, we inject error on B2 if R1.round + 1 = B2.round
/// Direct suffix is checked by parent.has_reconfiguration && !parent.parent.has_reconfiguration
/// The error is injected by sending proposals to half of the validators to force a timeout.
Expand Down Expand Up @@ -1973,4 +1989,29 @@ impl RoundManager {
Ok(())
}
}

#[cfg(feature = "failpoints")]
async fn attempt_to_equivocate(
epoch_state: Arc<EpochState>,
network: Arc<NetworkSender>,
proposal_msg: &ProposalMsg,
safety_rules: Arc<Mutex<MetricsSafetyRules>>,
) -> anyhow::Result<()> {
info!("[Test] Leader of epoch {} round {} equivocates", epoch_state.epoch, proposal_msg.proposal().round());

let all_peers: Vec<_> = epoch_state
.verifier
.get_ordered_account_addresses_iter()
.collect();
let mut timestamp = proposal_msg.proposal().block_data().timestamp_usecs();
for peer in all_peers {
timestamp += 1;
let mut modified_proposal_msg = proposal_msg.clone();
modified_proposal_msg.set_timestamp(timestamp);
let signature = safety_rules.lock().sign_proposal(modified_proposal_msg.proposal().block_data())?;
modified_proposal_msg.set_signature(signature);
network.send_proposal(modified_proposal_msg.clone(), vec![peer]).await;
}
Err(anyhow::anyhow!("Injected leader equivocation"))
}
}
3 changes: 1 addition & 2 deletions terraform/helm/genesis/files/genesis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,7 @@ create_secrets() {
)

if [[ "$include_genesis_blob" == "true" ]]; then
tar -C $WORKSPACE -czf ${WORKSPACE}/genesis.blob.tar.gz genesis.blob
files_to_include+=("--from-file=genesis.blob.tar.gz=${WORKSPACE}/genesis.blob.tar.gz")
files_to_include+=("--from-file=genesis.blob=${WORKSPACE}/genesis.blob")
fi

kubectl create secret generic "${username}-genesis-e${ERA}" "${files_to_include[@]}"
Expand Down
10 changes: 7 additions & 3 deletions testsuite/forge-cli/src/suites/realistic_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ pub(crate) fn realistic_env_sweep_wrap(
.with_initial_fullnode_count(num_fullnodes)
.with_validator_override_node_config_fn(Arc::new(|config, _| {
config.execution.processed_transactions_detailed_counters = true;
config.api.failpoints_enabled = true;
}))
.add_network_test(test)
.add_network_test(wrap_with_realistic_env(num_validators, test))
// Test inherits the main EmitJobRequest, so update here for more precise latency measurements
.with_emit_job(
EmitJobRequest::default().latency_polling_interval(Duration::from_millis(100)),
Expand All @@ -82,10 +83,13 @@ pub(crate) fn realistic_env_sweep_wrap(
pub(crate) fn realistic_env_load_sweep_test() -> ForgeConfig {
realistic_env_sweep_wrap(10, 5, LoadVsPerfBenchmark {
test: Box::new(PerformanceBenchmark),
workloads: Workloads::TPS(vec![100, 10000]),
workloads: Workloads::TPS(vec![1000, 2500, 5000, 7500, 10000]),
criteria: [
(95, 0.9, 1.1, 1.2, 0),
(6700, 2.5, 3.5, 5.0, 0),
(95, 0.9, 1.1, 1.2, 0),
(95, 0.9, 1.1, 1.2, 0),
(95, 0.9, 1.1, 1.2, 0),
(95, 0.9, 1.1, 1.2, 0),
// TODO add 9k or 10k. Allow some expired transactions (high-load)
]
.into_iter()
Expand Down
3 changes: 0 additions & 3 deletions testsuite/forge-test-runner-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ spec:
- effect: NoExecute
key: aptos.org/nodepool
value: validators
- effect: NoExecute
key: AptosNodeOnly
value: "false"
volumes:
- name: multiregion-kubeconfig
secret:
Expand Down
2 changes: 1 addition & 1 deletion testsuite/forge.py
Original file line number Diff line number Diff line change
Expand Up @@ -1560,7 +1560,7 @@ def test(
asyncio.run(forge_cluster.write(context.shell))

# These features and profile flags are set as strings
enable_failpoints = forge_enable_failpoints == "true"
enable_failpoints = True
enable_performance_profile = forge_enable_performance == "true"

# In the below, assume that the image is pushed to all registries
Expand Down
2 changes: 1 addition & 1 deletion testsuite/forge/src/backend/k8s/cluster_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ pub async fn install_testnet_resources(
kube_client.clone(),
kube_namespace.clone(),
FORGE_TESTNET_DEPLOYER_DOCKER_IMAGE_REPO.to_string(),
Some("7ab32f3eb20e28fbc2103b0fbe82148063794b30".into()),
None,
);

testnet_deployer.start(config).await?;
Expand Down
41 changes: 41 additions & 0 deletions testsuite/smoke-test/src/consensus/consensus_fault_tolerance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,47 @@ async fn test_execution_retry() {
.unwrap();
}

#[tokio::test]
async fn test_fault_tolerance_of_leader_equivocation() {
let num_validators = 4;

let swarm = create_swarm(num_validators, 1).await;
let (validator_clients, public_info) = {
(
swarm.get_validator_clients_with_names(),
swarm.aptos_public_info(),
)
};
test_consensus_fault_tolerance(
validator_clients,
public_info,
3,
5.0,
1,
Box::new(FailPointFailureInjection::new(Box::new(move |cycle, _| {
(
vec![(
cycle % num_validators,
"consensus::leader_equivocation".to_string(),
format!("{}%return", 50),
)],
true,
)
}))),
Box::new(
move |_, executed_epochs, executed_rounds, executed_transactions, _, _| {
successful_criteria(executed_epochs, executed_rounds, executed_transactions);
Ok(())
},
),
true,
false,
)
.await
.unwrap();
panic!("test_fault_tolerance_of_leader_equivocation");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This panic! statement forces the test to fail unconditionally after completing its checks. Since the test already has success criteria and error handling, this panic should be removed to allow the test to properly report its actual results.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

}

#[tokio::test]
async fn test_fault_tolerance_of_network_send() {
// Randomly increase network failure rate, until network halts, and check that it comes back afterwards.
Expand Down
2 changes: 1 addition & 1 deletion testsuite/test_framework/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def set_kubeconf(self, kubeconf: str) -> ForgeCluster:

@property
def kubectl_create_context_arg(self) -> List[str]:
return [] if self.is_multiregion else []
return ["--context=karmada-apiserver"] if self.is_multiregion else []

async def write(self, shell: Shell) -> None:
assert self.kubeconf is not None, "kubeconf must be set"
Expand Down
34 changes: 32 additions & 2 deletions testsuite/testcases/src/performance_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{sync::Arc, time::Duration};
use anyhow::{anyhow, bail, Context};

use crate::NetworkLoadTest;
use aptos_forge::{NetworkContextSynchronizer, NetworkTest, Result, Test};
use aptos_forge::{NetworkContextSynchronizer, NetworkTest, Result, Swarm, SwarmExt, Test, TestReport};
use async_trait::async_trait;

pub struct PerformanceBenchmark;
Expand All @@ -14,7 +17,34 @@ impl Test for PerformanceBenchmark {
}
}

impl NetworkLoadTest for PerformanceBenchmark {}
#[async_trait]
impl NetworkLoadTest for PerformanceBenchmark {
async fn test(
&self,
swarm: Arc<tokio::sync::RwLock<Box<dyn Swarm>>>,
_report: &mut TestReport,
duration: Duration,
) -> Result<()> {
let validators = { swarm.read().await.get_validator_clients_with_names() };
let num_bad_leaders = validators.len() / 5;
for (name, validator) in validators[..num_bad_leaders].iter() {
validator
.set_failpoint(
"consensus::leader_equivocation".to_string(),
"return".to_string(),
)
.await
.map_err(|e| {
anyhow!(
"set_failpoint to set consensus leader equivocation on {} failed, {:?}",
name,
e
)
})?;
};
Ok(())
}
}

#[async_trait]
impl NetworkTest for PerformanceBenchmark {
Expand Down
Loading