diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index a03c067d2c4e..1efc8e9e4405 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,3 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 } diff --git a/CHANGES.md b/CHANGES.md index 0a620038f11e..85f1be48cfb7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,12 +67,14 @@ ## New Features / Improvements * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). +* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726)) * Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)). * Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)]) ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog properties in a key-store fashion ([#31726](https://github.com/apache/beam/pull/31726)) ## Deprecations diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index fefef4aa4917..2956d75a266e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -19,214 +19,35 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; -import javax.annotation.Nullable; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import java.util.Properties; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.checkerframework.dataflow.qual.Pure; @AutoValue public abstract class IcebergCatalogConfig implements Serializable { - - @Pure - public abstract String getName(); - - /* Core Properties */ - @Pure - public abstract @Nullable String getIcebergCatalogType(); - - @Pure - public abstract @Nullable String getCatalogImplementation(); - - @Pure - public abstract @Nullable String getFileIOImplementation(); - - @Pure - public abstract @Nullable String getWarehouseLocation(); - - @Pure - public abstract @Nullable String getMetricsReporterImplementation(); - - /* Caching */ - @Pure - public abstract boolean getCacheEnabled(); - - @Pure - public abstract boolean getCacheCaseSensitive(); - - @Pure - public abstract long getCacheExpirationIntervalMillis(); - - @Pure - public abstract boolean getIOManifestCacheEnabled(); - - @Pure - public abstract long getIOManifestCacheExpirationIntervalMillis(); - - @Pure - public abstract long getIOManifestCacheMaxTotalBytes(); - - @Pure - public abstract long getIOManifestCacheMaxContentLength(); - - @Pure - public abstract @Nullable String getUri(); - - @Pure - public abstract int getClientPoolSize(); - - @Pure - public abstract long getClientPoolEvictionIntervalMs(); - - @Pure - public abstract @Nullable String getClientPoolCacheKeys(); - - @Pure - public abstract @Nullable String getLockImplementation(); - - @Pure - public abstract long getLockHeartbeatIntervalMillis(); - - @Pure - public abstract long getLockHeartbeatTimeoutMillis(); - - @Pure - public abstract int getLockHeartbeatThreads(); - - @Pure - public abstract long getLockAcquireIntervalMillis(); - - @Pure - public abstract long getLockAcquireTimeoutMillis(); - - @Pure - public abstract @Nullable String getAppIdentifier(); - - @Pure - public abstract @Nullable String getUser(); - @Pure - public abstract long getAuthSessionTimeoutMillis(); + public abstract String getCatalogName(); @Pure - public abstract @Nullable Configuration getConfiguration(); + public abstract Properties getProperties(); @Pure public static Builder builder() { - return new AutoValue_IcebergCatalogConfig.Builder() - .setIcebergCatalogType(null) - .setCatalogImplementation(null) - .setFileIOImplementation(null) - .setWarehouseLocation(null) - .setMetricsReporterImplementation(null) // TODO: Set this to our implementation - .setCacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT) - .setCacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT) - .setCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) - .setIOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT) - .setIOManifestCacheExpirationIntervalMillis( - CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) - .setIOManifestCacheMaxTotalBytes( - CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT) - .setIOManifestCacheMaxContentLength( - CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT) - .setUri(null) - .setClientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT) - .setClientPoolEvictionIntervalMs( - CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT) - .setClientPoolCacheKeys(null) - .setLockImplementation(null) - .setLockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT) - .setLockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT) - .setLockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT) - .setLockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT) - .setLockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT) - .setAppIdentifier(null) - .setUser(null) - .setAuthSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT) - .setConfiguration(null); - } - - @Pure - public ImmutableMap properties() { - return new PropertyBuilder() - .put(CatalogUtil.ICEBERG_CATALOG_TYPE, getIcebergCatalogType()) - .put(CatalogProperties.CATALOG_IMPL, getCatalogImplementation()) - .put(CatalogProperties.FILE_IO_IMPL, getFileIOImplementation()) - .put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouseLocation()) - .put(CatalogProperties.METRICS_REPORTER_IMPL, getMetricsReporterImplementation()) - .put(CatalogProperties.CACHE_ENABLED, getCacheEnabled()) - .put(CatalogProperties.CACHE_CASE_SENSITIVE, getCacheCaseSensitive()) - .put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, getCacheExpirationIntervalMillis()) - .build(); + return new AutoValue_IcebergCatalogConfig.Builder(); } public org.apache.iceberg.catalog.Catalog catalog() { - Configuration conf = getConfiguration(); - if (conf == null) { - conf = new Configuration(); - } - return CatalogUtil.buildIcebergCatalog(getName(), properties(), conf); + return CatalogUtil.buildIcebergCatalog( + getCatalogName(), Maps.fromProperties(getProperties()), new Configuration()); } @AutoValue.Builder public abstract static class Builder { + public abstract Builder setCatalogName(String catalogName); - /* Core Properties */ - public abstract Builder setName(String name); - - public abstract Builder setIcebergCatalogType(@Nullable String icebergType); - - public abstract Builder setCatalogImplementation(@Nullable String catalogImpl); - - public abstract Builder setFileIOImplementation(@Nullable String fileIOImpl); - - public abstract Builder setWarehouseLocation(@Nullable String warehouse); - - public abstract Builder setMetricsReporterImplementation(@Nullable String metricsImpl); - - /* Caching */ - public abstract Builder setCacheEnabled(boolean cacheEnabled); - - public abstract Builder setCacheCaseSensitive(boolean cacheCaseSensitive); - - public abstract Builder setCacheExpirationIntervalMillis(long expiration); - - public abstract Builder setIOManifestCacheEnabled(boolean enabled); - - public abstract Builder setIOManifestCacheExpirationIntervalMillis(long expiration); - - public abstract Builder setIOManifestCacheMaxTotalBytes(long bytes); - - public abstract Builder setIOManifestCacheMaxContentLength(long length); - - public abstract Builder setUri(@Nullable String uri); - - public abstract Builder setClientPoolSize(int size); - - public abstract Builder setClientPoolEvictionIntervalMs(long interval); - - public abstract Builder setClientPoolCacheKeys(@Nullable String keys); - - public abstract Builder setLockImplementation(@Nullable String lockImplementation); - - public abstract Builder setLockHeartbeatIntervalMillis(long interval); - - public abstract Builder setLockHeartbeatTimeoutMillis(long timeout); - - public abstract Builder setLockHeartbeatThreads(int threads); - - public abstract Builder setLockAcquireIntervalMillis(long interval); - - public abstract Builder setLockAcquireTimeoutMillis(long timeout); - - public abstract Builder setAppIdentifier(@Nullable String id); - - public abstract Builder setUser(@Nullable String user); - - public abstract Builder setAuthSessionTimeoutMillis(long timeout); - - public abstract Builder setConfiguration(@Nullable Configuration conf); + public abstract Builder setProperties(Properties props); public abstract IcebergCatalogConfig build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index fb32e18d9374..ef535353efd0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -21,19 +21,21 @@ import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config; import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.iceberg.catalog.TableIdentifier; /** @@ -47,7 +49,6 @@ public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProv @Override protected SchemaTransform from(Config configuration) { - configuration.validate(); return new IcebergReadSchemaTransform(configuration); } @@ -68,21 +69,24 @@ public static Builder builder() { return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder(); } + @SchemaFieldDescription("Identifier of the Iceberg table to write to.") public abstract String getTable(); - public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig(); + @SchemaFieldDescription("Name of the catalog containing the table.") + public abstract String getCatalogName(); + + @SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.") + public abstract Map getCatalogProperties(); @AutoValue.Builder public abstract static class Builder { - public abstract Builder setTable(String tables); + public abstract Builder setTable(String table); - public abstract Builder setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig); + public abstract Builder setCatalogName(String catalogName); - public abstract Config build(); - } + public abstract Builder setCatalogProperties(Map catalogProperties); - public void validate() { - getCatalogConfig().validate(); + public abstract Config build(); } } @@ -109,17 +113,13 @@ Row getConfigurationRow() { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - IcebergSchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); + Properties properties = new Properties(); + properties.putAll(configuration.getCatalogProperties()); IcebergCatalogConfig.Builder catalogBuilder = - IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName()); - - if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { - catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); - } - if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) { - catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation()); - } + IcebergCatalogConfig.builder() + .setCatalogName(configuration.getCatalogName()) + .setProperties(properties); PCollection output = input diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java deleted file mode 100644 index 87b3d2041985..000000000000 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.iceberg; - -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; - -import com.google.auto.value.AutoValue; -import java.util.Set; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaRegistry; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; -import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; -import org.apache.beam.sdk.util.Preconditions; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.apache.iceberg.CatalogUtil; -import org.checkerframework.checker.nullness.qual.Nullable; - -@DefaultSchema(AutoValueSchema.class) -@AutoValue -public abstract class IcebergSchemaTransformCatalogConfig { - public static Builder builder() { - return new AutoValue_IcebergSchemaTransformCatalogConfig.Builder(); - } - - public abstract String getCatalogName(); - - @SchemaFieldDescription("Valid types are: {hadoop, hive, rest}") - public abstract @Nullable String getCatalogType(); - - public abstract @Nullable String getCatalogImplementation(); - - public abstract @Nullable String getWarehouseLocation(); - - @AutoValue.Builder - public abstract static class Builder { - - public abstract Builder setCatalogName(String catalogName); - - public abstract Builder setCatalogType(String catalogType); - - public abstract Builder setCatalogImplementation(String catalogImplementation); - - public abstract Builder setWarehouseLocation(String warehouseLocation); - - public abstract IcebergSchemaTransformCatalogConfig build(); - } - - public static final Schema SCHEMA; - - static { - try { - // To stay consistent with our SchemaTransform configuration naming conventions, - // we sort lexicographically and convert field names to snake_case - SCHEMA = - SchemaRegistry.createDefault() - .getSchema(IcebergSchemaTransformCatalogConfig.class) - .sorted() - .toSnakeCase(); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } - - @SuppressWarnings("argument") - public Row toRow() { - return Row.withSchema(SCHEMA) - .withFieldValue("catalog_name", getCatalogName()) - .withFieldValue("catalog_type", getCatalogType()) - .withFieldValue("catalog_implementation", getCatalogImplementation()) - .withFieldValue("warehouse_location", getWarehouseLocation()) - .build(); - } - - public static final Set VALID_CATALOG_TYPES = - Sets.newHashSet( - CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, - CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, - CatalogUtil.ICEBERG_CATALOG_TYPE_REST); - - public void validate() { - if (!Strings.isNullOrEmpty(getCatalogType())) { - checkArgument( - VALID_CATALOG_TYPES.contains(Preconditions.checkArgumentNotNull(getCatalogType())), - "Invalid catalog type. Please pick one of %s", - VALID_CATALOG_TYPES); - } - } -} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index b490693a9adb..b3de7a88c541 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -21,6 +21,8 @@ import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config; import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -39,7 +41,6 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.iceberg.catalog.TableIdentifier; /** @@ -64,7 +65,6 @@ public String description() { @Override protected SchemaTransform from(Config configuration) { - configuration.validate(); return new IcebergWriteSchemaTransform(configuration); } @@ -93,20 +93,21 @@ public static Builder builder() { @SchemaFieldDescription("Identifier of the Iceberg table to write to.") public abstract String getTable(); - @SchemaFieldDescription("Configuration parameters used to set up the Iceberg catalog.") - public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig(); + @SchemaFieldDescription("Name of the catalog containing the table.") + public abstract String getCatalogName(); + + @SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.") + public abstract Map getCatalogProperties(); @AutoValue.Builder public abstract static class Builder { - public abstract Builder setTable(String tables); + public abstract Builder setTable(String table); - public abstract Builder setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig); + public abstract Builder setCatalogName(String catalogName); - public abstract Config build(); - } + public abstract Builder setCatalogProperties(Map catalogProperties); - public void validate() { - getCatalogConfig().validate(); + public abstract Config build(); } } @@ -133,26 +134,21 @@ Row getConfigurationRow() { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - PCollection rows = input.get(INPUT_TAG); - IcebergSchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); - - IcebergCatalogConfig.Builder catalogBuilder = - IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName()); + Properties properties = new Properties(); + properties.putAll(configuration.getCatalogProperties()); - if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { - catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); - } - if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) { - catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation()); - } + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName(configuration.getCatalogName()) + .setProperties(properties) + .build(); // TODO: support dynamic destinations IcebergWriteResult result = rows.apply( - IcebergIO.writeRows(catalogBuilder.build()) - .to(TableIdentifier.parse(configuration.getTable()))); + IcebergIO.writeRows(catalog).to(TableIdentifier.parse(configuration.getTable()))); PCollection snapshots = result diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 467a2cbaf242..0420e2f57797 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -206,12 +206,12 @@ public void testRead() throws Exception { Map config = ImmutableMap.builder() .put("table", tableId.toString()) + .put("catalog_name", "test-name") .put( - "catalog_config", + "catalog_properties", ImmutableMap.builder() - .put("catalog_name", "hadoop") - .put("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse_location", warehouseLocation) + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouseLocation) .build()) .build(); @@ -246,12 +246,12 @@ public void testWrite() { Map config = ImmutableMap.builder() .put("table", tableId.toString()) + .put("catalog_name", "test-name") .put( - "catalog_config", + "catalog_properties", ImmutableMap.builder() - .put("catalog_name", "hadoop") - .put("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse_location", warehouseLocation) + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouseLocation) .build()) .build(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 12d86811e604..d6db3f689117 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import java.util.List; +import java.util.Properties; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -93,12 +94,12 @@ public void testSimpleScan() throws Exception { .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) .collect(Collectors.toList()); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + IcebergCatalogConfig catalogConfig = - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build(); PCollection output = testPipeline diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index e04eaf48cb3d..e0a584ec9da9 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -23,6 +23,7 @@ import java.io.Serializable; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; @@ -75,12 +76,12 @@ public void testSimpleAppend() throws Exception { // Create a table and add records to it. Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + IcebergCatalogConfig catalog = - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build(); testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) @@ -109,12 +110,12 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception { Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA); Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + IcebergCatalogConfig catalog = - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build(); DynamicDestinations dynamicDestinations = new DynamicDestinations() { @@ -199,12 +200,12 @@ public void testDynamicDestinationsWithSpillover() throws Exception { elementsPerTable.computeIfAbsent(tableId, ignored -> Lists.newArrayList()).add(element); } + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + IcebergCatalogConfig catalog = - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build(); DynamicDestinations dynamicDestinations = new DynamicDestinations() { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java index 46168a487dda..bc15021fa2b0 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -52,16 +53,15 @@ public class IcebergReadSchemaTransformProviderTest { @Test public void testBuildTransformWithRow() { - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); + Map properties = new HashMap<>(); + properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + properties.put("warehouse", "test_location"); + Row transformConfigRow = Row.withSchema(new IcebergReadSchemaTransformProvider().configurationSchema()) .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_name", "test-name") + .withFieldValue("catalog_properties", properties) .build(); new IcebergReadSchemaTransformProvider().from(transformConfigRow); @@ -97,17 +97,15 @@ public void testSimpleScan() throws Exception { .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) .collect(Collectors.toList()); - IcebergSchemaTransformCatalogConfig catalogConfig = - IcebergSchemaTransformCatalogConfig.builder() - .setCatalogName("hadoop") - .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + Map properties = new HashMap<>(); + properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + properties.put("warehouse", warehouse.location); IcebergReadSchemaTransformProvider.Config readConfig = IcebergReadSchemaTransformProvider.Config.builder() .setTable(identifier) - .setCatalogConfig(catalogConfig) + .setCatalogName("name") + .setCatalogProperties(properties) .build(); PCollection output = @@ -158,10 +156,10 @@ public void testReadUsingManagedTransform() throws Exception { String yamlConfig = String.format( "table: %s\n" - + "catalog_config: \n" - + " catalog_name: hadoop\n" - + " catalog_type: %s\n" - + " warehouse_location: %s", + + "catalog_name: test-name\n" + + "catalog_properties: \n" + + " type: %s\n" + + " warehouse: %s", identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); Map configMap = new Yaml().load(yamlConfig); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java index fb4c98cb0bdf..7863f7812a13 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java @@ -25,7 +25,9 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; @@ -42,6 +44,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.TableIdentifier; import org.junit.ClassRule; @@ -63,18 +66,19 @@ public class IcebergSchemaTransformTranslationTest { static final IcebergReadSchemaTransformProvider READ_PROVIDER = new IcebergReadSchemaTransformProvider(); + private static final Map CATALOG_PROPERTIES = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", "test_location") + .build(); + @Test public void testReCreateWriteTransformFromRow() { - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); Row transformConfigRow = Row.withSchema(WRITE_PROVIDER.configurationSchema()) .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_name", "test-name") + .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); IcebergWriteSchemaTransform writeTransform = (IcebergWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); @@ -101,17 +105,11 @@ public void testWriteTransformProtoTranslation() Collections.singletonList(Row.withSchema(inputSchema).addValue("a").build()))) .setRowSchema(inputSchema); - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_catalog") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("catalog_implementation", "test_implementation") - .withFieldValue("warehouse_location", warehouse.location) - .build(); Row transformConfigRow = Row.withSchema(WRITE_PROVIDER.configurationSchema()) .withFieldValue("table", "test_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_name", "test-name") + .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); IcebergWriteSchemaTransform writeTransform = @@ -158,16 +156,11 @@ public void testWriteTransformProtoTranslation() @Test public void testReCreateReadTransformFromRow() { // setting a subset of fields here. - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); Row transformConfigRow = Row.withSchema(READ_PROVIDER.configurationSchema()) .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_name", "test-name") + .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); IcebergReadSchemaTransform readTransform = @@ -188,19 +181,17 @@ public void testReadTransformProtoTranslation() throws InvalidProtocolBufferException, IOException { // First build a pipeline Pipeline p = Pipeline.create(); - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_catalog") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", warehouse.location) - .build(); String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); warehouse.createTable(TableIdentifier.parse(identifier), TestFixtures.SCHEMA); + Map properties = new HashMap<>(CATALOG_PROPERTIES); + properties.put("warehouse", warehouse.location); + Row transformConfigRow = Row.withSchema(READ_PROVIDER.configurationSchema()) .withFieldValue("table", identifier) - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_name", "test-name") + .withFieldValue("catalog_properties", properties) .build(); IcebergReadSchemaTransform readTransform = diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 9ef3e9945ec9..75884f4bcf70 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -61,16 +62,15 @@ public class IcebergWriteSchemaTransformProviderTest { @Test public void testBuildTransformWithRow() { - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); + Map properties = new HashMap<>(); + properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + properties.put("warehouse", "test_location"); + Row transformConfigRow = Row.withSchema(new IcebergWriteSchemaTransformProvider().configurationSchema()) .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_name", "test-name") + .withFieldValue("catalog_properties", properties) .build(); new IcebergWriteSchemaTransformProvider().from(transformConfigRow); @@ -85,15 +85,15 @@ public void testSimpleAppend() { // Create a table and add records to it. Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); + Map properties = new HashMap<>(); + properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + properties.put("warehouse", warehouse.location); + Config config = Config.builder() .setTable(identifier) - .setCatalogConfig( - IcebergSchemaTransformCatalogConfig.builder() - .setCatalogName("hadoop") - .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build()) + .setCatalogName("name") + .setCatalogProperties(properties) .build(); PCollectionRowTuple input = @@ -127,10 +127,10 @@ public void testWriteUsingManagedTransform() { String yamlConfig = String.format( "table: %s\n" - + "catalog_config: \n" - + " catalog_name: hadoop\n" - + " catalog_type: %s\n" - + " warehouse_location: %s", + + "catalog_name: test-name\n" + + "catalog_properties: \n" + + " type: %s\n" + + " warehouse: %s", identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); Map configMap = new Yaml().load(yamlConfig); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java index c7d5353428c2..143687e3c999 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import java.util.List; +import java.util.Properties; import java.util.UUID; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -64,14 +65,17 @@ public void testUnstartedReaderReadsSamesItsSource() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + BoundedSource source = new ScanSource( IcebergScanConfig.builder() .setCatalogConfig( IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) + .setCatalogName("name") + .setProperties(props) .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) @@ -103,14 +107,17 @@ public void testInitialSplitting() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + BoundedSource source = new ScanSource( IcebergScanConfig.builder() .setCatalogConfig( IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) + .setCatalogName("name") + .setProperties(props) .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) @@ -146,14 +153,17 @@ public void testDoubleInitialSplitting() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + BoundedSource source = new ScanSource( IcebergScanConfig.builder() .setCatalogConfig( IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) + .setCatalogName("name") + .setProperties(props) .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\."))