Skip to content

Commit

Permalink
Add LogicalPlanStats
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Nov 20, 2024
1 parent de66be3 commit 82cbdee
Show file tree
Hide file tree
Showing 132 changed files with 5,718 additions and 3,766 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ datafusion-sql = { path = "datafusion/sql", version = "43.0.0" }
datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = "43.0.0" }
datafusion-substrait = { path = "datafusion/substrait", version = "43.0.0" }
doc-comment = "0.3"
enumset = "1.1.5"
env_logger = "0.11"
futures = "0.3"
half = { version = "2.2.1", default-features = false }
Expand Down
62 changes: 62 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ impl AdjustedPrintOptions {
// all rows
if matches!(
plan,
LogicalPlan::Explain(_)
LogicalPlan::Explain(_, _)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Analyze(_, _)
) {
self.inner.maxrows = MaxRows::Unlimited;
}
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn create_plan(
// Note that cmd is a mutable reference so that create_external_table function can remove all
// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &plan {
// To support custom formats, treat error as None
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
Expand All @@ -323,7 +323,7 @@ async fn create_plan(
.await?;
}

if let LogicalPlan::Copy(copy_to) = &mut plan {
if let LogicalPlan::Copy(copy_to, _) = &mut plan {
let format = config_file_type_from_str(&copy_to.file_type.get_ext());

register_object_store_and_config_extensions(
Expand Down Expand Up @@ -412,7 +412,7 @@ mod tests {
let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &plan {
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
&ctx,
Expand All @@ -438,7 +438,7 @@ mod tests {

let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Copy(cmd) = &plan {
if let LogicalPlan::Copy(cmd, _) = &plan {
let format = config_file_type_from_str(&cmd.file_type.get_ext());
register_object_store_and_config_extensions(
&ctx,
Expand Down Expand Up @@ -492,7 +492,7 @@ mod tests {
for statement in statements {
//Should not fail
let mut plan = create_plan(&ctx, statement).await?;
if let LogicalPlan::Copy(copy_to) = &mut plan {
if let LogicalPlan::Copy(copy_to, _) = &mut plan {
assert_eq!(copy_to.output_url, location);
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
ctx.runtime_env()
Expand Down
10 changes: 5 additions & 5 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -538,7 +538,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand All @@ -564,7 +564,7 @@ mod tests {

let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -592,7 +592,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -629,7 +629,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/analyzer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl AnalyzerRule for RowLevelAccessControl {
}

fn is_employee_table_scan(plan: &LogicalPlan) -> bool {
if let LogicalPlan::TableScan(scan) = plan {
if let LogicalPlan::TableScan(scan, _) = plan {
scan.table_name.table() == "employee"
} else {
false
Expand Down
8 changes: 4 additions & 4 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn main() -> Result<()> {
let expr = col("a") + lit(5);

// The same same expression can be created directly, with much more code:
let expr2 = Expr::BinaryExpr(BinaryExpr::new(
let expr2 = Expr::binary_expr(BinaryExpr::new(
Box::new(col("a")),
Operator::Plus,
Box::new(Expr::Literal(ScalarValue::Int32(Some(5)))),
Expand Down Expand Up @@ -396,20 +396,20 @@ fn type_coercion_demo() -> Result<()> {
let coerced_expr = expr
.transform(|e| {
// Only type coerces binary expressions.
let Expr::BinaryExpr(e) = e else {
let Expr::BinaryExpr(e, _) = e else {
return Ok(Transformed::no(e));
};
if let Expr::Column(ref col_expr) = *e.left {
let field = df_schema.field_with_name(None, col_expr.name())?;
let cast_to_type = field.data_type();
let coerced_right = e.right.cast_to(cast_to_type, &df_schema)?;
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
Ok(Transformed::yes(Expr::binary_expr(BinaryExpr::new(
e.left,
e.op,
Box::new(coerced_right),
))))
} else {
Ok(Transformed::no(Expr::BinaryExpr(e)))
Ok(Transformed::no(Expr::binary_expr(e)))
}
})?
.data;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl MyOptimizerRule {
expr.transform_up(|expr| {
// Closure called for each sub tree
match expr {
Expr::BinaryExpr(binary_expr) if is_binary_eq(&binary_expr) => {
Expr::BinaryExpr(binary_expr, _) if is_binary_eq(&binary_expr) => {
// destruture the expression
let BinaryExpr { left, op: _, right } = binary_expr;
// rewrite to `my_eq(left, right)`
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simplify_udaf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl AggregateUDFImpl for BetterAvgUdaf {
// as an example for this functionality we replace UDF function
// with build-in aggregate function to illustrate the use
let simplify = |aggregate_function: AggregateFunction, _: &dyn SimplifyInfo| {
Ok(Expr::AggregateFunction(AggregateFunction::new_udf(
Ok(Expr::aggregate_function(AggregateFunction::new_udf(
avg_udaf(),
// yes it is the same Avg, `BetterAvgUdaf` was just a
// marketing pitch :)
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simplify_udwf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
/// this function will simplify `SimplifySmoothItUdf` to `SmoothItUdf`.
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
Ok(Expr::WindowFunction(WindowFunction {
Ok(Expr::window_function(WindowFunction {
fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
args: window_function.args,
partition_by: window_function.partition_by,
Expand Down
8 changes: 4 additions & 4 deletions datafusion-examples/examples/sql_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn total_join_count(plan: &LogicalPlan) -> usize {
// We can use the TreeNode API to walk over a LogicalPlan.
plan.apply(|node| {
// if we encounter a join we update the running count
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
total += 1;
}
Ok(TreeNodeRecursion::Continue)
Expand Down Expand Up @@ -89,7 +89,7 @@ fn count_trees(plan: &LogicalPlan) -> (usize, Vec<usize>) {
while let Some(node) = to_visit.pop() {
// if we encounter a join, we know were at the root of the tree
// count this tree and recurse on it's inputs
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
let (group_count, inputs) = count_tree(node);
total += group_count;
groups.push(group_count);
Expand Down Expand Up @@ -146,12 +146,12 @@ fn count_tree(join: &LogicalPlan) -> (usize, Vec<&LogicalPlan>) {
// / \
// B C
// we can continue the recursion in this case
if let LogicalPlan::Projection(_) = node {
if let LogicalPlan::Projection(_, _) = node {
return Ok(TreeNodeRecursion::Continue);
}

// any join we count
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
total += 1;
Ok(TreeNodeRecursion::Continue)
} else {
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ impl DataFrame {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<DataFrame> {
let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_, _)]);
let aggr_expr_len = aggr_expr.len();
let plan = LogicalPlanBuilder::from(self.plan)
.aggregate(group_expr, aggr_expr)?
Expand Down Expand Up @@ -1404,7 +1404,7 @@ impl DataFrame {
/// # }
/// ```
pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
if matches!(self.plan, LogicalPlan::Explain(_)) {
if matches!(self.plan, LogicalPlan::Explain(_, _)) {
return plan_err!("Nested EXPLAINs are not supported");
}
let plan = LogicalPlanBuilder::from(self.plan)
Expand Down Expand Up @@ -2176,7 +2176,7 @@ mod tests {
async fn select_with_window_exprs() -> Result<()> {
// build plan using Table API
let t = test_table().await?;
let first_row = Expr::WindowFunction(WindowFunction::new(
let first_row = Expr::window_function(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(first_value_udwf()),
vec![col("aggregate_test_100.c1")],
))
Expand Down Expand Up @@ -2742,7 +2742,7 @@ mod tests {
vec![col("c3")],
);

Expr::WindowFunction(w)
Expr::window_function(w)
.null_treatment(NullTreatment::IgnoreNulls)
.order_by(vec![col("c2").sort(true, true), col("c3").sort(true, true)])
.window_frame(WindowFrame::new_bounds(
Expand Down
Loading

0 comments on commit 82cbdee

Please sign in to comment.