Skip to content

Commit

Permalink
Simplifications and comment improvements (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozankabak authored Mar 4, 2024
1 parent 8306c9d commit c944b81
Show file tree
Hide file tree
Showing 47 changed files with 663 additions and 710 deletions.
682 changes: 309 additions & 373 deletions datafusion/common/src/tree_node.rs

Large diffs are not rendered by default.

24 changes: 11 additions & 13 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,27 @@
use std::sync::Arc;

use arrow::compute::{and, cast, prep_null_mask_filter};
use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
use crate::{error::Result, scalar::ScalarValue};

use arrow::{
array::{ArrayRef, StringBuilder},
array::{Array, ArrayRef, AsArray, StringBuilder},
compute::{and, cast, prep_null_mask_filter},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use arrow_array::cast::AsArray;
use arrow_array::Array;
use arrow_schema::Fields;
use futures::stream::FuturesUnordered;
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use log::{debug, trace};

use crate::{error::Result, scalar::ScalarValue};

use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
use crate::execution::context::SessionState;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{internal_err, Column, DFField, DFSchema, DataFusionError};
use datafusion_expr::{Expr, ScalarFunctionDefinition, Volatility};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::execution_props::ExecutionProps;

use futures::stream::{BoxStream, FuturesUnordered};
use futures::{StreamExt, TryStreamExt};
use log::{debug, trace};
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};

Expand Down
19 changes: 9 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::BTreeSet;
use std::sync::Arc;

use super::ParquetFileMetrics;
use crate::physical_plan::metrics;

use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
};
use datafusion_common::{arrow_err, DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::reassign_predicate_columns;
use std::collections::BTreeSet;

use datafusion_physical_expr::{split_conjunction, PhysicalExpr};

use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use std::sync::Arc;

use crate::physical_plan::metrics;

use super::ParquetFileMetrics;

/// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which
/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`.
Expand Down Expand Up @@ -190,8 +190,7 @@ impl<'a> FilterCandidateBuilder<'a> {
mut self,
metadata: &ParquetMetaData,
) -> Result<Option<FilterCandidate>> {
let expr = self.expr.clone();
let expr = expr.rewrite(&mut self)?.data;
let expr = self.expr.clone().rewrite(&mut self).data()?;

if self.non_primitive_columns || self.projected_columns {
Ok(None)
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches
use crate::config::ConfigOptions;
use std::sync::Arc;

use crate::{
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, Partitioning,
},
};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use std::sync::Arc;

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
Expand Down
23 changes: 13 additions & 10 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning};

use arrow::compute::SortOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::map_columns_before_projection;
Expand Down Expand Up @@ -198,24 +198,24 @@ impl PhysicalOptimizerRule for EnforceDistribution {
// Run a top-down process to adjust input key ordering recursively
let plan_requirements = PlanWithKeyRequirements::new_default(plan);
let adjusted = plan_requirements
.transform_down(&adjust_input_keys_ordering)?
.data;
.transform_down(&adjust_input_keys_ordering)
.data()?;
adjusted.plan
} else {
// Run a bottom-up process
plan.transform_up(&|plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})?
.data
})
.data()?
};

let distribution_context = DistributionContext::new_default(adjusted);
// Distribution enforcement needs to be applied bottom-up.
let distribution_context = distribution_context
.transform_up(&|distribution_context| {
ensure_distribution(distribution_context, config)
})?
.data;
})
.data()?;
Ok(distribution_context.plan)
}

Expand Down Expand Up @@ -1788,22 +1788,25 @@ pub(crate) mod tests {
let plan_requirements =
PlanWithKeyRequirements::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_down(&adjust_input_keys_ordering).data()
.transform_down(&adjust_input_keys_ordering)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
adjusted.plan
} else {
// Run reorder_join_keys_to_inputs rule
$PLAN.clone().transform_up(&|plan| {
Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
})?.data
})
.data()?
};

// Then run ensure_distribution rule
DistributionContext::new_default(adjusted)
.transform_up(&|distribution_context| {
ensure_distribution(distribution_context, &config)
}).data()
})
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
}
Expand Down
20 changes: 12 additions & 8 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl PhysicalOptimizerRule for EnforceSorting {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
.transform_up(&parallelize_sorts)?
.data;
.transform_up(&parallelize_sorts)
.data()?;
parallel.plan
} else {
adjusted.plan
Expand All @@ -181,8 +181,8 @@ impl PhysicalOptimizerRule for EnforceSorting {
true,
config,
)
})?
.data;
})
.data()?;

// Execute a top-down traversal to exploit sort push-down opportunities
// missed by the bottom-up traversal:
Expand Down Expand Up @@ -687,15 +687,17 @@ mod tests {
{
let plan_requirements = PlanWithCorrespondingSort::new_default($PLAN.clone());
let adjusted = plan_requirements
.transform_up(&ensure_sorting).data()
.transform_up(&ensure_sorting)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.

let new_plan = if state.config_options().optimizer.repartition_sorts {
let plan_with_coalesce_partitions =
PlanWithCorrespondingCoalescePartitions::new_default(adjusted.plan);
let parallel = plan_with_coalesce_partitions
.transform_up(&parallelize_sorts).data()
.transform_up(&parallelize_sorts)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
parallel.plan
Expand All @@ -712,14 +714,16 @@ mod tests {
true,
state.config_options(),
)
}).data()
})
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.

let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
sort_pushdown
.transform_down(&pushdown_sorts).data()
.transform_down(&pushdown_sorts)
.data()
.and_then(check_integrity)?;
// TODO: End state payloads will be checked here.
}
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ impl PhysicalOptimizerRule for JoinSelection {
Box::new(hash_join_swap_subrule),
];
let new_plan = plan
.transform_up(&|p| apply_subrules(p, &subrules, config))?
.data;
.transform_up(&|p| apply_subrules(p, &subrules, config))
.data()?;
// Next, we apply another subrule that tries to optimize joins using any
// statistics their inputs might have.
// - For a hash join with partition mode [`PartitionMode::Auto`], we will
Expand Down Expand Up @@ -813,8 +813,8 @@ mod tests_statistical {
Box::new(hash_join_swap_subrule),
];
let new_plan = plan
.transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new()))?
.data;
.transform_up(&|p| apply_subrules(p, &subrules, &ConfigOptions::new()))
.data()?;
// TODO: End state payloads will be checked here.
let config = ConfigOptions::new().optimizer;
let collect_left_threshold = config.hash_join_single_partition_threshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = if config.optimizer.enable_distinct_aggregation_soft_limit {
if config.optimizer.enable_distinct_aggregation_soft_limit {
plan.transform_down(&|plan| {
Ok(
if let Some(plan) =
Expand All @@ -173,12 +173,11 @@ impl PhysicalOptimizerRule for LimitedDistinctAggregation {
Transformed::no(plan)
},
)
})?
.data
})
.data()
} else {
plan
};
Ok(plan)
Ok(plan)
}
}

fn name(&self) -> &str {
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ pub fn remove_unnecessary_projections(
let maybe_unified = try_unifying_projections(projection, child_projection)?;
return if let Some(new_plan) = maybe_unified {
// To unify 3 or more sequential projections:
Ok(Transformed::yes(
remove_unnecessary_projections(new_plan)?.data,
))
remove_unnecessary_projections(new_plan)
.data()
.map(Transformed::yes)
} else {
Ok(Transformed::no(plan))
};
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ use crate::{
logical_expr::Operator,
physical_plan::{ColumnarValue, PhysicalExpr},
};
use arrow::record_batch::RecordBatchOptions;

use arrow::{
array::{new_null_array, ArrayRef, BooleanArray},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
record_batch::{RecordBatch, RecordBatchOptions},
};
use arrow_array::cast::AsArray;
use datafusion_common::tree_node::TransformedResult;
use datafusion_common::{
internal_err, plan_err,
internal_err, plan_datafusion_err, plan_err,
tree_node::{Transformed, TreeNode},
ScalarValue,
};
use datafusion_common::{plan_datafusion_err, ScalarValue};
use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};

use log::trace;

/// A source of runtime statistical information to [`PruningPredicate`]s.
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/src/physical_optimizer/topk_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl PhysicalOptimizerRule for TopKAggregation {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan = if config.optimizer.enable_topk_aggregation {
if config.optimizer.enable_topk_aggregation {
plan.transform_down(&|plan| {
Ok(
if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) {
Expand All @@ -149,12 +149,11 @@ impl PhysicalOptimizerRule for TopKAggregation {
Transformed::no(plan)
},
)
})?
.data
})
.data()
} else {
plan
};
Ok(plan)
Ok(plan)
}
}

fn name(&self) -> &str {
Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,12 @@

use std::sync::Arc;

use arrow::array::{ArrayRef, Int64Array};
use arrow::array::{Array, ArrayRef, AsArray, Int64Array};
use arrow::compute::{concat_batches, SortOptions};
use arrow::datatypes::DataType;
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::Array;
use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::task::JoinSet;

use datafusion::common::Result;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::aggregates::{
Expand All @@ -44,6 +37,11 @@ use datafusion_physical_expr::{AggregateExpr, PhysicalSortExpr};
use datafusion_physical_plan::InputOrderMode;
use test_utils::{add_empty_batches, StringBatchGenerator};

use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use tokio::task::JoinSet;

/// Tests that streaming aggregate and batch (non streaming) aggregate produce
/// same results
#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -316,6 +314,7 @@ async fn verify_ordered_aggregate(frame: &DataFrame, expected_sort: bool) {

impl TreeNodeVisitor for Visitor {
type Node = Arc<dyn ExecutionPlan>;

fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
if let Some(exec) = node.as_any().downcast_ref::<AggregateExec>() {
if self.expected_sort {
Expand Down
Loading

0 comments on commit c944b81

Please sign in to comment.