Skip to content

Commit

Permalink
generate standard results at runtime
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 committed Jan 31, 2023
1 parent 6faa9b1 commit 0b9c309
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 7,261 deletions.
77 changes: 44 additions & 33 deletions src/tests/simulation/tests/nexmark_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,65 @@ use anyhow::Result;
use madsim::time::{sleep, Instant};
use risingwave_simulation::cluster::{Configuration, KillOpts};
use risingwave_simulation::nexmark::{self, NexmarkCluster, THROUGHPUT};
use risingwave_simulation::utils::AssertResult;

/// Setup a nexmark stream, inject failures, and verify results.
#[madsim::test]
async fn nexmark_recovery() -> Result<()> {
async fn nexmark_recovery_common(create: &str, select: &str, drop: &str) -> Result<()> {
// tracing_subscriber::fmt()
// .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
// .init();

let mut cluster =
NexmarkCluster::new(Configuration::for_scale(), 2, Some(THROUGHPUT * 20)).await?;
NexmarkCluster::new(Configuration::for_scale(), 6, Some(THROUGHPUT * 20)).await?;

// note: feel free to disable queries to speed up the test
cluster.run(nexmark::queries::q3::CREATE).await.unwrap();
cluster.run(nexmark::queries::q4::CREATE).await.unwrap();
cluster.run(nexmark::queries::q5::CREATE).await.unwrap();
cluster.run(nexmark::queries::q7::CREATE).await.unwrap();
cluster.run(nexmark::queries::q8::CREATE).await.unwrap();
cluster.run(nexmark::queries::q9::CREATE).await.unwrap();
// get the output without failures as the standard result
cluster.run(create).await?;
sleep(Duration::from_secs(30)).await;
let expected = cluster.run(select).await?;
cluster.run(drop).await?;
sleep(Duration::from_secs(5)).await;

cluster.run(create).await?;

// kill nodes and trigger recovery
for _ in 0..5 {
sleep(Duration::from_secs(2)).await;
cluster.kill_node(&KillOpts::ALL).await;
}
// wait enough time to make sure the stream is end
sleep(Duration::from_secs(60)).await;

// make sure running for enough time
sleep(Duration::from_secs(30)).await;

let q3 = cluster.run(nexmark::queries::q3::SELECT).await.unwrap();
let q4 = cluster.run(nexmark::queries::q4::SELECT).await.unwrap();
let q5 = cluster.run(nexmark::queries::q5::SELECT).await.unwrap();
let q7 = cluster.run(nexmark::queries::q7::SELECT).await.unwrap();
let q8 = cluster.run(nexmark::queries::q8::SELECT).await.unwrap();
let q9 = cluster.run(nexmark::queries::q9::SELECT).await.unwrap();

// uncomment the following lines to generate results
// std::fs::write("tests/nexmark_result/q3.txt", q3).unwrap();
// std::fs::write("tests/nexmark_result/q4.txt", q4).unwrap();
// std::fs::write("tests/nexmark_result/q5.txt", q5).unwrap();
// std::fs::write("tests/nexmark_result/q7.txt", q7).unwrap();
// std::fs::write("tests/nexmark_result/q8.txt", q8).unwrap();
// std::fs::write("tests/nexmark_result/q9.txt", q9).unwrap();
cluster.run(select).await?.assert_result_eq(&expected);

assert_eq!(q3, include_str!("nexmark_result/q3.txt"));
assert_eq!(q4, include_str!("nexmark_result/q4.txt"));
assert_eq!(q5, include_str!("nexmark_result/q5.txt"));
assert_eq!(q7, include_str!("nexmark_result/q7.txt"));
assert_eq!(q8, include_str!("nexmark_result/q8.txt"));
assert_eq!(q9, include_str!("nexmark_result/q9.txt"));
Ok(())
}

macro_rules! test {
($query:ident) => {
paste::paste! {
#[madsim::test]
async fn [< nexmark_recovery_ $query >]() -> Result<()> {
use risingwave_simulation::nexmark::queries::$query::*;
nexmark_recovery_common(CREATE, SELECT, DROP)
.await
}
}
};
}

// q0, q1, q2: too trivial
test!(q3);
test!(q4);
test!(q5);
// q6: cannot plan
test!(q7);
test!(q8);
test!(q9);
// q10+: duplicated or unsupported

// Self made queries.
test!(q101);
test!(q102);
test!(q103);
test!(q104);
test!(q105);
Loading

0 comments on commit 0b9c309

Please sign in to comment.