Skip to content

Commit

Permalink
Use LogicalPlanStats
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Nov 29, 2024
1 parent 13eca9f commit 7711964
Show file tree
Hide file tree
Showing 37 changed files with 2,127 additions and 1,606 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ datafusion-proto = { path = "datafusion/proto", version = "43.0.0" }
datafusion-proto-common = { path = "datafusion/proto-common", version = "43.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "43.0.0" }
doc-comment = "0.3"
enumset = "1.1.5"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
Expand Down
53 changes: 27 additions & 26 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ datafusion-physical-expr-common = { workspace = true }
datafusion-physical-optimizer = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }
enumset = { workspace = true }
flate2 = { version = "1.0.24", optional = true }
futures = { workspace = true }
glob = "0.3.0"
Expand Down
74 changes: 40 additions & 34 deletions datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use arrow::{
util::pretty::pretty_format_batches,
};
use async_trait::async_trait;
use enumset::enum_set;
use futures::{Stream, StreamExt};

use datafusion::execution::session_state::SessionStateBuilder;
Expand Down Expand Up @@ -97,8 +98,8 @@ use datafusion::{
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::ScalarValue;
use datafusion_expr::logical_plan::tree_node::LogicalPlanPattern;
use datafusion_expr::{FetchType, Projection, SortExpr};
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::AnalyzerRule;

/// Execute the specified sql and return the resulting record batches
Expand Down Expand Up @@ -343,10 +344,6 @@ impl OptimizerRule for TopKOptimizerRule {
"topk"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn supports_rewrite(&self) -> bool {
true
}
Expand All @@ -357,38 +354,47 @@ impl OptimizerRule for TopKOptimizerRule {
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
let LogicalPlan::Limit(ref limit, _) = plan else {
return Ok(Transformed::no(plan));
};
let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else {
return Ok(Transformed::no(plan));
};
plan.transform_down_with_subqueries(|plan| {
if !plan.stats().contains_all_patterns(enum_set!(
LogicalPlanPattern::LogicalPlanLimit
| LogicalPlanPattern::LogicalPlanSort
)) {
return Ok(Transformed::jump(plan));
}

if let LogicalPlan::Sort(
Sort {
ref expr,
ref input,
..
},
_,
) = limit.input.as_ref()
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a a TopK
return Ok(Transformed::yes(LogicalPlan::extension(Extension {
node: Arc::new(TopKPlanNode {
k: fetch,
input: input.as_ref().clone(),
expr: expr[0].clone(),
}),
})));
// Note: this code simply looks for the pattern of a Limit followed by a
// Sort and replaces it by a TopK node. It does not handle many
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
let LogicalPlan::Limit(ref limit, _) = plan else {
return Ok(Transformed::no(plan));
};
let FetchType::Literal(Some(fetch)) = limit.get_fetch_type()? else {
return Ok(Transformed::no(plan));
};

if let LogicalPlan::Sort(
Sort {
ref expr,
ref input,
..
},
_,
) = limit.input.as_ref()
{
if expr.len() == 1 {
// we found a sort with a single sort expr, replace with a a TopK
return Ok(Transformed::yes(LogicalPlan::extension(Extension {
node: Arc::new(TopKPlanNode {
k: fetch,
input: input.as_ref().clone(),
expr: expr[0].clone(),
}),
})));
}
}
}

Ok(Transformed::no(plan))
Ok(Transformed::no(plan))
})
}
}

Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion-expr-common = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
enumset = { workspace = true }
indexmap = { workspace = true }
paste = "^1.0"
recursive = { workspace = true }
Expand Down
Loading

0 comments on commit 7711964

Please sign in to comment.