Skip to content

Commit

Permalink
Merge branch 'apache:main' into add-least
Browse files Browse the repository at this point in the history
  • Loading branch information
rluvaton authored Dec 20, 2024
2 parents a28d425 + b0d7cd0 commit b82ee38
Show file tree
Hide file tree
Showing 72 changed files with 1,567 additions and 529 deletions.
14 changes: 12 additions & 2 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ use crate::{
};

use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::common::{plan_datafusion_err, plan_err};
use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
Expand Down Expand Up @@ -234,10 +235,19 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

if physical_plan.execution_mode().is_unbounded() {
if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
"The given query can generate a valid result only once \
the source finishes, but the source is unbounded"
);
}
// As the input stream comes, we can generate results.
// However, memory safety is not guaranteed.
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
// Bounded stream; collected results are printed after all input consumed.
let schema = physical_plan.schema();
let results = collect(physical_plan, task_ctx.clone()).await?;
adjusted.into_inner().print_batches(schema, &results, now)?;
Expand Down
8 changes: 5 additions & 3 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
Partitioning, PlanProperties, SendableRecordBatchStream,
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream,
};
use datafusion::prelude::*;

Expand Down Expand Up @@ -214,7 +215,8 @@ impl CustomExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
Expand Down
25 changes: 17 additions & 8 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,30 @@ use crate::{DataFusionError, Result};
/// Join type
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
pub enum JoinType {
/// Inner Join
/// Inner Join - Returns only rows where there is a matching value in both tables based on the join condition.
/// For example, if joining table A and B on A.id = B.id, only rows where A.id equals B.id will be included.
/// All columns from both tables are returned for the matching rows. Non-matching rows are excluded entirely.
Inner,
/// Left Join
/// Left Join - Returns all rows from the left table and matching rows from the right table.
/// If no match, NULL values are returned for columns from the right table.
Left,
/// Right Join
/// Right Join - Returns all rows from the right table and matching rows from the left table.
/// If no match, NULL values are returned for columns from the left table.
Right,
/// Full Join
/// Full Join (also called Full Outer Join) - Returns all rows from both tables, matching rows where possible.
/// When a row from either table has no match in the other table, the missing columns are filled with NULL values.
/// For example, if table A has row X with no match in table B, the result will contain row X with NULL values for all of table B's columns.
/// This join type preserves all records from both tables, making it useful when you need to see all data regardless of matches.
Full,
/// Left Semi Join
/// Left Semi Join - Returns rows from the left table that have matching rows in the right table.
/// Only columns from the left table are returned.
LeftSemi,
/// Right Semi Join
/// Right Semi Join - Returns rows from the right table that have matching rows in the left table.
/// Only columns from the right table are returned.
RightSemi,
/// Left Anti Join
/// Left Anti Join - Returns rows from the left table that do not have a matching row in the right table.
LeftAnti,
/// Right Anti Join
/// Right Anti Join - Returns rows from the right table that do not have a matching row in the left table.
RightAnti,
/// Left Mark join
///
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::Statistics;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::{ExecutionMode, PlanProperties};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion_physical_plan::PlanProperties;

use futures::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -97,7 +98,8 @@ impl ArrowExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}

Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ use super::FileScanConfig;
use crate::error::Result;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
};

use arrow::datatypes::SchemaRef;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

/// Execution plan for scanning Avro data source
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -81,7 +82,8 @@ impl AvroExec {
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(n_partitions), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}
}
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
};

use arrow::csv;
Expand All @@ -43,6 +43,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};

use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
use object_store::{GetOptions, GetResultPayload, ObjectStore};
Expand Down Expand Up @@ -327,7 +328,8 @@ impl CsvExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}

Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ use crate::datasource::physical_plan::FileMeta;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties,
Partitioning, PlanProperties, SendableRecordBatchStream, Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
};

use arrow::json::ReaderBuilder;
use arrow::{datatypes::SchemaRef, json};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

use futures::{StreamExt, TryStreamExt};
use object_store::buffered::BufWriter;
Expand Down Expand Up @@ -107,7 +108,8 @@ impl NdJsonExec {
PlanProperties::new(
eq_properties,
Self::output_partitioning_helper(file_scan_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}

Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ use crate::{
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
},
};

use arrow::datatypes::SchemaRef;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};

use itertools::Itertools;
use log::debug;
Expand Down Expand Up @@ -654,13 +655,11 @@ impl ParquetExec {
orderings: &[LexOrdering],
file_config: &FileScanConfig,
) -> PlanProperties {
// Equivalence Properties
let eq_properties = EquivalenceProperties::new_with_orderings(schema, orderings);

PlanProperties::new(
eq_properties,
EquivalenceProperties::new_with_orderings(schema, orderings),
Self::output_partitioning_helper(file_config), // Output Partitioning
ExecutionMode::Bounded, // Execution Mode
EmissionType::Incremental,
Boundedness::Bounded,
)
}

Expand Down
15 changes: 11 additions & 4 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ use datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;

