Skip to content

Commit

Permalink
refactor(simulation): interface of running multiple queries in the sa…
Browse files Browse the repository at this point in the history
…me session (#8697)

Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Mar 22, 2023
1 parent 6313c42 commit 8648ff8
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 144 deletions.
91 changes: 63 additions & 28 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ use std::collections::HashMap;
use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};
use std::time::Duration;

use anyhow::{bail, Result};
use clap::Parser;
use futures::channel::{mpsc, oneshot};
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
use madsim::net::ipvs::*;
use madsim::runtime::{Handle, NodeHandle};
use madsim::task::JoinHandle;
use rand::Rng;
use sqllogictest::AsyncDB;

Expand Down Expand Up @@ -308,35 +311,47 @@ impl Cluster {
})
}

/// Run a SQL query from the client.
pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
let sql = sql.into();
/// Start a SQL session on the client node.
pub fn start_session(&mut self) -> Session {
let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);

self.client.spawn(async move {
let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;

let result = self
.client
.spawn(async move {
// TODO: reuse session
let mut session = RisingWave::connect("frontend".into(), "dev".into())
while let Some((sql, tx)) = query_rx.next().await {
let result = client
.run(&sql)
.await
.expect("failed to connect to RisingWave");
let result = session.run(&sql).await?;
Ok::<_, anyhow::Error>(result)
})
.await??;

match result {
sqllogictest::DBOutput::Rows { rows, .. } => Ok(rows
.into_iter()
.map(|row| {
row.into_iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(" ")
})
.collect::<Vec<_>>()
.join("\n")),
_ => Ok("".to_string()),
}
.map(|output| match output {
sqllogictest::DBOutput::Rows { rows, .. } => rows
.into_iter()
.map(|row| {
row.into_iter()
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(" ")
})
.collect::<Vec<_>>()
.join("\n"),
_ => "".to_string(),
})
.map_err(Into::into);

let _ = tx.send(result);
}

Ok::<_, anyhow::Error>(())
});

Session { query_tx }
}

/// Run a SQL query on a **new** session of the client node.
///
/// This is a convenience method that creates a new session and runs the query on it. If you
/// want to run multiple queries on the same session, use `start_session` and `Session::run`.
pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
self.start_session().run(sql).await
}

/// Run a future on the client node.
Expand Down Expand Up @@ -517,6 +532,26 @@ impl Cluster {
}
}

type SessionRequest = (
String, // query sql
oneshot::Sender<Result<String>>, // channel to send result back
);

/// A SQL session on the simulated client node.
#[derive(Debug, Clone)]
pub struct Session {
query_tx: mpsc::Sender<SessionRequest>,
}

impl Session {
/// Run the given SQL query on the session.
pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
let (tx, rx) = oneshot::channel();
self.query_tx.send((sql.into(), tx)).await?;
rx.await?
}
}

