Skip to content

Commit

Permalink
fix more rules
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Nov 26, 2024
1 parent d490c36 commit 0233282
Show file tree
Hide file tree
Showing 12 changed files with 452 additions and 310 deletions.
25 changes: 18 additions & 7 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ use datafusion_common::{
FunctionalDependencies, ParamValues, Result, ScalarValue, TableReference,
UnnestOptions,
};
use enumset::enum_set;
use indexmap::IndexSet;

// backwards compatibility
use crate::display::PgJsonVisitor;
use crate::logical_plan::tree_node::LogicalPlanStats;
use crate::logical_plan::tree_node::{LogicalPlanPattern, LogicalPlanStats};
pub use datafusion_common::display::{PlanType, StringifiedPlan, ToStringifiedPlan};
pub use datafusion_common::{JoinConstraint, JoinType};

Expand Down Expand Up @@ -2277,7 +2278,9 @@ impl LogicalPlan {
}

pub fn filter(filter: Filter) -> Self {
let stats = filter.stats();
let stats =
LogicalPlanStats::new(enum_set!(LogicalPlanPattern::LogicalPlanFilter))
.merge(filter.stats());
LogicalPlan::Filter(filter, stats)
}

Expand All @@ -2292,17 +2295,21 @@ impl LogicalPlan {
}

pub fn aggregate(aggregate: Aggregate) -> Self {
let stats = aggregate.stats();
let stats =
LogicalPlanStats::new(enum_set!(LogicalPlanPattern::LogicalPlanAggregate))
.merge(aggregate.stats());
LogicalPlan::Aggregate(aggregate, stats)
}

pub fn sort(sort: Sort) -> Self {
let stats = sort.stats();
let stats = LogicalPlanStats::new(enum_set!(LogicalPlanPattern::LogicalPlanSort))
.merge(sort.stats());
LogicalPlan::Sort(sort, stats)
}

pub fn join(join: Join) -> Self {
let stats = join.stats();
let stats = LogicalPlanStats::new(enum_set!(LogicalPlanPattern::LogicalPlanJoin))
.merge(join.stats());
LogicalPlan::Join(join, stats)
}

Expand All @@ -2312,7 +2319,9 @@ impl LogicalPlan {
}

pub fn union(projection: Union) -> Self {
let stats = projection.stats();
let stats =
LogicalPlanStats::new(enum_set!(LogicalPlanPattern::LogicalPlanUnion))
.merge(projection.stats());
LogicalPlan::Union(projection, stats)
}

Expand All @@ -2332,7 +2341,9 @@ impl LogicalPlan {
}

pub fn limit(limit: Limit) -> Self {
let stats = limit.stats();
let stats =
LogicalPlanStats::new(enum_set!(LogicalPlanPattern::LogicalPlanLimit))
.merge(limit.stats());
LogicalPlan::Limit(limit, stats)
}

