diff --git a/src/frontend/planner_test/tests/testdata/bushy_join.yaml b/src/frontend/planner_test/tests/testdata/bushy_join.yaml index 02e55e1a34f38..e0071790f212f 100644 --- a/src/frontend/planner_test/tests/testdata/bushy_join.yaml +++ b/src/frontend/planner_test/tests/testdata/bushy_join.yaml @@ -93,41 +93,6 @@ r_comment VARCHAR, PRIMARY KEY (r_regionkey) ); -- id: tpch_q1 - before: - - create_tables - sql: | - select - l_returnflag, - l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - round(avg(l_quantity), 4) as avg_qty, - round(avg(l_extendedprice), 4) as avg_price, - round(avg(l_discount), 4) as avg_disc, - count(*) as count_order - from - lineitem - where - l_shipdate <= date '1998-12-01' - interval '71' day - group by - l_returnflag, - l_linestatus - order by - l_returnflag, - l_linestatus; - stream_plan: | - StreamMaterialize { columns: [l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order], stream_key: [l_returnflag, l_linestatus], pk_columns: [l_returnflag, l_linestatus], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [lineitem.l_returnflag, lineitem.l_linestatus, sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum($expr1), sum($expr2), RoundDigit((sum(lineitem.l_quantity) / count(lineitem.l_quantity)), 4:Int32) as $expr3, RoundDigit((sum(lineitem.l_extendedprice) / count(lineitem.l_extendedprice)), 4:Int32) as $expr4, RoundDigit((sum(lineitem.l_discount) / count(lineitem.l_discount)), 4:Int32) as $expr5, count] } - └─StreamHashAgg { group_key: [lineitem.l_returnflag, lineitem.l_linestatus], aggs: [sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum($expr1), sum($expr2), count(lineitem.l_quantity), count(lineitem.l_extendedprice), sum(lineitem.l_discount), count(lineitem.l_discount), count] } - └─StreamExchange { dist: HashShard(lineitem.l_returnflag, lineitem.l_linestatus) } - └─StreamProject { exprs: [lineitem.l_returnflag, lineitem.l_linestatus, lineitem.l_quantity, lineitem.l_extendedprice, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, ((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) * (1:Int32 + lineitem.l_tax)) as $expr2, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber] } - └─StreamFilter { predicate: (lineitem.l_shipdate <= '1998-09-21 00:00:00':Timestamp) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' - id: tpch_q2 before: - create_tables @@ -148,33 +113,33 @@ nation, region where - p_partkey = ps_partkey - and s_suppkey = ps_suppkey - and p_size = 4 - and p_type like '%TIN' - and s_nationkey = n_nationkey - and n_regionkey = r_regionkey - and r_name = 'AFRICA' - and ps_supplycost = ( - select - min(ps_supplycost) - from - partsupp, - supplier, - nation, - region - where - p_partkey = ps_partkey - and s_suppkey = ps_suppkey - and s_nationkey = n_nationkey - and n_regionkey = r_regionkey - and r_name = 'AFRICA' - ) + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and p_size = 4 + and p_type like '%TIN' + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + and ps_supplycost = ( + select + min(ps_supplycost) + from + partsupp, + supplier, + nation, + region + where + p_partkey = ps_partkey + and s_suppkey = ps_suppkey + and s_nationkey = n_nationkey + and n_regionkey = r_regionkey + and r_name = 'AFRICA' + ) order by - s_acctbal desc, - n_name, - s_name, - p_partkey + s_acctbal desc, + n_name, + s_name, + p_partkey limit 100; stream_plan: | StreamMaterialize { columns: [s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, region.r_regionkey(hidden), nation.n_nationkey(hidden), supplier.s_suppkey(hidden), part.p_partkey(hidden), partsupp.ps_partkey(hidden), partsupp.ps_suppkey(hidden), min(partsupp.ps_supplycost)(hidden)], stream_key: [region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, min(partsupp.ps_supplycost)], pk_columns: [s_acctbal, n_name, s_name, p_partkey, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, part.p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, min(partsupp.ps_supplycost)], pk_conflict: "NoCheck" } @@ -233,98 +198,6 @@ └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q3 - before: - - create_tables - sql: | - select - l_orderkey, - sum(l_extendedprice * (1 - l_discount)) as revenue, - o_orderdate, - o_shippriority - from - customer, - orders, - lineitem - where - c_mktsegment = 'FURNITURE' - and c_custkey = o_custkey - and l_orderkey = o_orderkey - and o_orderdate < date '1995-03-29' - and l_shipdate > date '1995-03-29' - group by - l_orderkey, - o_orderdate, - o_shippriority - order by - revenue desc, - o_orderdate - limit 10; - stream_plan: | - StreamMaterialize { columns: [l_orderkey, revenue, o_orderdate, o_shippriority], stream_key: [l_orderkey, o_orderdate, o_shippriority], pk_columns: [revenue, o_orderdate, l_orderkey, o_shippriority], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority] } - └─StreamTopN { order: "[sum($expr1) DESC, orders.o_orderdate ASC]", limit: 10, offset: 0 } - └─StreamExchange { dist: Single } - └─StreamGroupTopN { order: "[sum($expr1) DESC, orders.o_orderdate ASC]", limit: 10, offset: 0, group_key: [4] } - └─StreamProject { exprs: [lineitem.l_orderkey, sum($expr1), orders.o_orderdate, orders.o_shippriority, Vnode(lineitem.l_orderkey) as $expr2] } - └─StreamHashAgg { group_key: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority], aggs: [sum($expr1), count] } - └─StreamProject { exprs: [lineitem.l_orderkey, orders.o_orderdate, orders.o_shippriority, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } - └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [orders.o_orderdate, orders.o_shippriority, lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } - ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber] } - | └─StreamFilter { predicate: (lineitem.l_shipdate > '1995-03-29':Date) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamExchange { dist: HashShard(orders.o_orderkey) } - └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [orders.o_orderkey, orders.o_orderdate, orders.o_shippriority, customer.c_custkey] } - ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | └─StreamProject { exprs: [customer.c_custkey] } - | └─StreamFilter { predicate: (customer.c_mktsegment = 'FURNITURE':Varchar) } - | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_mktsegment], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - └─StreamExchange { dist: HashShard(orders.o_custkey) } - └─StreamFilter { predicate: (orders.o_orderdate < '1995-03-29':Date) } - └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate, orders.o_shippriority], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q4 - before: - - create_tables - sql: | - select - o_orderpriority, - count(*) as order_count - from - orders - where - o_orderdate >= date '1997-07-01' - and o_orderdate < date '1997-07-01' + interval '3' month - and exists ( - select - * - from - lineitem - where - l_orderkey = o_orderkey - and l_commitdate < l_receiptdate - ) - group by - o_orderpriority - order by - o_orderpriority; - stream_plan: | - StreamMaterialize { columns: [o_orderpriority, order_count], stream_key: [o_orderpriority], pk_columns: [o_orderpriority], pk_conflict: "NoCheck" } - └─StreamHashAgg { group_key: [orders.o_orderpriority], aggs: [count] } - └─StreamExchange { dist: HashShard(orders.o_orderpriority) } - └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderpriority, orders.o_orderkey] } - ├─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamProject { exprs: [orders.o_orderkey, orders.o_orderpriority] } - | └─StreamFilter { predicate: (orders.o_orderdate >= '1997-07-01':Date) AND (orders.o_orderdate < '1997-10-01 00:00:00':Timestamp) } - | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderpriority, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_linenumber] } - └─StreamFilter { predicate: (lineitem.l_commitdate < lineitem.l_receiptdate) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' - id: tpch_q5 before: - create_tables @@ -358,56 +231,32 @@ └─StreamProject { exprs: [nation.n_name, sum($expr1)] } └─StreamHashAgg { group_key: [nation.n_name], aggs: [sum($expr1), count] } └─StreamExchange { dist: HashShard(nation.n_name) } - └─StreamProject { exprs: [nation.n_name, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, lineitem.l_suppkey, customer.c_nationkey] } - └─StreamHashJoin { type: Inner, predicate: lineitem.l_suppkey = supplier.s_suppkey AND customer.c_nationkey = supplier.s_nationkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, lineitem.l_orderkey, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey, lineitem.l_suppkey, customer.c_nationkey, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey] } - ├─StreamExchange { dist: HashShard(lineitem.l_suppkey, customer.c_nationkey) } - | └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, customer.c_nationkey, lineitem.l_orderkey, lineitem.l_linenumber, customer.c_custkey, orders.o_orderkey] } - | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamProject { exprs: [nation.n_name, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber, supplier.s_suppkey, lineitem.l_suppkey, region.r_regionkey, nation.n_nationkey, orders.o_orderkey, customer.c_custkey, orders.o_custkey, supplier.s_nationkey] } + └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey AND supplier.s_nationkey = customer.c_nationkey AND supplier.s_nationkey = nation.n_nationkey, output: [lineitem.l_extendedprice, lineitem.l_discount, nation.n_name, lineitem.l_orderkey, lineitem.l_linenumber, supplier.s_suppkey, lineitem.l_suppkey, supplier.s_nationkey, region.r_regionkey, nation.n_nationkey, orders.o_orderkey, customer.c_custkey, orders.o_custkey] } + ├─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | └─StreamHashJoin { type: Inner, predicate: lineitem.l_suppkey = supplier.s_suppkey, output: [lineitem.l_orderkey, lineitem.l_extendedprice, lineitem.l_discount, supplier.s_nationkey, lineitem.l_linenumber, lineitem.l_suppkey, supplier.s_suppkey] } + | ├─StreamExchange { dist: HashShard(lineitem.l_suppkey) } | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - | └─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_nationkey, orders.o_orderkey, customer.c_custkey] } - | ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_nationkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - | └─StreamExchange { dist: HashShard(orders.o_custkey) } - | └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } - | └─StreamFilter { predicate: (orders.o_orderdate >= '1994-01-01':Date) AND (orders.o_orderdate < '1995-01-01 00:00:00':Timestamp) } - | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - └─StreamExchange { dist: HashShard(supplier.s_suppkey, supplier.s_nationkey) } - └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [nation.n_name, supplier.s_suppkey, supplier.s_nationkey, region.r_regionkey, nation.n_nationkey] } - ├─StreamExchange { dist: HashShard(nation.n_nationkey) } - | └─StreamHashJoin { type: Inner, predicate: region.r_regionkey = nation.n_regionkey, output: [nation.n_nationkey, nation.n_name, region.r_regionkey] } - | ├─StreamExchange { dist: HashShard(region.r_regionkey) } - | | └─StreamProject { exprs: [region.r_regionkey] } - | | └─StreamFilter { predicate: (region.r_name = 'MIDDLE EAST':Varchar) } - | | └─StreamTableScan { table: region, columns: [region.r_regionkey, region.r_name], pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } - | └─StreamExchange { dist: HashShard(nation.n_regionkey) } - | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } - └─StreamExchange { dist: HashShard(supplier.s_nationkey) } - └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q6 - before: - - create_tables - sql: | - select - sum(l_extendedprice * l_discount) as revenue - from - lineitem - where - l_shipdate >= date '1994-01-01' - and l_shipdate < date '1994-01-01' + interval '1' year - and l_discount between 0.08 - 0.01 and 0.08 + 0.01 - and l_quantity < 24; - stream_plan: | - StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [sum(sum($expr1))] } - └─StreamGlobalSimpleAgg { aggs: [sum(sum($expr1)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessLocalSimpleAgg { aggs: [sum($expr1)] } - └─StreamProject { exprs: [(lineitem.l_extendedprice * lineitem.l_discount) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber] } - └─StreamFilter { predicate: (lineitem.l_shipdate >= '1994-01-01':Date) AND (lineitem.l_shipdate < '1995-01-01 00:00:00':Timestamp) AND (lineitem.l_discount >= 0.07:Decimal) AND (lineitem.l_discount <= 0.09:Decimal) AND (lineitem.l_quantity < 24:Int32) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_quantity, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + | └─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | └─StreamFilter { predicate: (supplier.s_nationkey = supplier.s_nationkey) } + | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = customer.c_nationkey, output: [nation.n_nationkey, nation.n_name, orders.o_orderkey, customer.c_nationkey, region.r_regionkey, customer.c_custkey, orders.o_custkey] } + ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | └─StreamHashJoin { type: Inner, predicate: region.r_regionkey = nation.n_regionkey, output: [nation.n_nationkey, nation.n_name, region.r_regionkey] } + | ├─StreamExchange { dist: HashShard(region.r_regionkey) } + | | └─StreamProject { exprs: [region.r_regionkey] } + | | └─StreamFilter { predicate: (region.r_name = 'MIDDLE EAST':Varchar) } + | | └─StreamTableScan { table: region, columns: [region.r_regionkey, region.r_name], pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } + | └─StreamExchange { dist: HashShard(nation.n_regionkey) } + | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + └─StreamExchange { dist: HashShard(customer.c_nationkey) } + └─StreamHashJoin { type: Inner, predicate: orders.o_custkey = customer.c_custkey, output: [orders.o_orderkey, customer.c_nationkey, orders.o_custkey, customer.c_custkey] } + ├─StreamExchange { dist: HashShard(orders.o_custkey) } + | └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } + | └─StreamFilter { predicate: (orders.o_orderdate >= '1994-01-01':Date) AND (orders.o_orderdate < '1995-01-01 00:00:00':Timestamp) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamExchange { dist: HashShard(customer.c_custkey) } + └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_nationkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' - id: tpch_q7 @@ -608,30 +457,31 @@ └─StreamProject { exprs: [nation.n_name, $expr1, RoundDigit(sum($expr2), 2:Int32) as $expr3] } └─StreamHashAgg { group_key: [nation.n_name, $expr1], aggs: [sum($expr2), count] } └─StreamExchange { dist: HashShard(nation.n_name, $expr1) } - └─StreamProject { exprs: [nation.n_name, Extract('YEAR':Varchar, orders.o_orderdate) as $expr1, ((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) - (partsupp.ps_supplycost * lineitem.l_quantity)) as $expr2, nation.n_nationkey, supplier.s_suppkey, orders.o_orderkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber] } - └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_supplycost, orders.o_orderdate, nation.n_name, nation.n_nationkey, supplier.s_suppkey, orders.o_orderkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } - | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [nation.n_name, supplier.s_suppkey, nation.n_nationkey] } - | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } - | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } - | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } - | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } - └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } - └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderdate, partsupp.ps_supplycost, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamHashJoin { type: Inner, predicate: partsupp.ps_suppkey = lineitem.l_suppkey AND partsupp.ps_partkey = lineitem.l_partkey, output: [partsupp.ps_supplycost, lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - └─StreamExchange { dist: HashShard(lineitem.l_partkey, lineitem.l_suppkey) } - └─StreamHashJoin { type: Inner, predicate: part.p_partkey = lineitem.l_partkey, output: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, part.p_partkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(part.p_partkey) } - | └─StreamProject { exprs: [part.p_partkey] } - | └─StreamFilter { predicate: Like(part.p_name, '%yellow%':Varchar) } - | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_name], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } - └─StreamExchange { dist: HashShard(lineitem.l_partkey) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamProject { exprs: [nation.n_name, Extract('YEAR':Varchar, orders.o_orderdate) as $expr1, ((lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) - (partsupp.ps_supplycost * lineitem.l_quantity)) as $expr2, part.p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, nation.n_nationkey, supplier.s_suppkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + └─StreamHashJoin { type: Inner, predicate: part.p_partkey = lineitem.l_partkey AND partsupp.ps_suppkey = lineitem.l_suppkey AND partsupp.ps_partkey = lineitem.l_partkey AND partsupp.ps_suppkey = supplier.s_suppkey, output: [lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_supplycost, orders.o_orderdate, nation.n_name, part.p_partkey, partsupp.ps_partkey, partsupp.ps_suppkey, nation.n_nationkey, supplier.s_suppkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + | └─StreamHashJoin { type: Inner, predicate: part.p_partkey = partsupp.ps_partkey, output: all } + | ├─StreamExchange { dist: HashShard(part.p_partkey) } + | | └─StreamProject { exprs: [part.p_partkey] } + | | └─StreamFilter { predicate: Like(part.p_name, '%yellow%':Varchar) } + | | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_name], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + | └─StreamExchange { dist: HashShard(partsupp.ps_partkey) } + | └─StreamFilter { predicate: (partsupp.ps_suppkey = partsupp.ps_suppkey) } + | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [nation.n_name, supplier.s_suppkey, orders.o_orderdate, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, nation.n_nationkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [nation.n_name, supplier.s_suppkey, nation.n_nationkey] } + | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } + └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderdate, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamFilter { predicate: (lineitem.l_partkey = lineitem.l_partkey) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' - id: tpch_q10 @@ -698,565 +548,6 @@ └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q11 - before: - - create_tables - sql: | - select - ps_partkey, - sum(ps_supplycost * ps_availqty) as value - from - partsupp, - supplier, - nation - where - ps_suppkey = s_suppkey - and s_nationkey = n_nationkey - and n_name = 'ARGENTINA' - group by - ps_partkey - having - sum(ps_supplycost * ps_availqty) > ( - select - sum(ps_supplycost * ps_availqty) * 0.0001000000 - from - partsupp, - supplier, - nation - where - ps_suppkey = s_suppkey - and s_nationkey = n_nationkey - and n_name = 'ARGENTINA' - ) - order by - value desc; - stream_plan: | - StreamMaterialize { columns: [ps_partkey, value], stream_key: [ps_partkey], pk_columns: [value, ps_partkey], pk_conflict: "NoCheck" } - └─StreamDynamicFilter { predicate: (sum($expr1) > $expr3), output: [partsupp.ps_partkey, sum($expr1)] } - ├─StreamProject { exprs: [partsupp.ps_partkey, sum($expr1)] } - | └─StreamHashAgg { group_key: [partsupp.ps_partkey], aggs: [sum($expr1), count] } - | └─StreamExchange { dist: HashShard(partsupp.ps_partkey) } - | └─StreamProject { exprs: [partsupp.ps_partkey, (partsupp.ps_supplycost * partsupp.ps_availqty) as $expr1, nation.n_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } - | └─StreamShare { id = 13 } - | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, nation.n_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } - | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } - | | └─StreamProject { exprs: [nation.n_nationkey] } - | | └─StreamFilter { predicate: (nation.n_name = 'ARGENTINA':Varchar) } - | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } - | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } - | └─StreamHashJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } - | ├─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } - | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - | └─StreamExchange { dist: HashShard(supplier.s_suppkey) } - | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } - └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum($expr2)) * 0.0001000000:Decimal) as $expr3] } - └─StreamGlobalSimpleAgg { aggs: [sum(sum($expr2)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessLocalSimpleAgg { aggs: [sum($expr2)] } - └─StreamProject { exprs: [(partsupp.ps_supplycost * partsupp.ps_availqty) as $expr2, nation.n_nationkey, partsupp.ps_partkey, partsupp.ps_suppkey, supplier.s_suppkey] } - └─StreamShare { id = 13 } - └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, nation.n_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } - ├─StreamExchange { dist: HashShard(nation.n_nationkey) } - | └─StreamProject { exprs: [nation.n_nationkey] } - | └─StreamFilter { predicate: (nation.n_name = 'ARGENTINA':Varchar) } - | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } - └─StreamExchange { dist: HashShard(supplier.s_nationkey) } - └─StreamHashJoin { type: Inner, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [partsupp.ps_partkey, partsupp.ps_availqty, partsupp.ps_supplycost, supplier.s_nationkey, partsupp.ps_suppkey, supplier.s_suppkey] } - ├─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } - | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - └─StreamExchange { dist: HashShard(supplier.s_suppkey) } - └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q12 - before: - - create_tables - sql: | - select - l_shipmode, - sum(case - when o_orderpriority = '1-URGENT' - or o_orderpriority = '2-HIGH' - then 1 - else 0 - end) as high_line_count, - sum(case - when o_orderpriority <> '1-URGENT' - and o_orderpriority <> '2-HIGH' - then 1 - else 0 - end) as low_line_count - from - orders, - lineitem - where - o_orderkey = l_orderkey - and l_shipmode in ('FOB', 'SHIP') - and l_commitdate < l_receiptdate - and l_shipdate < l_commitdate - and l_receiptdate >= date '1994-01-01' - and l_receiptdate < date '1994-01-01' + interval '1' year - group by - l_shipmode - order by - l_shipmode; - stream_plan: | - StreamMaterialize { columns: [l_shipmode, high_line_count, low_line_count], stream_key: [l_shipmode], pk_columns: [l_shipmode], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [lineitem.l_shipmode, sum($expr1), sum($expr2)] } - └─StreamHashAgg { group_key: [lineitem.l_shipmode], aggs: [sum($expr1), sum($expr2), count] } - └─StreamExchange { dist: HashShard(lineitem.l_shipmode) } - └─StreamProject { exprs: [lineitem.l_shipmode, Case(((orders.o_orderpriority = '1-URGENT':Varchar) OR (orders.o_orderpriority = '2-HIGH':Varchar)), 1:Int32, 0:Int32) as $expr1, Case(((orders.o_orderpriority <> '1-URGENT':Varchar) AND (orders.o_orderpriority <> '2-HIGH':Varchar)), 1:Int32, 0:Int32) as $expr2, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } - └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [orders.o_orderpriority, lineitem.l_shipmode, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_orderpriority], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_shipmode, lineitem.l_linenumber] } - └─StreamFilter { predicate: In(lineitem.l_shipmode, 'FOB':Varchar, 'SHIP':Varchar) AND (lineitem.l_commitdate < lineitem.l_receiptdate) AND (lineitem.l_shipdate < lineitem.l_commitdate) AND (lineitem.l_receiptdate >= '1994-01-01':Date) AND (lineitem.l_receiptdate < '1995-01-01 00:00:00':Timestamp) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_shipmode, lineitem.l_linenumber, lineitem.l_shipdate, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q13 - before: - - create_tables - sql: | - select - c_count, - count(*) as custdist - from - ( - select - c_custkey, - count(o_orderkey) as c_count - from - customer left outer join orders on - c_custkey = o_custkey - and o_comment not like '%:1%:2%' - group by - c_custkey - ) as c_orders (c_custkey, c_count) - group by - c_count - order by - custdist desc, - c_count desc; - stream_plan: | - StreamMaterialize { columns: [c_count, custdist], stream_key: [c_count], pk_columns: [custdist, c_count], pk_conflict: "NoCheck" } - └─StreamHashAgg { group_key: [count(orders.o_orderkey)], aggs: [count] } - └─StreamExchange { dist: HashShard(count(orders.o_orderkey)) } - └─StreamProject { exprs: [customer.c_custkey, count(orders.o_orderkey)] } - └─StreamHashAgg { group_key: [customer.c_custkey], aggs: [count(orders.o_orderkey), count] } - └─StreamHashJoin { type: LeftOuter, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, orders.o_orderkey] } - ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | └─StreamTableScan { table: customer, columns: [customer.c_custkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - └─StreamExchange { dist: HashShard(orders.o_custkey) } - └─StreamProject { exprs: [orders.o_orderkey, orders.o_custkey] } - └─StreamFilter { predicate: Not(Like(orders.o_comment, '%:1%:2%':Varchar)) } - └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_comment], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q14 - before: - - create_tables - sql: | - select - 100.00 * sum(case - when p_type like 'PROMO%' - then l_extendedprice * (1 - l_discount) - else 0 - end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue - from - lineitem, - part - where - l_partkey = p_partkey - and l_shipdate >= date '1995-09-01' - and l_shipdate < date '1995-09-01' + interval '1' month; - stream_plan: | - StreamMaterialize { columns: [promo_revenue], stream_key: [], pk_columns: [], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [((100.00:Decimal * sum(sum($expr1))) / sum(sum($expr2))) as $expr3] } - └─StreamGlobalSimpleAgg { aggs: [sum(sum($expr1)), sum(sum($expr2)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessLocalSimpleAgg { aggs: [sum($expr1), sum($expr2)] } - └─StreamProject { exprs: [Case(Like(part.p_type, 'PROMO%':Varchar), (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)), 0:Decimal) as $expr1, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr2, lineitem.l_orderkey, lineitem.l_linenumber, part.p_partkey, lineitem.l_partkey] } - └─StreamHashJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: [lineitem.l_extendedprice, lineitem.l_discount, part.p_type, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_partkey, part.p_partkey] } - ├─StreamExchange { dist: HashShard(lineitem.l_partkey) } - | └─StreamProject { exprs: [lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber] } - | └─StreamFilter { predicate: (lineitem.l_shipdate >= '1995-09-01':Date) AND (lineitem.l_shipdate < '1995-10-01 00:00:00':Timestamp) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamExchange { dist: HashShard(part.p_partkey) } - └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_type], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q15 - before: - - create_tables - sql: | - with revenue0 (supplier_no, total_revenue) as ( - select - l_suppkey, - sum(l_extendedprice * (1 - l_discount)) - from - lineitem - where - l_shipdate >= date '1993-01-01' - and l_shipdate < date '1993-01-01' + interval '3' month - group by - l_suppkey - ) - select - s_suppkey, - s_name, - s_address, - s_phone, - total_revenue - from - supplier, - revenue0 - where - s_suppkey = supplier_no - and total_revenue = ( - select - max(total_revenue) - from - revenue0 - ) - order by - s_suppkey; - stream_plan: | - StreamMaterialize { columns: [s_suppkey, s_name, s_address, s_phone, total_revenue, max(max(sum($expr1)))(hidden), lineitem.l_suppkey(hidden)], stream_key: [s_suppkey, lineitem.l_suppkey, max(max(sum($expr1)))], pk_columns: [s_suppkey, lineitem.l_suppkey, max(max(sum($expr1)))], pk_conflict: "NoCheck" } - └─StreamHashJoin { type: Inner, predicate: max(max(sum($expr1))) = sum($expr1), output: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, sum($expr1), max(max(sum($expr1))), lineitem.l_suppkey] } - ├─StreamExchange { dist: HashShard(max(max(sum($expr1)))) } - | └─StreamProject { exprs: [max(max(sum($expr1)))] } - | └─StreamGlobalSimpleAgg { aggs: [max(max(sum($expr1))), count] } - | └─StreamExchange { dist: Single } - | └─StreamHashAgg { group_key: [$expr2], aggs: [max(sum($expr1)), count] } - | └─StreamProject { exprs: [lineitem.l_suppkey, sum($expr1), Vnode(lineitem.l_suppkey) as $expr2] } - | └─StreamShare { id = 10 } - | └─StreamProject { exprs: [lineitem.l_suppkey, sum($expr1)] } - | └─StreamHashAgg { group_key: [lineitem.l_suppkey], aggs: [sum($expr1), count] } - | └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } - | └─StreamProject { exprs: [lineitem.l_suppkey, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber] } - | └─StreamFilter { predicate: (lineitem.l_shipdate >= '1993-01-01':Date) AND (lineitem.l_shipdate < '1993-04-01 00:00:00':Timestamp) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamExchange { dist: HashShard(sum($expr1)) } - └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = lineitem.l_suppkey, output: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone, sum($expr1), lineitem.l_suppkey] } - ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } - | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_phone], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } - └─StreamShare { id = 10 } - └─StreamProject { exprs: [lineitem.l_suppkey, sum($expr1)] } - └─StreamHashAgg { group_key: [lineitem.l_suppkey], aggs: [sum($expr1), count] } - └─StreamExchange { dist: HashShard(lineitem.l_suppkey) } - └─StreamProject { exprs: [lineitem.l_suppkey, (lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber] } - └─StreamFilter { predicate: (lineitem.l_shipdate >= '1993-01-01':Date) AND (lineitem.l_shipdate < '1993-04-01 00:00:00':Timestamp) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_suppkey, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q16 - before: - - create_tables - sql: | - select - p_brand, - p_type, - p_size, - count(distinct ps_suppkey) as supplier_cnt - from - partsupp, - part - where - p_partkey = ps_partkey - and p_brand <> 'Brand#45' - and p_type not like 'SMALL PLATED%' - and p_size in (19, 17, 16, 23, 10, 4, 38, 11) - and ps_suppkey not in ( - select - s_suppkey - from - supplier - where - s_comment like '%Customer%Complaints%' - ) - group by - p_brand, - p_type, - p_size - order by - supplier_cnt desc, - p_brand, - p_type, - p_size; - stream_plan: | - StreamMaterialize { columns: [p_brand, p_type, p_size, supplier_cnt], stream_key: [p_brand, p_type, p_size], pk_columns: [supplier_cnt, p_brand, p_type, p_size], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [part.p_brand, part.p_type, part.p_size, count(distinct partsupp.ps_suppkey)] } - └─StreamHashAgg { group_key: [part.p_brand, part.p_type, part.p_size], aggs: [count(distinct partsupp.ps_suppkey), count] } - └─StreamExchange { dist: HashShard(part.p_brand, part.p_type, part.p_size) } - └─StreamHashJoin { type: LeftAnti, predicate: partsupp.ps_suppkey = supplier.s_suppkey, output: [part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey, partsupp.ps_partkey, part.p_partkey] } - ├─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } - | └─StreamHashJoin { type: Inner, predicate: partsupp.ps_partkey = part.p_partkey, output: [partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size, partsupp.ps_partkey, part.p_partkey] } - | ├─StreamExchange { dist: HashShard(partsupp.ps_partkey) } - | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - | └─StreamExchange { dist: HashShard(part.p_partkey) } - | └─StreamFilter { predicate: (part.p_brand <> 'Brand#45':Varchar) AND Not(Like(part.p_type, 'SMALL PLATED%':Varchar)) AND In(part.p_size, 19:Int32, 17:Int32, 16:Int32, 23:Int32, 10:Int32, 4:Int32, 38:Int32, 11:Int32) } - | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_type, part.p_size], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } - └─StreamExchange { dist: HashShard(supplier.s_suppkey) } - └─StreamProject { exprs: [supplier.s_suppkey] } - └─StreamFilter { predicate: Like(supplier.s_comment, '%Customer%Complaints%':Varchar) } - └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_comment], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q17 - before: - - create_tables - sql: | - select - ROUND(sum(l_extendedprice) / 7.0, 16) as avg_yearly - from - lineitem, - part - where - p_partkey = l_partkey - and p_brand = 'Brand#13' - and p_container = 'JUMBO PKG' - and l_quantity < ( - select - 0.2 * avg(l_quantity) - from - lineitem - where - l_partkey = p_partkey - ); - stream_plan: | - StreamMaterialize { columns: [avg_yearly], stream_key: [], pk_columns: [], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [RoundDigit((sum(sum(lineitem.l_extendedprice)) / 7.0:Decimal), 16:Int32) as $expr2] } - └─StreamGlobalSimpleAgg { aggs: [sum(sum(lineitem.l_extendedprice)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessLocalSimpleAgg { aggs: [sum(lineitem.l_extendedprice)] } - └─StreamProject { exprs: [lineitem.l_extendedprice, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber, part.p_partkey, lineitem.l_partkey] } - └─StreamFilter { predicate: (lineitem.l_quantity < $expr1) } - └─StreamHashJoin { type: Inner, predicate: part.p_partkey IS NOT DISTINCT FROM part.p_partkey, output: all } - ├─StreamProject { exprs: [part.p_partkey, (0.2:Decimal * (sum(lineitem.l_quantity) / count(lineitem.l_quantity))) as $expr1] } - | └─StreamHashAgg { group_key: [part.p_partkey], aggs: [sum(lineitem.l_quantity), count(lineitem.l_quantity), count] } - | └─StreamHashJoin { type: LeftOuter, predicate: part.p_partkey IS NOT DISTINCT FROM lineitem.l_partkey, output: [part.p_partkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - | ├─StreamExchange { dist: HashShard(part.p_partkey) } - | | └─StreamProject { exprs: [part.p_partkey] } - | | └─StreamHashAgg { group_key: [part.p_partkey], aggs: [count] } - | | └─StreamProject { exprs: [part.p_partkey] } - | | └─StreamFilter { predicate: (part.p_brand = 'Brand#13':Varchar) AND (part.p_container = 'JUMBO PKG':Varchar) } - | | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_container], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } - | └─StreamExchange { dist: HashShard(lineitem.l_partkey) } - | └─StreamFilter { predicate: IsNotNull(lineitem.l_partkey) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamExchange { dist: HashShard(part.p_partkey) } - └─StreamHashJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: [lineitem.l_quantity, lineitem.l_extendedprice, part.p_partkey, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_partkey] } - ├─StreamExchange { dist: HashShard(lineitem.l_partkey) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_orderkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamExchange { dist: HashShard(part.p_partkey) } - └─StreamProject { exprs: [part.p_partkey] } - └─StreamFilter { predicate: (part.p_brand = 'Brand#13':Varchar) AND (part.p_container = 'JUMBO PKG':Varchar) } - └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_container], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q18 - before: - - create_tables - sql: | - select - c_name, - c_custkey, - o_orderkey, - o_orderdate, - o_totalprice, - sum(l_quantity) quantity - from - customer, - orders, - lineitem - where - o_orderkey in ( - select - l_orderkey - from - lineitem - group by - l_orderkey - having - sum(l_quantity) > 1 - ) - and c_custkey = o_custkey - and o_orderkey = l_orderkey - group by - c_name, - c_custkey, - o_orderkey, - o_orderdate, - o_totalprice - order by - o_totalprice desc, - o_orderdate - LIMIT 100; - stream_plan: | - StreamMaterialize { columns: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice, quantity], stream_key: [c_name, c_custkey, o_orderkey, o_orderdate, o_totalprice], pk_columns: [o_totalprice, o_orderdate, c_name, c_custkey, o_orderkey], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity)] } - └─StreamTopN { order: "[orders.o_totalprice DESC, orders.o_orderdate ASC]", limit: 100, offset: 0 } - └─StreamExchange { dist: Single } - └─StreamGroupTopN { order: "[orders.o_totalprice DESC, orders.o_orderdate ASC]", limit: 100, offset: 0, group_key: [6] } - └─StreamProject { exprs: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, sum(lineitem.l_quantity), Vnode(orders.o_orderkey) as $expr1] } - └─StreamHashAgg { group_key: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice], aggs: [sum(lineitem.l_quantity), count] } - └─StreamHashJoin { type: LeftSemi, predicate: orders.o_orderkey = lineitem.l_orderkey, output: [customer.c_name, customer.c_custkey, orders.o_orderkey, orders.o_orderdate, orders.o_totalprice, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamHashJoin { type: Inner, predicate: lineitem.l_orderkey = orders.o_orderkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - | └─StreamExchange { dist: HashShard(orders.o_orderkey) } - | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_custkey, customer.c_name, orders.o_orderkey, orders.o_totalprice, orders.o_orderdate] } - | ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_name], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - | └─StreamExchange { dist: HashShard(orders.o_custkey) } - | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey, orders.o_totalprice, orders.o_orderdate], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - └─StreamProject { exprs: [lineitem.l_orderkey] } - └─StreamFilter { predicate: (sum(lineitem.l_quantity) > 1:Int32) } - └─StreamProject { exprs: [lineitem.l_orderkey, sum(lineitem.l_quantity)] } - └─StreamHashAgg { group_key: [lineitem.l_orderkey], aggs: [sum(lineitem.l_quantity), count] } - └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_quantity, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q19 - before: - - create_tables - sql: | - select - sum(l_extendedprice* (1 - l_discount)) as revenue - from - lineitem, - part - where - ( - p_partkey = l_partkey - and p_brand = 'Brand#52' - and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') - and l_quantity >= 1 and l_quantity <= 11 - and p_size between 1 and 5 - and l_shipmode in ('AIR', 'AIR REG') - and l_shipinstruct = 'DELIVER IN PERSON' - ) - or - ( - p_partkey = l_partkey - and p_brand = 'Brand#24' - and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') - and l_quantity >= 30 and l_quantity <= 40 - and p_size between 1 and 10 - and l_shipmode in ('AIR', 'AIR REG') - and l_shipinstruct = 'DELIVER IN PERSON' - ) - or - ( - p_partkey = l_partkey - and p_brand = 'Brand#32' - and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG') - and l_quantity >= 10 and l_quantity <= 20 - and p_size between 1 and 15 - and l_shipmode in ('AIR', 'AIR REG') - and l_shipinstruct = 'DELIVER IN PERSON' - ); - stream_plan: | - StreamMaterialize { columns: [revenue], stream_key: [], pk_columns: [], pk_conflict: "NoCheck" } - └─StreamProject { exprs: [sum(sum($expr1))] } - └─StreamGlobalSimpleAgg { aggs: [sum(sum($expr1)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessLocalSimpleAgg { aggs: [sum($expr1)] } - └─StreamProject { exprs: [(lineitem.l_extendedprice * (1:Int32 - lineitem.l_discount)) as $expr1, lineitem.l_orderkey, lineitem.l_linenumber, part.p_partkey, lineitem.l_partkey] } - └─StreamFilter { predicate: (((((((part.p_brand = 'Brand#52':Varchar) AND In(part.p_container, 'SM CASE':Varchar, 'SM BOX':Varchar, 'SM PACK':Varchar, 'SM PKG':Varchar)) AND (lineitem.l_quantity >= 1:Int32)) AND (lineitem.l_quantity <= 11:Int32)) AND (part.p_size <= 5:Int32)) OR (((((part.p_brand = 'Brand#24':Varchar) AND In(part.p_container, 'MED BAG':Varchar, 'MED BOX':Varchar, 'MED PKG':Varchar, 'MED PACK':Varchar)) AND (lineitem.l_quantity >= 30:Int32)) AND (lineitem.l_quantity <= 40:Int32)) AND (part.p_size <= 10:Int32))) OR (((((part.p_brand = 'Brand#32':Varchar) AND In(part.p_container, 'LG CASE':Varchar, 'LG BOX':Varchar, 'LG PACK':Varchar, 'LG PKG':Varchar)) AND (lineitem.l_quantity >= 10:Int32)) AND (lineitem.l_quantity <= 20:Int32)) AND (part.p_size <= 15:Int32))) } - └─StreamHashJoin { type: Inner, predicate: lineitem.l_partkey = part.p_partkey, output: all } - ├─StreamExchange { dist: HashShard(lineitem.l_partkey) } - | └─StreamProject { exprs: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber] } - | └─StreamFilter { predicate: In(lineitem.l_shipmode, 'AIR':Varchar, 'AIR REG':Varchar) AND (lineitem.l_shipinstruct = 'DELIVER IN PERSON':Varchar) } - | └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipinstruct, lineitem.l_shipmode], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - └─StreamExchange { dist: HashShard(part.p_partkey) } - └─StreamFilter { predicate: (part.p_size >= 1:Int32) } - └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_brand, part.p_size, part.p_container], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q20 - before: - - create_tables - sql: | - select - s_name, - s_address - from - supplier, - nation - where - s_suppkey in ( - select - ps_suppkey - from - partsupp - where - ps_partkey in ( - select - p_partkey - from - part - where - p_name like 'forest%' - ) - and ps_availqty > ( - select - 0.5 * sum(l_quantity) - from - lineitem - where - l_partkey = ps_partkey - and l_suppkey = ps_suppkey - and l_shipdate >= date '1994-01-01' - and l_shipdate < date '1994-01-01' + interval '1' year - ) - ) - and s_nationkey = n_nationkey - and n_name = 'KENYA' - order by - s_name; - stream_plan: | - StreamMaterialize { columns: [s_name, s_address, supplier.s_suppkey(hidden), nation.n_nationkey(hidden), supplier.s_nationkey(hidden)], stream_key: [supplier.s_suppkey, nation.n_nationkey, supplier.s_nationkey], pk_columns: [s_name, supplier.s_suppkey, nation.n_nationkey, supplier.s_nationkey], pk_conflict: "NoCheck" } - └─StreamHashJoin { type: LeftSemi, predicate: supplier.s_suppkey = partsupp.ps_suppkey, output: [supplier.s_name, supplier.s_address, supplier.s_suppkey, nation.n_nationkey, supplier.s_nationkey] } - ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } - | └─StreamHashJoin { type: Inner, predicate: supplier.s_nationkey = nation.n_nationkey, output: all } - | ├─StreamExchange { dist: HashShard(supplier.s_nationkey) } - | | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } - | └─StreamExchange { dist: HashShard(nation.n_nationkey) } - | └─StreamProject { exprs: [nation.n_nationkey] } - | └─StreamFilter { predicate: (nation.n_name = 'KENYA':Varchar) } - | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } - └─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } - └─StreamProject { exprs: [partsupp.ps_suppkey, partsupp.ps_partkey, partsupp.ps_partkey, partsupp.ps_suppkey] } - └─StreamFilter { predicate: ($expr1 > $expr2) } - └─StreamHashJoin { type: Inner, predicate: partsupp.ps_partkey IS NOT DISTINCT FROM partsupp.ps_partkey AND partsupp.ps_suppkey IS NOT DISTINCT FROM partsupp.ps_suppkey, output: all } - ├─StreamExchange { dist: HashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - | └─StreamProject { exprs: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty::Decimal as $expr1] } - | └─StreamHashJoin { type: LeftSemi, predicate: partsupp.ps_partkey = part.p_partkey, output: all } - | ├─StreamExchange { dist: HashShard(partsupp.ps_partkey) } - | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_availqty], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - | └─StreamExchange { dist: HashShard(part.p_partkey) } - | └─StreamProject { exprs: [part.p_partkey] } - | └─StreamFilter { predicate: Like(part.p_name, 'forest%':Varchar) } - | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_name], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } - └─StreamProject { exprs: [partsupp.ps_partkey, partsupp.ps_suppkey, (0.5:Decimal * sum(lineitem.l_quantity)) as $expr2] } - └─StreamHashAgg { group_key: [partsupp.ps_partkey, partsupp.ps_suppkey], aggs: [sum(lineitem.l_quantity), count] } - └─StreamHashJoin { type: LeftOuter, predicate: partsupp.ps_partkey IS NOT DISTINCT FROM lineitem.l_partkey AND partsupp.ps_suppkey IS NOT DISTINCT FROM lineitem.l_suppkey, output: [partsupp.ps_partkey, partsupp.ps_suppkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - ├─StreamExchange { dist: HashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - | └─StreamProject { exprs: [partsupp.ps_partkey, partsupp.ps_suppkey] } - | └─StreamHashAgg { group_key: [partsupp.ps_partkey, partsupp.ps_suppkey], aggs: [count] } - | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } - └─StreamExchange { dist: HashShard(lineitem.l_partkey, lineitem.l_suppkey) } - └─StreamProject { exprs: [lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber] } - └─StreamFilter { predicate: IsNotNull(lineitem.l_partkey) AND IsNotNull(lineitem.l_suppkey) AND (lineitem.l_shipdate >= '1994-01-01':Date) AND (lineitem.l_shipdate < '1995-01-01 00:00:00':Timestamp) } - └─StreamTableScan { table: lineitem, columns: [lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_quantity, lineitem.l_orderkey, lineitem.l_linenumber, lineitem.l_shipdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } - with_config_map: - RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' - id: tpch_q21 before: - create_tables @@ -1340,66 +631,106 @@ └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' -- id: tpch_q22 +- id: large_join before: - create_tables - sql: | - select - cntrycode, - count(*) as numcust, - sum(c_acctbal) as totacctbal - from - ( - select - substring(c_phone from 1 for 2) as cntrycode, - c_acctbal - from - customer - where - substring(c_phone from 1 for 2) in - ('30', '24', '31', '38', '25', '34', '37') - and c_acctbal > ( - select - avg(c_acctbal) - from - customer - where - c_acctbal > 0.00::numeric - and substring(c_phone from 1 for 2) in - ('30', '24', '31', '38', '25', '34', '37') - ) - and not exists ( - select - * - from - orders - where - o_custkey = c_custkey - ) - ) as custsale - group by - cntrycode - order by - cntrycode; + sql: |- + select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from supplier, part, partsupp, customer, orders, lineitem l1, nation, region where s_suppkey = ps_suppkey and p_partkey = ps_partkey and l_partkey = p_partkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and l_orderkey = o_orderkey and o_custkey = c_custkey and ps_supplycost = ( select min(ps_supplycost) from supplier, part, partsupp, customer, orders, lineitem l2, nation, region where s_suppkey = ps_suppkey and p_partkey = ps_partkey and l_partkey = p_partkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and l_orderkey = o_orderkey and o_custkey = c_custkey and exists ( select * from lineitem l3 where l3.l_orderkey = l2.l_orderkey and l3.l_suppkey <> l2.l_suppkey ) and not exists ( select * from lineitem l4 where l4.l_orderkey = l2.l_orderkey and l4.l_suppkey <> l2.l_suppkey and l4.l_receiptdate > l4.l_commitdate ) ) and exists ( select * from lineitem l5 where l5.l_orderkey = l1.l_orderkey and l5.l_suppkey <> l1.l_suppkey ) and not exists ( select * from lineitem l6 where l6.l_orderkey = l1.l_orderkey and l6.l_suppkey <> l1.l_suppkey and l6.l_receiptdate > l6.l_commitdate ) + order by s_acctbal desc, n_name, s_name, p_partkey limit 100; stream_plan: | - StreamMaterialize { columns: [cntrycode, numcust, totacctbal], stream_key: [cntrycode], pk_columns: [cntrycode], pk_conflict: "NoCheck" } - └─StreamHashAgg { group_key: [$expr2], aggs: [count, sum(customer.c_acctbal)] } - └─StreamExchange { dist: HashShard($expr2) } - └─StreamProject { exprs: [Substr(customer.c_phone, 1:Int32, 2:Int32) as $expr2, customer.c_acctbal, customer.c_custkey] } - └─StreamDynamicFilter { predicate: (customer.c_acctbal > $expr1), output: [customer.c_phone, customer.c_acctbal, customer.c_custkey] } - ├─StreamHashJoin { type: LeftAnti, predicate: customer.c_custkey = orders.o_custkey, output: [customer.c_phone, customer.c_acctbal, customer.c_custkey] } - | ├─StreamExchange { dist: HashShard(customer.c_custkey) } - | | └─StreamFilter { predicate: In(Substr(customer.c_phone, 1:Int32, 2:Int32), '30':Varchar, '24':Varchar, '31':Varchar, '38':Varchar, '25':Varchar, '34':Varchar, '37':Varchar) } - | | └─StreamTableScan { table: customer, columns: [customer.c_custkey, customer.c_phone, customer.c_acctbal], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } - | └─StreamExchange { dist: HashShard(orders.o_custkey) } - | └─StreamTableScan { table: orders, columns: [orders.o_custkey, orders.o_orderkey], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } - └─StreamExchange { dist: Broadcast } - └─StreamProject { exprs: [(sum(sum(customer.c_acctbal)) / sum0(count(customer.c_acctbal))) as $expr1] } - └─StreamGlobalSimpleAgg { aggs: [sum(sum(customer.c_acctbal)), sum0(count(customer.c_acctbal)), count] } - └─StreamExchange { dist: Single } - └─StreamStatelessLocalSimpleAgg { aggs: [sum(customer.c_acctbal), count(customer.c_acctbal)] } - └─StreamProject { exprs: [customer.c_acctbal, customer.c_custkey] } - └─StreamFilter { predicate: (customer.c_acctbal > 0.00:Decimal) AND In(Substr(customer.c_phone, 1:Int32, 2:Int32), '30':Varchar, '24':Varchar, '31':Varchar, '38':Varchar, '25':Varchar, '34':Varchar, '37':Varchar) } - └─StreamTableScan { table: customer, columns: [customer.c_acctbal, customer.c_custkey, customer.c_phone], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + StreamMaterialize { columns: [s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment, region.r_regionkey(hidden), nation.n_nationkey(hidden), supplier.s_suppkey(hidden), partsupp.ps_partkey(hidden), partsupp.ps_suppkey(hidden), customer.c_custkey(hidden), orders.o_orderkey(hidden), lineitem.l_orderkey(hidden), lineitem.l_linenumber(hidden), partsupp.ps_supplycost(hidden)], stream_key: [region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, p_partkey, customer.c_custkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber, partsupp.ps_supplycost], pk_columns: [s_acctbal, n_name, s_name, p_partkey, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, customer.c_custkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber, partsupp.ps_supplycost], pk_conflict: "NoCheck" } + └─StreamProject { exprs: [supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, customer.c_custkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber, partsupp.ps_supplycost] } + └─StreamTopN { order: "[supplier.s_acctbal DESC, nation.n_name ASC, supplier.s_name ASC, part.p_partkey ASC]", limit: 100, offset: 0 } + └─StreamExchange { dist: Single } + └─StreamGroupTopN { order: "[supplier.s_acctbal DESC, nation.n_name ASC, supplier.s_name ASC, part.p_partkey ASC]", limit: 100, offset: 0, group_key: [18] } + └─StreamProject { exprs: [supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, customer.c_custkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber, partsupp.ps_supplycost, Vnode(partsupp.ps_supplycost) as $expr2] } + └─StreamHashJoin { type: Inner, predicate: partsupp.ps_supplycost = min(min(partsupp.ps_supplycost)), output: [supplier.s_acctbal, supplier.s_name, nation.n_name, part.p_partkey, part.p_mfgr, supplier.s_address, supplier.s_phone, supplier.s_comment, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, customer.c_custkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber, partsupp.ps_supplycost] } + ├─StreamExchange { dist: HashShard(partsupp.ps_supplycost) } + | └─StreamHashJoin { type: LeftAnti, predicate: lineitem.l_orderkey = lineitem.l_orderkey AND (lineitem.l_suppkey <> lineitem.l_suppkey), output: [supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, part.p_partkey, part.p_mfgr, partsupp.ps_supplycost, nation.n_name, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, customer.c_custkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + | ├─StreamHashJoin { type: LeftSemi, predicate: lineitem.l_orderkey = lineitem.l_orderkey AND (lineitem.l_suppkey <> lineitem.l_suppkey), output: [supplier.s_name, supplier.s_address, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, part.p_partkey, part.p_mfgr, partsupp.ps_supplycost, lineitem.l_orderkey, lineitem.l_suppkey, nation.n_name, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, customer.c_custkey, orders.o_orderkey, lineitem.l_linenumber] } + | | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | | └─StreamFilter { predicate: (supplier.s_suppkey = partsupp.ps_suppkey) AND (part.p_partkey = partsupp.ps_partkey) AND (lineitem.l_partkey = part.p_partkey) AND (supplier.s_nationkey = nation.n_nationkey) AND (nation.n_regionkey = region.r_regionkey) AND (lineitem.l_orderkey = orders.o_orderkey) AND (orders.o_custkey = customer.c_custkey) } + | | | └─StreamShare { id = 30 } + | | | └─StreamHashJoin { type: Inner, predicate: partsupp.ps_partkey = part.p_partkey AND partsupp.ps_partkey = lineitem.l_partkey, output: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, part.p_partkey, part.p_mfgr, partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost, customer.c_custkey, orders.o_orderkey, orders.o_custkey, lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, nation.n_nationkey, nation.n_name, nation.n_regionkey, region.r_regionkey, lineitem.l_linenumber] } + | | | ├─StreamExchange { dist: HashShard(partsupp.ps_partkey) } + | | | | └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = partsupp.ps_suppkey, output: all } + | | | | ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | | | | | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: all } + | | | | | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | | | | | └─StreamHashJoin { type: Inner, predicate: region.r_regionkey = nation.n_regionkey, output: all } + | | | | | | ├─StreamExchange { dist: HashShard(region.r_regionkey) } + | | | | | | | └─StreamTableScan { table: region, columns: [region.r_regionkey], pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } + | | | | | | └─StreamExchange { dist: HashShard(nation.n_regionkey) } + | | | | | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | | | | | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | | | | | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_acctbal, supplier.s_comment], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + | | | | └─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + | | | | └─StreamFilter { predicate: (partsupp.ps_partkey = partsupp.ps_partkey) } + | | | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | | | └─StreamHashJoin { type: Inner, predicate: part.p_partkey = lineitem.l_partkey, output: all } + | | | ├─StreamExchange { dist: HashShard(part.p_partkey) } + | | | | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_mfgr], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + | | | └─StreamExchange { dist: HashShard(lineitem.l_partkey) } + | | | └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: all } + | | | ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | | | | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: all } + | | | | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | | | | └─StreamTableScan { table: customer, columns: [customer.c_custkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | | | | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | | | | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + | | | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + | | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber] } + | └─StreamFilter { predicate: (lineitem.l_receiptdate > lineitem.l_commitdate) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(min(min(partsupp.ps_supplycost))) } + └─StreamProject { exprs: [min(min(partsupp.ps_supplycost))] } + └─StreamGlobalSimpleAgg { aggs: [min(min(partsupp.ps_supplycost)), count] } + └─StreamExchange { dist: Single } + └─StreamHashAgg { group_key: [$expr1], aggs: [min(partsupp.ps_supplycost), count] } + └─StreamProject { exprs: [partsupp.ps_supplycost, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, customer.c_custkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber, Vnode(lineitem.l_orderkey) as $expr1] } + └─StreamHashJoin { type: LeftAnti, predicate: lineitem.l_orderkey = lineitem.l_orderkey AND (lineitem.l_suppkey <> lineitem.l_suppkey), output: [partsupp.ps_supplycost, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, customer.c_custkey, orders.o_orderkey, lineitem.l_orderkey, lineitem.l_linenumber] } + ├─StreamHashJoin { type: LeftSemi, predicate: lineitem.l_orderkey = lineitem.l_orderkey AND (lineitem.l_suppkey <> lineitem.l_suppkey), output: [partsupp.ps_supplycost, lineitem.l_orderkey, lineitem.l_suppkey, region.r_regionkey, nation.n_nationkey, supplier.s_suppkey, partsupp.ps_partkey, partsupp.ps_suppkey, part.p_partkey, customer.c_custkey, orders.o_orderkey, lineitem.l_linenumber] } + | ├─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | └─StreamFilter { predicate: (supplier.s_suppkey = partsupp.ps_suppkey) AND (part.p_partkey = partsupp.ps_partkey) AND (lineitem.l_partkey = part.p_partkey) AND (supplier.s_nationkey = nation.n_nationkey) AND (nation.n_regionkey = region.r_regionkey) AND (lineitem.l_orderkey = orders.o_orderkey) AND (orders.o_custkey = customer.c_custkey) } + | | └─StreamShare { id = 30 } + | | └─StreamHashJoin { type: Inner, predicate: partsupp.ps_partkey = part.p_partkey AND partsupp.ps_partkey = lineitem.l_partkey, output: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_acctbal, supplier.s_comment, part.p_partkey, part.p_mfgr, partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost, customer.c_custkey, orders.o_orderkey, orders.o_custkey, lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, nation.n_nationkey, nation.n_name, nation.n_regionkey, region.r_regionkey, lineitem.l_linenumber] } + | | ├─StreamExchange { dist: HashShard(partsupp.ps_partkey) } + | | | └─StreamHashJoin { type: Inner, predicate: supplier.s_suppkey = partsupp.ps_suppkey, output: all } + | | | ├─StreamExchange { dist: HashShard(supplier.s_suppkey) } + | | | | └─StreamHashJoin { type: Inner, predicate: nation.n_nationkey = supplier.s_nationkey, output: all } + | | | | ├─StreamExchange { dist: HashShard(nation.n_nationkey) } + | | | | | └─StreamHashJoin { type: Inner, predicate: region.r_regionkey = nation.n_regionkey, output: all } + | | | | | ├─StreamExchange { dist: HashShard(region.r_regionkey) } + | | | | | | └─StreamTableScan { table: region, columns: [region.r_regionkey], pk: [region.r_regionkey], dist: UpstreamHashShard(region.r_regionkey) } + | | | | | └─StreamExchange { dist: HashShard(nation.n_regionkey) } + | | | | | └─StreamTableScan { table: nation, columns: [nation.n_nationkey, nation.n_name, nation.n_regionkey], pk: [nation.n_nationkey], dist: UpstreamHashShard(nation.n_nationkey) } + | | | | └─StreamExchange { dist: HashShard(supplier.s_nationkey) } + | | | | └─StreamTableScan { table: supplier, columns: [supplier.s_suppkey, supplier.s_name, supplier.s_address, supplier.s_nationkey, supplier.s_phone, supplier.s_acctbal, supplier.s_comment], pk: [supplier.s_suppkey], dist: UpstreamHashShard(supplier.s_suppkey) } + | | | └─StreamExchange { dist: HashShard(partsupp.ps_suppkey) } + | | | └─StreamFilter { predicate: (partsupp.ps_partkey = partsupp.ps_partkey) } + | | | └─StreamTableScan { table: partsupp, columns: [partsupp.ps_partkey, partsupp.ps_suppkey, partsupp.ps_supplycost], pk: [partsupp.ps_partkey, partsupp.ps_suppkey], dist: UpstreamHashShard(partsupp.ps_partkey, partsupp.ps_suppkey) } + | | └─StreamHashJoin { type: Inner, predicate: part.p_partkey = lineitem.l_partkey, output: all } + | | ├─StreamExchange { dist: HashShard(part.p_partkey) } + | | | └─StreamTableScan { table: part, columns: [part.p_partkey, part.p_mfgr], pk: [part.p_partkey], dist: UpstreamHashShard(part.p_partkey) } + | | └─StreamExchange { dist: HashShard(lineitem.l_partkey) } + | | └─StreamHashJoin { type: Inner, predicate: orders.o_orderkey = lineitem.l_orderkey, output: all } + | | ├─StreamExchange { dist: HashShard(orders.o_orderkey) } + | | | └─StreamHashJoin { type: Inner, predicate: customer.c_custkey = orders.o_custkey, output: all } + | | | ├─StreamExchange { dist: HashShard(customer.c_custkey) } + | | | | └─StreamTableScan { table: customer, columns: [customer.c_custkey], pk: [customer.c_custkey], dist: UpstreamHashShard(customer.c_custkey) } + | | | └─StreamExchange { dist: HashShard(orders.o_custkey) } + | | | └─StreamTableScan { table: orders, columns: [orders.o_orderkey, orders.o_custkey], pk: [orders.o_orderkey], dist: UpstreamHashShard(orders.o_orderkey) } + | | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_partkey, lineitem.l_suppkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + | └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + | └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } + └─StreamExchange { dist: HashShard(lineitem.l_orderkey) } + └─StreamProject { exprs: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber] } + └─StreamFilter { predicate: (lineitem.l_receiptdate > lineitem.l_commitdate) } + └─StreamTableScan { table: lineitem, columns: [lineitem.l_orderkey, lineitem.l_suppkey, lineitem.l_linenumber, lineitem.l_commitdate, lineitem.l_receiptdate], pk: [lineitem.l_orderkey, lineitem.l_linenumber], dist: UpstreamHashShard(lineitem.l_orderkey, lineitem.l_linenumber) } with_config_map: RW_STREAMING_ENABLE_BUSHY_JOIN: 'true' diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index faab125aea73d..8bcda89272566 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -26,7 +26,7 @@ use super::{ PlanNodeType, PlanRef, PlanTreeNodeBinary, PlanTreeNodeUnary, PredicatePushdown, ToBatch, ToStream, }; -use crate::expr::{ExprImpl, ExprRewriter}; +use crate::expr::{ExprImpl, ExprRewriter, ExprType, FunctionCall}; use crate::optimizer::plan_node::{ ColumnPruningContext, PlanTreeNode, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, @@ -485,86 +485,39 @@ impl LogicalMultiJoin { Ok(join_ordering) } + /// transform multijoin into bushy tree join. + /// + /// 1. First, use equivalent condition derivation to get derive join relation. + /// 2. Second, for every isolated node will create connection to every other nodes. + /// 3. Third, select and merge one node for a iteration, and use a bfs policy for which node the + /// selected node merged with. + /// i. The select node mentioned above is the node with least numer of relations and the + /// lowerst join tree. + /// ii. nodes with a join tree higher than the temporal optimal join tree will be pruned. pub fn as_bushy_tree_join(&self) -> Result { - // Join tree internal representation - #[derive(Clone, Default, Debug)] - struct JoinTreeNode { - idx: Option, - left: Option>, - right: Option>, - height: usize, - } - - // join graph internal representation - #[derive(Clone, Debug)] - struct GraphNode { - id: usize, - join_tree: JoinTreeNode, - // use BTreeSet for deterministic - relations: BTreeSet, - } - - let mut nodes: BTreeMap<_, _> = (0..self.inputs.len()) - .map(|idx| GraphNode { - id: idx, - relations: BTreeSet::new(), - join_tree: JoinTreeNode { - idx: Some(idx), - left: None, - right: None, - height: 0, - }, - }) - .enumerate() - .collect(); - let (eq_join_conditions, _) = self - .on - .clone() - .split_by_input_col_nums(&self.input_col_nums(), true); - - for ((src, dst), _) in eq_join_conditions { - nodes.get_mut(&src).unwrap().relations.insert(dst); - nodes.get_mut(&dst).unwrap().relations.insert(src); - } - - // isolated nodes can be joined at any where. - let iso_nodes = nodes - .iter() - .filter_map(|n| { - if n.1.relations.is_empty() { - Some(*n.0) - } else { - None - } - }) - .collect_vec(); + let (nodes, condition) = self.get_join_graph()?; - for n in iso_nodes { - for adj in 0..nodes.len() { - if adj != n { - nodes.get_mut(&n).unwrap().relations.insert(adj); - nodes.get_mut(&adj).unwrap().relations.insert(n); - } - } + if nodes.is_empty() { + return Err(RwError::from(ErrorCode::InternalError( + "empty multi-join graph".into(), + ))); } - let mut optimized_bushy_tree = None; + let mut optimized_bushy_tree: Option = None; let mut que = VecDeque::from([nodes]); let mut isolated = BTreeSet::new(); while let Some(mut nodes) = que.pop_front() { if nodes.len() == 1 { let node = nodes.into_values().next().unwrap(); - optimized_bushy_tree = Some(optimized_bushy_tree.map_or( - node.clone(), - |old_tree: GraphNode| { - if node.join_tree.height < old_tree.join_tree.height { - node - } else { - old_tree - } - }, - )); + + if let Some(old) = &optimized_bushy_tree { + if node.join_tree.height < old.join_tree.height { + optimized_bushy_tree = Some(node); + } + } else { + optimized_bushy_tree = Some(node); + } continue; } @@ -578,91 +531,86 @@ impl LogicalMultiJoin { }, ) .unwrap(); - let n = nodes.remove(&idx.clone()).unwrap(); + let n_id = *idx; + let n = nodes.get(&n_id).unwrap(); if n.relations.is_empty() { isolated.insert(n.id); + nodes.remove(&n_id).unwrap(); que.push_back(nodes); continue; } - for merge_node in &n.relations { + let mut relations = nodes.get_mut(&n_id).unwrap().relations.clone(); + relations.sort_by(|a, b| { + let a = nodes.get(a).unwrap(); + let b = nodes.get(b).unwrap(); + match a.join_tree.height.cmp(&b.join_tree.height) { + Ordering::Equal => a.id.cmp(&b.id), + other => other, + } + }); + relations.dedup(); + nodes.get_mut(&n_id).unwrap().relations = relations; + + for merge_node_id in &nodes.get(&n_id).unwrap().relations { let mut nodes = nodes.clone(); - for adjacent_node in &n.relations { - if *adjacent_node != *merge_node { - nodes - .get_mut(adjacent_node) - .unwrap() - .relations - .remove(&n.id); - nodes - .get_mut(adjacent_node) - .unwrap() - .relations - .insert(*merge_node); - nodes - .get_mut(merge_node) + let n = nodes.remove(&n_id).unwrap(); + + for adj_node_id in &n.relations { + if adj_node_id != merge_node_id { + let n_id = nodes + .get(adj_node_id) .unwrap() .relations - .insert(*adjacent_node); + .iter() + .position(|adj_rel| *adj_rel == n_id) + .unwrap(); + + let adj_node = nodes.get_mut(adj_node_id).unwrap(); + adj_node.relations.swap_remove(n_id); + adj_node.relations.push(*merge_node_id); + let merge_node = nodes.get_mut(merge_node_id).unwrap(); + merge_node.relations.push(*adj_node_id); } } - let mut merge_graph_node = nodes.get_mut(merge_node).unwrap(); - merge_graph_node.relations.remove(&n.id); + + let idx = nodes + .get(merge_node_id) + .unwrap() + .relations + .iter() + .position(|merge_rel| *merge_rel == n_id) + .unwrap(); + + let merge_node = nodes.get_mut(merge_node_id).unwrap(); + merge_node.relations.swap_remove(idx); let l_tree = n.join_tree.clone(); - let r_tree = std::mem::take(&mut merge_graph_node.join_tree); + let r_tree = std::mem::take(&mut merge_node.join_tree); let new_height = usize::max(l_tree.height, r_tree.height) + 1; if let Some(min_height) = optimized_bushy_tree.as_ref().map(|t| t.join_tree.height) && min_height < new_height { continue; } - merge_graph_node.join_tree = JoinTreeNode { + merge_node.join_tree = JoinTreeNode { idx: None, left: Some(Box::new(l_tree)), right: Some(Box::new(r_tree)), height: new_height, }; + que.push_back(nodes); } } - fn create_logical_join( - s: &LogicalMultiJoin, - mut join_tree: JoinTreeNode, - join_ordering: &mut Vec, - ) -> Result { - Ok(match (join_tree.left.take(), join_tree.right.take()) { - (Some(l), Some(r)) => LogicalJoin::new( - create_logical_join(s, *l, join_ordering)?, - create_logical_join(s, *r, join_ordering)?, - JoinType::Inner, - Condition::true_cond(), - ) - .into(), - (None, None) => { - if let Some(idx) = join_tree.idx { - join_ordering.push(idx); - s.inputs[idx].clone() - } else { - return Err(RwError::from(ErrorCode::InternalError( - "id of the leaf node not found in the join tree".into(), - ))); - } - } - (_, _) => { - return Err(RwError::from(ErrorCode::InternalError( - "only leaf node can have None subtree".into(), - ))) - } - }) - } - let isolated = isolated.into_iter().collect_vec(); + + // maintain join order to mapping columns. let mut join_ordering = vec![]; let mut output = if let Some(optimized_bushy_tree) = optimized_bushy_tree { let mut output = - create_logical_join(self, optimized_bushy_tree.join_tree, &mut join_ordering)?; + self.create_logical_join(optimized_bushy_tree.join_tree, &mut join_ordering)?; output = isolated.into_iter().fold(output, |chain, n| { join_ordering.push(n); @@ -695,6 +643,7 @@ impl LogicalMultiJoin { "no plan remain".into(), ))); }; + let total_col_num = self.inner2output.source_size(); let reorder_mapping = { let mut reorder_mapping = vec![None; total_col_num]; @@ -714,9 +663,7 @@ impl LogicalMultiJoin { LogicalProject::with_out_col_idx(output, reorder_mapping.iter().map(|i| i.unwrap())) .into(); - // We will later push down all of the filters back to the individual joins via the - // `FilterJoinRule`. - output = LogicalFilter::create(output, self.on.clone()); + output = LogicalFilter::create(output, condition); output = LogicalProject::with_out_col_idx(output, self.output_indices.iter().cloned()).into(); Ok(output) @@ -725,6 +672,143 @@ impl LogicalMultiJoin { pub(crate) fn input_col_nums(&self) -> Vec { self.inputs.iter().map(|i| i.schema().len()).collect() } + + /// get join graph from `self.on`, return the join graph and the new join condition. + fn get_join_graph(&self) -> Result<(BTreeMap, Condition)> { + let mut nodes: BTreeMap<_, _> = (0..self.inputs.len()) + .map(|idx| GraphNode { + id: idx, + relations: vec![], + join_tree: JoinTreeNode { + idx: Some(idx), + left: None, + right: None, + height: 0, + }, + }) + .enumerate() + .collect(); + + let condition = self.on.clone(); + let condition = self.eq_condition_derivation(condition)?; + let (eq_join_conditions, _) = condition + .clone() + .split_by_input_col_nums(&self.input_col_nums(), true); + + for ((src, dst), _) in eq_join_conditions { + nodes.get_mut(&src).unwrap().relations.push(dst); + nodes.get_mut(&dst).unwrap().relations.push(src); + } + + Ok((nodes, condition)) + } + + /// equivalent condition derivation by `a = b && a = c` ==> `b = c` + fn eq_condition_derivation(&self, mut condition: Condition) -> Result { + let (eq_join_conditions, _) = condition + .clone() + .split_by_input_col_nums(&self.input_col_nums(), true); + + let mut new_conj: BTreeMap> = BTreeMap::new(); + let mut input_ref_map = BTreeMap::new(); + + for con in eq_join_conditions.values() { + for conj in &con.conjunctions { + let (l, r) = conj.as_eq_cond().unwrap(); + new_conj.entry(l.index).or_default().insert(r.index); + new_conj.entry(r.index).or_default().insert(l.index); + input_ref_map.insert(l.index, Some(l)); + input_ref_map.insert(r.index, Some(r)); + } + } + + let mut new_pairs = BTreeSet::new(); + + for conjs in new_conj.values() { + if conjs.len() < 2 { + continue; + } + + let conjs = conjs.iter().copied().collect_vec(); + for i in 0..conjs.len() { + for j in i + 1..conjs.len() { + if !new_conj.get(&conjs[i]).unwrap().contains(&conjs[j]) { + if conjs[i] < conjs[j] { + new_pairs.insert((conjs[i], conjs[j])); + } else { + new_pairs.insert((conjs[j], conjs[i])); + } + } + } + } + } + for (i, j) in new_pairs { + condition + .conjunctions + .push(ExprImpl::FunctionCall(Box::new(FunctionCall::new( + ExprType::Equal, + vec![ + ExprImpl::InputRef(Box::new( + input_ref_map.get(&i).unwrap().as_ref().unwrap().clone(), + )), + ExprImpl::InputRef(Box::new( + input_ref_map.get(&j).unwrap().as_ref().unwrap().clone(), + )), + ], + )?))); + } + Ok(condition) + } + + /// create logical plan by recursively travase `JoinTreeNode` + fn create_logical_join( + &self, + mut join_tree: JoinTreeNode, + join_ordering: &mut Vec, + ) -> Result { + Ok(match (join_tree.left.take(), join_tree.right.take()) { + (Some(l), Some(r)) => LogicalJoin::new( + self.create_logical_join(*l, join_ordering)?, + self.create_logical_join(*r, join_ordering)?, + JoinType::Inner, + Condition::true_cond(), + ) + .into(), + (None, None) => { + if let Some(idx) = join_tree.idx { + join_ordering.push(idx); + self.inputs[idx].clone() + } else { + return Err(RwError::from(ErrorCode::InternalError( + "id of the leaf node not found in the join tree".into(), + ))); + } + } + (_, _) => { + return Err(RwError::from(ErrorCode::InternalError( + "only leaf node can have None subtree".into(), + ))) + } + }) + } +} + +// Join tree internal representation +#[derive(Clone, Default, Debug)] +struct JoinTreeNode { + idx: Option, + left: Option>, + right: Option>, + height: usize, +} + +// join graph internal representation +#[derive(Clone, Debug)] +struct GraphNode { + id: usize, + join_tree: JoinTreeNode, + // use BTreeSet for deterministic + relations: Vec, } impl ToStream for LogicalMultiJoin {