Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableMetadataBuilder #587

Merged
merged 19 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ impl Catalog for GlueCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ impl Catalog for HmsCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;
let metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
&location,
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,9 @@ impl Catalog for SqlCatalog {
}
};

let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?;
let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
.build()?
.metadata;
let tbl_metadata_location = format!(
"{}/metadata/0-{}.metadata.json",
location.clone(),
Expand Down
88 changes: 75 additions & 13 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,46 @@ impl TableUpdate {
/// Applies the update to the table metadata builder.
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
match self {
TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
_ => unimplemented!(),
TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
TableUpdate::AddSchema {
schema,
last_column_id,
} => {
if let Some(last_column_id) = last_column_id {
if builder.last_column_id() > last_column_id {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid last column ID: {last_column_id} < {} (previous last column ID)",
builder.last_column_id()
),
));
}
};
Ok(builder.add_schema(schema))
}
TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
builder.set_default_sort_order(sort_order_id)
}
TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => builder.set_ref(&ref_name, reference),
TableUpdate::RemoveSnapshots { snapshot_ids } => {
Ok(builder.remove_snapshots(&snapshot_ids))
}
TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
TableUpdate::SetProperties { updates } => builder.set_properties(updates),
TableUpdate::RemoveProperties { removals } => builder.remove_properties(&removals),
TableUpdate::UpgradeFormatVersion { format_version } => {
builder.upgrade_format_version(format_version)
}
}
}
}
Expand Down Expand Up @@ -745,7 +783,7 @@ mod tests {
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder,
SqlViewRepresentation, Summary, TableMetadata, TableMetadataBuilder, Transform, Type,
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
ViewVersion,
ViewVersion, MAIN_BRANCH,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};

Expand Down Expand Up @@ -799,9 +837,9 @@ mod tests {
TableMetadataBuilder::from_table_creation(tbl_creation)
.unwrap()
.assign_uuid(uuid::Uuid::nil())
.unwrap()
.build()
.unwrap()
.metadata
}

#[test]
Expand Down Expand Up @@ -851,7 +889,7 @@ mod tests {
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
"timestamp-ms": 1515100955770,
"timestamp-ms": 9992191116217,
"summary": {
"operation": "append"
},
Expand All @@ -861,8 +899,28 @@ mod tests {
"#;

let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
let mut metadata = metadata;
metadata.append_snapshot(snapshot);
let builder = metadata.into_builder(None);
let builder = TableUpdate::AddSnapshot {
snapshot: snapshot.clone(),
}
.apply(builder)
.unwrap();
let metadata = TableUpdate::SetSnapshotRef {
ref_name: MAIN_BRANCH.to_string(),
reference: SnapshotReference {
snapshot_id: snapshot.snapshot_id(),
retention: SnapshotRetention::Branch {
min_snapshots_to_keep: Some(10),
max_snapshot_age_ms: None,
max_ref_age_ms: None,
},
},
}
.apply(builder)
.unwrap()
.build()
.unwrap()
.metadata;

// Ref exists and should matches
let requirement = TableRequirement::RefSnapshotIdMatch {
Expand Down Expand Up @@ -912,14 +970,13 @@ mod tests {
#[test]
fn test_check_last_assigned_partition_id() {
let metadata = metadata();

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 1,
last_assigned_partition_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_err());

let requirement = TableRequirement::LastAssignedPartitionIdMatch {
last_assigned_partition_id: 0,
last_assigned_partition_id: 999,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
Expand Down Expand Up @@ -1578,16 +1635,21 @@ mod tests {
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
.unwrap()
.metadata;
let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
table_metadata,
Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
);

let uuid = uuid::Uuid::new_v4();
let update = TableUpdate::AssignUuid { uuid };
let updated_metadata = update
.apply(table_metadata_builder)
.unwrap()
.build()
.unwrap();
.unwrap()
.metadata;
assert_eq!(updated_metadata.uuid(), uuid);
}

Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod schema;
mod snapshot;
mod sort;
mod table_metadata;
mod table_metadata_builder;
mod transform;
mod values;
mod view_metadata;
Expand Down
6 changes: 5 additions & 1 deletion crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ impl BoundPartitionSpec {
}

/// Get the highest field id in the partition spec.
/// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999).
pub fn highest_field_id(&self) -> Option<i32> {
self.fields.iter().map(|f| f.field_id).max()
}
Expand Down Expand Up @@ -182,6 +181,11 @@ impl BoundPartitionSpec {

true
}

/// Change the spec id of the partition spec
pub fn with_spec_id(self, spec_id: i32) -> Self {
Self { spec_id, ..self }
}
}

impl SchemalessPartitionSpec {
Expand Down
19 changes: 18 additions & 1 deletion crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ impl SchemaBuilder {
/// Reassignment starts from the field-id specified in `start_from` (inclusive).
///
/// All specified aliases and identifier fields will be updated to the new field-ids.
#[allow(dead_code)] // Will be needed in TableMetadataBuilder
pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self {
self.reassign_field_ids_from = Some(start_from.try_into().unwrap_or(i32::MAX));
self
Expand Down Expand Up @@ -376,6 +375,24 @@ impl Schema {
pub fn accessor_by_field_id(&self, field_id: i32) -> Option<Arc<StructAccessor>> {
self.field_id_to_accessor.get(&field_id).cloned()
}

/// Check if this schema is identical to another schema semantically - excluding schema id.
pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement PartialEq, Eq trait?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this as well, but opted for a method with documentation.
This method excludes the schema_id which I can describe here in the docstring. Using Eq I would expect the schema_id to be equal too - especially because its used in tests.

self.as_struct().eq(other.as_struct())
&& self.identifier_field_ids().eq(other.identifier_field_ids())
}

/// Change the schema id of this schema.
// This is redundant with the `with_schema_id` method on the builder, but useful
// as it is infallible in contrast to the builder `build()` method.
pub(crate) fn with_schema_id(self, schema_id: SchemaId) -> Self {
Self { schema_id, ..self }
}

/// Return A HashMap matching field ids to field names.
pub(crate) fn field_id_to_name_map(&self) -> &HashMap<i32, String> {
&self.id_to_name
}
}

impl Display for Schema {
Expand Down
7 changes: 7 additions & 0 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,13 @@ pub struct SnapshotReference {
pub retention: SnapshotRetention,
}

impl SnapshotReference {
/// Returns true if the snapshot reference is a branch.
pub fn is_branch(&self) -> bool {
matches!(self.retention, SnapshotRetention::Branch { .. })
}
}

impl SnapshotReference {
/// Create new snapshot reference
pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
Expand Down
Loading
Loading