From ee074a8e34d0bf2718378602df3de233292567d3 Mon Sep 17 00:00:00 2001 From: regadas Date: Thu, 5 Dec 2024 07:57:38 -0500 Subject: [PATCH 1/3] Add support for Iceberg table identifiers with special characters --- .../sdk/io/iceberg/AppendFilesToTables.java | 3 +- .../beam/sdk/io/iceberg/FileWriteResult.java | 5 ++- .../IcebergReadSchemaTransformProvider.java | 3 +- .../sdk/io/iceberg/IcebergScanConfig.java | 7 +++- .../beam/sdk/io/iceberg/IcebergUtils.java | 17 ++++++++ .../iceberg/OneTableDynamicDestinations.java | 4 +- .../iceberg/PortableIcebergDestinations.java | 3 +- .../sdk/io/iceberg/IcebergIOReadTest.java | 26 ++++++++---- .../beam/sdk/io/iceberg/IcebergUtilsTest.java | 40 +++++++++++++++++++ 9 files changed, 89 insertions(+), 19 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java index d9768114e7c6..deec779c6cc9 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java @@ -47,7 +47,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -134,7 +133,7 @@ public void processElement( return; } - Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey())); + Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey())); // vast majority of the time, we will simply append data files. // in the rare case we get a batch that contains multiple partition specs, we will group diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java index bf00bf8519fc..d58ac8696d37 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java @@ -25,6 +25,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @AutoValue @@ -41,7 +42,7 @@ abstract class FileWriteResult { @SchemaIgnore public TableIdentifier getTableIdentifier() { if (cachedTableIdentifier == null) { - cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString()); + cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString()); } return cachedTableIdentifier; } @@ -67,7 +68,7 @@ abstract static class Builder { @SchemaIgnore public Builder setTableIdentifier(TableIdentifier tableId) { - return setTableIdentifierString(tableId.toString()); + return setTableIdentifierString(TableIdentifierParser.toJson(tableId)); } public abstract FileWriteResult build(); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index d44149fda08e..951442e2c95f 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.iceberg.catalog.TableIdentifier; /** * SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and @@ -86,7 +85,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { .getPipeline() .apply( IcebergIO.readRows(configuration.getIcebergCatalog()) - .from(TableIdentifier.parse(configuration.getTable()))); + .from(IcebergUtils.parseTableIdentifier(configuration.getTable()))); return PCollectionRowTuple.of(OUTPUT_TAG, output); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 60372b172af7..640283d83c2e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -23,6 +23,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.expressions.Expression; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -51,7 +52,9 @@ public enum ScanType { public Table getTable() { if (cachedTable == null) { cachedTable = - getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier())); + getCatalogConfig() + .catalog() + .loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier())); } return cachedTable; } @@ -126,7 +129,7 @@ public abstract static class Builder { public abstract Builder setTableIdentifier(String tableIdentifier); public Builder setTableIdentifier(TableIdentifier tableIdentifier) { - return this.setTableIdentifier(tableIdentifier.toString()); + return this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier)); } public Builder setTableIdentifier(String... names) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index ef19a5881366..bd2f743172dc 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -19,6 +19,9 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.nio.ByteBuffer; import java.time.LocalDate; import java.time.LocalDateTime; @@ -36,6 +39,8 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.TableIdentifierParser; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Type; @@ -47,6 +52,9 @@ /** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ public class IcebergUtils { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private IcebergUtils() {} private static final Map BEAM_TYPES_TO_ICEBERG_TYPES = @@ -506,4 +514,13 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType // LocalDateTime, LocalDate, LocalTime return icebergValue; } + + public static TableIdentifier parseTableIdentifier(String table) { + try { + JsonNode jsonNode = OBJECT_MAPPER.readTree(table); + return TableIdentifierParser.fromJson(jsonNode); + } catch (JsonProcessingException e) { + return TableIdentifier.parse(table); + } + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java index 861a8ad198a8..be810aa20a13 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/OneTableDynamicDestinations.java @@ -41,7 +41,7 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable @VisibleForTesting TableIdentifier getTableIdentifier() { if (tableId == null) { - tableId = TableIdentifier.parse(checkStateNotNull(tableIdString)); + tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString)); } return tableId; } @@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException { @Override public void readExternal(ObjectInput in) throws IOException { tableIdString = in.readUTF(); - tableId = TableIdentifier.parse(tableIdString); + tableId = IcebergUtils.parseTableIdentifier(tableIdString); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java index 47f661bba3f8..58f70463bc76 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PortableIcebergDestinations.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.catalog.TableIdentifier; import org.checkerframework.checker.nullness.qual.Nullable; class PortableIcebergDestinations implements DynamicDestinations { @@ -73,7 +72,7 @@ public String getTableStringIdentifier(ValueInSingleWindow element) { @Override public IcebergDestination instantiateDestination(String dest) { return IcebergDestination.builder() - .setTableIdentifier(TableIdentifier.parse(dest)) + .setTableIdentifier(IcebergUtils.parseTableIdentifier(dest)) .setTableCreateConfig(null) .setFileFormat(FileFormat.fromString(fileFormat)) .build(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index fe4a07dedfdf..cac0c1436dde 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -20,9 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.beam.sdk.coders.RowCoder; @@ -42,11 +40,11 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class IcebergIOReadTest { private static final Logger LOG = LoggerFactory.getLogger(IcebergIOReadTest.class); @@ -57,6 +55,21 @@ public class IcebergIOReadTest { @Rule public TestPipeline testPipeline = TestPipeline.create(); + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList( + new Object[][] { + {String.format("{\"namespace\": [\"default\"], \"name\": \"%s\"}", tableId())}, + {String.format("default.%s", tableId())}, + }); + } + + public static String tableId() { + return "table" + Long.toString(UUID.randomUUID().hashCode(), 16); + } + + @Parameterized.Parameter public String tableStringIdentifier; + static class PrintRow extends DoFn { @ProcessElement @@ -68,8 +81,7 @@ public void process(@Element Row row, OutputReceiver output) throws Excepti @Test public void testSimpleScan() throws Exception { - TableIdentifier tableId = - TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + TableIdentifier tableId = IcebergUtils.parseTableIdentifier(tableStringIdentifier); Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA); final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index 134f05c34bfb..918c6b1146ee 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -19,11 +19,13 @@ import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId; import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType; +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.parseTableIdentifier; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.math.BigDecimal; @@ -32,6 +34,7 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.beam.sdk.schemas.Schema; @@ -49,6 +52,7 @@ import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; /** Test class for {@link IcebergUtils}. */ @RunWith(Enclosed.class) @@ -802,4 +806,40 @@ public void testStructIcebergSchemaToBeamSchema() { assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema); } } + + @RunWith(Parameterized.class) + public static class TableIdentifierParseTests { + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList( + new Object[][] { + { + "{\"namespace\": [\"dogs\", \"owners.and.handlers\"], \"name\": \"food\"}", + "dogs.owners.and.handlers.food", + true + }, + {"dogs.owners.and.handlers.food", "dogs.owners.and.handlers.food", true}, + {"{\"name\": \"food\"}", "food", true}, + {"{\"table_name\": \"food\"}", "{\"table_name\": \"food\"}", false}, + }); + } + + @Parameterized.Parameter public String input; + + @Parameterized.Parameter(1) + public String expected; + + @Parameterized.Parameter(2) + public boolean shouldSucceed; + + @Test + public void test() { + if (shouldSucceed) { + assertEquals(expected, parseTableIdentifier(input).toString()); + } else { + assertThrows(IllegalArgumentException.class, () -> parseTableIdentifier(input)); + } + } + } } From ebcec2657d080b855ecda306b6fc1006c81ba5dd Mon Sep 17 00:00:00 2001 From: regadas Date: Thu, 5 Dec 2024 08:34:45 -0500 Subject: [PATCH 2/3] Add used undeclared dependencies --- sdks/java/io/iceberg/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 6754b0aecf50..bea42ad8508a 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -54,6 +54,8 @@ dependencies { implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common + implementation library.java.jackson_core + implementation library.java.jackson_databind testImplementation project(":sdks:java:managed") testImplementation library.java.hadoop_client From 33add1081e9da2bfa2e13cbdae728a3e8fb7a29b Mon Sep 17 00:00:00 2001 From: regadas Date: Thu, 5 Dec 2024 08:46:38 -0500 Subject: [PATCH 3/3] Fix style --- .../org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index cac0c1436dde..68ef2aac6832 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -20,7 +20,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; -import java.util.*; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.beam.sdk.coders.RowCoder;