Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: Do not return empty record batches from streams #13794

Merged
merged 17 commits into from
Dec 18, 2024

Conversation

mertak-synnada
Copy link
Contributor

@mertak-synnada mertak-synnada commented Dec 16, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

Some plans generate and emit empty RecordBatches. Since this behavior is unnecessary and incurs costs during stream execution, this change implements a solution for handling them in the streaming process.

The main intention is, to avoid returning empty RecordBatches, acting them as technical debt, and replacing them with Option<RecordBatch> signatures, to reflect the behavior in design.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the physical-expr Physical Expressions label Dec 16, 2024
None => self.compute_aggregates(),
};
None => {
let batch = concat_batches(&self.input.schema(), &self.batches)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change moves concat_batches out of elapsed_compute

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should fix this prior to merge

@alamb alamb changed the title Chore: Remove empty batches Chore: do not return empty record batches from Aggregate operators Dec 16, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mertak-synnada -- this looks like a good change to me

I left some small style suggestions. My only real concern with this PR is that there are no tests and thus someone may change this behavior in the future and we likely wouldn't catch it in code review.

Thus, I would recommend creating some sort of unit test that would fail if the AggregateExec is changed so that it generates empty RecordBatches again.

Comment on lines 658 to 662
let Some(batch) = extract_ok!(self.emit(to_emit, false))
else {
break 'reading_input;
};
self.exec_state = ExecutionState::ProducingOutput(batch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think I would find this easier to read if it it avoid a redundant break:

Suggested change
let Some(batch) = extract_ok!(self.emit(to_emit, false))
else {
break 'reading_input;
};
self.exec_state = ExecutionState::ProducingOutput(batch);
if let let Some(batch) = extract_ok!(self.emit(to_emit, false)) {
self.exec_state = ExecutionState::ProducingOutput(batch);
}

timer.done();
let Some(batch) = extract_ok!(self.emit(to_emit, false))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above

let schema = if spilling {
Arc::clone(&self.spill_state.spill_schema)
} else {
self.schema()
};
if self.group_values.is_empty() {
return Ok(RecordBatch::new_empty(schema));
return Ok(None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -937,7 +944,8 @@ impl GroupedHashAggregateStream {
// over the target memory size after emission, we can emit again rather than returning Err.
let _ = self.update_memory_reservation();
let batch = RecordBatch::try_new(schema, output)?;
Ok(batch)
debug_assert!(batch.num_rows() > 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be good to document in comments somewhere the expectation / behavior that no empty record batches are produced

@github-actions github-actions bot added the core Core DataFusion crate label Dec 17, 2024
@mertak-synnada
Copy link
Contributor Author

Thanks for the review, @alamb and @Dandandan! I tried to implement your suggestions, including adding some unit tests and assertions for some existing tests. I hope it's in a better state now.

Copy link
Contributor

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM in general, left some minor feedback

@@ -768,6 +776,9 @@ impl Stream for GroupedHashAggregateStream {
let output = batch.slice(0, size);
(ExecutionState::ProducingOutput(remaining), output)
};
// Empty record batches should not be emitted.
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
// They need to be treated as [`Option<RecordBatch>`]es and handled separately

@@ -407,6 +407,9 @@ impl PartialSortStream {
self.is_closed = true;
}
}
// Empty record batches should not be emitted.
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
// They need to be treated as [`Option<RecordBatch>`]es and handled separately

self.metrics.output_rows.add(result_batch.num_rows());

// Empty record batches should not be emitted.
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
// They need to be treated as [`Option<RecordBatch>`]es and handled separately

Comment on lines 581 to 590
let temp_result = list_unnest_at_level(
input,
list_type_columns,
&mut temp_unnested_result,
depth,
options,
)?;
if num_rows == 0 {
return Ok(RecordBatch::new_empty(Arc::clone(schema)));
}
let Some(temp_result) = temp_result else {
return Ok(None);
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the more succinct

                let Some(temp_result) = list_unnest_at_level(
                    input,
                    list_type_columns,
                    &mut temp_unnested_result,
                    depth,
                    options,
                )? else {
                    continue;
                };

Comment on lines 990 to 993
let batch = self.compute_aggregates()?;
if let Some(batch) = batch {
return Poll::Ready(Some(Ok(batch)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we say

Suggested change
let batch = self.compute_aggregates()?;
if let Some(batch) = batch {
return Poll::Ready(Some(Ok(batch)));
}
if let Some(batch) = self.compute_aggregates()? {
return Poll::Ready(Some(Ok(batch)));
}

Comment on lines 1002 to 1005
let batch = self.compute_aggregates()?;
if let Some(batch) = batch {
return Poll::Ready(Some(Ok(batch)));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we say

Suggested change
let batch = self.compute_aggregates()?;
if let Some(batch) = batch {
return Poll::Ready(Some(Ok(batch)));
}
if let Some(batch) = self.compute_aggregates()? {
return Poll::Ready(Some(Ok(batch)));
}

Comment on lines 392 to 394
let Some(result) = maybe_batch else {
return Poll::Ready(None);
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this go inside the None arm of the match above?


return Poll::Ready(Some(result));
// Empty record batches should not be emitted.
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// They need to be treated as [`Option<RecordBatch>`]es and handle separately
// They need to be treated as [`Option<RecordBatch>`]es and handled separately

@mertak-synnada mertak-synnada changed the title Chore: do not return empty record batches from Aggregate operators Chore: Do not return empty record batches from streams Dec 18, 2024
@ozankabak
Copy link
Contributor

I will go ahead and merge this in the next few hours unless new feedback arrives, since the reviews has been addressed and it is a fairly straightforward change

@ozankabak ozankabak merged commit 63ce486 into apache:main Dec 18, 2024
26 checks passed
@alamb
Copy link
Contributor

alamb commented Dec 18, 2024

Thank you everyone!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants