Skip to content

Commit

Permalink
no need to store handle
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Mar 22, 2023
1 parent df8b52c commit bd8096b
Showing 1 changed file with 4 additions and 11 deletions.
15 changes: 4 additions & 11 deletions src/tests/simulation/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,11 @@ impl Cluster {
pub fn start_session(&mut self) -> Session {
let (query_tx, mut query_rx) = mpsc::channel::<SessionRequest>(0);

let join_handle = self.client.spawn(async move {
let mut session = RisingWave::connect("frontend".into(), "dev".into()).await?;
self.client.spawn(async move {
let mut client = RisingWave::connect("frontend".into(), "dev".into()).await?;

while let Some((sql, tx)) = query_rx.next().await {
let result = session
let result = client
.run(&sql)
.await
.map(|output| match output {
Expand All @@ -343,10 +343,7 @@ impl Cluster {
Ok::<_, anyhow::Error>(())
});

Session {
query_tx,
join_handle: Arc::new(join_handle),
}
Session { query_tx }
}

/// Run a SQL query on a **new** session of the client node.
Expand Down Expand Up @@ -544,15 +541,11 @@ type SessionRequest = (
#[derive(Debug, Clone)]
pub struct Session {
query_tx: mpsc::Sender<SessionRequest>,
join_handle: Arc<JoinHandle<Result<()>>>,
}

impl Session {
/// Run the given SQL query on the session.
pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
if self.join_handle.is_finished() {
bail!("session is finished");
}
let (tx, rx) = oneshot::channel();
self.query_tx.send((sql.into(), tx)).await?;
rx.await?
Expand Down

0 comments on commit bd8096b

Please sign in to comment.