Skip to content

Commit

Permalink
Add LogicalPlanStats
Browse files Browse the repository at this point in the history
# Conflicts:
#	datafusion/sql/src/unparser/utils.rs
#	datafusion/substrait/src/logical_plan/consumer.rs
#	datafusion/substrait/src/logical_plan/producer.rs
  • Loading branch information
peter-toth committed Nov 29, 2024
1 parent d86b582 commit 4d4c7be
Show file tree
Hide file tree
Showing 154 changed files with 6,228 additions and 4,410 deletions.
16 changes: 8 additions & 8 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ impl AdjustedPrintOptions {
// all rows
if matches!(
plan,
LogicalPlan::Explain(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
LogicalPlan::Explain(_, _)
| LogicalPlan::DescribeTable(_, _)
| LogicalPlan::Analyze(_, _)
) {
self.inner.maxrows = MaxRows::Unlimited;
}
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn create_plan(
// Note that cmd is a mutable reference so that create_external_table function can remove all
// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &plan {
// To support custom formats, treat error as None
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
Expand All @@ -323,7 +323,7 @@ async fn create_plan(
.await?;
}

if let LogicalPlan::Copy(copy_to) = &mut plan {
if let LogicalPlan::Copy(copy_to, _) = &mut plan {
let format = config_file_type_from_str(&copy_to.file_type.get_ext());

register_object_store_and_config_extensions(
Expand Down Expand Up @@ -412,7 +412,7 @@ mod tests {
let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &plan {
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
&ctx,
Expand All @@ -438,7 +438,7 @@ mod tests {

let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Copy(cmd) = &plan {
if let LogicalPlan::Copy(cmd, _) = &plan {
let format = config_file_type_from_str(&cmd.file_type.get_ext());
register_object_store_and_config_extensions(
&ctx,
Expand Down Expand Up @@ -492,7 +492,7 @@ mod tests {
for statement in statements {
//Should not fail
let mut plan = create_plan(&ctx, statement).await?;
if let LogicalPlan::Copy(copy_to) = &mut plan {
if let LogicalPlan::Copy(copy_to, _) = &mut plan {
assert_eq!(copy_to.output_url, location);
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
ctx.runtime_env()
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ pub struct ParquetMetadataFunc {}
impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. }, _)) => name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
Expand Down
10 changes: 5 additions & 5 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -538,7 +538,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand All @@ -564,7 +564,7 @@ mod tests {

let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -592,7 +592,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -629,7 +629,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/analyzer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl AnalyzerRule for RowLevelAccessControl {
}

fn is_employee_table_scan(plan: &LogicalPlan) -> bool {
if let LogicalPlan::TableScan(scan) = plan {
if let LogicalPlan::TableScan(scan, _) = plan {
scan.table_name.table() == "employee"
} else {
false
Expand Down
12 changes: 6 additions & 6 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ async fn main() -> Result<()> {
let expr = col("a") + lit(5);

// The same same expression can be created directly, with much more code:
let expr2 = Expr::BinaryExpr(BinaryExpr::new(
let expr2 = Expr::binary_expr(BinaryExpr::new(
Box::new(col("a")),
Operator::Plus,
Box::new(Expr::Literal(ScalarValue::Int32(Some(5)))),
Box::new(Expr::literal(ScalarValue::Int32(Some(5)))),
));
assert_eq!(expr, expr2);

Expand Down Expand Up @@ -396,20 +396,20 @@ fn type_coercion_demo() -> Result<()> {
let coerced_expr = expr
.transform(|e| {
// Only type coerces binary expressions.
let Expr::BinaryExpr(e) = e else {
let Expr::BinaryExpr(e, _) = e else {
return Ok(Transformed::no(e));
};
if let Expr::Column(ref col_expr) = *e.left {
if let Expr::Column(ref col_expr, _) = *e.left {
let field = df_schema.field_with_name(None, col_expr.name())?;
let cast_to_type = field.data_type();
let coerced_right = e.right.cast_to(cast_to_type, &df_schema)?;
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
Ok(Transformed::yes(Expr::binary_expr(BinaryExpr::new(
e.left,
e.op,
Box::new(coerced_right),
))))
} else {
Ok(Transformed::no(Expr::BinaryExpr(e)))
Ok(Transformed::no(Expr::binary_expr(e)))
}
})?
.data;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl ScalarFunctionWrapper {
fn replacement(expr: &Expr, args: &[Expr]) -> Result<Expr> {
let result = expr.clone().transform(|e| {
let r = match e {
Expr::Placeholder(placeholder) => {
Expr::Placeholder(placeholder, _) => {
let placeholder_position =
Self::parse_placeholder_identifier(&placeholder.id)?;
if placeholder_position < args.len() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl MyOptimizerRule {
expr.transform_up(|expr| {
// Closure called for each sub tree
match expr {
Expr::BinaryExpr(binary_expr) if is_binary_eq(&binary_expr) => {
Expr::BinaryExpr(binary_expr, _) if is_binary_eq(&binary_expr) => {
// destruture the expression
let BinaryExpr { left, op: _, right } = binary_expr;
// rewrite to `my_eq(left, right)`
Expand All @@ -171,7 +171,7 @@ fn is_binary_eq(binary_expr: &BinaryExpr) -> bool {

/// Return true if the expression is a literal or column reference
fn is_lit_or_col(expr: &Expr) -> bool {
matches!(expr, Expr::Column(_) | Expr::Literal(_))
matches!(expr, Expr::Column(_, _) | Expr::Literal(_, _))
}

/// A simple user defined filter function
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ struct LocalCsvTableFunc {}

impl TableFunctionImpl for LocalCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)))) = exprs.first() else {
let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)), _)) = exprs.first()
else {
return plan_err!("read_csv requires at least one string argument");
};

Expand All @@ -145,7 +146,7 @@ impl TableFunctionImpl for LocalCsvTableFunc {
let info = SimplifyContext::new(&execution_props);
let expr = ExprSimplifier::new(info).simplify(expr.clone())?;

if let Expr::Literal(ScalarValue::Int64(Some(limit))) = expr {
if let Expr::Literal(ScalarValue::Int64(Some(limit)), _) = expr {
Ok(limit as usize)
} else {
plan_err!("Limit must be an integer")
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simplify_udaf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl AggregateUDFImpl for BetterAvgUdaf {
// as an example for this functionality we replace UDF function
// with build-in aggregate function to illustrate the use
let simplify = |aggregate_function: AggregateFunction, _: &dyn SimplifyInfo| {
Ok(Expr::AggregateFunction(AggregateFunction::new_udf(
Ok(Expr::aggregate_function(AggregateFunction::new_udf(
avg_udaf(),
// yes it is the same Avg, `BetterAvgUdaf` was just a
// marketing pitch :)
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simplify_udwf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
/// this function will simplify `SimplifySmoothItUdf` to `SmoothItUdf`.
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
Ok(Expr::WindowFunction(WindowFunction {
Ok(Expr::window_function(WindowFunction {
fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
args: window_function.args,
partition_by: window_function.partition_by,
Expand Down
8 changes: 4 additions & 4 deletions datafusion-examples/examples/sql_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn total_join_count(plan: &LogicalPlan) -> usize {
// We can use the TreeNode API to walk over a LogicalPlan.
plan.apply(|node| {
// if we encounter a join we update the running count
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
total += 1;
}
Ok(TreeNodeRecursion::Continue)
Expand Down Expand Up @@ -89,7 +89,7 @@ fn count_trees(plan: &LogicalPlan) -> (usize, Vec<usize>) {
while let Some(node) = to_visit.pop() {
// if we encounter a join, we know were at the root of the tree
// count this tree and recurse on it's inputs
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
let (group_count, inputs) = count_tree(node);
total += group_count;
groups.push(group_count);
Expand Down Expand Up @@ -146,12 +146,12 @@ fn count_tree(join: &LogicalPlan) -> (usize, Vec<&LogicalPlan>) {
// / \
// B C
// we can continue the recursion in this case
if let LogicalPlan::Projection(_) = node {
if let LogicalPlan::Projection(_, _) = node {
return Ok(TreeNodeRecursion::Continue);
}

// any join we count
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
total += 1;
Ok(TreeNodeRecursion::Continue)
} else {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ pub trait TableProvider: Debug + Sync + Send {
/// let support: Vec<_> = filters.iter().map(|expr| {
/// match expr {
/// // This example only supports a between expr with a single column named "c1".
/// Expr::Between(between_expr) => {
/// Expr::Between(between_expr, _) => {
/// between_expr.expr
/// .try_as_col()
/// .map(|column| {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,11 @@ impl<T> Transformed<T> {
Self::new(data, false, TreeNodeRecursion::Continue)
}

/// Wrapper for unchanged data with [`TreeNodeRecursion::Jump`] statement.
pub fn jump(data: T) -> Self {
Self::new(data, false, TreeNodeRecursion::Jump)
}

/// Applies an infallible `f` to the data of this [`Transformed`] object,
/// without modifying the `transformed` flag.
pub fn update_data<U, F: FnOnce(T) -> U>(self, f: F) -> Transformed<U> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/benches/map_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ fn criterion_benchmark(c: &mut Criterion) {
let mut value_buffer = Vec::new();

for i in 0..1000 {
key_buffer.push(Expr::Literal(ScalarValue::Utf8(Some(keys[i].clone()))));
value_buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i]))));
key_buffer.push(Expr::literal(ScalarValue::Utf8(Some(keys[i].clone()))));
value_buffer.push(Expr::literal(ScalarValue::Int32(Some(values[i]))));
}
c.bench_function("map_1000_1", |b| {
b.iter(|| {
Expand Down
18 changes: 9 additions & 9 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl DataFrame {
.collect::<Result<Vec<_>>>()?;
let expr: Vec<Expr> = fields
.into_iter()
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
.map(|(qualifier, field)| Expr::column(Column::from((qualifier, field))))
.collect();
self.select(expr)
}
Expand Down Expand Up @@ -369,7 +369,7 @@ impl DataFrame {
.enumerate()
.map(|(idx, _)| self.plan.schema().qualified_field(idx))
.filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
.map(|(qualifier, field)| Expr::column(Column::from((qualifier, field))))
.collect();
self.select(expr)
}
Expand Down Expand Up @@ -513,7 +513,7 @@ impl DataFrame {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<DataFrame> {
let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_, _)]);
let aggr_expr_len = aggr_expr.len();
let plan = LogicalPlanBuilder::from(self.plan)
.aggregate(group_expr, aggr_expr)?
Expand All @@ -527,7 +527,7 @@ impl DataFrame {
.into_iter()
.enumerate()
.filter(|(idx, _)| *idx != grouping_id_pos)
.map(|(_, column)| Expr::Column(column))
.map(|(_, column)| Expr::column(column))
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan).project(exprs)?.build()?
} else {
Expand Down Expand Up @@ -1164,7 +1164,7 @@ impl DataFrame {
/// ```
pub async fn count(self) -> Result<usize> {
let rows = self
.aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?
.aggregate(vec![], vec![count(Expr::literal(COUNT_STAR_EXPANSION))])?
.collect()
.await?;
let len = *rows
Expand Down Expand Up @@ -1403,7 +1403,7 @@ impl DataFrame {
/// # }
/// ```
pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
if matches!(self.plan, LogicalPlan::Explain(_)) {
if matches!(self.plan, LogicalPlan::Explain(_, _)) {
return plan_err!("Nested EXPLAINs are not supported");
}
let plan = LogicalPlanBuilder::from(self.plan)
Expand Down Expand Up @@ -2175,7 +2175,7 @@ mod tests {
async fn select_with_window_exprs() -> Result<()> {
// build plan using Table API
let t = test_table().await?;
let first_row = Expr::WindowFunction(WindowFunction::new(
let first_row = Expr::window_function(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(first_value_udwf()),
vec![col("aggregate_test_100.c1")],
))
Expand Down Expand Up @@ -2741,7 +2741,7 @@ mod tests {
vec![col("c3")],
);

Expr::WindowFunction(w)
Expr::window_function(w)
.null_treatment(NullTreatment::IgnoreNulls)
.order_by(vec![col("c2").sort(true, true), col("c3").sort(true, true)])
.window_frame(WindowFrame::new_bounds(
Expand Down Expand Up @@ -3007,7 +3007,7 @@ mod tests {
let join = left.clone().join_on(
right.clone(),
JoinType::Inner,
Some(Expr::Literal(ScalarValue::Null)),
Some(Expr::literal(ScalarValue::Null)),
)?;
let expected_plan = "EmptyRelation";
assert_eq!(expected_plan, format!("{}", join.into_optimized_plan()?));
Expand Down
Loading

0 comments on commit 4d4c7be

Please sign in to comment.