Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplifications and comment improvements #2

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
682 changes: 309 additions & 373 deletions datafusion/common/src/tree_node.rs

Large diffs are not rendered by default.

24 changes: 11 additions & 13 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,27 @@

use std::sync::Arc;

use arrow::compute::{and, cast, prep_null_mask_filter};
use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
use crate::{error::Result, scalar::ScalarValue};

use arrow::{
array::{ArrayRef, StringBuilder},
array::{Array, ArrayRef, AsArray, StringBuilder},
compute::{and, cast, prep_null_mask_filter},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_schema::Fields;
use futures::stream::FuturesUnordered;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use log::{debug, trace};

use crate::{error::Result, scalar::ScalarValue};

use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError};
use datafusion_expr::{Expr, ScalarFunctionDefinition, Volatility};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;

use futures::stream::{BoxStream, FuturesUnordered};
use futures::{StreamExt, TryStreamExt};
use log::{debug, trace};
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};

Expand Down
19 changes: 9 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::BTreeSet;
use std::sync::Arc;

use super::ParquetFileMetrics;
use crate::physical_plan::metrics;

use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
};
use datafusion_common::{arrow_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::reassign_predicate_columns;
use std::collections::BTreeSet;

use datafusion_physical_expr::{split_conjunction, PhysicalExpr};

use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use std::sync::Arc;

use crate::physical_plan::metrics;

use super::ParquetFileMetrics;

/// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which
/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`.
Expand Down Expand Up @@ -190,8 +190,7 @@ impl<'a> FilterCandidateBuilder<'a> {
mut self,
metadata: &ParquetMetaData,
) -> Result<Option<FilterCandidate>> {
let expr = self.expr.clone();
let expr = expr.rewrite(&mut self)?.data;
let expr = self.expr.clone().rewrite(&mut self).data()?;

if self.non_primitive_columns || self.projected_columns {
Ok(None)
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches

use crate::config::ConfigOptions;
use std::sync::Arc;

use crate::{
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, Partitioning,
},
};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use std::sync::Arc;

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
Expand Down
23 changes: 13 additions & 10 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
Expand Down Expand Up @@ -198,24 +198,24 @@ impl PhysicalOptimizerRule for EnforceDistribution {
// Run a top-down process to adjust input key ordering recursively
let plan_requirements = PlanWithKeyRequirements::new_default(plan);
let adjusted = plan_requirements
.transform_down(&adjust_input_keys_ordering)?
.data;
.transform_down(&adjust_input_keys_ordering)
.data()?;
adjusted.plan
} else {
// Run a bottom-up process
plan.transform_up(&|plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})?
.data
})
.data()?
};

let distribution_context = DistributionContext::new_default(adjusted);
// Distribution enforcement needs to be applied bottom-up.
let distribution_context = distribution_context
.transform_up(&|distribution_context| {
ensure_distribution(distribution_context, config)
})?
.data;
})
.data()?;
Ok(distribution_context.plan)
}

Expand Down Expand Up @@ -1788,22 +1788,25 @@ pub(crate) mod tests {
let plan_requirements =
PlanWithKeyRequirements::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_down(&adjust_input_keys_ordering).data()
.transform_down(&adjust_input_keys_ordering)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
adjusted.plan
} else {
// Run reorder_join_keys_to_inputs rule
$PLAN.clone().transform_up(&|plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})?.data
})
.data()?
};

// Then run ensure_distribution rule
DistributionContext::new_default(adjusted)
.transform_up(&|distribution_context| {
ensure_distribution(distribution_context, &config)
}).data()
})
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
}
Expand Down
20 changes: 12 additions & 8 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl PhysicalOptimizerRule for EnforceSorting {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
.transform_up(&parallelize_sorts)?
.data;
.transform_up(&parallelize_sorts)
.data()?;
parallel.plan
} else {
adjusted.plan
Expand All @@ -181,8 +181,8 @@ impl PhysicalOptimizerRule for EnforceSorting {
true,
config,
)
})?
.data;
})
.data()?;

// Execute a top-down traversal to exploit sort push-down opportunities
// missed by the bottom-up traversal:
Expand Down Expand Up @@ -687,15 +687,17 @@ mod tests {
{
let plan_requirements = PlanWithCorrespondingSort::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_up(&ensure_sorting).data()
.transform_up(&ensure_sorting)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.

let new_plan = if state.config_options().optimizer.repartition_sorts {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
.transform_up(&parallelize_sorts).data()
.transform_up(&parallelize_sorts)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
parallel.plan
Expand All @@ -712,14 +714,16 @@ mod tests {
true,
state.config_options(),
)
}).data()
})
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.

let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
sort_pushdown
.transform_down(&pushdown_sorts).data()
.transform_down(&pushdown_sorts)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ impl PhysicalOptimizerRule for JoinSelection {
Box::new(hash_join_swap_subrule),
];
let new_plan = plan
.transform_up(&|p| apply_subrules(p, &subrules, config))?
.data;
.transform_up(&|p| apply_subrules(p, &subrules, config))
.data()?;
// Next, we apply another subrule that tries to optimize joins using any
// statistics their inputs might have.
// - For a hash join with partition mode [`PartitionMode::Auto`], we will
Expand Down Expand Up @@ -813,8 +813,8 @@ mod tests_statistical {
Box::new(hash_join_swap_subrule),
];
let new_plan = plan
.transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new()))?
.data;
.transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new()))
.data()?;
// TODO: End state payloads will be checked here.
let config = ConfigOptions::new().optimizer;
let collect_left_threshold = config.hash_join_single_partition_threshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = if config.optimizer.enable_distinct_aggregation_soft_limit {
if config.optimizer.enable_distinct_aggregation_soft_limit {
plan.transform_down(&|plan| {
Ok(
if let Some(plan) =
Expand All @@ -173,12 +173,11 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
Transformed::no(plan)
},
)
})?
.data
})
.data()
} else {
plan
};
Ok(plan)
Ok(plan)
}
}

fn name(&self) -> &str {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ pub fn remove_unnecessary_projections(
let maybe_unified = try_unifying_projections(projection, child_projection)?;
return if let Some(new_plan) = maybe_unified {
// To unify 3 or more sequential projections:
Ok(Transformed::yes(
remove_unnecessary_projections(new_plan)?.data,
))
remove_unnecessary_projections(new_plan)
.data()
.map(Transformed::yes)
} else {
Ok(Transformed::no(plan))
};
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ use crate::{
logical_expr::Operator,
physical_plan::{ColumnarValue, PhysicalExpr},
};
use arrow::record_batch::RecordBatchOptions;

use arrow::{
array::{new_null_array, ArrayRef, BooleanArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
record_batch::{RecordBatch, RecordBatchOptions},
};
use arrow_array::cast::AsArray;
use datafusion_common::tree_node::TransformedResult;
use datafusion_common::{
internal_err, plan_err,
internal_err, plan_datafusion_err, plan_err,
tree_node::{Transformed, TreeNode},
ScalarValue,
};
use datafusion_common::{plan_datafusion_err, ScalarValue};
use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};

use log::trace;

/// A source of runtime statistical information to [`PruningPredicate`]s.
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/src/physical_optimizer/topk_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl PhysicalOptimizerRule for TopKAggregation {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = if config.optimizer.enable_topk_aggregation {
if config.optimizer.enable_topk_aggregation {
plan.transform_down(&|plan| {
Ok(
if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) {
Expand All @@ -149,12 +149,11 @@ impl PhysicalOptimizerRule for TopKAggregation {
Transformed::no(plan)
},
)
})?
.data
})
.data()
} else {
plan
};
Ok(plan)
Ok(plan)
}
}

fn name(&self) -> &str {
Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,12 @@

use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array};
use arrow::array::{Array, ArrayRef, AsArray, Int64Array};
use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::Array;
use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::task::JoinSet;

use datafusion::common::Result;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::aggregates::{
Expand All @@ -44,6 +37,11 @@ use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
use datafusion_physical_plan::InputOrderMode;
use test_utils::{add_empty_batches, StringBatchGenerator};

use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::task::JoinSet;

/// Tests that streaming aggregate and batch (non streaming) aggregate produce
/// same results
#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -316,6 +314,7 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) {

impl TreeNodeVisitor for Visitor {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
if let Some(exec) = node.as_any().downcast_ref::<AggregateExec>() {
if self.expected_sort {
Expand Down
Loading