From 0e8d81f65331a35b1fff54f33259c88f0d866e85 Mon Sep 17 00:00:00 2001 From: Yuhao Su <31772373+yuhao-su@users.noreply.github.com> Date: Thu, 23 Mar 2023 16:02:52 +0800 Subject: [PATCH] fix: evict hash join cache every n messages. (#8731) --- src/stream/src/executor/hash_join.rs | 31 +++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 5fac7026ffc03..4bbca3ac74148 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -52,6 +52,10 @@ use crate::task::AtomicU64Ref; /// enum is not supported in const generic. // TODO: Use enum to replace this once [feature(adt_const_params)](https://github.com/rust-lang/rust/issues/95174) get completed. pub type JoinTypePrimitive = u8; + +/// Evict the cache every n rows. +const EVICT_EVERY_N_ROWS: u32 = 1024; + #[allow(non_snake_case, non_upper_case_globals)] pub mod JoinType { use super::JoinTypePrimitive; @@ -242,6 +246,8 @@ pub struct HashJoinExecutor, /// The maximum size of the chunk produced by executor at a time chunk_size: usize, + /// Count the messages received, clear to 0 when counted to `EVICT_EVERY_N_MESSAGES` + cnt_rows_received: u32, /// watermark column index -> `BufferedWatermarks` watermark_buffers: BTreeMap>, @@ -603,6 +609,7 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor HashJoinExecutor, + side_match: &mut JoinSide, + cnt_rows_received: &mut u32, + ) { + *cnt_rows_received += 1; + if *cnt_rows_received == EVICT_EVERY_N_ROWS { + side_update.ht.evict(); + side_match.ht.evict(); + *cnt_rows_received = 0; + } + } + fn handle_watermark( &mut self, side: SideTypePrimitive, @@ -850,6 +868,7 @@ impl HashJoinExecutor HashJoinExecutor = Self::hash_eq_match(key, &mut side_match.ht).await?; match op {