Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Iceberg table identifiers with special characters #33293

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ dependencies {
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation library.java.hadoop_common
implementation library.java.jackson_core
implementation library.java.jackson_databind

testImplementation project(":sdks:java:managed")
testImplementation library.java.hadoop_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
Expand Down Expand Up @@ -134,7 +133,7 @@ public void processElement(
return;
}

Table table = getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
Table table = getCatalog().loadTable(IcebergUtils.parseTableIdentifier(element.getKey()));

// vast majority of the time, we will simply append data files.
// in the rare case we get a batch that contains multiple partition specs, we will group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

@AutoValue
Expand All @@ -41,7 +42,7 @@ abstract class FileWriteResult {
@SchemaIgnore
public TableIdentifier getTableIdentifier() {
if (cachedTableIdentifier == null) {
cachedTableIdentifier = TableIdentifier.parse(getTableIdentifierString());
cachedTableIdentifier = IcebergUtils.parseTableIdentifier(getTableIdentifierString());
}
return cachedTableIdentifier;
}
Expand All @@ -67,7 +68,7 @@ abstract static class Builder {

@SchemaIgnore
public Builder setTableIdentifier(TableIdentifier tableId) {
return setTableIdentifierString(tableId.toString());
return setTableIdentifierString(TableIdentifierParser.toJson(tableId));
}

public abstract FileWriteResult build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.catalog.TableIdentifier;

/**
* SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and
Expand Down Expand Up @@ -86,7 +85,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.getPipeline()
.apply(
IcebergIO.readRows(configuration.getIcebergCatalog())
.from(TableIdentifier.parse(configuration.getTable())));
.from(IcebergUtils.parseTableIdentifier(configuration.getTable())));

return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.expressions.Expression;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -51,7 +52,9 @@ public enum ScanType {
public Table getTable() {
if (cachedTable == null) {
cachedTable =
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
getCatalogConfig()
.catalog()
.loadTable(IcebergUtils.parseTableIdentifier(getTableIdentifier()));
}
return cachedTable;
}
Expand Down Expand Up @@ -126,7 +129,7 @@ public abstract static class Builder {
public abstract Builder setTableIdentifier(String tableIdentifier);

public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
return this.setTableIdentifier(tableIdentifier.toString());
return this.setTableIdentifier(TableIdentifierParser.toJson(tableIdentifier));
}

public Builder setTableIdentifier(String... names) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -36,6 +39,8 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.TableIdentifierParser;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
Expand All @@ -47,6 +52,9 @@

/** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */
public class IcebergUtils {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private IcebergUtils() {}

private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
Expand Down Expand Up @@ -506,4 +514,13 @@ private static Object getLogicalTypeValue(Object icebergValue, Schema.FieldType
// LocalDateTime, LocalDate, LocalTime
return icebergValue;
}

public static TableIdentifier parseTableIdentifier(String table) {
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(table);
return TableIdentifierParser.fromJson(jsonNode);
} catch (JsonProcessingException e) {
return TableIdentifier.parse(table);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class OneTableDynamicDestinations implements DynamicDestinations, Externalizable
@VisibleForTesting
TableIdentifier getTableIdentifier() {
if (tableId == null) {
tableId = TableIdentifier.parse(checkStateNotNull(tableIdString));
tableId = IcebergUtils.parseTableIdentifier(checkStateNotNull(tableIdString));
}
return tableId;
}
Expand Down Expand Up @@ -86,6 +86,6 @@ public void writeExternal(ObjectOutput out) throws IOException {
@Override
public void readExternal(ObjectInput in) throws IOException {
tableIdString = in.readUTF();
tableId = TableIdentifier.parse(tableIdString);
tableId = IcebergUtils.parseTableIdentifier(tableIdString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

class PortableIcebergDestinations implements DynamicDestinations {
Expand Down Expand Up @@ -73,7 +72,7 @@ public String getTableStringIdentifier(ValueInSingleWindow<Row> element) {
@Override
public IcebergDestination instantiateDestination(String dest) {
return IcebergDestination.builder()
.setTableIdentifier(TableIdentifier.parse(dest))
.setTableIdentifier(IcebergUtils.parseTableIdentifier(dest))
.setTableCreateConfig(null)
.setFileFormat(FileFormat.fromString(fileFormat))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -42,11 +44,11 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
@RunWith(Parameterized.class)
public class IcebergIOReadTest {

private static final Logger LOG = LoggerFactory.getLogger(IcebergIOReadTest.class);
Expand All @@ -57,6 +59,21 @@ public class IcebergIOReadTest {

@Rule public TestPipeline testPipeline = TestPipeline.create();

@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{String.format("{\"namespace\": [\"default\"], \"name\": \"%s\"}", tableId())},
{String.format("default.%s", tableId())},
});
}

public static String tableId() {
return "table" + Long.toString(UUID.randomUUID().hashCode(), 16);
}

@Parameterized.Parameter public String tableStringIdentifier;

static class PrintRow extends DoFn<Row, Row> {

@ProcessElement
Expand All @@ -68,8 +85,7 @@ public void process(@Element Row row, OutputReceiver<Row> output) throws Excepti

@Test
public void testSimpleScan() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
TableIdentifier tableId = IcebergUtils.parseTableIdentifier(tableStringIdentifier);
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId;
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType;
import static org.apache.beam.sdk.io.iceberg.IcebergUtils.parseTableIdentifier;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.math.BigDecimal;
Expand All @@ -32,6 +34,7 @@
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -49,6 +52,7 @@
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;

/** Test class for {@link IcebergUtils}. */
@RunWith(Enclosed.class)
Expand Down Expand Up @@ -802,4 +806,40 @@ public void testStructIcebergSchemaToBeamSchema() {
assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema);
}
}

@RunWith(Parameterized.class)
public static class TableIdentifierParseTests {

@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{
"{\"namespace\": [\"dogs\", \"owners.and.handlers\"], \"name\": \"food\"}",
"dogs.owners.and.handlers.food",
true
},
{"dogs.owners.and.handlers.food", "dogs.owners.and.handlers.food", true},
{"{\"name\": \"food\"}", "food", true},
{"{\"table_name\": \"food\"}", "{\"table_name\": \"food\"}", false},
});
}

@Parameterized.Parameter public String input;

@Parameterized.Parameter(1)
public String expected;

@Parameterized.Parameter(2)
public boolean shouldSucceed;

@Test
public void test() {
if (shouldSucceed) {
assertEquals(expected, parseTableIdentifier(input).toString());
} else {
assertThrows(IllegalArgumentException.class, () -> parseTableIdentifier(input));
}
}
}
}
Loading