Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Pruning into physical-optimizer crate #13485

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub mod enforce_sorting;
pub mod join_selection;
pub mod optimizer;
pub mod projection_pushdown;
pub mod pruning;
pub mod replace_with_order_preserving_variants;
pub mod sanity_checker;
#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-optimizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ arrow-schema = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-execution = { workspace = true }
datafusion-expr-common = { workspace = true, default-features = true }
datafusion-optimizer = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
recursive = { workspace = true }

[dev-dependencies]
datafusion-expr = { workspace = true }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed in test.

datafusion-functions-aggregate = { workspace = true }
datafusion-functions-nested = { workspace = true }
tokio = { workspace = true }
1 change: 1 addition & 0 deletions datafusion/physical-optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod limit_pushdown;
pub mod limited_distinct_aggregation;
mod optimizer;
pub mod output_requirements;
pub mod pruning;
pub mod topk_aggregation;
pub mod update_aggr_exprs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,30 @@
//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
//! based on statistics (e.g. Parquet Row Groups)
//!
//! [`Expr`]: crate::prelude::Expr
//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
use std::collections::HashSet;
Copy link
Contributor Author

@irenjj irenjj Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

datafusion-expr dependency is needed by //! [Expr]: crate::prelude::Expr, should we remove the comment to avoid introducing new dependencies? @alamb

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places, we have changed the link to docs.rs directly. So in this case, change the link to

https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html

use std::sync::Arc;

use crate::{
common::{Column, DFSchema},
error::{DataFusionError, Result},
logical_expr::Operator,
physical_plan::{ColumnarValue, PhysicalExpr},
};

use arrow::array::AsArray;
use arrow::{
array::{new_null_array, ArrayRef, BooleanArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::{RecordBatch, RecordBatchOptions},
};
use arrow_array::cast::AsArray;
use log::trace;

use datafusion_common::error::{DataFusionError, Result};
use datafusion_common::tree_node::TransformedResult;
use datafusion_common::{
internal_err, plan_datafusion_err, plan_err,
tree_node::{Transformed, TreeNode},
ScalarValue,
};
use datafusion_common::{Column, DFSchema};
use datafusion_expr_common::operator::Operator;
use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};

use log::trace;
use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};

/// A source of runtime statistical information to [`PruningPredicate`]s.
///
Expand Down Expand Up @@ -567,7 +564,7 @@ impl PruningPredicate {
/// expressions like `b = false`, but it does handle the
/// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
///
/// [`ExprSimplifier`]: crate::optimizer::simplify_expressions::ExprSimplifier
/// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> Result<Vec<bool>> {
let mut builder = BoolVecBuilder::new(statistics.num_containers());

Expand Down Expand Up @@ -653,7 +650,7 @@ impl PruningPredicate {

// this is only used by `parquet` feature right now
#[allow(dead_code)]
pub(crate) fn required_columns(&self) -> &RequiredColumns {
pub fn required_columns(&self) -> &RequiredColumns {
&self.required_columns
}

Expand Down Expand Up @@ -762,7 +759,7 @@ fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
/// Handles creating references to the min/max statistics
/// for columns as well as recording which statistics are needed
#[derive(Debug, Default, Clone)]
pub(crate) struct RequiredColumns {
pub struct RequiredColumns {
/// The statistics required to evaluate this predicate:
/// * The unqualified column in the input schema
/// * Statistics type (e.g. Min or Max or Null_Count)
Expand All @@ -786,7 +783,7 @@ impl RequiredColumns {
/// * `true` returns None
#[allow(dead_code)]
// this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
pub(crate) fn single_column(&self) -> Option<&phys_expr::Column> {
pub fn single_column(&self) -> Option<&phys_expr::Column> {
if self.columns.windows(2).all(|w| {
// check if all columns are the same (ignoring statistics and field)
let c1 = &w[0].0;
Expand Down Expand Up @@ -1664,15 +1661,14 @@ mod tests {
use std::ops::{Not, Rem};

use super::*;
use crate::assert_batches_eq;
use crate::logical_expr::{col, lit};
use datafusion_common::assert_batches_eq;
use datafusion_expr::{col, lit};

use arrow::array::Decimal128Array;
use arrow::{
array::{BinaryArray, Int32Array, Int64Array, StringArray},
array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
datatypes::TimeUnit,
};
use arrow_array::UInt64Array;
use datafusion_expr::expr::InList;
use datafusion_expr::{cast, is_null, try_cast, Expr};
use datafusion_functions_nested::expr_fn::{array_has, make_array};
Expand Down Expand Up @@ -3536,7 +3532,7 @@ mod tests {
// more complex case with unknown column
let input = known_expression.clone().and(input.clone());
let expected = phys_expr::BinaryExpr::new(
known_expression_transformed.clone(),
Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
Operator::And,
logical2physical(&lit(42), &schema),
);
Expand All @@ -3552,7 +3548,7 @@ mod tests {
// more complex case with unknown expression
let input = known_expression.and(input);
let expected = phys_expr::BinaryExpr::new(
known_expression_transformed.clone(),
Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
Operator::And,
logical2physical(&lit(42), &schema),
);
Expand Down Expand Up @@ -4038,7 +4034,7 @@ mod tests {
) {
println!("Pruning with expr: {}", expr);
let expr = logical2physical(&expr, schema);
let p = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
let result = p.prune(statistics).unwrap();
assert_eq!(result, expected);
}
Expand Down