Skip to content

Commit

Permalink
Provide field and schema metadata missing on distinct aggregations. (#…
Browse files Browse the repository at this point in the history
…12691)

* test(12687): reproducer of missing metadata bug

* fix(12687): minimum change needed to fix the missing metadata
  • Loading branch information
wiedld authored Oct 1, 2024
1 parent 0242767 commit 84ac4f9
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 10 deletions.
25 changes: 16 additions & 9 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::aggregates::{
topk_stream::GroupedTopKAggregateStream,
};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::projection::get_field_metadata;
use crate::windows::get_ordered_partition_by_indices;
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode,
Expand Down Expand Up @@ -795,14 +796,17 @@ fn create_schema(
) -> Result<Schema> {
let mut fields = Vec::with_capacity(group_expr.len() + aggr_expr.len());
for (index, (expr, name)) in group_expr.iter().enumerate() {
fields.push(Field::new(
name,
expr.data_type(input_schema)?,
// In cases where we have multiple grouping sets, we will use NULL expressions in
// order to align the grouping sets. So the field must be nullable even if the underlying
// schema field is not.
group_expr_nullable[index] || expr.nullable(input_schema)?,
))
fields.push(
Field::new(
name,
expr.data_type(input_schema)?,
// In cases where we have multiple grouping sets, we will use NULL expressions in
// order to align the grouping sets. So the field must be nullable even if the underlying
// schema field is not.
group_expr_nullable[index] || expr.nullable(input_schema)?,
)
.with_metadata(get_field_metadata(expr, input_schema).unwrap_or_default()),
)
}

match mode {
Expand All @@ -823,7 +827,10 @@ fn create_schema(
}
}

Ok(Schema::new(fields))
Ok(Schema::new_with_metadata(
fields,
input_schema.metadata().clone(),
))
}

fn group_schema(schema: &Schema, group_count: usize) -> SchemaRef {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl ExecutionPlan for ProjectionExec {

/// If e is a direct column reference, returns the field level
/// metadata for that field, if any. Otherwise returns None
fn get_field_metadata(
pub(crate) fn get_field_metadata(
e: &Arc<dyn PhysicalExpr>,
input_schema: &Schema,
) -> Option<HashMap<String, String>> {
Expand Down
38 changes: 38 additions & 0 deletions datafusion/sqllogictest/test_files/metadata.slt
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,43 @@ WHERE "data"."id" = "samples"."id";
1
3



# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687
query I
select count(distinct name) from table_with_metadata;
----
2

# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687
query I
select approx_median(distinct id) from table_with_metadata;
----
2

# Regression test: prevent field metadata loss per https://github.com/apache/datafusion/issues/12687
statement ok
select array_agg(distinct id) from table_with_metadata;

query I
select distinct id from table_with_metadata order by id;
----
1
3
NULL

query I
select count(id) from table_with_metadata;
----
2

query I
select count(id) cnt from table_with_metadata group by name order by cnt;
----
0
1
1


statement ok
drop table table_with_metadata;

0 comments on commit 84ac4f9

Please sign in to comment.