Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect OFFSET during LIMIT pushdown. #12399

Merged
13 changes: 10 additions & 3 deletions datafusion/physical-optimizer/src/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,14 @@ impl From<LimitExec> for Arc<dyn ExecutionPlan> {
/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
/// an instance of `GlobalRequirements` and modifies these parameters while
/// checking if the limits can be pushed down or not.
///
/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise,
/// return a [`TreeNodeRecursion::Continue`].
pub fn pushdown_limit_helper(
mut pushdown_plan: Arc<dyn ExecutionPlan>,
mut global_state: GlobalRequirements,
) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
// Extract limit, if exist, and return child inputs.
if let Some(limit_exec) = extract_limit(&pushdown_plan) {
// If we have fetch/skip info in the global state already, we need to
// decide which one to continue with:
Expand Down Expand Up @@ -190,7 +194,8 @@ pub fn pushdown_limit_helper(

let skip_and_fetch = Some(global_fetch + global_state.skip);

if pushdown_plan.supports_limit_pushdown() {
let global_skip = global_state.skip;
wiedld marked this conversation as resolved.
Show resolved Hide resolved
if global_skip == 0 && pushdown_plan.supports_limit_pushdown() {
if !combines_input_partitions(&pushdown_plan) {
// We have information in the global state and the plan pushes down,
// continue:
Expand Down Expand Up @@ -223,7 +228,6 @@ pub fn pushdown_limit_helper(
// to add the fetch info and return the plan.

// There's no push down, change fetch & skip to default values:
let global_skip = global_state.skip;
global_state.fetch = None;
global_state.skip = 0;

Expand Down Expand Up @@ -256,21 +260,24 @@ pub(crate) fn pushdown_limits(
pushdown_plan: Arc<dyn ExecutionPlan>,
global_state: GlobalRequirements,
) -> Result<Arc<dyn ExecutionPlan>> {
// Call pushdown_limit_helper.
// This will either extract the limit node (returning the child), or apply the limit pushdown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a good comment to add to the pushdown_limit_helper function as well

let (mut new_node, mut global_state) =
pushdown_limit_helper(pushdown_plan, global_state)?;

// While limits exist, continue combining the global_state.
while new_node.tnr == TreeNodeRecursion::Stop {
(new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
}

// Apply pushdown limits in children
let children = new_node.data.children();
let new_children = children
.into_iter()
.map(|child| {
pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
})
.collect::<Result<_>>()?;

new_node.data.with_new_children(new_children)
}

Expand Down
127 changes: 127 additions & 0 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1196,3 +1196,130 @@ physical_plan
02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true


####################
# Test issue: TBD
####################
wiedld marked this conversation as resolved.
Show resolved Hide resolved

# all results
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc;
----
3 25
2 25
1 0
0 0

# limit only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc LIMIT 3;
----
3 25
2 25
1 0

# offset only
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1;
----
2 25
1 0
0 0

# offset + limit
query II
SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
2 25
1 0

# Applying offset & limit when multiple streams from groupby
query TT
EXPLAIN SELECT b, sum(a) FROM ordered_table GROUP BY b order by b desc OFFSET 1 LIMIT 2;
----
logical_plan
01)Limit: skip=1, fetch=2
02)--Sort: ordered_table.b DESC NULLS FIRST, fetch=3
03)----Aggregate: groupBy=[[ordered_table.b]], aggr=[[sum(CAST(ordered_table.a AS Int64))]]
04)------TableScan: ordered_table projection=[a, b]
physical_plan
01)GlobalLimitExec: skip=1, fetch=2
02)--SortPreservingMergeExec: [b@0 DESC], fetch=3
03)----SortExec: TopK(fetch=3), expr=[b@0 DESC], preserve_partitioning=[true]
04)------AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[sum(ordered_table.a)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([b@0], 2), input_partitions=2
07)------------AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[sum(ordered_table.a)]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true

# Applying offset & limit when multiple streams from union
query TT
explain select * FROM (
select c FROM ordered_table
UNION ALL
select d FROM ordered_table
) order by 1 desc LIMIT 10 OFFSET 4;
----
logical_plan
01)Limit: skip=4, fetch=10
02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14
03)----Union
04)------Projection: CAST(ordered_table.c AS Int64) AS c
05)--------TableScan: ordered_table projection=[c]
06)------Projection: CAST(ordered_table.d AS Int64) AS c
07)--------TableScan: ordered_table projection=[d]
physical_plan
01)GlobalLimitExec: skip=4, fetch=10
02)--SortPreservingMergeExec: [c@0 DESC], fetch=14
03)----UnionExec
04)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
05)--------ProjectionExec: expr=[CAST(c@0 AS Int64) as c]
06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], has_header=true
08)------SortExec: TopK(fetch=14), expr=[c@0 DESC], preserve_partitioning=[true]
09)--------ProjectionExec: expr=[CAST(d@0 AS Int64) as c]
10)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[d], has_header=true

# ApplyingmLIMIT & OFFSET to subquery.
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc;
----
3 98 96
3 98 89
3 98 82
3 98 79
3 97 96
3 97 89
3 97 82
3 97 79
3 96 96
3 96 89
3 96 82
3 96 79
3 95 96
3 95 89
3 95 82
3 95 79

# Apply OFFSET & LIMIT to both parent and child (subquery).
query III
select t1.b, c, c2 FROM (
select b, c FROM ordered_table ORDER BY b desc, c desc OFFSET 1 LIMIT 4
) as t1 INNER JOIN (
select b, c as c2 FROM ordered_table ORDER BY b desc, d desc OFFSET 1 LIMIT 4
) as t2
ON t1.b = t2.b
ORDER BY t1.b desc, c desc, c2 desc
OFFSET 3 LIMIT 2;
----
3 99 82
3 99 79
Loading