/// The `EnforceDistribution` rule ensures that distribution requirements are
Expand Down Expand Up @@ -1161,12 +1162,17 @@ fn ensure_distribution(
let should_use_estimates = config
.execution
.use_row_number_estimates_to_optimize_partitioning;
let is_unbounded = dist_context.plan.execution_mode().is_unbounded();
let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
&& matches!(
dist_context.plan.pipeline_behavior(),
EmissionType::Incremental | EmissionType::Both
);
// Use order preserving variants either of the conditions true
// - it is desired according to config
// - when plan is unbounded
// - when it is pipeline friendly (can incrementally produce results)
let order_preserving_variants_desirable =
is_unbounded || config.optimizer.prefer_existing_sort;
unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;

// Remove unnecessary repartition from the physical plan if any
let DistributionContext {
Expand Down Expand Up @@ -1459,7 +1465,8 @@ pub(crate) mod tests {
PlanProperties::new(
input.equivalence_properties().clone(), // Equivalence Properties
input.output_partitioning().clone(), // Output Partitioning
input.execution_mode(), // Execution Mode
input.pipeline_behavior(), // Pipeline Behavior
input.boundedness(), // Boundedness
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fn replace_with_partial_sort(
let plan_any = plan.as_any();
if let Some(sort_plan) = plan_any.downcast_ref::<SortExec>() {
let child = Arc::clone(sort_plan.children()[0]);
if !child.execution_mode().is_unbounded() {
if !child.boundedness().is_unbounded() {
return Ok(plan);
}

Expand Down
28 changes: 19 additions & 9 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::execution_plan::EmissionType;

/// The [`JoinSelection`] rule tries to modify a given plan so that it can
/// accommodate infinite sources and optimize joins in the plan according to
Expand Down Expand Up @@ -516,7 +517,8 @@ fn statistical_join_selection_subrule(
pub type PipelineFixerSubrule =
dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn ExecutionPlan>>;

/// Converts a hash join to a symmetric hash join in the case of infinite inputs on both sides.
/// Converts a hash join to a symmetric hash join if both its inputs are
/// unbounded and incremental.
///
/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
Expand All @@ -537,10 +539,18 @@ fn hash_join_convert_symmetric_subrule(
) -> Result<Arc<dyn ExecutionPlan>> {
// Check if the current plan node is a HashJoinExec.
if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
let left_unbounded = hash_join.left.execution_mode().is_unbounded();
let right_unbounded = hash_join.right.execution_mode().is_unbounded();
// Process only if both left and right sides are unbounded.
if left_unbounded && right_unbounded {
let left_unbounded = hash_join.left.boundedness().is_unbounded();
let left_incremental = matches!(
hash_join.left.pipeline_behavior(),
EmissionType::Incremental | EmissionType::Both
);
let right_unbounded = hash_join.right.boundedness().is_unbounded();
let right_incremental = matches!(
hash_join.right.pipeline_behavior(),
EmissionType::Incremental | EmissionType::Both
);
// Process only if both left and right sides are unbounded and incrementally emit.
if left_unbounded && right_unbounded & left_incremental & right_incremental {
// Determine the partition mode based on configuration.
let mode = if config_options.optimizer.repartition_joins {
StreamJoinPartitionMode::Partitioned
Expand Down Expand Up @@ -669,8 +679,8 @@ fn hash_join_swap_subrule(
_config_options: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
if hash_join.left.execution_mode().is_unbounded()
&& !hash_join.right.execution_mode().is_unbounded()
if hash_join.left.boundedness().is_unbounded()
&& !hash_join.right.boundedness().is_unbounded()
&& matches!(
*hash_join.join_type(),
JoinType::Inner
Expand Down Expand Up @@ -2025,12 +2035,12 @@ mod hash_join_tests {
assert_eq!(
(
t.case.as_str(),
if left.execution_mode().is_unbounded() {
if left.boundedness().is_unbounded() {
SourceType::Unbounded
} else {
SourceType::Bounded
},
if right.execution_mode().is_unbounded() {
if right.boundedness().is_unbounded() {
SourceType::Unbounded
} else {
SourceType::Bounded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;

use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::Transformed;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion_physical_plan::execution_plan::EmissionType;
use datafusion_physical_plan::tree_node::PlanContext;
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;

/// For a given `plan`, this object carries the information one needs from its
Expand Down Expand Up @@ -246,7 +247,8 @@ pub(crate) fn replace_with_order_preserving_variants(
// For unbounded cases, we replace with the order-preserving variant in any
// case, as doing so helps fix the pipeline. Also replace if config allows.
let use_order_preserving_variant = config.optimizer.prefer_existing_sort
|| !requirements.plan.execution_mode().pipeline_friendly();
|| (requirements.plan.boundedness().is_unbounded()
&& requirements.plan.pipeline_behavior() == EmissionType::Final);

// Create an alternate plan with order-preserving variants:
let mut alternate_plan = plan_with_order_preserving_variants(
Expand Down
Loading

0 comments on commit b82ee38

Please sign in to comment.