Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize CoalesceBatches in limit #11983

Closed
Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 122 additions & 31 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches

use std::sync::Arc;
use std::{any::Any, sync::Arc};

use crate::{
config::ConfigOptions,
Expand All @@ -29,8 +29,15 @@ use crate::{
},
};

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use crate::arrow::util::bit_util::ceil;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::{
aggregates::AggregateExec,
joins::SortMergeJoinExec,
limit::{GlobalLimitExec, LocalLimitExec},
sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
windows::WindowAggExec,
};

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
Expand All @@ -43,45 +50,129 @@ impl CoalesceBatches {
Self::default()
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if !config.execution.coalesce_batches {
return Ok(plan);
}

let target_batch_size = config.execution.batch_size;
plan.transform_up(|plan| {
let plan_any = plan.as_any();
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
#[inline]
fn get_limit(plan: &dyn Any) -> Option<usize> {
if let Some(limit_exec) = plan.downcast_ref::<GlobalLimitExec>() {
limit_exec.fetch().map(|fetch| limit_exec.skip() + fetch)
} else {
plan.downcast_ref::<LocalLimitExec>()
.map(|limit_exec| limit_exec.fetch())
}
}

#[inline]
fn need_scan_all(plan: &dyn Any) -> bool {
plan.downcast_ref::<SortMergeJoinExec>().is_some()
|| plan.downcast_ref::<AggregateExec>().is_some()
|| plan.downcast_ref::<SortExec>().is_some()
|| plan.downcast_ref::<SortPreservingMergeExec>().is_some()
|| plan.downcast_ref::<WindowAggExec>().is_some()
}

#[inline]
fn need_wrap_in_coalesce(plan: &dyn Any) -> bool {
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
// See https://github.com/apache/arrow-datafusion/issues/139
plan.downcast_ref::<FilterExec>().is_some()
|| plan.downcast_ref::<HashJoinExec>().is_some()
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
|| plan
.downcast_ref::<RepartitionExec>()
.map(|repart_exec| {
!matches!(
repart_exec.partitioning().clone(),
Partitioning::RoundRobinBatch(_)
)
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Transformed::yes(Arc::new(CoalesceBatchesExec::new(
plan,
target_batch_size,
))))
} else {
Ok(Transformed::no(plan))
}
.unwrap_or(false)
}

fn wrap_in_coalesce_rewrite_inner(
mut limit: Option<usize>,
partition: usize,
default_batch_size: usize,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
// If the entire table needs to be scanned, the limit at the upper level does not take effect
if need_scan_all(plan.as_any()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should turn this around: allow any approved plan nodes instead of disallowing some.
Otherwise this will be wrong for any added/forgotten nodes or user defined nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I get it. I'll give it a try.

Copy link
Contributor Author

@acking-you acking-you Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should turn this around: allow any approved plan nodes instead of disallowing some. Otherwise this will be wrong for any added/forgotten nodes or user defined nodes.

After carefully considering the revised plan, I realized that identifying operators requiring a full table scan is still necessary regardless of the changes.
Because this optimization is always determined by whether or not there is a full table scan operator and whether or not it contains a limit operator.
Here are the new changes: https://github.com/acking-you/arrow-datafusion/blob/feat/optimize_coalesce_batches/datafusion/core/src/physical_optimizer/coalesce_batches.rs#L101-L134

limit = None
}
let children = plan
.children()
.iter()
.map(|&child| {
// Update to downstream limit
let limit = match get_limit(child.as_any()) {
None => limit,
v => v,
};
wrap_in_coalesce_rewrite_inner(
limit,
partition,
default_batch_size,
child.clone(),
)
})
.data()
.collect::<Result<Vec<_>>>()?;

let mut wrap_in_coalesce = need_wrap_in_coalesce(plan.as_any());

// Take the `limit/partition` as CoalesceBatchesExec's fetch,it will limit maximum number of rows to fetch
let fetch = limit.map(|limit| {
// If limit is small enough, then this optimization is not performed
if limit < partition * 16 {
wrap_in_coalesce = false;
}
ceil(limit, partition)
});

let plan = if children.is_empty() {
plan
} else {
plan.with_new_children(children)?
};

Ok(if wrap_in_coalesce {
Arc::new(CoalesceBatchesExec::new(plan, default_batch_size).with_fetch(fetch))
} else {
plan
})
}

fn wrap_in_coalesce_rewrite(
mut partition: usize,
default_batch_size: usize,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
// The partition is at least 1
if partition == 0 {
partition = 1;
}
wrap_in_coalesce_rewrite_inner(
get_limit(plan.as_any()),
partition,
default_batch_size,
plan,
)
}

impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if !config.execution.coalesce_batches {
return Ok(plan);
}
wrap_in_coalesce_rewrite(
config.execution.target_partitions,
config.execution.batch_size,
plan,
)
}

fn name(&self) -> &str {
Expand Down
Loading