From 30dca71139d8f2db2f4f29044ebf6c4ea9e9fc3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tesla=20Zhang=E2=80=AE?= Date: Mon, 3 Apr 2023 10:36:46 -0400 Subject: [PATCH] refactor(plan_node): simplify `Expand` and `Filter` (#8932) --- .../src/optimizer/plan_node/batch_expand.rs | 25 +++++++----- .../src/optimizer/plan_node/batch_filter.rs | 40 ++++++++----------- .../src/optimizer/plan_node/generic/expand.rs | 15 ++++++- .../src/optimizer/plan_node/generic/filter.rs | 23 ++++++++++- .../src/optimizer/plan_node/logical_expand.rs | 18 +++------ .../src/optimizer/plan_node/logical_filter.rs | 25 +++--------- .../src/optimizer/plan_node/logical_join.rs | 2 +- .../src/optimizer/plan_node/logical_scan.rs | 2 +- .../src/optimizer/plan_node/stream_expand.rs | 27 +++++++------ .../src/optimizer/plan_node/stream_filter.rs | 40 +++++++++---------- .../src/scheduler/distributed/query.rs | 7 ++-- 11 files changed, 115 insertions(+), 109 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/batch_expand.rs b/src/frontend/src/optimizer/plan_node/batch_expand.rs index 86043cd3e1ab3..394166af0b311 100644 --- a/src/frontend/src/optimizer/plan_node/batch_expand.rs +++ b/src/frontend/src/optimizer/plan_node/batch_expand.rs @@ -20,9 +20,9 @@ use risingwave_pb::batch_plan::expand_node::Subset; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::ExpandNode; -use super::ExprRewritable; +use super::{generic, ExprRewritable}; use crate::optimizer::plan_node::{ - LogicalExpand, PlanBase, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, ToLocalBatch, + PlanBase, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, ToLocalBatch, }; use crate::optimizer::property::{Distribution, Order}; use crate::optimizer::PlanRef; @@ -30,25 +30,26 @@ use crate::optimizer::PlanRef; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchExpand { pub base: PlanBase, - logical: LogicalExpand, + logical: generic::Expand, } impl BatchExpand { - pub fn new(logical: LogicalExpand) -> Self { - let ctx = logical.base.ctx.clone(); - let dist = match logical.input().distribution() { + pub fn new(logical: generic::Expand) -> Self { + let base = PlanBase::new_logical_with_core(&logical); + let ctx = base.ctx; + let dist = match logical.input.distribution() { Distribution::Single => Distribution::Single, Distribution::SomeShard | Distribution::HashShard(_) | Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard, Distribution::Broadcast => unreachable!(), }; - let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any()); + let base = PlanBase::new_batch(ctx, base.schema, dist, Order::any()); BatchExpand { base, logical } } - pub fn column_subsets(&self) -> &Vec> { - self.logical.column_subsets() + pub fn column_subsets(&self) -> &[Vec] { + &self.logical.column_subsets } } @@ -60,11 +61,13 @@ impl fmt::Display for BatchExpand { impl PlanTreeNodeUnary for BatchExpand { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + let mut logical = self.logical.clone(); + logical.input = input; + Self::new(logical) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_filter.rs b/src/frontend/src/optimizer/plan_node/batch_filter.rs index bf37f65b19842..219bba7f5d8d0 100644 --- a/src/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/src/frontend/src/optimizer/plan_node/batch_filter.rs @@ -18,9 +18,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::FilterNode; -use super::{ - ExprRewritable, LogicalFilter, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, -}; +use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::{PlanBase, ToLocalBatch}; use crate::utils::Condition; @@ -29,24 +27,25 @@ use crate::utils::Condition; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct BatchFilter { pub base: PlanBase, - logical: LogicalFilter, + logical: generic::Filter, } impl BatchFilter { - pub fn new(logical: LogicalFilter) -> Self { - let ctx = logical.base.ctx.clone(); + pub fn new(logical: generic::Filter) -> Self { + let base = PlanBase::new_logical_with_core(&logical); + let ctx = base.ctx; // TODO: derive from input let base = PlanBase::new_batch( ctx, - logical.schema().clone(), - logical.input().distribution().clone(), - logical.input().order().clone(), + base.schema, + logical.input.distribution().clone(), + logical.input.order().clone(), ); BatchFilter { base, logical } } pub fn predicate(&self) -> &Condition { - self.logical.predicate() + &self.logical.predicate } } @@ -58,11 +57,13 @@ impl fmt::Display for BatchFilter { impl PlanTreeNodeUnary for BatchFilter { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + let mut logical = self.logical.clone(); + logical.input = input; + Self::new(logical) } } @@ -78,9 +79,7 @@ impl ToDistributedBatch for BatchFilter { impl ToBatchPb for BatchFilter { fn to_batch_prost_body(&self) -> NodeBody { NodeBody::Filter(FilterNode { - search_condition: Some( - ExprImpl::from(self.logical.predicate().clone()).to_expr_proto(), - ), + search_condition: Some(ExprImpl::from(self.logical.predicate.clone()).to_expr_proto()), }) } } @@ -98,13 +97,8 @@ impl ExprRewritable for BatchFilter { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - Self::new( - self.logical - .rewrite_exprs(r) - .as_logical_filter() - .unwrap() - .clone(), - ) - .into() + let mut logical = self.logical.clone(); + logical.rewrite_exprs(r); + Self::new(logical).into() } } diff --git a/src/frontend/src/optimizer/plan_node/generic/expand.rs b/src/frontend/src/optimizer/plan_node/generic/expand.rs index 020d9d0277481..d80d2196ef573 100644 --- a/src/frontend/src/optimizer/plan_node/generic/expand.rs +++ b/src/frontend/src/optimizer/plan_node/generic/expand.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; + use itertools::Itertools; use risingwave_common::catalog::{Field, FieldDisplay, Schema}; use risingwave_common::types::DataType; @@ -106,9 +108,18 @@ impl Expand { subset .iter() .map(|&i| FieldDisplay(self.input.schema().fields.get(i).unwrap())) - .collect_vec() + .collect() }) - .collect_vec() + .collect() + } + + pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { + write!( + f, + "{} {{ column_subsets: {:?} }}", + name, + self.column_subsets_display() + ) } pub fn i2o_col_mapping(&self) -> ColIndexMapping { diff --git a/src/frontend/src/optimizer/plan_node/generic/filter.rs b/src/frontend/src/optimizer/plan_node/generic/filter.rs index f7e3629a5435a..7d24a4939eca6 100644 --- a/src/frontend/src/optimizer/plan_node/generic/filter.rs +++ b/src/frontend/src/optimizer/plan_node/generic/filter.rs @@ -12,13 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt; + use risingwave_common::catalog::Schema; use super::{GenericPlanNode, GenericPlanRef}; use crate::expr::ExprRewriter; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::FunctionalDependencySet; -use crate::utils::Condition; +use crate::utils::{Condition, ConditionDisplay}; /// [`Filter`] iterates over its input and returns elements for which `predicate` evaluates to /// true, filtering out the others. @@ -30,7 +32,24 @@ pub struct Filter { pub input: PlanRef, } -impl Filter {} +impl Filter { + pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { + let input_schema = self.input.schema(); + write!( + f, + "{} {{ predicate: {} }}", + name, + ConditionDisplay { + condition: &self.predicate, + input_schema + } + ) + } + + pub fn new(predicate: Condition, input: PlanRef) -> Self { + Filter { predicate, input } + } +} impl GenericPlanNode for Filter { fn schema(&self) -> Schema { self.input.schema().clone() diff --git a/src/frontend/src/optimizer/plan_node/logical_expand.rs b/src/frontend/src/optimizer/plan_node/logical_expand.rs index a387b2d14516b..b984bcd941f82 100644 --- a/src/frontend/src/optimizer/plan_node/logical_expand.rs +++ b/src/frontend/src/optimizer/plan_node/logical_expand.rs @@ -15,7 +15,6 @@ use std::fmt; use itertools::Itertools; -use risingwave_common::catalog::FieldDisplay; use risingwave_common::error::Result; use super::{ @@ -64,17 +63,8 @@ impl LogicalExpand { &self.core.column_subsets } - pub fn column_subsets_display(&self) -> Vec>> { - self.core.column_subsets_display() - } - pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - write!( - f, - "{} {{ column_subsets: {:?} }}", - name, - self.column_subsets_display() - ) + self.core.fmt_with_name(f, name) } } @@ -153,7 +143,8 @@ impl PredicatePushdown for LogicalExpand { impl ToBatch for LogicalExpand { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; - let new_logical = self.clone_with_input(new_input); + let mut new_logical = self.core.clone(); + new_logical.input = new_input; Ok(BatchExpand::new(new_logical).into()) } } @@ -170,7 +161,8 @@ impl ToStream for LogicalExpand { fn to_stream(&self, ctx: &mut ToStreamContext) -> Result { let new_input = self.input().to_stream(ctx)?; - let new_logical = self.clone_with_input(new_input); + let mut new_logical = self.core.clone(); + new_logical.input = new_input; Ok(StreamExpand::new(new_logical).into()) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index 35fa7094fc55d..19b70c4327476 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -29,7 +29,7 @@ use crate::optimizer::plan_node::{ BatchFilter, ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamFilter, ToStreamContext, }; -use crate::utils::{ColIndexMapping, Condition, ConditionDisplay}; +use crate::utils::{ColIndexMapping, Condition}; /// `LogicalFilter` iterates over its input and returns elements for which `predicate` evaluates to /// true, filtering out the others. @@ -94,20 +94,6 @@ impl LogicalFilter { pub fn predicate(&self) -> &Condition { &self.core.predicate } - - pub(super) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result { - let input = self.input(); - let input_schema = input.schema(); - write!( - f, - "{} {{ predicate: {} }}", - name, - ConditionDisplay { - condition: self.predicate(), - input_schema - } - ) - } } impl PlanTreeNodeUnary for LogicalFilter { @@ -134,7 +120,7 @@ impl_plan_tree_node_for_unary! {LogicalFilter} impl fmt::Display for LogicalFilter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.fmt_with_name(f, "LogicalFilter") + self.core.fmt_with_name(f, "LogicalFilter") } } @@ -210,7 +196,8 @@ impl PredicatePushdown for LogicalFilter { impl ToBatch for LogicalFilter { fn to_batch(&self) -> Result { let new_input = self.input().to_batch()?; - let new_logical = self.clone_with_input(new_input); + let mut new_logical = self.core.clone(); + new_logical.input = new_input; Ok(BatchFilter::new(new_logical).into()) } } @@ -240,7 +227,8 @@ impl ToStream for LogicalFilter { "All `now()` exprs were valid, but the condition must have at least one now expr as a lower bound." ); } - let new_logical = self.clone_with_input(new_input); + let mut new_logical = self.core.clone(); + new_logical.input = new_input; Ok(StreamFilter::new(new_logical).into()) } @@ -256,7 +244,6 @@ impl ToStream for LogicalFilter { #[cfg(test)] mod tests { - use std::collections::HashSet; use risingwave_common::catalog::{Field, Schema}; diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index 22439db9a24fa..17f07a6db6ba3 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -915,7 +915,7 @@ impl LogicalJoin { ); let logical_join = logical_join.clone_with_cond(eq_cond.eq_cond()); let hash_join = StreamHashJoin::new(logical_join.core, eq_cond).into(); - let logical_filter = LogicalFilter::new(hash_join, predicate.non_eq_cond()); + let logical_filter = generic::Filter::new(predicate.non_eq_cond(), hash_join); let plan = StreamFilter::new(logical_filter).into(); if self.output_indices() != &default_indices { let logical_project = LogicalProject::with_mapping( diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index 79f9d56bd60c0..52798ece2bed5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -617,7 +617,7 @@ impl LogicalScan { let mut plan: PlanRef = BatchSeqScan::new(scan, scan_ranges).into(); if !predicate.always_true() { - plan = BatchFilter::new(LogicalFilter::new(plan, predicate)).into(); + plan = BatchFilter::new(generic::Filter::new(predicate, plan)).into(); } if let Some(exprs) = project_expr { plan = BatchProject::new(LogicalProject::new(plan, exprs)).into() diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index bc70d8d3b2b86..383c9fd72409a 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -19,20 +19,21 @@ use risingwave_pb::stream_plan::expand_node::Subset; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ExpandNode; -use super::{ExprRewritable, LogicalExpand, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamExpand { pub base: PlanBase, - logical: LogicalExpand, + logical: generic::Expand, } impl StreamExpand { - pub fn new(logical: LogicalExpand) -> Self { - let input = logical.input(); - let schema = logical.schema().clone(); + pub fn new(logical: generic::Expand) -> Self { + let base = PlanBase::new_logical_with_core(&logical); + let input = logical.input.clone(); + let schema = base.schema; let dist = match input.distribution() { Distribution::Single => Distribution::Single, @@ -51,10 +52,10 @@ impl StreamExpand { ); let base = PlanBase::new_stream( - logical.base.ctx.clone(), + base.ctx, schema, - logical.base.logical_pk.to_vec(), - logical.functional_dependency().clone(), + base.logical_pk, + base.functional_dependency, dist, input.append_only(), watermark_columns, @@ -62,8 +63,8 @@ impl StreamExpand { StreamExpand { base, logical } } - pub fn column_subsets(&self) -> &Vec> { - self.logical.column_subsets() + pub fn column_subsets(&self) -> &[Vec] { + &self.logical.column_subsets } } @@ -75,11 +76,13 @@ impl fmt::Display for StreamExpand { impl PlanTreeNodeUnary for StreamExpand { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + let mut logical = self.logical.clone(); + logical.input = input; + Self::new(logical) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index 23ecfea0d4cbb..d503623f005f0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -17,7 +17,7 @@ use std::fmt; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::FilterNode; -use super::{ExprRewritable, LogicalFilter, PlanRef, PlanTreeNodeUnary, StreamNode}; +use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; use crate::optimizer::plan_node::PlanBase; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -27,30 +27,31 @@ use crate::utils::Condition; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamFilter { pub base: PlanBase, - logical: LogicalFilter, + logical: generic::Filter, } impl StreamFilter { - pub fn new(logical: LogicalFilter) -> Self { - let ctx = logical.base.ctx.clone(); - let input = logical.input(); - let pk_indices = logical.base.logical_pk.to_vec(); + pub fn new(logical: generic::Filter) -> Self { + let base = PlanBase::new_logical_with_core(&logical); + let ctx = base.ctx; + let input = logical.input.clone(); + let pk_indices = base.logical_pk; let dist = input.distribution().clone(); // Filter executor won't change the append-only behavior of the stream. let base = PlanBase::new_stream( ctx, - logical.schema().clone(), + base.schema, pk_indices, - logical.functional_dependency().clone(), + base.functional_dependency, dist, - logical.input().append_only(), - logical.input().watermark_columns().clone(), + input.append_only(), + input.watermark_columns().clone(), ); StreamFilter { base, logical } } pub fn predicate(&self) -> &Condition { - self.logical.predicate() + &self.logical.predicate } } @@ -62,11 +63,13 @@ impl fmt::Display for StreamFilter { impl PlanTreeNodeUnary for StreamFilter { fn input(&self) -> PlanRef { - self.logical.input() + self.logical.input.clone() } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(self.logical.clone_with_input(input)) + let mut logical = self.logical.clone(); + logical.input = input; + Self::new(logical) } } @@ -86,13 +89,8 @@ impl ExprRewritable for StreamFilter { } fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef { - Self::new( - self.logical - .rewrite_exprs(r) - .as_logical_filter() - .unwrap() - .clone(), - ) - .into() + let mut logical = self.logical.clone(); + logical.rewrite_exprs(r); + Self::new(logical).into() } } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index d9932066105f2..83bf6392ce81c 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -454,8 +454,7 @@ pub(crate) mod tests { use crate::catalog::root_catalog::Catalog; use crate::expr::InputRef; use crate::optimizer::plan_node::{ - generic, BatchExchange, BatchFilter, BatchHashJoin, EqJoinPredicate, LogicalFilter, - LogicalScan, ToBatch, + generic, BatchExchange, BatchFilter, BatchHashJoin, EqJoinPredicate, LogicalScan, ToBatch, }; use crate::optimizer::property::{Distribution, Order}; use crate::optimizer::{OptimizerContext, PlanRef}; @@ -539,11 +538,11 @@ pub(crate) mod tests { .unwrap() .to_distributed() .unwrap(); - let batch_filter = BatchFilter::new(LogicalFilter::new( - batch_plan_node.clone(), + let batch_filter = BatchFilter::new(generic::Filter::new( Condition { conjunctions: vec![], }, + batch_plan_node.clone(), )) .into(); let batch_exchange_node1: PlanRef = BatchExchange::new(