Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect OFFSET during LIMIT pushdown. #12399

Merged

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Sep 9, 2024

Which issue does this PR close?

Fixes #12423

First commit demonstrates the bug.

Rationale for this change

First commit demonstrates the current, incorrect behavior where the offset is not applied correctly during limit pushdown.

Followup commits add the fix, as well as a few doc comments.

What changes are included in this PR?

Slight change in offset handling during one of the helper functions with the limit pushdown.
Also added some docs to help explain existing code.

Are these changes tested?

Yes.

Are there any user-facing changes?

No.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Sep 9, 2024
@wiedld wiedld force-pushed the iox-12102/fix-limit-pushdown-with-offsey branch from fd0f609 to b6fd751 Compare September 9, 2024 18:23
@alamb
Copy link
Contributor

alamb commented Sep 9, 2024

cc @itsjunetime

@github-actions github-actions bot added the optimizer Optimizer rules label Sep 9, 2024
@wiedld
Copy link
Contributor Author

wiedld commented Sep 9, 2024

@mertak-synnada this is a fix to what we believe is a bug from this (very excellent) change. We would appreciate your review 🙏🏼 .

@wiedld wiedld marked this pull request as ready for review September 9, 2024 21:34
Copy link
Contributor

@itsjunetime itsjunetime left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may negate some of the performance improvements gained by the initial PR that introduced these bugs, but I think once this is merged, we can refactor the pushdown_limit_helper function slightly to keep the correct behavior while pulling in the improvements again. I think it's just more important to get a fix merged first since this did break existing behavior.

@wiedld wiedld force-pushed the iox-12102/fix-limit-pushdown-with-offsey branch from d125bc2 to 34b94f0 Compare September 9, 2024 22:03
@alamb
Copy link
Contributor

alamb commented Sep 10, 2024

I have filed #12423 to track this issue and updated this PR description

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wiedld and @itsjunetime and @mertak-synnada

I think this code is looking good to me. I had a few suggestions about comments, but the code and testing seems 👍 to me