Expand Down
12 changes: 6 additions & 6 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,19 @@ pub enum LogicalPlanPattern {

// /// [`LogicalPlan`]
// LogicalPlanProjection,
// LogicalPlanFilter,
LogicalPlanFilter,
// LogicalPlanWindow,
// LogicalPlanAggregate,
// LogicalPlanSort,
// LogicalPlanJoin,
LogicalPlanAggregate,
LogicalPlanSort,
LogicalPlanJoin,
// LogicalPlanCrossJoin,
// LogicalPlanRepartition,
// LogicalPlanUnion,
LogicalPlanUnion,
// LogicalPlanTableScan,
// LogicalPlanEmptyRelation,
// LogicalPlanSubquery,
// LogicalPlanSubqueryAlias,
// LogicalPlanLimit,
LogicalPlanLimit,
// LogicalPlanStatement,
// LogicalPlanValues,
// LogicalPlanExplain,
Expand Down
95 changes: 52 additions & 43 deletions datafusion/optimizer/src/eliminate_duplicated_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::Result;
use datafusion_expr::logical_plan::tree_node::LogicalPlanPattern;
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{Aggregate, Expr, Sort, SortExpr};
use enumset::enum_set;
use indexmap::IndexSet;
use std::cell::Cell;
use std::hash::{Hash, Hasher};

/// Optimization rule that eliminate duplicated expr.
#[derive(Default, Debug)]
pub struct EliminateDuplicatedExpr;
Expand All @@ -49,10 +53,6 @@ impl Hash for SortExprWrapper {
}
}
impl OptimizerRule for EliminateDuplicatedExpr {
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn supports_rewrite(&self) -> bool {
true
}
Expand All @@ -62,51 +62,60 @@ impl OptimizerRule for EliminateDuplicatedExpr {
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Sort(sort, _) => {
let len = sort.expr.len();
let unique_exprs: Vec<_> = sort
.expr
.into_iter()
.map(SortExprWrapper)
.collect::<IndexSet<_>>()
.into_iter()
.map(|wrapper| wrapper.0)
.collect();
plan.transform_down_with_subqueries(|plan| {
if !plan.stats().contains_any_patterns(enum_set!(
LogicalPlanPattern::LogicalPlanSort
| LogicalPlanPattern::LogicalPlanAggregate
)) {
return Ok(Transformed::jump(plan));
}

let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Transformed::no
};
match plan {
LogicalPlan::Sort(sort, _) => {
let len = sort.expr.len();
let unique_exprs: Vec<_> = sort
.expr
.into_iter()
.map(SortExprWrapper)
.collect::<IndexSet<_>>()
.into_iter()
.map(|wrapper| wrapper.0)
.collect();

Ok(transformed(LogicalPlan::sort(Sort {
expr: unique_exprs,
input: sort.input,
fetch: sort.fetch,
})))
}
LogicalPlan::Aggregate(agg, _) => {
let len = agg.group_expr.len();
let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Transformed::no
};

Ok(transformed(LogicalPlan::sort(Sort {
expr: unique_exprs,
input: sort.input,
fetch: sort.fetch,
})))
}
LogicalPlan::Aggregate(agg, _) => {
let len = agg.group_expr.len();

let unique_exprs: Vec<Expr> = agg
.group_expr
.into_iter()
.collect::<IndexSet<_>>()
.into_iter()
.collect();
let unique_exprs: Vec<Expr> = agg
.group_expr
.into_iter()
.collect::<IndexSet<_>>()
.into_iter()
.collect();

let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Transformed::no
};
let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Transformed::no
};

Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr)
.map(|f| transformed(LogicalPlan::aggregate(f)))
Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr)
.map(|f| transformed(LogicalPlan::aggregate(f)))
}
_ => Ok(Transformed::no(plan)),
}
_ => Ok(Transformed::no(plan)),
}
})
}
fn name(&self) -> &str {
"eliminate_duplicated_expr"
Expand Down
55 changes: 30 additions & 25 deletions datafusion/optimizer/src/eliminate_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

//! [`EliminateFilter`] replaces `where false` or `where null` with an empty relation.
use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::tree_node::Transformed;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::logical_plan::tree_node::LogicalPlanPattern;
use datafusion_expr::{EmptyRelation, Expr, Filter, LogicalPlan};
use enumset::enum_set;
use std::sync::Arc;

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};

/// Optimization rule that eliminate the scalar value (true/false/null) filter
/// with an [LogicalPlan::EmptyRelation]
///
Expand All @@ -45,10 +46,6 @@ impl OptimizerRule for EliminateFilter {
"eliminate_filter"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn supports_rewrite(&self) -> bool {
true
}
Expand All @@ -58,25 +55,33 @@ impl OptimizerRule for EliminateFilter {
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Filter(
Filter {
predicate: Expr::Literal(ScalarValue::Boolean(v), _),
input,
..
},
_,
) => match v {
Some(true) => Ok(Transformed::yes(Arc::unwrap_or_clone(input))),
Some(false) | None => Ok(Transformed::yes(LogicalPlan::empty_relation(
EmptyRelation {
produce_one_row: false,
schema: Arc::clone(input.schema()),
plan.transform_down_with_subqueries(|plan| {
if !plan.stats().contains_any_patterns(enum_set!(
LogicalPlanPattern::LogicalPlanFilter | LogicalPlanPattern::ExprLiteral
)) {
return Ok(Transformed::jump(plan));
}

match plan {
LogicalPlan::Filter(
Filter {
predicate: Expr::Literal(ScalarValue::Boolean(v), _),
input,
..
},
))),
},
_ => Ok(Transformed::no(plan)),
}
_,
) => match v {
Some(true) => Ok(Transformed::yes(Arc::unwrap_or_clone(input))),
Some(false) | None => Ok(Transformed::yes(
LogicalPlan::empty_relation(EmptyRelation {
produce_one_row: false,
schema: Arc::clone(input.schema()),
}),
)),
},
_ => Ok(Transformed::no(plan)),
}
})
}
}

Expand Down
Loading

0 comments on commit 0233282

Please sign in to comment.