Skip to content

Commit

Permalink
feat(stream): stream window join naming (risingwavelabs#9076)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Apr 10, 2023
1 parent fd38f3d commit 4929b30
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
8 changes: 4 additions & 4 deletions src/frontend/planner_test/tests/testdata/watermark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,26 @@
└─StreamAppendOnlyHashAgg { group_key: [t.ts, t.v1], aggs: [count(t.v2), count], output_watermarks: [t.ts] }
└─StreamExchange { dist: HashShard(t.ts, t.v1) }
└─StreamTableScan { table: t, columns: [t.ts, t.v1, t.v2, t._row_id], pk: [t._row_id], dist: UpstreamHashShard(t._row_id) }
- name: hash join
- name: inner window join
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
select t1.ts as t1_ts, t2.ts as ts2, t1.v1 as t1_v1, t1.v2 as t1_v2, t2.v1 as t2_v1, t2.v2 as t2_v2 from t1, t2 where t1.ts = t2.ts;
stream_plan: |
StreamMaterialize { columns: [t1_ts, ts2, t1_v1, t1_v2, t2_v1, t2_v2, t1._row_id(hidden), t2._row_id(hidden)], stream_key: [t1._row_id, t2._row_id, t1_ts], pk_columns: [t1._row_id, t2._row_id, t1_ts], pk_conflict: "NoCheck", watermark_columns: [t1_ts, ts2] }
└─StreamAppendOnlyHashJoin { type: Inner, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts, t2.ts], output: [t1.ts, t2.ts, t1.v1, t1.v2, t2.v1, t2.v2, t1._row_id, t2._row_id] }
└─StreamWindowJoin { type: Inner, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts, t2.ts], output: [t1.ts, t2.ts, t1.v1, t1.v2, t2.v1, t2.v2, t1._row_id, t2._row_id] }
├─StreamExchange { dist: HashShard(t1.ts) }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.ts) }
└─StreamTableScan { table: t2, columns: [t2.ts, t2.v1, t2.v2, t2._row_id], pk: [t2._row_id], dist: UpstreamHashShard(t2._row_id) }
- name: left semi hash join
- name: left semi window join
sql: |
create table t1 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
create table t2 (ts timestamp with time zone, v1 int, v2 int, watermark for ts as ts - INTERVAL '1' SECOND) append only;
select t1.ts as t1_ts, t1.v1 as t1_v1, t1.v2 as t1_v2 from t1 where exists (select * from t2 where t1.ts = t2.ts);
stream_plan: |
StreamMaterialize { columns: [t1_ts, t1_v1, t1_v2, t1._row_id(hidden)], stream_key: [t1._row_id, t1_ts], pk_columns: [t1._row_id, t1_ts], pk_conflict: "NoCheck", watermark_columns: [t1_ts] }
└─StreamHashJoin { type: LeftSemi, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts], output: all }
└─StreamWindowJoin { type: LeftSemi, predicate: t1.ts = t2.ts, output_watermarks: [t1.ts], output: all }
├─StreamExchange { dist: HashShard(t1.ts) }
| └─StreamTableScan { table: t1, columns: [t1.ts, t1.v1, t1.v2, t1._row_id], pk: [t1._row_id], dist: UpstreamHashShard(t1._row_id) }
└─StreamExchange { dist: HashShard(t2.ts) }
Expand Down
13 changes: 12 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,18 @@ impl StreamHashJoin {

impl fmt::Display for StreamHashJoin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = if self.clean_left_state_conjunction_idx.is_some()
let (ljk, rjk) = self
.eq_join_predicate
.eq_indexes()
.first()
.cloned()
.expect("first join key");

let mut builder = if self.left().watermark_columns().contains(ljk)
&& self.right().watermark_columns().contains(rjk)
{
f.debug_struct("StreamWindowJoin")
} else if self.clean_left_state_conjunction_idx.is_some()
&& self.clean_right_state_conjunction_idx.is_some()
{
f.debug_struct("StreamIntervalJoin")
Expand Down

0 comments on commit 4929b30

Please sign in to comment.