Skip to content

Commit

Permalink
refactor(expr): make evaluation async (risingwavelabs#8229)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Mar 15, 2023
1 parent 1a11c3f commit 03cc2ae
Show file tree
Hide file tree
Showing 77 changed files with 1,099 additions and 1,013 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl FilterExecutor {
#[for_await]
for data_chunk in self.child.execute() {
let data_chunk = data_chunk?.compact();
let vis_array = self.expr.eval(&data_chunk)?;
let vis_array = self.expr.eval(&data_chunk).await?;

if let Bool(vis) = vis_array.as_ref() {
// TODO: should we yield masked data chunk directly?
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl<K: HashKey + Send + Sync> HashAggExecutor<K> {

// TODO: currently not a vectorized implementation
for state in states {
state.update_single(&chunk, row_id)?
state.update_single(&chunk, row_id).await?
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ impl HopWindowExecutor {
let len = data_chunk.cardinality();
for i in 0..units {
let window_start_col = if output_indices.contains(&window_start_col_index) {
Some(self.window_start_exprs[i].eval(&data_chunk)?)
Some(self.window_start_exprs[i].eval(&data_chunk).await?)
} else {
None
};
let window_end_col = if output_indices.contains(&window_end_col_index) {
Some(self.window_end_exprs[i].eval(&data_chunk)?)
Some(self.window_end_exprs[i].eval(&data_chunk).await?)
} else {
None
};
Expand Down
73 changes: 50 additions & 23 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
#[for_await]
for chunk in Self::do_inner_join(params) {
let mut chunk = chunk?;
chunk.set_visibility(cond.eval(&chunk)?.as_bool().iter().collect());
chunk.set_visibility(cond.eval(&chunk).await?.as_bool().iter().collect());
yield chunk
}
}
Expand Down Expand Up @@ -473,7 +473,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
} else {
Expand All @@ -494,7 +495,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}

Expand Down Expand Up @@ -593,7 +595,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
}
Expand All @@ -606,7 +609,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}

Expand Down Expand Up @@ -657,7 +661,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
} else if let Some(spilled) = Self::append_one_probe_row(
Expand All @@ -675,7 +680,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
if let Some(spilled) = remaining_chunk_builder.consume_all() {
yield spilled
Expand Down Expand Up @@ -777,7 +783,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
}
Expand All @@ -787,7 +794,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
#[for_await]
for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
Expand Down Expand Up @@ -884,7 +892,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
}
}
Expand All @@ -894,7 +903,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
spilled,
cond.as_ref(),
&mut non_equi_state,
)?
)
.await?
}
#[for_await]
for spilled in Self::handle_remaining_build_rows_for_right_semi_anti_join::<ANTI_JOIN>(
Expand Down Expand Up @@ -1028,7 +1038,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
cond.as_ref(),
&mut left_non_equi_state,
&mut right_non_equi_state,
)?
)
.await?
}
}
} else {
Expand All @@ -1050,7 +1061,8 @@ impl<K: HashKey> HashJoinExecutor<K> {
cond.as_ref(),
&mut left_non_equi_state,
&mut right_non_equi_state,
)?
)
.await?
}
#[for_await]
for spilled in Self::handle_remaining_build_rows_for_right_outer_join(
Expand Down Expand Up @@ -1199,7 +1211,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
///
/// For more information about how `process_*_join_non_equi_condition` work, see their unit
/// tests.
fn process_left_outer_join_non_equi_condition(
async fn process_left_outer_join_non_equi_condition(
chunk: DataChunk,
cond: &dyn Expression,
LeftNonEquiJoinState {
Expand All @@ -1209,7 +1221,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
found_matched,
}: &mut LeftNonEquiJoinState,
) -> Result<DataChunk> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
Ok(DataChunkMutator(chunk)
.nullify_build_side_for_non_equi_condition(&filter, *probe_column_count)
.remove_duplicate_rows_for_left_outer_join(
Expand All @@ -1223,7 +1235,7 @@ impl<K: HashKey> HashJoinExecutor<K> {

/// Filters for candidate rows which satisfy `non_equi` predicate.
/// Removes duplicate rows.
fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
async fn process_left_semi_anti_join_non_equi_condition<const ANTI_JOIN: bool>(
chunk: DataChunk,
cond: &dyn Expression,
LeftNonEquiJoinState {
Expand All @@ -1233,7 +1245,7 @@ impl<K: HashKey> HashJoinExecutor<K> {
..
}: &mut LeftNonEquiJoinState,
) -> Result<DataChunk> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
Ok(DataChunkMutator(chunk)
.remove_duplicate_rows_for_left_semi_anti_join::<ANTI_JOIN>(
&filter,
Expand All @@ -1244,29 +1256,29 @@ impl<K: HashKey> HashJoinExecutor<K> {
.take())
}

fn process_right_outer_join_non_equi_condition(
async fn process_right_outer_join_non_equi_condition(
chunk: DataChunk,
cond: &dyn Expression,
RightNonEquiJoinState {
build_row_ids,
build_row_matched,
}: &mut RightNonEquiJoinState,
) -> Result<DataChunk> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
Ok(DataChunkMutator(chunk)
.remove_duplicate_rows_for_right_outer_join(&filter, build_row_ids, build_row_matched)
.take())
}

fn process_right_semi_anti_join_non_equi_condition(
async fn process_right_semi_anti_join_non_equi_condition(
chunk: DataChunk,
cond: &dyn Expression,
RightNonEquiJoinState {
build_row_ids,
build_row_matched,
}: &mut RightNonEquiJoinState,
) -> Result<()> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
DataChunkMutator(chunk).remove_duplicate_rows_for_right_semi_anti_join(
&filter,
build_row_ids,
Expand All @@ -1275,13 +1287,13 @@ impl<K: HashKey> HashJoinExecutor<K> {
Ok(())
}

fn process_full_outer_join_non_equi_condition(
async fn process_full_outer_join_non_equi_condition(
chunk: DataChunk,
cond: &dyn Expression,
left_non_equi_state: &mut LeftNonEquiJoinState,
right_non_equi_state: &mut RightNonEquiJoinState,
) -> Result<DataChunk> {
let filter = cond.eval(&chunk)?.as_bool().iter().collect();
let filter = cond.eval(&chunk).await?.as_bool().iter().collect();
Ok(DataChunkMutator(chunk)
.nullify_build_side_for_non_equi_condition(
&filter,
Expand Down Expand Up @@ -2609,6 +2621,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2638,6 +2651,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2667,6 +2681,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2706,6 +2721,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand All @@ -2732,6 +2748,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand All @@ -2758,6 +2775,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2799,6 +2817,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2827,6 +2846,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2855,6 +2875,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2918,6 +2939,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -2958,6 +2980,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -3010,6 +3033,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.is_ok()
);
assert_eq!(state.build_row_ids, Vec::new());
Expand Down Expand Up @@ -3044,6 +3068,7 @@ mod tests {
cond.as_ref(),
&mut state
)
.await
.is_ok()
);
assert_eq!(state.build_row_ids, Vec::new());
Expand Down Expand Up @@ -3105,6 +3130,7 @@ mod tests {
&mut left_state,
&mut right_state,
)
.await
.unwrap()
.compact(),
&expect
Expand Down Expand Up @@ -3152,6 +3178,7 @@ mod tests {
&mut left_state,
&mut right_state,
)
.await
.unwrap()
.compact(),
&expect
Expand Down
Loading

0 comments on commit 03cc2ae

Please sign in to comment.