/// Options for killing nodes.
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct KillOpts {
Expand Down
46 changes: 24 additions & 22 deletions src/tests/simulation/tests/it/cascade_materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ const MV5: &str = "create materialized view m5 as select * from m4;";
#[madsim::test]
async fn test_simple_cascade_materialized_view() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();

cluster.run(ROOT_TABLE_CREATE).await?;
cluster.run(MV1).await?;
session.run(ROOT_TABLE_CREATE).await?;
session.run(MV1).await?;

let fragment = cluster
.locate_one_fragment([
Expand All @@ -62,17 +63,17 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
fragment.inner.actors.len()
);

cluster
session
.run(&format!(
"insert into t1 values {}",
(1..=10).map(|x| format!("({x})")).join(",")
))
.await?;

cluster.run("flush").await?;
session.run("flush").await?;

// v1 > 5, result is [6, 7, 8, 9, 10]
cluster
session
.run("select count(*) from m1")
.await?
.assert_result_eq("5");
Expand All @@ -92,21 +93,21 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
fragment.inner.actors.len()
);

cluster
session
.run("select count(*) from m1")
.await?
.assert_result_eq("5");

cluster
session
.run(&format!(
"insert into t1 values {}",
(11..=20).map(|x| format!("({x})")).join(",")
))
.await?;

cluster.run("flush").await?;
session.run("flush").await?;
// 10 < v1 < 15, result is [11, 12, 13, 14]
cluster
session
.run("select count(*) from m1")
.await?
.assert_result_eq("15");
Expand All @@ -117,13 +118,14 @@ async fn test_simple_cascade_materialized_view() -> Result<()> {
#[madsim::test]
async fn test_diamond_cascade_materialized_view() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();

cluster.run(ROOT_TABLE_CREATE).await?;
cluster.run(MV1).await?;
cluster.run(MV2).await?;
cluster.run(MV3).await?;
cluster.run(MV4).await?;
cluster.run(MV5).await?;
session.run(ROOT_TABLE_CREATE).await?;
session.run(MV1).await?;
session.run(MV2).await?;
session.run(MV3).await?;
session.run(MV4).await?;
session.run(MV5).await?;

let fragment = cluster
.locate_one_fragment([
Expand All @@ -141,15 +143,15 @@ async fn test_diamond_cascade_materialized_view() -> Result<()> {
let fragment = cluster.locate_fragment_by_id(id).await?;
assert_eq!(fragment.inner.actors.len(), 1);

cluster
session
.run(&format!(
"insert into t1 values {}",
(1..=10).map(|x| format!("({x})")).join(",")
))
.await?;

cluster.run("flush").await?;
cluster
session.run("flush").await?;
session
.run("select count(*) from m5")
.await?
.assert_result_eq("0");
Expand All @@ -160,20 +162,20 @@ async fn test_diamond_cascade_materialized_view() -> Result<()> {
let fragment = cluster.locate_fragment_by_id(id).await?;
assert_eq!(fragment.inner.actors.len(), 6);

cluster
session
.run("select count(*) from m5")
.await?
.assert_result_eq("0");

cluster
session
.run(&format!(
"insert into t1 values {}",
(11..=20).map(|x| format!("({x})")).join(",")
))
.await?;

cluster.run("flush").await?;
cluster
session.run("flush").await?;
session
.run("select count(*) from m5")
.await?
.assert_result_eq("4");
Expand Down
51 changes: 26 additions & 25 deletions src/tests/simulation/tests/it/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ const SELECT: &str = "select * from mv1 order by v1;";
#[madsim::test]
async fn test_dynamic_filter() -> Result<()> {
let mut cluster = Cluster::start(Configuration::for_scale()).await?;
let mut session = cluster.start_session();

cluster.run("create table t1 (v1 int);").await?;
cluster.run("create table t2 (v2 int);").await?;
cluster.run("create materialized view mv1 as with max_v2 as (select max(v2) max from t2) select v1 from t1, max_v2 where v1 > max;").await?;
cluster.run("insert into t1 values (1), (2), (3)").await?;
cluster.run("flush").await?;
session.run("create table t1 (v1 int);").await?;
session.run("create table t2 (v2 int);").await?;
session.run("create materialized view mv1 as with max_v2 as (select max(v2) max from t2) select v1 from t1, max_v2 where v1 > max;").await?;
session.run("insert into t1 values (1), (2), (3)").await?;
session.run("flush").await?;
sleep(Duration::from_secs(5)).await;

let dynamic_filter_fragment = cluster
Expand All @@ -60,53 +61,53 @@ async fn test_dynamic_filter() -> Result<()> {
cluster.reschedule(format!("{id}-[1,2,3]")).await?;
sleep(Duration::from_secs(3)).await;

cluster.run(SELECT).await?.assert_result_eq("");
cluster.run("insert into t2 values (0)").await?;
cluster.run("flush").await?;
session.run(SELECT).await?.assert_result_eq("");
session.run("insert into t2 values (0)").await?;
session.run("flush").await?;
sleep(Duration::from_secs(5)).await;
cluster.run(SELECT).await?.assert_result_eq("1\n2\n3");
session.run(SELECT).await?.assert_result_eq("1\n2\n3");
// 1
// 2
// 3

cluster.reschedule(format!("{id}-[4,5]+[1,2,3]")).await?;
sleep(Duration::from_secs(3)).await;
cluster.run(SELECT).await?.assert_result_eq("1\n2\n3");
session.run(SELECT).await?.assert_result_eq("1\n2\n3");

cluster.run("insert into t2 values (2)").await?;
cluster.run("flush").await?;
session.run("insert into t2 values (2)").await?;
session.run("flush").await?;
sleep(Duration::from_secs(5)).await;
cluster.run(SELECT).await?.assert_result_eq("3");
session.run(SELECT).await?.assert_result_eq("3");
// 3

cluster.reschedule(format!("{id}-[1,2,3]+[4,5]")).await?;
sleep(Duration::from_secs(3)).await;
cluster.run(SELECT).await?.assert_result_eq("3");
session.run(SELECT).await?.assert_result_eq("3");

cluster.run("update t2 set v2 = 1 where v2 = 2").await?;
cluster.run("flush").await?;
session.run("update t2 set v2 = 1 where v2 = 2").await?;
session.run("flush").await?;
sleep(Duration::from_secs(5)).await;
cluster.run(SELECT).await?.assert_result_eq("2\n3");
session.run(SELECT).await?.assert_result_eq("2\n3");
// 2
// 3
//
cluster.reschedule(format!("{id}+[1,2,3]")).await?;
sleep(Duration::from_secs(3)).await;
cluster.run(SELECT).await?.assert_result_eq("2\n3");
session.run(SELECT).await?.assert_result_eq("2\n3");

cluster.run("delete from t2 where true").await?;
cluster.run("flush").await?;
session.run("delete from t2 where true").await?;
session.run("flush").await?;
sleep(Duration::from_secs(5)).await;
cluster.run(SELECT).await?.assert_result_eq("");
session.run(SELECT).await?.assert_result_eq("");

cluster.reschedule(format!("{id}-[1]")).await?;
sleep(Duration::from_secs(3)).await;
cluster.run(SELECT).await?.assert_result_eq("");
session.run(SELECT).await?.assert_result_eq("");

cluster.run("insert into t2 values (1)").await?;
cluster.run("flush").await?;
session.run("insert into t2 values (1)").await?;
session.run("flush").await?;
sleep(Duration::from_secs(5)).await;
cluster.run(SELECT).await?.assert_result_eq("2\n3");
session.run(SELECT).await?.assert_result_eq("2\n3");

Ok(())
}
18 changes: 10 additions & 8 deletions src/tests/simulation/tests/it/nexmark_chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ async fn nexmark_chaos_common_inner(
) -> Result<()> {
let mut cluster =
NexmarkCluster::new(Configuration::for_scale(), 6, Some(20 * THROUGHPUT), false).await?;
cluster.run(create).await?;
let mut session = cluster.start_session();
session.run(create).await?;
sleep(Duration::from_secs(30)).await;
let final_result = cluster.run(select).await?;
cluster.run(drop).await?;
let final_result = session.run(select).await?;
session.run(drop).await?;
sleep(Duration::from_secs(5)).await;

println!("Reference run done.");

cluster.run(create).await?;
// Create a new session for the chaos run.
let mut session = cluster.start_session();
session.run(create).await?;

let _initial_result = cluster
.wait_until_non_empty(select, initial_interval, initial_timeout)
Expand All @@ -68,7 +70,7 @@ async fn nexmark_chaos_common_inner(
cluster.reschedule(join_plans(fragments)).await?;

sleep(after_scale_duration).await;
cluster.run(select).await?.assert_result_ne(&final_result);
session.run(select).await?.assert_result_ne(&final_result);

let fragments = cluster.locate_random_fragments().await?;
cluster.reschedule(join_plans(fragments)).await?;
Expand All @@ -78,15 +80,15 @@ async fn nexmark_chaos_common_inner(
cluster.reschedule(fragment.random_reschedule()).await?;

sleep(after_scale_duration).await;
cluster.run(select).await?.assert_result_ne(&final_result);
session.run(select).await?.assert_result_ne(&final_result);

let fragment = cluster.locate_fragment_by_id(id).await?;
cluster.reschedule(fragment.random_reschedule()).await?;
}

sleep(Duration::from_secs(50)).await;

cluster.run(select).await?.assert_result_eq(&final_result);
session.run(select).await?.assert_result_eq(&final_result);

Ok(())
}
Expand Down
Loading

0 comments on commit 8648ff8

Please sign in to comment.