Skip to content

Commit

Permalink
fix: evict hash join cache every n messages. (risingwavelabs#8731)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Mar 23, 2023
1 parent 8261a30 commit 0e8d81f
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ use crate::task::AtomicU64Ref;
/// enum is not supported in const generic.
// TODO: Use enum to replace this once [feature(adt_const_params)](https://github.com/rust-lang/rust/issues/95174) get completed.
pub type JoinTypePrimitive = u8;

/// Evict the cache every n rows.
const EVICT_EVERY_N_ROWS: u32 = 1024;

#[allow(non_snake_case, non_upper_case_globals)]
pub mod JoinType {
use super::JoinTypePrimitive;
Expand Down Expand Up @@ -242,6 +246,8 @@ pub struct HashJoinExecutor<K: HashKey, S: StateStore, const T: JoinTypePrimitiv
metrics: Arc<StreamingMetrics>,
/// The maximum size of the chunk produced by executor at a time
chunk_size: usize,
/// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES`
cnt_rows_received: u32,

/// watermark column index -> `BufferedWatermarks`
watermark_buffers: BTreeMap<usize, BufferedWatermarks<SideTypePrimitive>>,
Expand Down Expand Up @@ -603,6 +609,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
append_only_optimize,
metrics,
chunk_size,
cnt_rows_received: 0,
watermark_buffers,
}
}
Expand Down Expand Up @@ -662,6 +669,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
chunk,
self.append_only_optimize,
self.chunk_size,
&mut self.cnt_rows_received,
) {
left_time += left_start_time.elapsed();
yield Message::Chunk(chunk?);
Expand All @@ -687,6 +695,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
chunk,
self.append_only_optimize,
self.chunk_size,
&mut self.cnt_rows_received,
) {
right_time += right_start_time.elapsed();
yield Message::Chunk(chunk?);
Expand Down Expand Up @@ -752,14 +761,23 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
// `commit` them here.
self.side_l.ht.flush(epoch).await?;
self.side_r.ht.flush(epoch).await?;

// We need to manually evict the cache to the target capacity.
self.side_l.ht.evict();
self.side_r.ht.evict();

Ok(())
}

// We need to manually evict the cache.
fn evict_cache(
side_update: &mut JoinSide<K, S>,
side_match: &mut JoinSide<K, S>,
cnt_rows_received: &mut u32,
) {
*cnt_rows_received += 1;
if *cnt_rows_received == EVICT_EVERY_N_ROWS {
side_update.ht.evict();
side_match.ht.evict();
*cnt_rows_received = 0;
}
}

fn handle_watermark(
&mut self,
side: SideTypePrimitive,
Expand Down Expand Up @@ -850,6 +868,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
chunk: StreamChunk,
append_only_optimize: bool,
chunk_size: usize,
cnt_rows_received: &'a mut u32,
) {
let chunk = chunk.compact();

Expand All @@ -870,6 +889,8 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,

let keys = K::build(&side_update.join_key_indices, chunk.data_chunk())?;
for ((op, row), key) in chunk.rows().zip_eq_debug(keys.iter()) {
Self::evict_cache(side_update, side_match, cnt_rows_received);

let matched_rows: Option<HashValueType> =
Self::hash_eq_match(key, &mut side_match.ht).await?;
match op {
Expand Down

0 comments on commit 0e8d81f

Please sign in to comment.