Skip to content

Commit

Permalink
Deprecate OptimizerRule::try_optimize (apache#11022)
Browse files Browse the repository at this point in the history
* Deprecate OptimizerRule::try_optimize

* optimize_children

* Apply review suggestions

* Fix clippy lint
  • Loading branch information
lewiszlw authored and xinlifoobar committed Jun 22, 2024
1 parent aee6e4d commit 0658959
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 42 deletions.
48 changes: 26 additions & 22 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, Result, ScalarValue};
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::{
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF,
};
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_optimizer::optimizer::{ApplyOrder, Optimizer};
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion_sql::sqlparser::parser::Parser;
Expand Down Expand Up @@ -133,30 +133,34 @@ impl OptimizerRule for MyOptimizerRule {

fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// recurse down and optimize children first
let optimized_plan = utils::optimize_children(self, plan, config)?;
match optimized_plan {
Some(LogicalPlan::Filter(filter)) => {
unreachable!()
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
match plan {
LogicalPlan::Filter(filter) => {
let predicate = my_rewrite(filter.predicate.clone())?;
Ok(Some(LogicalPlan::Filter(Filter::try_new(
Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input,
filter.input.clone(),
)?)))
}
Some(optimized_plan) => Ok(Some(optimized_plan)),
None => match plan {
LogicalPlan::Filter(filter) => {
let predicate = my_rewrite(filter.predicate.clone())?;
Ok(Some(LogicalPlan::Filter(Filter::try_new(
predicate,
filter.input.clone(),
)?)))
}
_ => Ok(None),
},
_ => Ok(Transformed::no(plan)),
}
}
}
Expand Down
44 changes: 29 additions & 15 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use datafusion::{
Expr, Extension, Limit, LogicalPlan, Sort, UserDefinedLogicalNode,
UserDefinedLogicalNodeCore,
},
optimizer::{optimize_children, OptimizerConfig, OptimizerRule},
optimizer::{OptimizerConfig, OptimizerRule},
physical_expr::EquivalenceProperties,
physical_plan::{
DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
Expand All @@ -92,6 +92,8 @@ use datafusion::{
};

use async_trait::async_trait;
use datafusion_common::tree_node::Transformed;
use datafusion_optimizer::optimizer::ApplyOrder;
use futures::{Stream, StreamExt};

/// Execute the specified sql and return the resulting record batches
Expand Down Expand Up @@ -282,17 +284,37 @@ impl OptimizerRule for TopKOptimizerRule {
// Example rewrite pass to insert a user defined LogicalPlanNode
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
unreachable!()
}

fn name(&self) -> &str {
"topk"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
if let LogicalPlan::Limit(Limit {
fetch: Some(fetch),
input,
..
}) = plan
}) = &plan
{
if let LogicalPlan::Sort(Sort {
ref expr,
Expand All @@ -302,26 +324,18 @@ impl OptimizerRule for TopKOptimizerRule {
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a a TopK
return Ok(Some(LogicalPlan::Extension(Extension {
return Ok(Transformed::yes(LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode {
k: *fetch,
input: self
.try_optimize(input.as_ref(), config)?
.unwrap_or_else(|| input.as_ref().clone()),
input: input.as_ref().clone(),
expr: expr[0].clone(),
}),
})));
}
}
}

// If we didn't find the Limit/Sort combination, recurse as
// normal and build the result.
optimize_children(self, plan, config)
}

fn name(&self) -> &str {
"topk"
Ok(Transformed::no(plan))
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub mod test;

pub use analyzer::{Analyzer, AnalyzerRule};
pub use optimizer::{Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule};
#[allow(deprecated)]
pub use utils::optimize_children;

pub(crate) mod join_key_set;
Expand Down
21 changes: 19 additions & 2 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ pub trait OptimizerRule {
/// Note this API will be deprecated in the future as it requires `clone`ing
/// the input plan, which can be expensive. OptimizerRules should implement
/// [`Self::rewrite`] instead.
#[deprecated(
since = "40.0.0",
note = "please implement supports_rewrite and rewrite instead"
)]
fn try_optimize(
&self,
plan: &LogicalPlan,
Expand Down Expand Up @@ -332,6 +336,7 @@ fn optimize_plan_node(
return rule.rewrite(plan, config);
}

#[allow(deprecated)]
rule.try_optimize(&plan, config).map(|maybe_plan| {
match maybe_plan {
Some(new_plan) => {
Expand Down Expand Up @@ -483,7 +488,7 @@ mod tests {
use std::sync::{Arc, Mutex};

use datafusion_common::tree_node::Transformed;
use datafusion_common::{plan_err, DFSchema, DFSchemaRef, Result};
use datafusion_common::{plan_err, DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion_expr::logical_plan::EmptyRelation;
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};

Expand Down Expand Up @@ -667,12 +672,24 @@ mod tests {
_: &LogicalPlan,
_: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
plan_err!("rule failed")
unreachable!()
}

fn name(&self) -> &str {
"bad rule"
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
_plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
plan_err!("rule failed")
}
}

/// Replaces whatever plan with a single table scan
Expand Down
17 changes: 14 additions & 3 deletions datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ use log::{debug, trace};
/// This also handles the case when the `plan` is a [`LogicalPlan::Explain`].
///
/// Returning `Ok(None)` indicates that the plan can't be optimized by the `optimizer`.
#[deprecated(
since = "40.0.0",
note = "please use OptimizerRule::apply_order with ApplyOrder::BottomUp instead"
)]
pub fn optimize_children(
optimizer: &impl OptimizerRule,
plan: &LogicalPlan,
Expand All @@ -43,9 +47,16 @@ pub fn optimize_children(
let mut new_inputs = Vec::with_capacity(plan.inputs().len());
let mut plan_is_changed = false;
for input in plan.inputs() {
let new_input = optimizer.try_optimize(input, config)?;
plan_is_changed = plan_is_changed || new_input.is_some();
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
if optimizer.supports_rewrite() {
let new_input = optimizer.rewrite(input.clone(), config)?;
plan_is_changed = plan_is_changed || new_input.transformed;
new_inputs.push(new_input.data);
} else {
#[allow(deprecated)]
let new_input = optimizer.try_optimize(input, config)?;
plan_is_changed = plan_is_changed || new_input.is_some();
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
}
if plan_is_changed {
let exprs = plan.expressions();
Expand Down

0 comments on commit 0658959

Please sign in to comment.