datafusion/sqllogictest/test_files/limit.slt Show resolved Hide resolved
datafusion/sqllogictest/test_files/limit.slt Show resolved Hide resolved
datafusion/sqllogictest/test_files/limit.slt Show resolved Hide resolved
@@ -256,21 +265,24 @@ pub(crate) fn pushdown_limits(
pushdown_plan: Arc<dyn ExecutionPlan>,
global_state: GlobalRequirements,
) -> Result<Arc<dyn ExecutionPlan>> {
// Call pushdown_limit_helper.
// This will either extract the limit node (returning the child), or apply the limit pushdown.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be a good comment to add to the pushdown_limit_helper function as well

datafusion/sqllogictest/test_files/limit.slt Outdated Show resolved Hide resolved
@alamb
Copy link
Contributor

alamb commented Sep 11, 2024

I am going to make the comment suggestions to this PR so we can merge it in.

@alamb
Copy link
Contributor

alamb commented Sep 11, 2024

I think this may negate some of the performance improvements gained by the initial PR that introduced these bugs, but I think once this is merged, we can refactor the pushdown_limit_helper function slightly to keep the correct behavior while pulling in the improvements again

@itsjunetime I wonder if you can elaborate on this or file a ticket. In order to avoid the final GlobalLimitExec I believe we would have to add offset support into SortPreservingMerge and Sort(TopK) -- which we could do, but I think the benefit might be relatively low

The implementation of Limit is pretty straightforward:

pub struct LimitStream {
/// The remaining number of rows to skip
skip: usize,
/// The remaining number of rows to produce
fetch: usize,
/// The input to read from. This is set to None once the limit is
/// reached to enable early termination
input: Option<SendableRecordBatchStream>,
/// Copy of the input schema
schema: SchemaRef,
/// Execution time metrics
baseline_metrics: BaselineMetrics,
}
impl LimitStream {
pub fn new(
input: SendableRecordBatchStream,
skip: usize,
fetch: Option<usize>,
baseline_metrics: BaselineMetrics,
) -> Self {
let schema = input.schema();
Self {
skip,
fetch: fetch.unwrap_or(usize::MAX),
input: Some(input),
schema,
baseline_metrics,
}
}
fn poll_and_skip(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let input = self.input.as_mut().unwrap();
loop {
let poll = input.poll_next_unpin(cx);
let poll = poll.map_ok(|batch| {
if batch.num_rows() <= self.skip {
self.skip -= batch.num_rows();
RecordBatch::new_empty(input.schema())
} else {
let new_batch = batch.slice(self.skip, batch.num_rows() - self.skip);
self.skip = 0;
new_batch
}
});
match &poll {
Poll::Ready(Some(Ok(batch))) => {
if batch.num_rows() > 0 {
break poll;
} else {
// continue to poll input stream
}
}
Poll::Ready(Some(Err(_e))) => break poll,
Poll::Ready(None) => break poll,
Poll::Pending => break poll,
}
}
}

@mertak-synnada
Copy link
Contributor

I think this may negate some of the performance improvements gained by the initial PR that introduced these bugs, but I think once this is merged, we can refactor the pushdown_limit_helper function slightly to keep the correct behavior while pulling in the improvements again

@itsjunetime I wonder if you can elaborate on this or file a ticket. In order to avoid the final GlobalLimitExec I believe we would have to add offset support into SortPreservingMerge and Sort(TopK) -- which we could do, but I think the benefit might be relatively low

The implementation of Limit is pretty straightforward:

pub struct LimitStream {
/// The remaining number of rows to skip
skip: usize,
/// The remaining number of rows to produce
fetch: usize,
/// The input to read from. This is set to None once the limit is
/// reached to enable early termination
input: Option<SendableRecordBatchStream>,
/// Copy of the input schema
schema: SchemaRef,
/// Execution time metrics
baseline_metrics: BaselineMetrics,
}
impl LimitStream {
pub fn new(
input: SendableRecordBatchStream,
skip: usize,
fetch: Option<usize>,
baseline_metrics: BaselineMetrics,
) -> Self {
let schema = input.schema();
Self {
skip,
fetch: fetch.unwrap_or(usize::MAX),
input: Some(input),
schema,
baseline_metrics,
}
}
fn poll_and_skip(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<RecordBatch>>> {
let input = self.input.as_mut().unwrap();
loop {
let poll = input.poll_next_unpin(cx);
let poll = poll.map_ok(|batch| {
if batch.num_rows() <= self.skip {
self.skip -= batch.num_rows();
RecordBatch::new_empty(input.schema())
} else {
let new_batch = batch.slice(self.skip, batch.num_rows() - self.skip);
self.skip = 0;
new_batch
}
});
match &poll {
Poll::Ready(Some(Ok(batch))) => {
if batch.num_rows() > 0 {
break poll;
} else {
// continue to poll input stream
}
}
Poll::Ready(Some(Err(_e))) => break poll,
Poll::Ready(None) => break poll,
Poll::Pending => break poll,
}
}
}

I believe @itsjunetime mentioned for the first commit, but after my change suggestion, the gains should be preserved, imo.

@alamb alamb merged commit 9025c1c into apache:main Sep 11, 2024
24 checks passed
@alamb
Copy link
Contributor

alamb commented Sep 11, 2024

Thanks everyone for your help getting this done!

@alamb alamb deleted the iox-12102/fix-limit-pushdown-with-offsey branch September 11, 2024 19:56
itsjunetime pushed a commit to influxdata/arrow-datafusion that referenced this pull request Sep 23, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
wiedld added a commit to influxdata/arrow-datafusion that referenced this pull request Sep 26, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
appletreeisyellow pushed a commit to influxdata/arrow-datafusion that referenced this pull request Oct 1, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
appletreeisyellow pushed a commit to influxdata/arrow-datafusion that referenced this pull request Oct 1, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
wiedld added a commit to influxdata/arrow-datafusion that referenced this pull request Oct 4, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
wiedld added a commit to influxdata/arrow-datafusion that referenced this pull request Oct 4, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
wiedld added a commit to influxdata/arrow-datafusion that referenced this pull request Oct 9, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
wiedld added a commit to influxdata/arrow-datafusion that referenced this pull request Oct 15, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
wiedld added a commit to influxdata/arrow-datafusion that referenced this pull request Oct 16, 2024
* test: demonstrate offset not applied correctly with limit pushdown on multiple input streams

* fix: do not pushdown when skip is applied

* test: update tests after fix

* chore: more doc cleanup

* chore: move LIMIT+OFFSET tests to proper sqllogic test case

* refactor: add global limit back (if there is a skip) during limit pushdown

* Apply suggestions from code review

* Add comment explaining why

---------

Co-authored-by: Andrew Lamb <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incorrectly losing OFFSETs for LIMIT queries
4 participants