Skip to content

Commit

Permalink
Core,Open-API: Don't expose the last-column-id (#11514)
Browse files Browse the repository at this point in the history
* Core,Open-API: Don't expose the `last-column-id`

Okay, I've added this to the spec a while ago:

#7445

But I think this was a mistake, and we should not expose this
to the public APIs, as it is much better to track this internally.

I noticed this while reviewing apache/iceberg-rust#587

Removing this as part of the APIs in Java, and the Open-API
update makes it much more resilient, and don't require the
clients to compute this value. For example. when there are two conflicting
schema changes, the last-column-id must be recomputed correctly when doing
the retry operation.

* Update the tests as well

* Add `deprecation` flag

* Wording

Co-authored-by: Eduard Tudenhoefner <[email protected]>

* Wording

Co-authored-by: Eduard Tudenhoefner <[email protected]>

* Wording

* Thanks Ryan!

* Remove `LOG`

---------

Co-authored-by: Eduard Tudenhoefner <[email protected]>
  • Loading branch information
Fokko and nastra authored Nov 25, 2024
1 parent 1f23dcd commit 4b52dbd
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testSetTableInputInformationWithRemovedColumns() {

Schema newSchema =
new Schema(Types.NestedField.required(1, "x", Types.StringType.get(), "comment1"));
tableMetadata = tableMetadata.updateSchema(newSchema, 3);
tableMetadata = tableMetadata.updateSchema(newSchema);
IcebergToGlueConverter.setTableInputInformation(actualTableInputBuilder, tableMetadata);
TableInput actualTableInput = actualTableInputBuilder.build();

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ class AddSchema implements MetadataUpdate {
private final Schema schema;
private final int lastColumnId;

public AddSchema(Schema schema) {
this(schema, schema.highestFieldId());
}

/**
* Set the schema
*
* @deprecated since 1.8.0, will be removed 1.9.0 or 2.0.0, use AddSchema(schema).
*/
@Deprecated
public AddSchema(Schema schema, int lastColumnId) {
this.schema = schema;
this.lastColumnId = lastColumnId;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ public Schema apply() {

@Override
public void commit() {
TableMetadata update = applyChangesToMetadata(base.updateSchema(apply(), lastColumnId));
TableMetadata update = applyChangesToMetadata(base.updateSchema(apply()));
ops.commit(base, update);
}

Expand Down
25 changes: 24 additions & 1 deletion core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,23 @@ public TableMetadata withUUID() {
return new Builder(this).assignUUID().build();
}

/**
* Updates the schema
*
* @deprecated since 1.8.0, will be removed in 1.9.0 or 2.0.0, use updateSchema(schema).
*/
@Deprecated
public TableMetadata updateSchema(Schema newSchema, int newLastColumnId) {
return new Builder(this).setCurrentSchema(newSchema, newLastColumnId).build();
}

/** Updates the schema */
public TableMetadata updateSchema(Schema newSchema) {
return new Builder(this)
.setCurrentSchema(newSchema, Math.max(this.lastColumnId, newSchema.highestFieldId()))
.build();
}

// The caller is responsible to pass a newPartitionSpec with correct partition field IDs
public TableMetadata updatePartitionSpec(PartitionSpec newPartitionSpec) {
return new Builder(this).setDefaultPartitionSpec(newPartitionSpec).build();
Expand Down Expand Up @@ -1082,8 +1095,18 @@ public Builder setCurrentSchema(int schemaId) {
return this;
}

public Builder addSchema(Schema schema) {
addSchemaInternal(schema, Math.max(lastColumnId, schema.highestFieldId()));
return this;
}

/**
* Add a new schema.
*
* @deprecated since 1.8.0, will be removed in 1.9.0 or 2.0.0, use AddSchema(schema).
*/
@Deprecated
public Builder addSchema(Schema schema, int newLastColumnId) {
// TODO: remove requirement for newLastColumnId
addSchemaInternal(schema, newLastColumnId);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,7 @@ private static List<MetadataUpdate> createChanges(TableMetadata meta) {
changes.add(new MetadataUpdate.UpgradeFormatVersion(meta.formatVersion()));

Schema schema = meta.schema();
changes.add(new MetadataUpdate.AddSchema(schema, schema.highestFieldId()));
changes.add(new MetadataUpdate.AddSchema(schema));
changes.add(new MetadataUpdate.SetCurrentSchema(-1));

PartitionSpec spec = meta.spec();
Expand Down
7 changes: 1 addition & 6 deletions core/src/main/java/org/apache/iceberg/view/ViewMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,20 +372,15 @@ private int addSchemaInternal(Schema schema) {
newSchema = schema;
}

int highestFieldId = Math.max(highestFieldId(), newSchema.highestFieldId());
schemas.add(newSchema);
schemasById.put(newSchema.schemaId(), newSchema);
changes.add(new MetadataUpdate.AddSchema(newSchema, highestFieldId));
changes.add(new MetadataUpdate.AddSchema(newSchema));

this.lastAddedSchemaId = newSchemaId;

return newSchemaId;
}

private int highestFieldId() {
return schemas.stream().map(Schema::highestFieldId).max(Integer::compareTo).orElse(0);
}

private int reuseOrCreateNewSchemaId(Schema newSchema) {
// if the schema already exists, use its id; otherwise use the highest id + 1
int newSchemaId = INITIAL_SCHEMA_ID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,9 @@ public void testUpgradeFormatVersionFromJson() {
public void testAddSchemaFromJson() {
String action = MetadataUpdateParser.ADD_SCHEMA;
Schema schema = ID_DATA_SCHEMA;
int lastColumnId = schema.highestFieldId();
String json =
String.format(
"{\"action\":\"add-schema\",\"schema\":%s,\"last-column-id\":%d}",
SchemaParser.toJson(schema), lastColumnId);
MetadataUpdate actualUpdate = new MetadataUpdate.AddSchema(schema, lastColumnId);
assertEquals(action, actualUpdate, MetadataUpdateParser.fromJson(json));
}

@Test
public void testAddSchemaFromJsonWithoutLastColumnId() {
String action = MetadataUpdateParser.ADD_SCHEMA;
Schema schema = ID_DATA_SCHEMA;
int lastColumnId = schema.highestFieldId();
String json =
String.format("{\"action\":\"add-schema\",\"schema\":%s}", SchemaParser.toJson(schema));
MetadataUpdate actualUpdate = new MetadataUpdate.AddSchema(schema, lastColumnId);
MetadataUpdate actualUpdate = new MetadataUpdate.AddSchema(schema);
assertEquals(action, actualUpdate, MetadataUpdateParser.fromJson(json));
}

Expand All @@ -140,7 +126,7 @@ public void testAddSchemaToJson() {
String.format(
"{\"action\":\"add-schema\",\"schema\":%s,\"last-column-id\":%d}",
SchemaParser.toJson(schema), lastColumnId);
MetadataUpdate update = new MetadataUpdate.AddSchema(schema, lastColumnId);
MetadataUpdate update = new MetadataUpdate.AddSchema(schema);
String actual = MetadataUpdateParser.toJson(update);
assertThat(actual)
.as("Add schema should convert to the correct JSON value")
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/java/org/apache/iceberg/TestTableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ public void testUpdateSchemaIdentifierFields() {
new Schema(
Lists.newArrayList(Types.NestedField.required(1, "x", Types.StringType.get())),
Sets.newHashSet(1));
TableMetadata newMeta = meta.updateSchema(newSchema, 1);
TableMetadata newMeta = meta.updateSchema(newSchema);
assertThat(newMeta.schemas()).hasSize(2);
assertThat(newMeta.schema().identifierFieldIds()).containsExactly(1);
}
Expand All @@ -1447,7 +1447,7 @@ public void testUpdateSchema() {
new Schema(
Types.NestedField.required(1, "y", Types.LongType.get(), "comment"),
Types.NestedField.required(2, "x", Types.StringType.get()));
TableMetadata twoSchemasTable = freshTable.updateSchema(schema2, 2);
TableMetadata twoSchemasTable = freshTable.updateSchema(schema2);
assertThat(twoSchemasTable.currentSchemaId()).isEqualTo(1);
assertSameSchemaList(
ImmutableList.of(schema, new Schema(1, schema2.columns())), twoSchemasTable.schemas());
Expand All @@ -1459,34 +1459,34 @@ public void testUpdateSchema() {
new Schema(
Types.NestedField.required(1, "y", Types.LongType.get(), "comment"),
Types.NestedField.required(2, "x", Types.StringType.get()));
TableMetadata sameSchemaTable = twoSchemasTable.updateSchema(sameSchema2, 2);
TableMetadata sameSchemaTable = twoSchemasTable.updateSchema(sameSchema2);
assertThat(sameSchemaTable).isSameAs(twoSchemasTable);

// update schema with the same schema and different last column ID as current should create
// a new table
TableMetadata differentColumnIdTable = sameSchemaTable.updateSchema(sameSchema2, 3);
TableMetadata differentColumnIdTable = sameSchemaTable.updateSchema(sameSchema2);
assertThat(differentColumnIdTable.currentSchemaId()).isEqualTo(1);
assertSameSchemaList(
ImmutableList.of(schema, new Schema(1, schema2.columns())),
differentColumnIdTable.schemas());
assertThat(differentColumnIdTable.schema().asStruct()).isEqualTo(schema2.asStruct());
assertThat(differentColumnIdTable.lastColumnId()).isEqualTo(3);
assertThat(differentColumnIdTable.lastColumnId()).isEqualTo(2);

// update schema with old schema does not change schemas
TableMetadata revertSchemaTable = differentColumnIdTable.updateSchema(schema, 3);
TableMetadata revertSchemaTable = differentColumnIdTable.updateSchema(schema);
assertThat(revertSchemaTable.currentSchemaId()).isEqualTo(0);
assertSameSchemaList(
ImmutableList.of(schema, new Schema(1, schema2.columns())), revertSchemaTable.schemas());
assertThat(revertSchemaTable.schema().asStruct()).isEqualTo(schema.asStruct());
assertThat(revertSchemaTable.lastColumnId()).isEqualTo(3);
assertThat(revertSchemaTable.lastColumnId()).isEqualTo(2);

// create new schema will use the largest schema id + 1
Schema schema3 =
new Schema(
Types.NestedField.required(2, "y", Types.LongType.get(), "comment"),
Types.NestedField.required(4, "x", Types.StringType.get()),
Types.NestedField.required(6, "z", Types.IntegerType.get()));
TableMetadata threeSchemaTable = revertSchemaTable.updateSchema(schema3, 6);
TableMetadata threeSchemaTable = revertSchemaTable.updateSchema(schema3);
assertThat(threeSchemaTable.currentSchemaId()).isEqualTo(2);
assertSameSchemaList(
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ public void addSchema() {
UpdateRequirements.forUpdateTable(
metadata,
ImmutableList.of(
new MetadataUpdate.AddSchema(new Schema(), lastColumnId),
new MetadataUpdate.AddSchema(new Schema(), lastColumnId + 1),
new MetadataUpdate.AddSchema(new Schema(), lastColumnId + 2)));
new MetadataUpdate.AddSchema(new Schema()),
new MetadataUpdate.AddSchema(new Schema()),
new MetadataUpdate.AddSchema(new Schema())));
requirements.forEach(req -> req.validate(metadata));

assertThat(requirements)
Expand Down Expand Up @@ -253,9 +253,9 @@ public void addSchemaFailure() {
UpdateRequirements.forUpdateTable(
metadata,
ImmutableList.of(
new MetadataUpdate.AddSchema(new Schema(), 1),
new MetadataUpdate.AddSchema(new Schema(), 2),
new MetadataUpdate.AddSchema(new Schema(), 3)));
new MetadataUpdate.AddSchema(new Schema()),
new MetadataUpdate.AddSchema(new Schema()),
new MetadataUpdate.AddSchema(new Schema())));

assertThatThrownBy(() -> requirements.forEach(req -> req.validate(updated)))
.isInstanceOf(CommitFailedException.class)
Expand All @@ -269,9 +269,9 @@ public void addSchemaForView() {
UpdateRequirements.forReplaceView(
viewMetadata,
ImmutableList.of(
new MetadataUpdate.AddSchema(new Schema(), lastColumnId),
new MetadataUpdate.AddSchema(new Schema(), lastColumnId + 1),
new MetadataUpdate.AddSchema(new Schema(), lastColumnId + 2)));
new MetadataUpdate.AddSchema(new Schema()),
new MetadataUpdate.AddSchema(new Schema()),
new MetadataUpdate.AddSchema(new Schema())));
requirements.forEach(req -> req.validate(viewMetadata));

assertThat(requirements)
Expand Down
2 changes: 1 addition & 1 deletion open-api/rest-catalog-open-api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ class AddSchemaUpdate(BaseUpdate):
last_column_id: Optional[int] = Field(
None,
alias='last-column-id',
description='The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.',
description="This optional field is **DEPRECATED for REMOVAL** since it more safe to handle this internally, and shouldn't be exposed to the clients.\nThe highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.",
)


Expand Down
8 changes: 7 additions & 1 deletion open-api/rest-catalog-open-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2692,7 +2692,13 @@ components:
$ref: '#/components/schemas/Schema'
last-column-id:
type: integer
description: The highest assigned column ID for the table. This is used to ensure columns are always assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.
deprecated: true
description:
This optional field is **DEPRECATED for REMOVAL** since it more safe to handle this internally,
and shouldn't be exposed to the clients.

The highest assigned column ID for the table. This is used to ensure columns are always
assigned an unused ID when evolving schemas. When omitted, it will be computed on the server side.

SetCurrentSchemaUpdate:
allOf:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,7 @@ public void testPartitionMetadataColumnWithManyColumns() {

TableOperations ops = ((HasTableOperations) table).operations();
TableMetadata base = ops.current();
ops.commit(
base,
base.updateSchema(manyColumnsSchema, manyColumnsSchema.highestFieldId())
.updatePartitionSpec(spec));
ops.commit(base, base.updateSchema(manyColumnsSchema).updatePartitionSpec(spec));

Dataset<Row> df =
spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,7 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema
Schema expectedSchema = reassignIds(readSchema, idMapping);

// Set the schema to the expected schema directly to simulate the table schema evolving
TestTables.replaceMetadata(
desc, TestTables.readMetadata(desc).updateSchema(expectedSchema, 100));
TestTables.replaceMetadata(desc, TestTables.readMetadata(desc).updateSchema(expectedSchema));

Dataset<Row> df =
spark
Expand Down

0 comments on commit 4b52dbd

Please sign in to comment.