Skip to content

Commit

Permalink
Fix union_schema to merge metadatas for both fields and schema
Browse files Browse the repository at this point in the history
  • Loading branch information
itsjunetime committed Oct 22, 2024
1 parent ec9cf2d commit b229807
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions datafusion/physical-plan/src/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,9 @@ pub fn can_interleave<T: Borrow<Arc<dyn ExecutionPlan>>>(
}

fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
let fields: Vec<Field> = (0..inputs[0].schema().fields().len())
let first_schema = inputs[0].schema();

let fields = (0..first_schema.fields().len())
.map(|i| {
inputs
.iter()
Expand All @@ -477,25 +479,30 @@ fn union_schema(inputs: &[Arc<dyn ExecutionPlan>]) -> SchemaRef {
let field = input.schema().field(i).clone();
let mut metadata = field.metadata().clone();

let other_side_metdata = inputs
.get(input_idx ^ (1 << 0))
.map(|other_input| {
other_input.schema().field(i).metadata().clone()
})
.unwrap_or_default();
let other_metadatas = inputs
.iter()
.enumerate()
.filter(|(other_idx, _)| *other_idx != input_idx)
.flat_map(|(_, other_input)| {
other_input.schema().field(i).metadata().clone().into_iter()
});

metadata.extend(other_side_metdata);
metadata.extend(other_metadatas);
field.with_metadata(metadata)
})
.find_or_first(|f| f.is_nullable())
.find_or_first(Field::is_nullable)
// We can unwrap this because if inputs was empty, this would've already panic'ed when we
// indexed into inputs[0].
.unwrap()
})
.collect::<Vec<_>>();

let all_metadata_merged = inputs
.iter()
.flat_map(|i| i.schema().metadata().clone().into_iter())
.collect();

Arc::new(Schema::new_with_metadata(
fields,
inputs[0].schema().metadata().clone(),
))
Arc::new(Schema::new_with_metadata(fields, all_metadata_merged))
}

/// CombinedRecordBatchStream can be used to combine a Vec of SendableRecordBatchStreams into one
Expand Down

0 comments on commit b229807

Please sign in to comment.