From 9e76369a2bd44b2ca339b14eb234b1597812424d Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 27 Jul 2024 19:00:19 +0200 Subject: [PATCH 01/14] Initial commit --- .../src/expr/visitors/expression_evaluator.rs | 3 +- .../visitors/inclusive_metrics_evaluator.rs | 3 +- .../src/expr/visitors/inclusive_projection.rs | 15 +- crates/iceberg/src/spec/partition.rs | 743 +++++++++++++++++- crates/iceberg/src/spec/table_metadata.rs | 16 +- 5 files changed, 737 insertions(+), 43 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 3700a9b3d..f7e2d9003 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -282,7 +282,8 @@ mod tests { .field_id(1) .transform(Transform::Identity) .build()]) - .build() + .unwrap() + .build(&schema) .unwrap(); Ok((Arc::new(schema), Arc::new(spec))) diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index 430ebfc1a..b96258f91 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -1664,7 +1664,8 @@ mod test { .field_id(1) .transform(Transform::Identity) .build()]) - .build() + .unwrap() + .build(&table_schema_ref) .unwrap(); let partition_spec_ref = Arc::new(partition_spec); (table_schema_ref, partition_spec_ref) diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 9cfbb4fd8..2605726af 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -268,7 +268,8 @@ mod tests { let partition_spec = PartitionSpec::builder() .with_spec_id(1) .with_fields(vec![]) - .build() + .unwrap() + .build(&schema) .unwrap(); let arc_schema = Arc::new(schema); @@ -304,7 +305,8 @@ mod tests { .field_id(1) .transform(Transform::Identity) .build()]) - .build() + .unwrap() + .build(&schema) .unwrap(); let arc_schema = Arc::new(schema); @@ -352,7 +354,8 @@ mod tests { .transform(Transform::Day) .build(), ]) - .build() + .unwrap() + .build(&schema) .unwrap(); let arc_schema = Arc::new(schema); @@ -386,7 +389,8 @@ mod tests { .field_id(3) .transform(Transform::Truncate(4)) .build()]) - .build() + .unwrap() + .build(&schema) .unwrap(); let arc_schema = Arc::new(schema); @@ -423,7 +427,8 @@ mod tests { .field_id(1) .transform(Transform::Bucket(7)) .build()]) - .build() + .unwrap() + .build(&schema) .unwrap(); let arc_schema = Arc::new(schema); diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index f1244e4e9..a9d72bfbf 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -25,7 +25,7 @@ use typed_builder::TypedBuilder; use super::transform::Transform; use super::{NestedField, Schema, StructType}; -use crate::{Error, ErrorKind}; +use crate::{Error, ErrorKind, Result}; /// Reference to [`PartitionSpec`]. pub type PartitionSpecRef = Arc; @@ -44,15 +44,25 @@ pub struct PartitionField { pub transform: Transform, } +impl PartitionField { + /// To unbound partition field + pub fn to_unbound(&self) -> UnboundPartitionField { + UnboundPartitionField { + source_id: self.source_id, + partition_id: Some(self.field_id), + name: self.name.clone(), + transform: self.transform.clone(), + } + } +} + /// Partition spec that defines how to produce a tuple of partition values from a record. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] #[serde(rename_all = "kebab-case")] -#[builder(setter(prefix = "with"))] pub struct PartitionSpec { /// Identifier for PartitionSpec pub spec_id: i32, /// Details of the partition spec - #[builder(setter(each(name = "with_partition_field")))] pub fields: Vec, } @@ -74,7 +84,7 @@ impl PartitionSpec { } /// Returns the partition type of this partition spec. - pub fn partition_type(&self, schema: &Schema) -> Result { + pub fn partition_type(&self, schema: &Schema) -> Result { let mut fields = Vec::with_capacity(self.fields.len()); for partition_field in &self.fields { let field = schema @@ -96,6 +106,23 @@ impl PartitionSpec { } Ok(StructType::new(fields)) } + + /// Turn this partition spec into an unbound partition spec. + pub fn to_unbound(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { + spec_id: Some(self.spec_id), + fields: self + .fields + .into_iter() + .map(|f| UnboundPartitionField { + source_id: f.source_id, + partition_id: Some(f.field_id), + name: f.name, + transform: f.transform, + }) + .collect(), + } + } } /// Reference to [`UnboundPartitionSpec`]. @@ -136,6 +163,286 @@ impl UnboundPartitionSpec { } } +/// Create valid partition specs for a given schema. +#[derive(Debug, Default)] +pub struct PartitionSpecBuilder { + spec_id: i32, + last_assigned_field_id: i32, + fields: Vec, +} + +impl PartitionSpecBuilder { + pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; + + /// Create a new partition spec builder with the given schema. + pub fn new() -> Self { + Self { + spec_id: 0, + fields: vec![], + last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, + } + } + + /// Accessor to the current last assigned field id. + pub fn last_assigned_field_id(&self) -> i32 { + self.last_assigned_field_id + } + + /// Set the last assigned field id for the partition spec. + /// This is useful when re-binding partition specs. + pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self { + self.last_assigned_field_id = last_assigned_field_id; + self + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = spec_id; + self + } + + /// Add a new partition field to the partition spec. + pub fn with_partition_field(mut self, field: PartitionField) -> Result { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + self.check_partition_id_unique(field.field_id)?; + + self.fields.push(field); + Ok(self) + } + + /// Add a new partition field to the partition spec. + /// Field ID is auto assigned if not set. + pub fn with_unbound_partition_field(mut self, field: UnboundPartitionField) -> Result { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + + let partition_id = if let Some(partition_id) = field.partition_id { + self.last_assigned_field_id = std::cmp::max(self.last_assigned_field_id, partition_id); + partition_id + } else { + self.increment_and_get_next_field_id()? + }; + + let field = PartitionField { + source_id: field.source_id, + field_id: partition_id, + name: field.name, + transform: field.transform, + }; + + self.fields.push(field); + Ok(self) + } + + /// Add multiple unbound partition fields to the partition spec. + /// Field IDs are auto assigned if not set. + pub fn with_unbound_fields( + self, + fields: impl IntoIterator, + ) -> Result { + let mut builder = self; + for field in fields { + builder = builder.with_unbound_partition_field(field)?; + } + Ok(builder) + } + + /// Add multiple partition fields to the partition spec. + pub fn with_fields(self, fields: impl IntoIterator) -> Result { + let mut builder = self; + for field in fields { + builder = builder.with_partition_field(field)?; + } + Ok(builder) + } + + /// Build the unbound partition spec. + pub fn build_unbound(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { + spec_id: Some(self.spec_id), + fields: self.fields.into_iter().map(|f| f.to_unbound()).collect(), + } + } + + /// Build a bound partition spec with the given schema. + pub fn build(self, schema: &Schema) -> Result { + let mut fields = Vec::with_capacity(self.fields.len()); + for field in self.fields { + Self::check_name_does_not_collide_with_schema(&field, schema)?; + Self::check_transform_compatibility(&field, schema)?; + fields.push(PartitionField { + source_id: field.source_id, + field_id: field.field_id, + name: field.name, + transform: field.transform, + }); + } + Ok(PartitionSpec { + spec_id: self.spec_id, + fields, + }) + } + + /// Build a partition spec without validating it against a schema. + /// This can lead to invalid partition specs. Use with caution. + pub fn build_unchecked(self) -> PartitionSpec { + PartitionSpec { + spec_id: self.spec_id, + fields: self.fields, + } + } + + /// Ensure that the partition name is unique among the partition fields and is not empty. + fn check_name_set_and_unique(&self, name: &str) -> Result<()> { + if name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot use empty partition name", + )); + } + + if self.partition_names().contains(name) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot use partition name more than once: {}", name), + )); + } + Ok(()) + } + + /// Check field / partition_id unique within the partition spec if set + fn check_partition_id_unique(&self, field_id: i32) -> Result<()> { + if self.fields.iter().any(|f| f.field_id == field_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot use field id more than once in one PartitionSpec: {}", + field_id + ), + )); + } + + Ok(()) + } + + /// Ensure that the partition name is unique among columns in the schema. + /// Duplicate names are allowed if: + /// 1. The column is sourced from the column with the same name. + /// 2. AND the transformation is identity + fn check_name_does_not_collide_with_schema( + field: &PartitionField, + schema: &Schema, + ) -> Result<()> { + match schema.field_by_name(field.name.as_str()) { + Some(schema_collision) => { + if field.transform == Transform::Identity { + if schema_collision.id == field.source_id { + Ok(()) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot create identity partition sourced from different field in schema. Field name '{}' has id `{}` in schema but partition source id is `{}`", + field.name, schema_collision.id, field.source_id + ), + )) + } + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot create partition with name: '{}' that conflicts with schema field and is not an identity transform.", + field.name + ), + )) + } + } + None => Ok(()), + } + } + + /// For a single source-column transformations must be unique. + fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> { + let collision = self.fields.iter().find(|f| { + f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name() + }); + + if let Some(collision) = collision { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`", + source_id, transform.dedup_name(), collision.name + ), + )) + } else { + Ok(()) + } + } + + /// Ensure that the transformation of the field is compatible with type of the field + /// in the schema. Implicitly also checks if the source field exists in the schema. + fn check_transform_compatibility(field: &PartitionField, schema: &Schema) -> Result<()> { + let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find partition source field with id `{}` in schema", + field.source_id + ), + ) + })?; + + if field.transform != Transform::Void { + if !schema_field.field_type.is_primitive() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot partition by non-primitive source field: '{}'.", + schema_field.field_type + ), + )); + } + + if field + .transform + .result_type(&schema_field.field_type) + .is_err() + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid source type: '{}' for transform: '{}'.", + schema_field.field_type, + field.transform.dedup_name() + ), + )); + } + } + + Ok(()) + } + + fn increment_and_get_next_field_id(&mut self) -> Result { + if self.last_assigned_field_id == i32::MAX { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot assign more partition fields. Field id overflow.", + )); + } + self.last_assigned_field_id += 1; + Ok(self.last_assigned_field_id) + } + + fn partition_names(&self) -> std::collections::HashSet<&str> { + self.fields.iter().map(|f| f.name.as_str()).collect() + } +} + #[cfg(test)] mod tests { use super::*; @@ -184,36 +491,28 @@ mod tests { #[test] fn test_is_unpartitioned() { - let partition_spec = PartitionSpec::builder() - .with_spec_id(1) - .with_fields(vec![]) - .build() - .unwrap(); + let partition_spec = PartitionSpec::builder().with_spec_id(1).build_unchecked(); assert!( partition_spec.is_unpartitioned(), "Empty partition spec should be unpartitioned" ); let partition_spec = PartitionSpec::builder() - .with_partition_field( - PartitionField::builder() + .with_unbound_fields(vec![ + UnboundPartitionField::builder() .source_id(1) - .field_id(1) .name("id".to_string()) .transform(Transform::Identity) .build(), - ) - .with_partition_field( - PartitionField::builder() + UnboundPartitionField::builder() .source_id(2) - .field_id(2) .name("name".to_string()) .transform(Transform::Void) .build(), - ) + ]) + .unwrap() .with_spec_id(1) - .build() - .unwrap(); + .build_unchecked(); assert!( !partition_spec.is_unpartitioned(), "Partition spec with one non void transform should not be unpartitioned" @@ -221,24 +520,20 @@ mod tests { let partition_spec = PartitionSpec::builder() .with_spec_id(1) - .with_partition_field( - PartitionField::builder() + .with_unbound_fields(vec![ + UnboundPartitionField::builder() .source_id(1) - .field_id(1) .name("id".to_string()) .transform(Transform::Void) .build(), - ) - .with_partition_field( - PartitionField::builder() + UnboundPartitionField::builder() .source_id(2) - .field_id(2) .name("name".to_string()) .transform(Transform::Void) .build(), - ) - .build() - .unwrap(); + ]) + .unwrap() + .build_unchecked(); assert!( partition_spec.is_unpartitioned(), "Partition spec with all void field should be unpartitioned" @@ -489,4 +784,392 @@ mod tests { assert!(partition_spec.partition_type(&schema).is_err()); } + + #[test] + fn test_builder_unchecked() { + let spec = PartitionSpec { + spec_id: 10, + fields: vec![ + PartitionField { + source_id: 4, + field_id: 1000, + name: "ts_day".to_string(), + transform: Transform::Day, + }, + PartitionField { + source_id: 1, + field_id: 1001, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + PartitionField { + source_id: 2, + field_id: 1002, + name: "id_truncate".to_string(), + transform: Transform::Truncate(4), + }, + ], + }; + + let build_spec = PartitionSpec::builder() + .with_spec_id(10) + .with_partition_field(spec.fields[0].clone()) + .unwrap() + .with_partition_field(spec.fields[1].clone()) + .unwrap() + .with_partition_field(spec.fields[2].clone()) + .unwrap() + .build_unchecked(); + + assert_eq!(spec, build_spec); + } + + #[test] + fn test_build_unbound() { + let unbound_spec = UnboundPartitionSpec { + spec_id: Some(10), + fields: vec![ + UnboundPartitionField { + source_id: 4, + partition_id: Some(1000), + name: "ts_day".to_string(), + transform: Transform::Day, + }, + UnboundPartitionField { + source_id: 1, + partition_id: Some(1001), + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + UnboundPartitionField { + source_id: 2, + partition_id: Some(1002), + name: "id_truncate".to_string(), + transform: Transform::Truncate(4), + }, + ], + }; + + let build_spec = PartitionSpec::builder() + .with_spec_id(10) + .with_unbound_partition_field(unbound_spec.fields[0].clone()) + .unwrap() + .with_unbound_partition_field(unbound_spec.fields[1].clone()) + .unwrap() + .with_unbound_partition_field(unbound_spec.fields[2].clone()) + .unwrap() + .build_unbound(); + + assert_eq!(unbound_spec, build_spec); + } + + #[test] + fn test_builder_disallow_duplicate_names() { + PartitionSpec::builder() + .with_partition_field(PartitionField { + source_id: 1, + field_id: 1000, + name: "ts_day".to_string(), + transform: Transform::Day, + }) + .unwrap() + .with_partition_field(PartitionField { + source_id: 2, + field_id: 1001, + name: "ts_day".to_string(), + transform: Transform::Day, + }) + .unwrap_err(); + } + + #[test] + fn test_builder_disallow_duplicate_field_ids() { + PartitionSpec::builder() + .with_partition_field(PartitionField { + source_id: 1, + field_id: 1000, + name: "ts_day".to_string(), + transform: Transform::Day, + }) + .unwrap() + .with_partition_field(PartitionField { + source_id: 2, + field_id: 1000, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap_err(); + } + + #[test] + fn test_builder_auto_assign_field_ids() { + let spec = PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + name: "id".to_string(), + transform: Transform::Identity, + partition_id: Some(512), + }) + .unwrap() + .with_unbound_partition_field(UnboundPartitionField { + source_id: 2, + name: "name".to_string(), + transform: Transform::Void, + partition_id: None, + }) + .unwrap() + // Should keep its ID even if its lower + .with_unbound_partition_field(UnboundPartitionField { + source_id: 3, + name: "ts".to_string(), + transform: Transform::Year, + partition_id: Some(1), + }) + .unwrap() + .build_unchecked(); + + assert_eq!(512, spec.fields[0].field_id); + assert_eq!(513, spec.fields[1].field_id); + assert_eq!(1, spec.fields[2].field_id); + } + + #[test] + fn test_builder_valid_schema_1() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + PartitionSpec::builder() + .with_spec_id(1) + .build(&schema) + .unwrap(); + + let spec = PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket[16]".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build(&schema) + .unwrap(); + + assert_eq!(spec, PartitionSpec { + spec_id: 1, + fields: vec![PartitionField { + source_id: 1, + field_id: 1, + name: "id_bucket[16]".to_string(), + transform: Transform::Bucket(16), + }] + }); + } + + #[test] + fn test_collision_with_schema_name() { + let schema = Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "id", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + PartitionSpec::builder() + .with_spec_id(1) + .build(&schema) + .unwrap(); + + let err = PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build(&schema) + .unwrap_err(); + assert!(err.message().contains("conflicts with schema")) + } + + #[test] + fn test_builder_collision_is_ok_for_identity_transforms() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "number", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into(), + ]) + .build() + .unwrap(); + + PartitionSpec::builder() + .with_spec_id(1) + .build(&schema) + .unwrap(); + + PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build(&schema) + .unwrap(); + + // Not OK for different source id + PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 2, + partition_id: None, + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build(&schema) + .unwrap_err(); + } + + #[test] + fn test_builder_all_source_ids_must_exist() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + NestedField::required( + 3, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + ]) + .build() + .unwrap(); + + // Valid + PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_fields( + vec![ + UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + UnboundPartitionField { + source_id: 2, + partition_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }, + ] + .into_iter(), + ) + .unwrap() + .build(&schema) + .unwrap(); + + // Invalid + PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_fields( + vec![ + UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + UnboundPartitionField { + source_id: 4, + partition_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }, + ] + .into_iter(), + ) + .unwrap() + .build(&schema) + .unwrap_err(); + } + + #[test] + fn test_builder_disallows_redundant() { + let err = PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket[16]".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket_with_other_name".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap_err(); + assert!(err.message().contains("redundant partition")); + } + + #[test] + fn test_builder_incompatible_transforms_disallowed() { + let schema = Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "id", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_year".to_string(), + transform: Transform::Year, + }) + .unwrap() + .build(&schema) + .unwrap_err(); + } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index d9a09d860..79d9074d6 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -1016,8 +1016,8 @@ mod tests { source_id: 4, field_id: 1000, }) - .build() - .unwrap(); + .unwrap() + .build_unchecked(); let expected = TableMetadata { format_version: FormatVersion::V2, @@ -1175,7 +1175,8 @@ mod tests { source_id: 1, field_id: 1000, }) - .build() + .unwrap() + .build(&schema) .unwrap(); let sort_order = SortOrder::builder() @@ -1288,7 +1289,8 @@ mod tests { source_id: 1, field_id: 1000, }) - .build() + .unwrap() + .build(&schema1) .unwrap(); let sort_order = SortOrder::builder() @@ -1410,7 +1412,8 @@ mod tests { source_id: 1, field_id: 1000, }) - .build() + .unwrap() + .build(&schema) .unwrap(); let sort_order = SortOrder::builder() @@ -1489,7 +1492,8 @@ mod tests { source_id: 1, field_id: 1000, }) - .build() + .unwrap() + .build(&schema) .unwrap(); let expected = TableMetadata { From f23b38a946d8c05c4f6743d64088cfb038e38281 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 27 Jul 2024 19:00:19 +0200 Subject: [PATCH 02/14] Fixes --- crates/catalog/memory/src/catalog.rs | 4 ++-- crates/iceberg/src/spec/partition.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 1e7e77f4e..2ec2e7adb 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -344,8 +344,8 @@ mod tests { let expected_partition_spec = PartitionSpec::builder() .with_spec_id(0) .with_fields(vec![]) - .build() - .unwrap(); + .unwrap() + .build_unchecked(); assert_eq!( metadata diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index a9d72bfbf..42333e5bc 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -46,12 +46,12 @@ pub struct PartitionField { impl PartitionField { /// To unbound partition field - pub fn to_unbound(&self) -> UnboundPartitionField { + pub fn to_unbound(self) -> UnboundPartitionField { UnboundPartitionField { source_id: self.source_id, partition_id: Some(self.field_id), - name: self.name.clone(), - transform: self.transform.clone(), + name: self.name, + transform: self.transform, } } } From cfc12034ea756e6aae6e0d7282385f696a2e3a1f Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 27 Jul 2024 19:00:19 +0200 Subject: [PATCH 03/14] Replace UnboundPartitionSpec Builder --- crates/catalog/rest/src/catalog.rs | 6 +++--- crates/iceberg/src/catalog/mod.rs | 6 ++++-- crates/iceberg/src/spec/partition.rs | 9 +++------ 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index aab615cd6..124adf852 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -1467,13 +1467,13 @@ mod tests { .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) .partition_spec( UnboundPartitionSpec::builder() - .with_fields(vec![UnboundPartitionField::builder() + .with_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) .transform(Transform::Truncate(3)) .name("id".to_string()) .build()]) - .build() - .unwrap(), + .unwrap() + .build_unbound(), ) .sort_order( SortOrder::builder() diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 5c63e1e77..b171ca1b3 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -802,6 +802,7 @@ mod tests { .transform(Transform::Day) .build(), ) + .unwrap() .with_unbound_partition_field( UnboundPartitionField::builder() .source_id(1) @@ -809,6 +810,7 @@ mod tests { .transform(Transform::Bucket(16)) .build(), ) + .unwrap() .with_unbound_partition_field( UnboundPartitionField::builder() .source_id(2) @@ -816,8 +818,8 @@ mod tests { .transform(Transform::Truncate(4)) .build(), ) - .build() - .unwrap(), + .unwrap() + .build_unbound(), }, ); } diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 42333e5bc..0e0a680dc 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -144,22 +144,19 @@ pub struct UnboundPartitionField { } /// Unbound partition spec can be built without a schema and later bound to a schema. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] #[serde(rename_all = "kebab-case")] -#[builder(setter(prefix = "with"))] pub struct UnboundPartitionSpec { /// Identifier for PartitionSpec - #[builder(default, setter(strip_option))] pub spec_id: Option, /// Details of the partition spec - #[builder(setter(each(name = "with_unbound_partition_field")))] pub fields: Vec, } impl UnboundPartitionSpec { /// Create unbound partition spec builer - pub fn builder() -> UnboundPartitionSpecBuilder { - UnboundPartitionSpecBuilder::default() + pub fn builder() -> PartitionSpecBuilder { + PartitionSpecBuilder::default() } } From 1a2be585a843d43f52c555a2c31e77db5189d946 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 27 Jul 2024 19:18:27 +0200 Subject: [PATCH 04/14] Fix tests, allow year, month day partition --- .../src/expr/visitors/inclusive_projection.rs | 16 +-- crates/iceberg/src/spec/partition.rs | 111 ++++++++++++------ 2 files changed, 82 insertions(+), 45 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 2605726af..6e4084d5f 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -344,19 +344,19 @@ mod tests { PartitionField::builder() .source_id(2) .name("month".to_string()) - .field_id(2) + .field_id(3) .transform(Transform::Month) .build(), PartitionField::builder() .source_id(2) .name("day".to_string()) - .field_id(2) + .field_id(4) .transform(Transform::Day) .build(), ]) .unwrap() - .build(&schema) - .unwrap(); + // ToDo: Partition spec is not valid for schema. Should the test be fixed? + .build_unchecked(); let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); @@ -390,8 +390,8 @@ mod tests { .transform(Transform::Truncate(4)) .build()]) .unwrap() - .build(&schema) - .unwrap(); + // ToDo: Schema is not valid for partition spec - `name` field collision. Should the test be fixed? + .build_unchecked(); let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); @@ -428,8 +428,8 @@ mod tests { .transform(Transform::Bucket(7)) .build()]) .unwrap() - .build(&schema) - .unwrap(); + // ToDo: Schema is not valid for partition spec. Should the test be fixed? + .build_unchecked(); let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 0e0a680dc..c5d202b54 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -364,9 +364,13 @@ impl PartitionSpecBuilder { /// For a single source-column transformations must be unique. fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> { - let collision = self.fields.iter().find(|f| { - f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name() - }); + let collision = self + .fields + .iter() + // ToDo: Should we compare transform.deduped_name instead? + // If so, deduped_name might be bugged, as it returns "time" for "day" and "month" and "year" transforms. + // If we switch to deduped_name here, we would not support year & month & day partitions on the same column. + .find(|f| f.source_id == source_id && &f.transform == transform); if let Some(collision) = collision { Err(Error::new( @@ -1078,23 +1082,20 @@ mod tests { // Valid PartitionSpec::builder() .with_spec_id(1) - .with_unbound_fields( - vec![ - UnboundPartitionField { - source_id: 1, - partition_id: None, - name: "id_bucket".to_string(), - transform: Transform::Bucket(16), - }, - UnboundPartitionField { - source_id: 2, - partition_id: None, - name: "name".to_string(), - transform: Transform::Identity, - }, - ] - .into_iter(), - ) + .with_unbound_fields(vec![ + UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + UnboundPartitionField { + source_id: 2, + partition_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }, + ]) .unwrap() .build(&schema) .unwrap(); @@ -1102,23 +1103,20 @@ mod tests { // Invalid PartitionSpec::builder() .with_spec_id(1) - .with_unbound_fields( - vec![ - UnboundPartitionField { - source_id: 1, - partition_id: None, - name: "id_bucket".to_string(), - transform: Transform::Bucket(16), - }, - UnboundPartitionField { - source_id: 4, - partition_id: None, - name: "name".to_string(), - transform: Transform::Identity, - }, - ] - .into_iter(), - ) + .with_unbound_fields(vec![ + UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }, + UnboundPartitionField { + source_id: 4, + partition_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }, + ]) .unwrap() .build(&schema) .unwrap_err(); @@ -1169,4 +1167,43 @@ mod tests { .build(&schema) .unwrap_err(); } + + #[test] + fn test_build_multiple_time_partitons() { + let schema = Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into()]) + .build() + .unwrap(); + + PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "ts_year".to_string(), + transform: Transform::Year, + }) + .unwrap() + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "ts_month".to_string(), + transform: Transform::Month, + }) + .unwrap() + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "ts_day".to_string(), + transform: Transform::Day, + }) + .unwrap() + .build(&schema) + .unwrap(); + } } From 831389ac38ecf54cdd3dc93cad46dbd90d2b0cb2 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 27 Jul 2024 19:22:17 +0200 Subject: [PATCH 05/14] Comments --- crates/iceberg/src/spec/partition.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index c5d202b54..92d23eb16 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -368,8 +368,11 @@ impl PartitionSpecBuilder { .fields .iter() // ToDo: Should we compare transform.deduped_name instead? - // If so, deduped_name might be bugged, as it returns "time" for "day" and "month" and "year" transforms. - // If we switch to deduped_name here, we would not support year & month & day partitions on the same column. + // If we switch to deduped_name here, we would not support year & month & day partitions on the same column, as it is always "time". + // deduped_name would probably be more correct for bucket transform, to disallow different bucket sizes on the same column. + // Java seems to use deduped_name: https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L420 + // Does java disalow year & month partitions on the same column? If so our tests are broken: + // (test_inclusive_projection_date_transforms) .find(|f| f.source_id == source_id && &f.transform == transform); if let Some(collision) = collision { From 683a8e6c7737a6bc8b532e8815e136fe877a0452 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 27 Jul 2024 19:27:31 +0200 Subject: [PATCH 06/14] typos --- crates/iceberg/src/spec/partition.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 92d23eb16..ff94c6584 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -371,7 +371,7 @@ impl PartitionSpecBuilder { // If we switch to deduped_name here, we would not support year & month & day partitions on the same column, as it is always "time". // deduped_name would probably be more correct for bucket transform, to disallow different bucket sizes on the same column. // Java seems to use deduped_name: https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L420 - // Does java disalow year & month partitions on the same column? If so our tests are broken: + // Does java disallow year & month partitions on the same column? If so our tests are broken: // (test_inclusive_projection_date_transforms) .find(|f| f.source_id == source_id && &f.transform == transform); @@ -1172,7 +1172,7 @@ mod tests { } #[test] - fn test_build_multiple_time_partitons() { + fn test_build_multiple_time_partitions() { let schema = Schema::builder() .with_fields(vec![NestedField::required( 1, From d8854b2709e5b41ad0fee413bce34d115aef5035 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 27 Jul 2024 20:10:12 +0200 Subject: [PATCH 07/14] Fix UnboundBuild setting partition_id --- crates/iceberg/src/spec/partition.rs | 110 ++++++++++++++------------- 1 file changed, 58 insertions(+), 52 deletions(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index ff94c6584..f568ca05e 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -163,28 +163,26 @@ impl UnboundPartitionSpec { /// Create valid partition specs for a given schema. #[derive(Debug, Default)] pub struct PartitionSpecBuilder { - spec_id: i32, + spec_id: Option, last_assigned_field_id: i32, - fields: Vec, + fields: Vec, } impl PartitionSpecBuilder { pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; + // Default partition spec id is only used for building `PartitionSpec`. + // When building unbound partition specs, the spec id is not set by default. + pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; /// Create a new partition spec builder with the given schema. pub fn new() -> Self { Self { - spec_id: 0, + spec_id: None, fields: vec![], last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, } } - /// Accessor to the current last assigned field id. - pub fn last_assigned_field_id(&self) -> i32 { - self.last_assigned_field_id - } - /// Set the last assigned field id for the partition spec. /// This is useful when re-binding partition specs. pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self { @@ -194,7 +192,7 @@ impl PartitionSpecBuilder { /// Set the spec id for the partition spec. pub fn with_spec_id(mut self, spec_id: i32) -> Self { - self.spec_id = spec_id; + self.spec_id = Some(spec_id); self } @@ -204,12 +202,13 @@ impl PartitionSpecBuilder { self.check_for_redundant_partitions(field.source_id, &field.transform)?; self.check_partition_id_unique(field.field_id)?; - self.fields.push(field); + self.fields.push(field.to_unbound()); Ok(self) } /// Add a new partition field to the partition spec. - /// Field ID is auto assigned if not set. + /// Field ID is auto assigned if not set on `build()`. + /// On `build_unbound()` the field ID is not auto assigned. pub fn with_unbound_partition_field(mut self, field: UnboundPartitionField) -> Result { self.check_name_set_and_unique(&field.name)?; self.check_for_redundant_partitions(field.source_id, &field.transform)?; @@ -217,20 +216,6 @@ impl PartitionSpecBuilder { self.check_partition_id_unique(partition_id)?; } - let partition_id = if let Some(partition_id) = field.partition_id { - self.last_assigned_field_id = std::cmp::max(self.last_assigned_field_id, partition_id); - partition_id - } else { - self.increment_and_get_next_field_id()? - }; - - let field = PartitionField { - source_id: field.source_id, - field_id: partition_id, - name: field.name, - transform: field.transform, - }; - self.fields.push(field); Ok(self) } @@ -260,26 +245,20 @@ impl PartitionSpecBuilder { /// Build the unbound partition spec. pub fn build_unbound(self) -> UnboundPartitionSpec { UnboundPartitionSpec { - spec_id: Some(self.spec_id), - fields: self.fields.into_iter().map(|f| f.to_unbound()).collect(), + spec_id: self.spec_id, + fields: self.fields, } } /// Build a bound partition spec with the given schema. pub fn build(self, schema: &Schema) -> Result { - let mut fields = Vec::with_capacity(self.fields.len()); - for field in self.fields { - Self::check_name_does_not_collide_with_schema(&field, schema)?; - Self::check_transform_compatibility(&field, schema)?; - fields.push(PartitionField { - source_id: field.source_id, - field_id: field.field_id, - name: field.name, - transform: field.transform, - }); + let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id); + for field in &fields { + Self::check_name_does_not_collide_with_schema(field, schema)?; + Self::check_transform_compatibility(field, schema)?; } Ok(PartitionSpec { - spec_id: self.spec_id, + spec_id: self.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID), fields, }) } @@ -287,12 +266,50 @@ impl PartitionSpecBuilder { /// Build a partition spec without validating it against a schema. /// This can lead to invalid partition specs. Use with caution. pub fn build_unchecked(self) -> PartitionSpec { + let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id); PartitionSpec { - spec_id: self.spec_id, - fields: self.fields, + spec_id: self.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID), + fields, } } + fn set_field_ids( + fields: Vec, + last_assigned_field_id: i32, + ) -> Vec { + let mut last_assigned_field_id = last_assigned_field_id; + // Already assigned partition ids. If we see one of these during iteration, + // we skip it. + let assigned_ids = fields + .iter() + .filter_map(|f| f.partition_id) + .collect::>(); + + fields + .into_iter() + .map(|field| { + let partition_id = if let Some(partition_id) = field.partition_id { + last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_id); + partition_id + } else { + // ToDo: Should we take care of overflow or accept wrap? + last_assigned_field_id += 1; + while assigned_ids.contains(&last_assigned_field_id) { + last_assigned_field_id += 1; + } + last_assigned_field_id + }; + + PartitionField { + source_id: field.source_id, + field_id: partition_id, + name: field.name, + transform: field.transform, + } + }) + .collect() + } + /// Ensure that the partition name is unique among the partition fields and is not empty. fn check_name_set_and_unique(&self, name: &str) -> Result<()> { if name.is_empty() { @@ -313,7 +330,7 @@ impl PartitionSpecBuilder { /// Check field / partition_id unique within the partition spec if set fn check_partition_id_unique(&self, field_id: i32) -> Result<()> { - if self.fields.iter().any(|f| f.field_id == field_id) { + if self.fields.iter().any(|f| f.partition_id == Some(field_id)) { return Err(Error::new( ErrorKind::DataInvalid, format!( @@ -431,17 +448,6 @@ impl PartitionSpecBuilder { Ok(()) } - fn increment_and_get_next_field_id(&mut self) -> Result { - if self.last_assigned_field_id == i32::MAX { - return Err(Error::new( - ErrorKind::DataInvalid, - "Cannot assign more partition fields. Field id overflow.", - )); - } - self.last_assigned_field_id += 1; - Ok(self.last_assigned_field_id) - } - fn partition_names(&self) -> std::collections::HashSet<&str> { self.fields.iter().map(|f| f.name.as_str()).collect() } From 6bb80d284f2048ff45cb7fc704f98159a6bc9a56 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Sat, 27 Jul 2024 20:18:17 +0200 Subject: [PATCH 08/14] Add test for unbound spec without partition ids --- crates/iceberg/src/spec/partition.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index f568ca05e..f0f67235e 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -1215,4 +1215,28 @@ mod tests { .build(&schema) .unwrap(); } + + #[test] + fn test_build_unbound_specs_without_partition_id() { + let spec = PartitionSpec::builder() + .with_spec_id(1) + .with_unbound_partition_field(UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket[16]".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build_unbound(); + + assert_eq!(spec, UnboundPartitionSpec { + spec_id: Some(1), + fields: vec![UnboundPartitionField { + source_id: 1, + partition_id: None, + name: "id_bucket[16]".to_string(), + transform: Transform::Bucket(16), + }] + }); + } } From fea4bc2a3ddfee51f4155968cf06610fe4c4ac01 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Mon, 29 Jul 2024 14:35:27 +0200 Subject: [PATCH 09/14] Fix into_unbound fn name --- crates/iceberg/src/spec/partition.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index f0f67235e..ba9074865 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -46,7 +46,7 @@ pub struct PartitionField { impl PartitionField { /// To unbound partition field - pub fn to_unbound(self) -> UnboundPartitionField { + pub fn into_unbound(self) -> UnboundPartitionField { UnboundPartitionField { source_id: self.source_id, partition_id: Some(self.field_id), @@ -202,7 +202,7 @@ impl PartitionSpecBuilder { self.check_for_redundant_partitions(field.source_id, &field.transform)?; self.check_partition_id_unique(field.field_id)?; - self.fields.push(field.to_unbound()); + self.fields.push(field.into_unbound()); Ok(self) } From 7159c0253d6f6ac9a40fef0ec3abebdcd8d2c4e8 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Tue, 6 Aug 2024 11:44:57 +0200 Subject: [PATCH 10/14] Split bound & unbound Partition builder, change add_partition_fields --- crates/catalog/memory/src/catalog.rs | 7 +- crates/catalog/rest/src/catalog.rs | 4 +- crates/iceberg/src/catalog/mod.rs | 30 +- .../src/expr/visitors/expression_evaluator.rs | 13 +- .../visitors/inclusive_metrics_evaluator.rs | 12 +- .../src/expr/visitors/inclusive_projection.rs | 121 +-- crates/iceberg/src/spec/partition.rs | 745 +++++++++--------- crates/iceberg/src/spec/table_metadata.rs | 52 +- 8 files changed, 492 insertions(+), 492 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 2ec2e7adb..69f476189 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -341,11 +341,10 @@ mod tests { assert_eq!(metadata.current_schema().as_ref(), expected_schema); - let expected_partition_spec = PartitionSpec::builder() + let expected_partition_spec = PartitionSpec::builder(expected_schema) .with_spec_id(0) - .with_fields(vec![]) - .unwrap() - .build_unchecked(); + .build() + .unwrap(); assert_eq!( metadata diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 124adf852..6a1ed5b6e 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -1467,13 +1467,13 @@ mod tests { .properties(HashMap::from([("owner".to_string(), "testx".to_string())])) .partition_spec( UnboundPartitionSpec::builder() - .with_unbound_fields(vec![UnboundPartitionField::builder() + .add_partition_fields(vec![UnboundPartitionField::builder() .source_id(1) .transform(Transform::Truncate(3)) .name("id".to_string()) .build()]) .unwrap() - .build_unbound(), + .build(), ) .sort_order( SortOrder::builder() diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index b171ca1b3..473832e88 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -229,7 +229,7 @@ pub struct TableCreation { /// The schema of the table. pub schema: Schema, /// The partition spec of the table, could be None. - #[builder(default, setter(strip_option))] + #[builder(default, setter(strip_option, into))] pub partition_spec: Option, /// The sort order of the table. #[builder(default, setter(strip_option))] @@ -451,7 +451,7 @@ mod tests { use crate::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, - TableMetadataBuilder, Transform, Type, UnboundPartitionField, UnboundPartitionSpec, + TableMetadataBuilder, Transform, Type, UnboundPartitionSpec, }; use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate}; @@ -795,31 +795,13 @@ mod tests { "#, TableUpdate::AddSpec { spec: UnboundPartitionSpec::builder() - .with_unbound_partition_field( - UnboundPartitionField::builder() - .source_id(4) - .name("ts_day".to_string()) - .transform(Transform::Day) - .build(), - ) + .add_partition_field(4, "ts_day".to_string(), Transform::Day) .unwrap() - .with_unbound_partition_field( - UnboundPartitionField::builder() - .source_id(1) - .name("id_bucket".to_string()) - .transform(Transform::Bucket(16)) - .build(), - ) + .add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16)) .unwrap() - .with_unbound_partition_field( - UnboundPartitionField::builder() - .source_id(2) - .name("id_truncate".to_string()) - .transform(Transform::Truncate(4)) - .build(), - ) + .add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4)) .unwrap() - .build_unbound(), + .build(), }, ); } diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index f7e2d9003..3eb5e960d 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -258,8 +258,9 @@ mod tests { UnaryExpression, }; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionField, - PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type, + DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionSpec, + PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type, + UnboundPartitionField, }; use crate::Result; @@ -274,16 +275,16 @@ mod tests { ))]) .build()?; - let spec = PartitionSpec::builder() + let spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() + .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) .name("a".to_string()) - .field_id(1) + .partition_id(1) .transform(Transform::Identity) .build()]) .unwrap() - .build(&schema) + .build() .unwrap(); Ok((Arc::new(schema), Arc::new(spec))) diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index b96258f91..1e106a34b 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -495,8 +495,8 @@ mod test { UnaryExpression, }; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionField, - PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type, + DataContentType, DataFile, DataFileFormat, Datum, NestedField, + PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, }; const INT_MIN_VALUE: i32 = 30; @@ -1656,16 +1656,16 @@ mod test { .unwrap(); let table_schema_ref = Arc::new(table_schema); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&table_schema_ref) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() + .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) .name("a".to_string()) - .field_id(1) + .partition_id(1) .transform(Transform::Identity) .build()]) .unwrap() - .build(&table_schema_ref) + .build() .unwrap(); let partition_spec_ref = Arc::new(partition_spec); (table_schema_ref, partition_spec_ref) diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 6e4084d5f..5e5820b83 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -236,6 +236,7 @@ mod tests { use crate::expr::{Bind, Predicate, Reference}; use crate::spec::{ Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type, + UnboundPartitionField, }; fn build_test_schema() -> Schema { @@ -265,11 +266,9 @@ mod tests { fn test_inclusive_projection_logic_ops() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![]) - .unwrap() - .build(&schema) + .build() .unwrap(); let arc_schema = Arc::new(schema); @@ -297,16 +296,18 @@ mod tests { fn test_inclusive_projection_identity_transform() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() - .source_id(1) - .name("a".to_string()) - .field_id(1) - .transform(Transform::Identity) - .build()]) + .add_unbound_field( + UnboundPartitionField::builder() + .source_id(1) + .name("a".to_string()) + .partition_id(1) + .transform(Transform::Identity) + .build(), + ) .unwrap() - .build(&schema) + .build() .unwrap(); let arc_schema = Arc::new(schema); @@ -332,31 +333,31 @@ mod tests { fn test_inclusive_projection_date_transforms() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() - .with_spec_id(1) - .with_fields(vec![ - PartitionField::builder() - .source_id(2) - .name("year".to_string()) - .field_id(2) - .transform(Transform::Year) - .build(), - PartitionField::builder() - .source_id(2) - .name("month".to_string()) - .field_id(3) - .transform(Transform::Month) - .build(), - PartitionField::builder() - .source_id(2) - .name("day".to_string()) - .field_id(4) - .transform(Transform::Day) - .build(), - ]) - .unwrap() - // ToDo: Partition spec is not valid for schema. Should the test be fixed? - .build_unchecked(); + // ToDo: We cannot use the builder here as having multiple transforms on + // a single field is not allowed. Should we keep this test? + let partition_spec = PartitionSpec { + spec_id: 1, + fields: vec![ + PartitionField { + source_id: 2, + name: "year".to_string(), + field_id: 1000, + transform: Transform::Year, + }, + PartitionField { + source_id: 2, + name: "month".to_string(), + field_id: 1001, + transform: Transform::Month, + }, + PartitionField { + source_id: 2, + name: "day".to_string(), + field_id: 1002, + transform: Transform::Day, + }, + ], + }; let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); @@ -381,17 +382,19 @@ mod tests { fn test_inclusive_projection_truncate_transform() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() - .source_id(3) - .name("name".to_string()) - .field_id(3) - .transform(Transform::Truncate(4)) - .build()]) + .add_unbound_field( + UnboundPartitionField::builder() + .source_id(3) + .name("name_truncate".to_string()) + .partition_id(3) + .transform(Transform::Truncate(4)) + .build(), + ) .unwrap() - // ToDo: Schema is not valid for partition spec - `name` field collision. Should the test be fixed? - .build_unchecked(); + .build() + .unwrap(); let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); @@ -402,7 +405,7 @@ mod tests { // applying InclusiveProjection to bound_predicate // should result in the 'name STARTS WITH "Testy McTest"' - // predicate being transformed to 'name STARTS WITH "Test"', + // predicate being transformed to 'name_truncate STARTS WITH "Test"', // since a `Truncate(4)` partition will map values of // name that start with "Testy McTest" into a partition // for values of name that start with the first four letters @@ -410,7 +413,7 @@ mod tests { let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); let result = inclusive_projection.project(&bound_predicate).unwrap(); - let expected = "name STARTS WITH \"Test\"".to_string(); + let expected = "name_truncate STARTS WITH \"Test\"".to_string(); assert_eq!(result.to_string(), expected) } @@ -419,17 +422,19 @@ mod tests { fn test_inclusive_projection_bucket_transform() { let schema = build_test_schema(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_fields(vec![PartitionField::builder() - .source_id(1) - .name("a".to_string()) - .field_id(1) - .transform(Transform::Bucket(7)) - .build()]) + .add_unbound_field( + UnboundPartitionField::builder() + .source_id(1) + .name("a_bucket[7]".to_string()) + .partition_id(1) + .transform(Transform::Bucket(7)) + .build(), + ) .unwrap() - // ToDo: Schema is not valid for partition spec. Should the test be fixed? - .build_unchecked(); + .build() + .unwrap(); let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); @@ -445,7 +450,7 @@ mod tests { let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); let result = inclusive_projection.project(&bound_predicate).unwrap(); - let expected = "a = 2".to_string(); + let expected = "a_bucket[7] = 2".to_string(); assert_eq!(result.to_string(), expected) } diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index ba9074865..f5d14af23 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -47,12 +47,7 @@ pub struct PartitionField { impl PartitionField { /// To unbound partition field pub fn into_unbound(self) -> UnboundPartitionField { - UnboundPartitionField { - source_id: self.source_id, - partition_id: Some(self.field_id), - name: self.name, - transform: self.transform, - } + self.into() } } @@ -68,8 +63,8 @@ pub struct PartitionSpec { impl PartitionSpec { /// Create partition spec builer - pub fn builder() -> PartitionSpecBuilder { - PartitionSpecBuilder::default() + pub fn builder(schema: &Schema) -> PartitionSpecBuilder { + PartitionSpecBuilder::new(schema) } /// Returns if the partition spec is unpartitioned. @@ -108,20 +103,10 @@ impl PartitionSpec { } /// Turn this partition spec into an unbound partition spec. + /// + /// The `field_id` is retained as `partition_id` in the unbound partition spec. pub fn to_unbound(self) -> UnboundPartitionSpec { - UnboundPartitionSpec { - spec_id: Some(self.spec_id), - fields: self - .fields - .into_iter() - .map(|f| UnboundPartitionField { - source_id: f.source_id, - partition_id: Some(f.field_id), - name: f.name, - transform: f.transform, - }) - .collect(), - } + self.into() } } @@ -155,32 +140,134 @@ pub struct UnboundPartitionSpec { impl UnboundPartitionSpec { /// Create unbound partition spec builer - pub fn builder() -> PartitionSpecBuilder { - PartitionSpecBuilder::default() + pub fn builder() -> UnboundPartitionSpecBuilder { + UnboundPartitionSpecBuilder::default() } } -/// Create valid partition specs for a given schema. +impl From for UnboundPartitionField { + fn from(field: PartitionField) -> Self { + UnboundPartitionField { + source_id: field.source_id, + partition_id: Some(field.field_id), + name: field.name, + transform: field.transform, + } + } +} + +impl From for UnboundPartitionSpec { + fn from(spec: PartitionSpec) -> Self { + UnboundPartitionSpec { + spec_id: Some(spec.spec_id), + fields: spec.fields.into_iter().map(Into::into).collect(), + } + } +} + +/// Create a new UnboundPartitionSpec #[derive(Debug, Default)] -pub struct PartitionSpecBuilder { +pub struct UnboundPartitionSpecBuilder { + spec_id: Option, + fields: Vec, +} + +impl UnboundPartitionSpecBuilder { + /// Create a new partition spec builder with the given schema. + pub fn new() -> Self { + Self { + spec_id: None, + fields: vec![], + } + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec from an unbound partition field. + pub fn add_partition_field( + self, + source_id: i32, + target_name: impl ToString, + transformation: Transform, + ) -> Result { + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform: transformation, + }; + self.add_partition_field_internal(field) + } + + /// Add multiple partition fields to the partition spec. + pub fn add_partition_fields( + self, + fields: impl IntoIterator, + ) -> Result { + let mut builder = self; + for field in fields { + builder = builder.add_partition_field_internal(field)?; + } + Ok(builder) + } + + fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + self.fields.push(field); + Ok(self) + } + + /// Build the unbound partition spec. + pub fn build(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { + spec_id: self.spec_id, + fields: self.fields, + } + } +} + +/// Create valid partition specs for a given schema. +#[derive(Debug)] +pub struct PartitionSpecBuilder<'a> { spec_id: Option, last_assigned_field_id: i32, fields: Vec, + schema: &'a Schema, } -impl PartitionSpecBuilder { +impl<'a> PartitionSpecBuilder<'a> { pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; // Default partition spec id is only used for building `PartitionSpec`. // When building unbound partition specs, the spec id is not set by default. pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; /// Create a new partition spec builder with the given schema. - pub fn new() -> Self { + pub fn new(schema: &'a Schema) -> Self { Self { spec_id: None, fields: vec![], last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, + schema, + } + } + + /// Create a new partition spec builder from an existing unbound partition spec. + pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) -> Result { + let mut builder = Self::new(schema) + .with_spec_id(unbound.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID)); + + for field in unbound.fields { + builder = builder.add_unbound_field(field)?; } + Ok(builder) } /// Set the last assigned field id for the partition spec. @@ -197,21 +284,44 @@ impl PartitionSpecBuilder { } /// Add a new partition field to the partition spec. - pub fn with_partition_field(mut self, field: PartitionField) -> Result { - self.check_name_set_and_unique(&field.name)?; - self.check_for_redundant_partitions(field.source_id, &field.transform)?; - self.check_partition_id_unique(field.field_id)?; + pub fn add_partition_field( + self, + source_name: impl ToString, + target_name: impl ToString, + transform: Transform, + ) -> Result { + let source_id = self + .schema + .field_by_name(source_name.to_string().as_str()) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find source column with name: {} in schema", + source_name.to_string() + ), + ) + })? + .id; + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform, + }; - self.fields.push(field.into_unbound()); - Ok(self) + self.add_unbound_field(field) } /// Add a new partition field to the partition spec. - /// Field ID is auto assigned if not set on `build()`. - /// On `build_unbound()` the field ID is not auto assigned. - pub fn with_unbound_partition_field(mut self, field: UnboundPartitionField) -> Result { + /// + /// If `partition_id` is set, it is used as the field id. + /// Otherwise, a new `field_id` is assigned. + pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result { self.check_name_set_and_unique(&field.name)?; self.check_for_redundant_partitions(field.source_id, &field.transform)?; + Self::check_name_does_not_collide_with_schema(&field, self.schema)?; + Self::check_transform_compatibility(&field, self.schema)?; if let Some(partition_id) = field.partition_id { self.check_partition_id_unique(partition_id)?; } @@ -220,63 +330,31 @@ impl PartitionSpecBuilder { Ok(self) } - /// Add multiple unbound partition fields to the partition spec. - /// Field IDs are auto assigned if not set. - pub fn with_unbound_fields( + /// Wrapper around `with_unbound_fields` to add multiple partition fields. + pub fn add_unbound_fields( self, fields: impl IntoIterator, ) -> Result { let mut builder = self; for field in fields { - builder = builder.with_unbound_partition_field(field)?; + builder = builder.add_unbound_field(field)?; } Ok(builder) } - /// Add multiple partition fields to the partition spec. - pub fn with_fields(self, fields: impl IntoIterator) -> Result { - let mut builder = self; - for field in fields { - builder = builder.with_partition_field(field)?; - } - Ok(builder) - } - - /// Build the unbound partition spec. - pub fn build_unbound(self) -> UnboundPartitionSpec { - UnboundPartitionSpec { - spec_id: self.spec_id, - fields: self.fields, - } - } - /// Build a bound partition spec with the given schema. - pub fn build(self, schema: &Schema) -> Result { - let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id); - for field in &fields { - Self::check_name_does_not_collide_with_schema(field, schema)?; - Self::check_transform_compatibility(field, schema)?; - } + pub fn build(self) -> Result { + let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?; Ok(PartitionSpec { spec_id: self.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID), fields, }) } - /// Build a partition spec without validating it against a schema. - /// This can lead to invalid partition specs. Use with caution. - pub fn build_unchecked(self) -> PartitionSpec { - let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id); - PartitionSpec { - spec_id: self.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID), - fields, - } - } - fn set_field_ids( fields: Vec, last_assigned_field_id: i32, - ) -> Vec { + ) -> Result> { let mut last_assigned_field_id = last_assigned_field_id; // Already assigned partition ids. If we see one of these during iteration, // we skip it. @@ -285,62 +363,41 @@ impl PartitionSpecBuilder { .filter_map(|f| f.partition_id) .collect::>(); - fields - .into_iter() - .map(|field| { - let partition_id = if let Some(partition_id) = field.partition_id { - last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_id); - partition_id - } else { - // ToDo: Should we take care of overflow or accept wrap? + fn _check_overflow(last_assigned_field_id: i32) -> Result<()> { + if last_assigned_field_id == i32::MAX { + Err(Error::new( + ErrorKind::DataInvalid, + "Cannot assign more partition ids. Overflow.", + )) + } else { + Ok(()) + } + } + + let mut bound_fields = Vec::with_capacity(fields.len()); + for field in fields.into_iter() { + let partition_id = if let Some(partition_id) = field.partition_id { + last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_id); + partition_id + } else { + _check_overflow(last_assigned_field_id)?; + last_assigned_field_id += 1; + while assigned_ids.contains(&last_assigned_field_id) { + _check_overflow(last_assigned_field_id)?; last_assigned_field_id += 1; - while assigned_ids.contains(&last_assigned_field_id) { - last_assigned_field_id += 1; - } - last_assigned_field_id - }; - - PartitionField { - source_id: field.source_id, - field_id: partition_id, - name: field.name, - transform: field.transform, } + last_assigned_field_id + }; + + bound_fields.push(PartitionField { + source_id: field.source_id, + field_id: partition_id, + name: field.name, + transform: field.transform, }) - .collect() - } - - /// Ensure that the partition name is unique among the partition fields and is not empty. - fn check_name_set_and_unique(&self, name: &str) -> Result<()> { - if name.is_empty() { - return Err(Error::new( - ErrorKind::DataInvalid, - "Cannot use empty partition name", - )); } - if self.partition_names().contains(name) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Cannot use partition name more than once: {}", name), - )); - } - Ok(()) - } - - /// Check field / partition_id unique within the partition spec if set - fn check_partition_id_unique(&self, field_id: i32) -> Result<()> { - if self.fields.iter().any(|f| f.partition_id == Some(field_id)) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot use field id more than once in one PartitionSpec: {}", - field_id - ), - )); - } - - Ok(()) + Ok(bound_fields) } /// Ensure that the partition name is unique among columns in the schema. @@ -348,7 +405,7 @@ impl PartitionSpecBuilder { /// 1. The column is sourced from the column with the same name. /// 2. AND the transformation is identity fn check_name_does_not_collide_with_schema( - field: &PartitionField, + field: &UnboundPartitionField, schema: &Schema, ) -> Result<()> { match schema.field_by_name(field.name.as_str()) { @@ -379,35 +436,9 @@ impl PartitionSpecBuilder { } } - /// For a single source-column transformations must be unique. - fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> { - let collision = self - .fields - .iter() - // ToDo: Should we compare transform.deduped_name instead? - // If we switch to deduped_name here, we would not support year & month & day partitions on the same column, as it is always "time". - // deduped_name would probably be more correct for bucket transform, to disallow different bucket sizes on the same column. - // Java seems to use deduped_name: https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L420 - // Does java disallow year & month partitions on the same column? If so our tests are broken: - // (test_inclusive_projection_date_transforms) - .find(|f| f.source_id == source_id && &f.transform == transform); - - if let Some(collision) = collision { - Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`", - source_id, transform.dedup_name(), collision.name - ), - )) - } else { - Ok(()) - } - } - /// Ensure that the transformation of the field is compatible with type of the field /// in the schema. Implicitly also checks if the source field exists in the schema. - fn check_transform_compatibility(field: &PartitionField, schema: &Schema) -> Result<()> { + fn check_transform_compatibility(field: &UnboundPartitionField, schema: &Schema) -> Result<()> { let schema_field = schema.field_by_id(field.source_id).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -447,10 +478,89 @@ impl PartitionSpecBuilder { Ok(()) } +} + +/// Contains checks that are common to both PartitionSpecBuilder and UnboundPartitionSpecBuilder +trait CorePartitionSpecValidator { + /// Ensure that the partition name is unique among the partition fields and is not empty. + fn check_name_set_and_unique(&self, name: &str) -> Result<()> { + if name.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot use empty partition name", + )); + } + + if self.partition_names().contains(name) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot use partition name more than once: {}", name), + )); + } + Ok(()) + } + /// For a single source-column transformations must be unique. + fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> { + let collision = self.fields().into_iter().find(|f| { + f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name() + }); + + if let Some(collision) = collision { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add redundant partition with source id `{}` and transform `{}`. A partition with the same source id and transform already exists with name `{}`", + source_id, transform.dedup_name(), collision.name + ), + )) + } else { + Ok(()) + } + } + + /// Check field / partition_id unique within the partition spec if set + fn check_partition_id_unique(&self, field_id: i32) -> Result<()> { + if self + .fields() + .iter() + .any(|f| f.partition_id == Some(field_id)) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot use field id more than once in one PartitionSpec: {}", + field_id + ), + )); + } + + Ok(()) + } + + fn partition_names(&self) -> std::collections::HashSet<&str>; + + fn fields(&self) -> Vec<&UnboundPartitionField>; +} + +impl CorePartitionSpecValidator for PartitionSpecBuilder<'_> { fn partition_names(&self) -> std::collections::HashSet<&str> { self.fields.iter().map(|f| f.name.as_str()).collect() } + + fn fields(&self) -> Vec<&UnboundPartitionField> { + self.fields.iter().collect() + } +} + +impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder { + fn partition_names(&self) -> std::collections::HashSet<&str> { + self.fields.iter().map(|f| f.name.as_str()).collect() + } + + fn fields(&self) -> Vec<&UnboundPartitionField> { + self.fields.iter().collect() + } } #[cfg(test)] @@ -501,14 +611,30 @@ mod tests { #[test] fn test_is_unpartitioned() { - let partition_spec = PartitionSpec::builder().with_spec_id(1).build_unchecked(); + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .build() + .unwrap(); assert!( partition_spec.is_unpartitioned(), "Empty partition spec should be unpartitioned" ); - let partition_spec = PartitionSpec::builder() - .with_unbound_fields(vec![ + let partition_spec = PartitionSpec::builder(&schema) + .add_unbound_fields(vec![ UnboundPartitionField::builder() .source_id(1) .name("id".to_string()) @@ -516,34 +642,36 @@ mod tests { .build(), UnboundPartitionField::builder() .source_id(2) - .name("name".to_string()) + .name("name_string".to_string()) .transform(Transform::Void) .build(), ]) .unwrap() .with_spec_id(1) - .build_unchecked(); + .build() + .unwrap(); assert!( !partition_spec.is_unpartitioned(), "Partition spec with one non void transform should not be unpartitioned" ); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_fields(vec![ + .add_unbound_fields(vec![ UnboundPartitionField::builder() .source_id(1) - .name("id".to_string()) + .name("id_void".to_string()) .transform(Transform::Void) .build(), UnboundPartitionField::builder() .source_id(2) - .name("name".to_string()) + .name("name_void".to_string()) .transform(Transform::Void) .build(), ]) .unwrap() - .build_unchecked(); + .build() + .unwrap(); assert!( partition_spec.is_unpartitioned(), "Partition spec with all void field should be unpartitioned" @@ -795,116 +923,41 @@ mod tests { assert!(partition_spec.partition_type(&schema).is_err()); } - #[test] - fn test_builder_unchecked() { - let spec = PartitionSpec { - spec_id: 10, - fields: vec![ - PartitionField { - source_id: 4, - field_id: 1000, - name: "ts_day".to_string(), - transform: Transform::Day, - }, - PartitionField { - source_id: 1, - field_id: 1001, - name: "id_bucket".to_string(), - transform: Transform::Bucket(16), - }, - PartitionField { - source_id: 2, - field_id: 1002, - name: "id_truncate".to_string(), - transform: Transform::Truncate(4), - }, - ], - }; - - let build_spec = PartitionSpec::builder() - .with_spec_id(10) - .with_partition_field(spec.fields[0].clone()) - .unwrap() - .with_partition_field(spec.fields[1].clone()) - .unwrap() - .with_partition_field(spec.fields[2].clone()) - .unwrap() - .build_unchecked(); - - assert_eq!(spec, build_spec); - } - - #[test] - fn test_build_unbound() { - let unbound_spec = UnboundPartitionSpec { - spec_id: Some(10), - fields: vec![ - UnboundPartitionField { - source_id: 4, - partition_id: Some(1000), - name: "ts_day".to_string(), - transform: Transform::Day, - }, - UnboundPartitionField { - source_id: 1, - partition_id: Some(1001), - name: "id_bucket".to_string(), - transform: Transform::Bucket(16), - }, - UnboundPartitionField { - source_id: 2, - partition_id: Some(1002), - name: "id_truncate".to_string(), - transform: Transform::Truncate(4), - }, - ], - }; - - let build_spec = PartitionSpec::builder() - .with_spec_id(10) - .with_unbound_partition_field(unbound_spec.fields[0].clone()) - .unwrap() - .with_unbound_partition_field(unbound_spec.fields[1].clone()) - .unwrap() - .with_unbound_partition_field(unbound_spec.fields[2].clone()) - .unwrap() - .build_unbound(); - - assert_eq!(unbound_spec, build_spec); - } - #[test] fn test_builder_disallow_duplicate_names() { - PartitionSpec::builder() - .with_partition_field(PartitionField { - source_id: 1, - field_id: 1000, - name: "ts_day".to_string(), - transform: Transform::Day, - }) + UnboundPartitionSpec::builder() + .add_partition_field(1, "ts_day".to_string(), Transform::Day) .unwrap() - .with_partition_field(PartitionField { - source_id: 2, - field_id: 1001, - name: "ts_day".to_string(), - transform: Transform::Day, - }) + .add_partition_field(2, "ts_day".to_string(), Transform::Day) .unwrap_err(); } #[test] fn test_builder_disallow_duplicate_field_ids() { - PartitionSpec::builder() - .with_partition_field(PartitionField { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + PartitionSpec::builder(&schema) + .add_unbound_field(UnboundPartitionField { source_id: 1, - field_id: 1000, - name: "ts_day".to_string(), - transform: Transform::Day, + partition_id: Some(1000), + name: "id".to_string(), + transform: Transform::Identity, }) .unwrap() - .with_partition_field(PartitionField { + .add_unbound_field(UnboundPartitionField { source_id: 2, - field_id: 1000, + partition_id: Some(1000), name: "id_bucket".to_string(), transform: Transform::Bucket(16), }) @@ -913,39 +966,59 @@ mod tests { #[test] fn test_builder_auto_assign_field_ids() { - let spec = PartitionSpec::builder() + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + NestedField::required( + 3, + "ts", + Type::Primitive(crate::spec::PrimitiveType::Timestamp), + ) + .into(), + ]) + .build() + .unwrap(); + let spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { + .add_unbound_field(UnboundPartitionField { source_id: 1, name: "id".to_string(), transform: Transform::Identity, - partition_id: Some(512), + partition_id: Some(1012), }) .unwrap() - .with_unbound_partition_field(UnboundPartitionField { + .add_unbound_field(UnboundPartitionField { source_id: 2, - name: "name".to_string(), + name: "name_void".to_string(), transform: Transform::Void, partition_id: None, }) .unwrap() // Should keep its ID even if its lower - .with_unbound_partition_field(UnboundPartitionField { + .add_unbound_field(UnboundPartitionField { source_id: 3, - name: "ts".to_string(), + name: "year".to_string(), transform: Transform::Year, partition_id: Some(1), }) .unwrap() - .build_unchecked(); + .build() + .unwrap(); - assert_eq!(512, spec.fields[0].field_id); - assert_eq!(513, spec.fields[1].field_id); + assert_eq!(1012, spec.fields[0].field_id); + assert_eq!(1013, spec.fields[1].field_id); assert_eq!(1, spec.fields[2].field_id); } #[test] - fn test_builder_valid_schema_1() { + fn test_builder_valid_schema() { let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) @@ -960,28 +1033,23 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder() + PartitionSpec::builder(&schema) .with_spec_id(1) - .build(&schema) + .build() .unwrap(); - let spec = PartitionSpec::builder() + let spec = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { - source_id: 1, - partition_id: None, - name: "id_bucket[16]".to_string(), - transform: Transform::Bucket(16), - }) + .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16)) .unwrap() - .build(&schema) + .build() .unwrap(); assert_eq!(spec, PartitionSpec { spec_id: 1, fields: vec![PartitionField { source_id: 1, - field_id: 1, + field_id: 1000, name: "id_bucket[16]".to_string(), transform: Transform::Bucket(16), }] @@ -1000,21 +1068,19 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder() + PartitionSpec::builder(&schema) .with_spec_id(1) - .build(&schema) + .build() .unwrap(); - let err = PartitionSpec::builder() + let err = PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { + .add_unbound_field(UnboundPartitionField { source_id: 1, partition_id: None, name: "id".to_string(), transform: Transform::Bucket(16), }) - .unwrap() - .build(&schema) .unwrap_err(); assert!(err.message().contains("conflicts with schema")) } @@ -1035,34 +1101,32 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder() + PartitionSpec::builder(&schema) .with_spec_id(1) - .build(&schema) + .build() .unwrap(); - PartitionSpec::builder() + PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { + .add_unbound_field(UnboundPartitionField { source_id: 1, partition_id: None, name: "id".to_string(), transform: Transform::Identity, }) .unwrap() - .build(&schema) + .build() .unwrap(); // Not OK for different source id - PartitionSpec::builder() + PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { + .add_unbound_field(UnboundPartitionField { source_id: 2, partition_id: None, name: "id".to_string(), transform: Transform::Identity, }) - .unwrap() - .build(&schema) .unwrap_err(); } @@ -1089,9 +1153,9 @@ mod tests { .unwrap(); // Valid - PartitionSpec::builder() + PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_fields(vec![ + .add_unbound_fields(vec![ UnboundPartitionField { source_id: 1, partition_id: None, @@ -1106,13 +1170,13 @@ mod tests { }, ]) .unwrap() - .build(&schema) + .build() .unwrap(); // Invalid - PartitionSpec::builder() + PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_fields(vec![ + .add_unbound_fields(vec![ UnboundPartitionField { source_id: 1, partition_id: None, @@ -1126,28 +1190,20 @@ mod tests { transform: Transform::Identity, }, ]) - .unwrap() - .build(&schema) .unwrap_err(); } #[test] fn test_builder_disallows_redundant() { - let err = PartitionSpec::builder() + let err = UnboundPartitionSpec::builder() .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { - source_id: 1, - partition_id: None, - name: "id_bucket[16]".to_string(), - transform: Transform::Bucket(16), - }) + .add_partition_field(1, "id_bucket[16]".to_string(), Transform::Bucket(16)) .unwrap() - .with_unbound_partition_field(UnboundPartitionField { - source_id: 1, - partition_id: None, - name: "id_bucket_with_other_name".to_string(), - transform: Transform::Bucket(16), - }) + .add_partition_field( + 1, + "id_bucket_with_other_name".to_string(), + Transform::Bucket(16), + ) .unwrap_err(); assert!(err.message().contains("redundant partition")); } @@ -1164,70 +1220,29 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder() + PartitionSpec::builder(&schema) .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { + .add_unbound_field(UnboundPartitionField { source_id: 1, partition_id: None, name: "id_year".to_string(), transform: Transform::Year, }) - .unwrap() - .build(&schema) .unwrap_err(); } - #[test] - fn test_build_multiple_time_partitions() { - let schema = Schema::builder() - .with_fields(vec![NestedField::required( - 1, - "ts", - Type::Primitive(crate::spec::PrimitiveType::Timestamp), - ) - .into()]) - .build() - .unwrap(); - - PartitionSpec::builder() - .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { - source_id: 1, - partition_id: None, - name: "ts_year".to_string(), - transform: Transform::Year, - }) - .unwrap() - .with_unbound_partition_field(UnboundPartitionField { - source_id: 1, - partition_id: None, - name: "ts_month".to_string(), - transform: Transform::Month, - }) - .unwrap() - .with_unbound_partition_field(UnboundPartitionField { - source_id: 1, - partition_id: None, - name: "ts_day".to_string(), - transform: Transform::Day, - }) - .unwrap() - .build(&schema) - .unwrap(); - } - #[test] fn test_build_unbound_specs_without_partition_id() { - let spec = PartitionSpec::builder() + let spec = UnboundPartitionSpec::builder() .with_spec_id(1) - .with_unbound_partition_field(UnboundPartitionField { + .add_partition_fields(vec![UnboundPartitionField { source_id: 1, partition_id: None, name: "id_bucket[16]".to_string(), transform: Transform::Bucket(16), - }) + }]) .unwrap() - .build_unbound(); + .build(); assert_eq!(spec, UnboundPartitionSpec { spec_id: Some(1), diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 79d9074d6..86032a178 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -923,7 +923,7 @@ mod tests { use crate::spec::{ NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, - Summary, Transform, Type, + Summary, Transform, Type, UnboundPartitionField, }; use crate::TableCreation; @@ -1008,16 +1008,15 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() - .with_spec_id(1) - .with_partition_field(PartitionField { + let partition_spec = PartitionSpec { + spec_id: 1, + fields: vec![PartitionField { name: "ts_day".to_string(), transform: Transform::Day, source_id: 4, field_id: 1000, - }) - .unwrap() - .build_unchecked(); + }], + }; let expected = TableMetadata { format_version: FormatVersion::V2, @@ -1167,16 +1166,15 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(0) - .with_partition_field(PartitionField { - name: "vendor_id".to_string(), - transform: Transform::Identity, - source_id: 1, - field_id: 1000, - }) + .add_partition_field( + "vendor_id".to_string(), + "vendor_id".to_string(), + Transform::Identity, + ) .unwrap() - .build(&schema) + .build() .unwrap(); let sort_order = SortOrder::builder() @@ -1281,16 +1279,16 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema1) .with_spec_id(0) - .with_partition_field(PartitionField { + .add_unbound_field(UnboundPartitionField { name: "x".to_string(), transform: Transform::Identity, source_id: 1, - field_id: 1000, + partition_id: Some(1000), }) .unwrap() - .build(&schema1) + .build() .unwrap(); let sort_order = SortOrder::builder() @@ -1404,16 +1402,16 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(0) - .with_partition_field(PartitionField { + .add_unbound_field(UnboundPartitionField { name: "x".to_string(), transform: Transform::Identity, source_id: 1, - field_id: 1000, + partition_id: Some(1000), }) .unwrap() - .build(&schema) + .build() .unwrap(); let sort_order = SortOrder::builder() @@ -1484,16 +1482,16 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder() + let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(0) - .with_partition_field(PartitionField { + .add_unbound_field(UnboundPartitionField { name: "x".to_string(), transform: Transform::Identity, source_id: 1, - field_id: 1000, + partition_id: Some(1000), }) .unwrap() - .build(&schema) + .build() .unwrap(); let expected = TableMetadata { From 3baecca19382f026af0a8e720fd78cd7e284b307 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Tue, 6 Aug 2024 11:48:13 +0200 Subject: [PATCH 11/14] Improve comment --- crates/iceberg/src/spec/partition.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index f5d14af23..ef23a9e0c 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -271,7 +271,10 @@ impl<'a> PartitionSpecBuilder<'a> { } /// Set the last assigned field id for the partition spec. - /// This is useful when re-binding partition specs. + /// + /// Set this field when a new partition spec is created for an existing TableMetaData. + /// As `field_id` must be unique in V2 metadata, this should be set to + /// the highest field id used previously. pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self { self.last_assigned_field_id = last_assigned_field_id; self From 0b43149b321c68924da2680e61921a93ebd9b572 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Tue, 6 Aug 2024 11:56:38 +0200 Subject: [PATCH 12/14] Fix fmt --- .../iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index 1e106a34b..e8e7337ac 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -495,8 +495,8 @@ mod test { UnaryExpression, }; use crate::spec::{ - DataContentType, DataFile, DataFileFormat, Datum, NestedField, - PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, + DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionSpec, + PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, }; const INT_MIN_VALUE: i32 = 30; From a47e26908f12168f2694a4ad717a5a61e4259f94 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Mon, 12 Aug 2024 17:44:34 +0200 Subject: [PATCH 13/14] Review fixes --- .../src/expr/visitors/expression_evaluator.rs | 2 +- .../src/expr/visitors/inclusive_projection.rs | 4 +- crates/iceberg/src/scan.rs | 2 +- crates/iceberg/src/spec/manifest.rs | 8 +- crates/iceberg/src/spec/partition.rs | 78 ++++++++++++------- crates/iceberg/src/spec/table_metadata.rs | 40 +++++----- 6 files changed, 75 insertions(+), 59 deletions(-) diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 3eb5e960d..d8a47ec48 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -300,7 +300,7 @@ mod tests { let partition_fields = partition_type.fields().to_owned(); let partition_schema = Schema::builder() - .with_schema_id(partition_spec.spec_id) + .with_schema_id(partition_spec.spec_id()) .with_fields(partition_fields) .build()?; diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 5e5820b83..716f0869a 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -40,7 +40,7 @@ impl InclusiveProjection { fn get_parts_for_field_id(&mut self, field_id: i32) -> &Vec { if let std::collections::hash_map::Entry::Vacant(e) = self.cached_parts.entry(field_id) { let mut parts: Vec = vec![]; - for partition_spec_field in &self.partition_spec.fields { + for partition_spec_field in self.partition_spec.fields() { if partition_spec_field.source_id == field_id { parts.push(partition_spec_field.clone()) } @@ -333,8 +333,6 @@ mod tests { fn test_inclusive_projection_date_transforms() { let schema = build_test_schema(); - // ToDo: We cannot use the builder here as having multiple transforms on - // a single field is not allowed. Should we keep this test? let partition_spec = PartitionSpec { spec_id: 1, fields: vec![ diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 18489b721..1f5602917 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -433,7 +433,7 @@ impl PartitionFilterCache { let partition_fields = partition_type.fields().to_owned(); let partition_schema = Arc::new( Schema::builder() - .with_schema_id(partition_spec.spec_id) + .with_schema_id(partition_spec.spec_id()) .with_fields(partition_fields) .build()?, ); diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index e08591f9e..4f51dc9cd 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -221,14 +221,14 @@ impl ManifestWriter { )?; avro_writer.add_user_metadata( "partition-spec".to_string(), - to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| { + to_vec(&manifest.metadata.partition_spec.fields()).map_err(|err| { Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec") .with_source(err) })?, )?; avro_writer.add_user_metadata( "partition-spec-id".to_string(), - manifest.metadata.partition_spec.spec_id.to_string(), + manifest.metadata.partition_spec.spec_id().to_string(), )?; avro_writer.add_user_metadata( "format-version".to_string(), @@ -294,12 +294,12 @@ impl ManifestWriter { self.output.write(Bytes::from(content)).await?; let partition_summary = - self.get_field_summary_vec(&manifest.metadata.partition_spec.fields); + self.get_field_summary_vec(manifest.metadata.partition_spec.fields()); Ok(ManifestFile { manifest_path: self.output.location().to_string(), manifest_length: length as i64, - partition_spec_id: manifest.metadata.partition_spec.spec_id, + partition_spec_id: manifest.metadata.partition_spec.spec_id(), content: manifest.metadata.content, // sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with // real sequence number in `ManifestListWriter`. diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index ef23a9e0c..19c558e2c 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -27,6 +27,9 @@ use super::transform::Transform; use super::{NestedField, Schema, StructType}; use crate::{Error, ErrorKind, Result}; +pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; +pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; + /// Reference to [`PartitionSpec`]. pub type PartitionSpecRef = Arc; /// Partition fields capture the transform from table data to partition values. @@ -56,9 +59,9 @@ impl PartitionField { #[serde(rename_all = "kebab-case")] pub struct PartitionSpec { /// Identifier for PartitionSpec - pub spec_id: i32, + pub(crate) spec_id: i32, /// Details of the partition spec - pub fields: Vec, + pub(crate) fields: Vec, } impl PartitionSpec { @@ -67,6 +70,16 @@ impl PartitionSpec { PartitionSpecBuilder::new(schema) } + /// Spec id of the partition spec + pub fn spec_id(&self) -> i32 { + self.spec_id + } + + /// Fields of the partition spec + pub fn fields(&self) -> &[PartitionField] { + &self.fields + } + /// Returns if the partition spec is unpartitioned. /// /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform. @@ -133,9 +146,9 @@ pub struct UnboundPartitionField { #[serde(rename_all = "kebab-case")] pub struct UnboundPartitionSpec { /// Identifier for PartitionSpec - pub spec_id: Option, + pub(crate) spec_id: Option, /// Details of the partition spec - pub fields: Vec, + pub(crate) fields: Vec, } impl UnboundPartitionSpec { @@ -143,6 +156,21 @@ impl UnboundPartitionSpec { pub fn builder() -> UnboundPartitionSpecBuilder { UnboundPartitionSpecBuilder::default() } + + /// Bind this unbound partition spec to a schema. + pub fn bind(self, schema: &Schema) -> Result { + PartitionSpecBuilder::new_from_unbound(self, schema)?.build() + } + + /// Spec id of the partition spec + pub fn spec_id(&self) -> Option { + self.spec_id + } + + /// Fields of the partition spec + pub fn fields(&self) -> &[UnboundPartitionField] { + &self.fields + } } impl From for UnboundPartitionField { @@ -244,25 +272,20 @@ pub struct PartitionSpecBuilder<'a> { } impl<'a> PartitionSpecBuilder<'a> { - pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; - // Default partition spec id is only used for building `PartitionSpec`. - // When building unbound partition specs, the spec id is not set by default. - pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; - /// Create a new partition spec builder with the given schema. pub fn new(schema: &'a Schema) -> Self { Self { spec_id: None, fields: vec![], - last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, + last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID, schema, } } /// Create a new partition spec builder from an existing unbound partition spec. pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) -> Result { - let mut builder = Self::new(schema) - .with_spec_id(unbound.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID)); + let mut builder = + Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID)); for field in unbound.fields { builder = builder.add_unbound_field(field)?; @@ -289,19 +312,19 @@ impl<'a> PartitionSpecBuilder<'a> { /// Add a new partition field to the partition spec. pub fn add_partition_field( self, - source_name: impl ToString, - target_name: impl ToString, + source_name: impl AsRef, + target_name: impl Into, transform: Transform, ) -> Result { let source_id = self .schema - .field_by_name(source_name.to_string().as_str()) + .field_by_name(source_name.as_ref()) .ok_or_else(|| { Error::new( ErrorKind::DataInvalid, format!( "Cannot find source column with name: {} in schema", - source_name.to_string() + source_name.as_ref() ), ) })? @@ -309,7 +332,7 @@ impl<'a> PartitionSpecBuilder<'a> { let field = UnboundPartitionField { source_id, partition_id: None, - name: target_name.to_string(), + name: target_name.into(), transform, }; @@ -329,6 +352,7 @@ impl<'a> PartitionSpecBuilder<'a> { self.check_partition_id_unique(partition_id)?; } + // Non-fallible from here self.fields.push(field); Ok(self) } @@ -349,7 +373,7 @@ impl<'a> PartitionSpecBuilder<'a> { pub fn build(self) -> Result { let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?; Ok(PartitionSpec { - spec_id: self.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID), + spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID), fields, }) } @@ -366,15 +390,13 @@ impl<'a> PartitionSpecBuilder<'a> { .filter_map(|f| f.partition_id) .collect::>(); - fn _check_overflow(last_assigned_field_id: i32) -> Result<()> { - if last_assigned_field_id == i32::MAX { - Err(Error::new( + fn _check_add_1(prev: i32) -> Result { + prev.checked_add(1).ok_or_else(|| { + Error::new( ErrorKind::DataInvalid, "Cannot assign more partition ids. Overflow.", - )) - } else { - Ok(()) - } + ) + }) } let mut bound_fields = Vec::with_capacity(fields.len()); @@ -383,11 +405,9 @@ impl<'a> PartitionSpecBuilder<'a> { last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_id); partition_id } else { - _check_overflow(last_assigned_field_id)?; - last_assigned_field_id += 1; + last_assigned_field_id = _check_add_1(last_assigned_field_id)?; while assigned_ids.contains(&last_assigned_field_id) { - _check_overflow(last_assigned_field_id)?; - last_assigned_field_id += 1; + last_assigned_field_id = _check_add_1(last_assigned_field_id)?; } last_assigned_field_id }; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 86032a178..087434361 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -32,12 +32,12 @@ use uuid::Uuid; use super::snapshot::{Snapshot, SnapshotReference, SnapshotRetention}; use super::{ PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, SortOrder, SortOrderRef, + DEFAULT_PARTITION_SPEC_ID, }; use crate::error::Result; use crate::{Error, ErrorKind, TableCreation}; static MAIN_BRANCH: &str = "main"; -static DEFAULT_SPEC_ID: i32 = 0; static DEFAULT_SORT_ORDER_ID: i64 = 0; pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; @@ -181,8 +181,8 @@ impl TableMetadata { /// Get default partition spec #[inline] pub fn default_partition_spec(&self) -> Option<&PartitionSpecRef> { - if self.default_spec_id == DEFAULT_SPEC_ID { - self.partition_spec_by_id(DEFAULT_SPEC_ID) + if self.default_spec_id == DEFAULT_PARTITION_SPEC_ID { + self.partition_spec_by_id(DEFAULT_PARTITION_SPEC_ID) } else { Some( self.partition_spec_by_id(self.default_spec_id) @@ -302,9 +302,9 @@ impl TableMetadataBuilder { )) } None => HashMap::from([( - DEFAULT_SPEC_ID, + DEFAULT_PARTITION_SPEC_ID, Arc::new(PartitionSpec { - spec_id: DEFAULT_SPEC_ID, + spec_id: DEFAULT_PARTITION_SPEC_ID, fields: vec![], }), )]), @@ -341,7 +341,7 @@ impl TableMetadataBuilder { current_schema_id: schema.schema_id(), schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]), partition_specs, - default_spec_id: DEFAULT_SPEC_ID, + default_spec_id: DEFAULT_PARTITION_SPEC_ID, last_partition_id: 0, properties, current_snapshot_id: None, @@ -385,8 +385,8 @@ pub(super) mod _serde { use uuid::Uuid; use super::{ - FormatVersion, MetadataLog, SnapshotLog, TableMetadata, DEFAULT_SORT_ORDER_ID, - DEFAULT_SPEC_ID, MAIN_BRANCH, + FormatVersion, MetadataLog, SnapshotLog, TableMetadata, DEFAULT_PARTITION_SPEC_ID, + DEFAULT_SORT_ORDER_ID, MAIN_BRANCH, }; use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; @@ -562,7 +562,7 @@ pub(super) mod _serde { value .partition_specs .into_iter() - .map(|x| (x.spec_id, Arc::new(x))), + .map(|x| (x.spec_id(), Arc::new(x))), ), default_spec_id: value.default_spec_id, last_partition_id: value.last_partition_id, @@ -637,12 +637,12 @@ pub(super) mod _serde { .partition_specs .unwrap_or_else(|| { vec![PartitionSpec { - spec_id: DEFAULT_SPEC_ID, + spec_id: DEFAULT_PARTITION_SPEC_ID, fields: value.partition_spec, }] }) .into_iter() - .map(|x| (x.spec_id, Arc::new(x))), + .map(|x| (x.spec_id(), Arc::new(x))), ); Ok(TableMetadata { format_version: FormatVersion::V1, @@ -802,7 +802,7 @@ pub(super) mod _serde { partition_spec: v .partition_specs .get(&v.default_spec_id) - .map(|x| x.fields.clone()) + .map(|x| x.fields().to_vec()) .unwrap_or_default(), partition_specs: Some( v.partition_specs @@ -1168,11 +1168,7 @@ mod tests { let partition_spec = PartitionSpec::builder(&schema) .with_spec_id(0) - .add_partition_field( - "vendor_id".to_string(), - "vendor_id".to_string(), - Transform::Identity, - ) + .add_partition_field("vendor_id", "vendor_id", Transform::Identity) .unwrap() .build() .unwrap(); @@ -1676,10 +1672,12 @@ mod tests { table_metadata.partition_specs, HashMap::from([( 0, - Arc::new(PartitionSpec { - spec_id: 0, - fields: vec![] - }) + Arc::new( + PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap()) + .with_spec_id(0) + .build() + .unwrap() + ) )]) ); assert_eq!( From 0c4615cf4c186b3a972f50a68661c61be5d3560d Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Mon, 12 Aug 2024 18:22:14 +0200 Subject: [PATCH 14/14] Remove partition_names() HashSet creation --- crates/iceberg/src/spec/partition.rs | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 19c558e2c..055b48e68 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -514,7 +514,7 @@ trait CorePartitionSpecValidator { )); } - if self.partition_names().contains(name) { + if self.fields().iter().any(|f| f.name == name) { return Err(Error::new( ErrorKind::DataInvalid, format!("Cannot use partition name more than once: {}", name), @@ -525,7 +525,7 @@ trait CorePartitionSpecValidator { /// For a single source-column transformations must be unique. fn check_for_redundant_partitions(&self, source_id: i32, transform: &Transform) -> Result<()> { - let collision = self.fields().into_iter().find(|f| { + let collision = self.fields().iter().find(|f| { f.source_id == source_id && f.transform.dedup_name() == transform.dedup_name() }); @@ -561,28 +561,18 @@ trait CorePartitionSpecValidator { Ok(()) } - fn partition_names(&self) -> std::collections::HashSet<&str>; - - fn fields(&self) -> Vec<&UnboundPartitionField>; + fn fields(&self) -> &Vec; } impl CorePartitionSpecValidator for PartitionSpecBuilder<'_> { - fn partition_names(&self) -> std::collections::HashSet<&str> { - self.fields.iter().map(|f| f.name.as_str()).collect() - } - - fn fields(&self) -> Vec<&UnboundPartitionField> { - self.fields.iter().collect() + fn fields(&self) -> &Vec { + &self.fields } } impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder { - fn partition_names(&self) -> std::collections::HashSet<&str> { - self.fields.iter().map(|f| f.name.as_str()).collect() - } - - fn fields(&self) -> Vec<&UnboundPartitionField> { - self.fields.iter().collect() + fn fields(&self) -> &Vec { + &self.fields } }