Skip to content

Commit

Permalink
TableMetadataBuilder (#587)
Browse files Browse the repository at this point in the history
* Squash builder

* Address comments

* Address comments

* Match on FormatVersion to fail for V3

* Fix examples

* Fix tests

* Address comments

* Address comments

* Update crates/iceberg/src/spec/table_metadata_builder.rs

Co-authored-by: Renjie Liu <[email protected]>

* Remove ReferenceType

* Fix import

* Remove current_schema and last_updated_ms accessors

* Ensure main branch is not removed

* Address comments

* Fix tests

* Do not ensure ensure_main_branch_not_removed

* set_branch_snapshot create branch if not exists

---------

Co-authored-by: Renjie Liu <[email protected]>
  • Loading branch information
c-thiel and liurenjie1024 authored Nov 26, 2024
1 parent 16f9411 commit 1138364
Show file tree
Hide file tree
Showing 13 changed files with 2,305 additions and 117 deletions.
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ impl Catalog for GlueCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ impl Catalog for HmsCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;
let metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
&location,
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,9 @@ impl Catalog for SqlCatalog {
}
};

let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?;
let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
.build()?
.metadata;
let tbl_metadata_location = format!(
"{}/metadata/0-{}.metadata.json",
location.clone(),
Expand Down
88 changes: 75 additions & 13 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,46 @@ impl TableUpdate {
/// Applies the update to the table metadata builder.
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
match self {
TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
_ => unimplemented!(),
TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
TableUpdate::AddSchema {
schema,
last_column_id,
} => {
if let Some(last_column_id) = last_column_id {
if builder.last_column_id() > last_column_id {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid last column ID: {last_column_id} < {} (previous last column ID)",
builder.last_column_id()
),
));
}
};
Ok(builder.add_schema(schema))
}
TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
builder.set_default_sort_order(sort_order_id)
}
TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => builder.set_ref(&ref_name, reference),
TableUpdate::RemoveSnapshots { snapshot_ids } => {
Ok(builder.remove_snapshots(&snapshot_ids))
}
TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
TableUpdate::SetProperties { updates } => builder.set_properties(updates),
TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
TableUpdate::UpgradeFormatVersion { format_version } => {
builder.upgrade_format_version(format_version)
}
}
}
}
Expand Down Expand Up @@ -749,7 +787,7 @@ mod tests {
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
SqlViewRepresentation, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
ViewVersion,
ViewVersion, MAIN_BRANCH,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};

Expand Down Expand Up @@ -803,9 +841,9 @@ mod tests {
TableMetadataBuilder::from_table_creation(tbl_creation)
.unwrap()
.assign_uuid(uuid::Uuid::nil())
.unwrap()
.build()
.unwrap()
.metadata
}

#[test]
Expand Down Expand Up @@ -855,7 +893,7 @@ mod tests {
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
"timestamp-ms": 1515100955770,
"timestamp-ms": 9992191116217,
"summary": {
"operation": "append"
},
Expand All @@ -865,8 +903,28 @@ mod tests {
"#;

let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let mut metadata = metadata;
metadata.append_snapshot(snapshot);
let builder = metadata.into_builder(None);
let builder = TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
}
.apply(builder)
.unwrap();
let metadata = TableUpdate::SetSnapshotRef {
ref_name: MAIN_BRANCH.to_string(),
reference: SnapshotReference {
snapshot_id: snapshot.snapshot_id(),
retention: SnapshotRetention::Branch {
min_snapshots_to_keep: Some(10),
max_snapshot_age_ms: None,
max_ref_age_ms: None,
},
},
}
.apply(builder)
.unwrap()
.build()
.unwrap()
.metadata;

// Ref exists and should matches
let requirement = TableRequirement::RefSnapshotIdMatch {
Expand Down Expand Up @@ -916,14 +974,13 @@ mod tests {
#[test]
fn test_check_last_assigned_partition_id() {
let metadata = metadata();

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 1,
last_assigned_partition_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 0,
last_assigned_partition_id: 999,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
Expand Down Expand Up @@ -1582,16 +1639,21 @@ mod tests {
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
.unwrap()
.metadata;
let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
table_metadata,
Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
);

let uuid = uuid::Uuid::new_v4();
let update = TableUpdate::AssignUuid { uuid };
let updated_metadata = update
.apply(table_metadata_builder)
.unwrap()
.build()
.unwrap();
.unwrap()
.metadata;
assert_eq!(updated_metadata.uuid(), uuid);
}

Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod schema;
mod snapshot;
mod sort;
mod table_metadata;
mod table_metadata_builder;
mod transform;
mod values;
mod view_metadata;
Expand Down
6 changes: 5 additions & 1 deletion crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ impl BoundPartitionSpec {
}

/// Get the highest field id in the partition spec.
/// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999).
pub fn highest_field_id(&self) -> Option<i32> {
self.fields.iter().map(|f| f.field_id).max()
}
Expand Down Expand Up @@ -182,6 +181,11 @@ impl BoundPartitionSpec {

true
}

/// Change the spec id of the partition spec
pub fn with_spec_id(self, spec_id: i32) -> Self {
Self { spec_id, ..self }
}
}

impl SchemalessPartitionSpec {
Expand Down
19 changes: 18 additions & 1 deletion crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl SchemaBuilder {
/// Reassignment starts from the field-id specified in `start_from` (inclusive).
///
/// All specified aliases and identifier fields will be updated to the new field-ids.
#[allow(dead_code)] // Will be needed in TableMetadataBuilder
pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self {
self.reassign_field_ids_from = Some(start_from.try_into().unwrap_or(i32::MAX));
self
Expand Down Expand Up @@ -376,6 +375,24 @@ impl Schema {
pub fn accessor_by_field_id(&self, field_id: i32) -> Option<Arc<StructAccessor>> {
self.field_id_to_accessor.get(&field_id).cloned()
}

/// Check if this schema is identical to another schema semantically - excluding schema id.
pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool {
self.as_struct().eq(other.as_struct())
&& self.identifier_field_ids().eq(other.identifier_field_ids())
}

/// Change the schema id of this schema.
// This is redundant with the `with_schema_id` method on the builder, but useful
// as it is infallible in contrast to the builder `build()` method.
pub(crate) fn with_schema_id(self, schema_id: SchemaId) -> Self {
Self { schema_id, ..self }
}

/// Return A HashMap matching field ids to field names.
pub(crate) fn field_id_to_name_map(&self) -> &HashMap<i32, String> {
&self.id_to_name
}
}

impl Display for Schema {
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,13 @@ pub struct SnapshotReference {
pub retention: SnapshotRetention,
}

impl SnapshotReference {
/// Returns true if the snapshot reference is a branch.
pub fn is_branch(&self) -> bool {
matches!(self.retention, SnapshotRetention::Branch { .. })
}
}

impl SnapshotReference {
/// Create new snapshot reference
pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
Expand Down
Loading

0 comments on commit 1138364

Please sign in to comment.