From 9054acb7681dd639773bbca7b36853f2e13eedaf Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 29 Jun 2024 19:46:34 -0400 Subject: [PATCH 1/8] Pass-through iceberg catalog properties --- .../sdk/io/iceberg/IcebergCatalogConfig.java | 189 +----------------- .../IcebergReadSchemaTransformProvider.java | 27 +-- .../IcebergSchemaTransformCatalogConfig.java | 107 ---------- .../IcebergWriteSchemaTransformProvider.java | 32 +-- .../beam/sdk/io/iceberg/IcebergIOIT.java | 14 +- .../sdk/io/iceberg/IcebergIOReadTest.java | 11 +- .../sdk/io/iceberg/IcebergIOWriteTest.java | 34 ++-- ...cebergReadSchemaTransformProviderTest.java | 30 +-- ...IcebergSchemaTransformTranslationTest.java | 45 ++--- ...ebergWriteSchemaTransformProviderTest.java | 34 ++-- .../beam/sdk/io/iceberg/ScanSourceTest.java | 34 ++-- 11 files changed, 111 insertions(+), 446 deletions(-) delete mode 100644 sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index fefef4aa4917..1562bb7e9138 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -19,212 +19,39 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; +import java.util.Properties; import javax.annotation.Nullable; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.util.ReleaseInfo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; import org.checkerframework.dataflow.qual.Pure; @AutoValue public abstract class IcebergCatalogConfig implements Serializable { - - @Pure - public abstract String getName(); - - /* Core Properties */ - @Pure - public abstract @Nullable String getIcebergCatalogType(); - - @Pure - public abstract @Nullable String getCatalogImplementation(); - - @Pure - public abstract @Nullable String getFileIOImplementation(); - - @Pure - public abstract @Nullable String getWarehouseLocation(); - - @Pure - public abstract @Nullable String getMetricsReporterImplementation(); - - /* Caching */ - @Pure - public abstract boolean getCacheEnabled(); - - @Pure - public abstract boolean getCacheCaseSensitive(); - - @Pure - public abstract long getCacheExpirationIntervalMillis(); - - @Pure - public abstract boolean getIOManifestCacheEnabled(); - - @Pure - public abstract long getIOManifestCacheExpirationIntervalMillis(); - - @Pure - public abstract long getIOManifestCacheMaxTotalBytes(); - - @Pure - public abstract long getIOManifestCacheMaxContentLength(); - - @Pure - public abstract @Nullable String getUri(); - - @Pure - public abstract int getClientPoolSize(); - - @Pure - public abstract long getClientPoolEvictionIntervalMs(); - - @Pure - public abstract @Nullable String getClientPoolCacheKeys(); - - @Pure - public abstract @Nullable String getLockImplementation(); - - @Pure - public abstract long getLockHeartbeatIntervalMillis(); - - @Pure - public abstract long getLockHeartbeatTimeoutMillis(); - - @Pure - public abstract int getLockHeartbeatThreads(); - - @Pure - public abstract long getLockAcquireIntervalMillis(); - - @Pure - public abstract long getLockAcquireTimeoutMillis(); - - @Pure - public abstract @Nullable String getAppIdentifier(); - - @Pure - public abstract @Nullable String getUser(); - @Pure - public abstract long getAuthSessionTimeoutMillis(); + public abstract Properties getProperties(); @Pure public abstract @Nullable Configuration getConfiguration(); @Pure public static Builder builder() { - return new AutoValue_IcebergCatalogConfig.Builder() - .setIcebergCatalogType(null) - .setCatalogImplementation(null) - .setFileIOImplementation(null) - .setWarehouseLocation(null) - .setMetricsReporterImplementation(null) // TODO: Set this to our implementation - .setCacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT) - .setCacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT) - .setCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) - .setIOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT) - .setIOManifestCacheExpirationIntervalMillis( - CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT) - .setIOManifestCacheMaxTotalBytes( - CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT) - .setIOManifestCacheMaxContentLength( - CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT) - .setUri(null) - .setClientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT) - .setClientPoolEvictionIntervalMs( - CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT) - .setClientPoolCacheKeys(null) - .setLockImplementation(null) - .setLockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT) - .setLockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT) - .setLockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT) - .setLockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT) - .setLockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT) - .setAppIdentifier(null) - .setUser(null) - .setAuthSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT) - .setConfiguration(null); - } - - @Pure - public ImmutableMap properties() { - return new PropertyBuilder() - .put(CatalogUtil.ICEBERG_CATALOG_TYPE, getIcebergCatalogType()) - .put(CatalogProperties.CATALOG_IMPL, getCatalogImplementation()) - .put(CatalogProperties.FILE_IO_IMPL, getFileIOImplementation()) - .put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouseLocation()) - .put(CatalogProperties.METRICS_REPORTER_IMPL, getMetricsReporterImplementation()) - .put(CatalogProperties.CACHE_ENABLED, getCacheEnabled()) - .put(CatalogProperties.CACHE_CASE_SENSITIVE, getCacheCaseSensitive()) - .put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, getCacheExpirationIntervalMillis()) - .build(); + return new AutoValue_IcebergCatalogConfig.Builder(); } public org.apache.iceberg.catalog.Catalog catalog() { Configuration conf = getConfiguration(); + String catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion(); if (conf == null) { conf = new Configuration(); } - return CatalogUtil.buildIcebergCatalog(getName(), properties(), conf); + return CatalogUtil.buildIcebergCatalog(catalogName, Maps.fromProperties(getProperties()), conf); } @AutoValue.Builder public abstract static class Builder { - - /* Core Properties */ - public abstract Builder setName(String name); - - public abstract Builder setIcebergCatalogType(@Nullable String icebergType); - - public abstract Builder setCatalogImplementation(@Nullable String catalogImpl); - - public abstract Builder setFileIOImplementation(@Nullable String fileIOImpl); - - public abstract Builder setWarehouseLocation(@Nullable String warehouse); - - public abstract Builder setMetricsReporterImplementation(@Nullable String metricsImpl); - - /* Caching */ - public abstract Builder setCacheEnabled(boolean cacheEnabled); - - public abstract Builder setCacheCaseSensitive(boolean cacheCaseSensitive); - - public abstract Builder setCacheExpirationIntervalMillis(long expiration); - - public abstract Builder setIOManifestCacheEnabled(boolean enabled); - - public abstract Builder setIOManifestCacheExpirationIntervalMillis(long expiration); - - public abstract Builder setIOManifestCacheMaxTotalBytes(long bytes); - - public abstract Builder setIOManifestCacheMaxContentLength(long length); - - public abstract Builder setUri(@Nullable String uri); - - public abstract Builder setClientPoolSize(int size); - - public abstract Builder setClientPoolEvictionIntervalMs(long interval); - - public abstract Builder setClientPoolCacheKeys(@Nullable String keys); - - public abstract Builder setLockImplementation(@Nullable String lockImplementation); - - public abstract Builder setLockHeartbeatIntervalMillis(long interval); - - public abstract Builder setLockHeartbeatTimeoutMillis(long timeout); - - public abstract Builder setLockHeartbeatThreads(int threads); - - public abstract Builder setLockAcquireIntervalMillis(long interval); - - public abstract Builder setLockAcquireTimeoutMillis(long timeout); - - public abstract Builder setAppIdentifier(@Nullable String id); - - public abstract Builder setUser(@Nullable String user); - - public abstract Builder setAuthSessionTimeoutMillis(long timeout); + public abstract Builder setProperties(Properties props); public abstract Builder setConfiguration(@Nullable Configuration conf); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index fb32e18d9374..7d0beda23439 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -21,19 +21,21 @@ import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config; import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.iceberg.catalog.TableIdentifier; /** @@ -47,7 +49,6 @@ public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProv @Override protected SchemaTransform from(Config configuration) { - configuration.validate(); return new IcebergReadSchemaTransform(configuration); } @@ -68,22 +69,20 @@ public static Builder builder() { return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder(); } + @SchemaFieldDescription("Identifier of the Iceberg table to write to.") public abstract String getTable(); - public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig(); + @SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.") + public abstract Map getCatalogProperties(); @AutoValue.Builder public abstract static class Builder { public abstract Builder setTable(String tables); - public abstract Builder setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig); + public abstract Builder setCatalogProperties(Map catalogProperties); public abstract Config build(); } - - public void validate() { - getCatalogConfig().validate(); - } } static class IcebergReadSchemaTransform extends SchemaTransform { @@ -109,17 +108,11 @@ Row getConfigurationRow() { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - IcebergSchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); + Properties properties = new Properties(); + properties.putAll(configuration.getCatalogProperties()); IcebergCatalogConfig.Builder catalogBuilder = - IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName()); - - if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { - catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); - } - if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) { - catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation()); - } + IcebergCatalogConfig.builder().setProperties(properties); PCollection output = input diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java deleted file mode 100644 index 87b3d2041985..000000000000 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformCatalogConfig.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.iceberg; - -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; - -import com.google.auto.value.AutoValue; -import java.util.Set; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.NoSuchSchemaException; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.SchemaRegistry; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; -import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; -import org.apache.beam.sdk.util.Preconditions; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; -import org.apache.iceberg.CatalogUtil; -import org.checkerframework.checker.nullness.qual.Nullable; - -@DefaultSchema(AutoValueSchema.class) -@AutoValue -public abstract class IcebergSchemaTransformCatalogConfig { - public static Builder builder() { - return new AutoValue_IcebergSchemaTransformCatalogConfig.Builder(); - } - - public abstract String getCatalogName(); - - @SchemaFieldDescription("Valid types are: {hadoop, hive, rest}") - public abstract @Nullable String getCatalogType(); - - public abstract @Nullable String getCatalogImplementation(); - - public abstract @Nullable String getWarehouseLocation(); - - @AutoValue.Builder - public abstract static class Builder { - - public abstract Builder setCatalogName(String catalogName); - - public abstract Builder setCatalogType(String catalogType); - - public abstract Builder setCatalogImplementation(String catalogImplementation); - - public abstract Builder setWarehouseLocation(String warehouseLocation); - - public abstract IcebergSchemaTransformCatalogConfig build(); - } - - public static final Schema SCHEMA; - - static { - try { - // To stay consistent with our SchemaTransform configuration naming conventions, - // we sort lexicographically and convert field names to snake_case - SCHEMA = - SchemaRegistry.createDefault() - .getSchema(IcebergSchemaTransformCatalogConfig.class) - .sorted() - .toSnakeCase(); - } catch (NoSuchSchemaException e) { - throw new RuntimeException(e); - } - } - - @SuppressWarnings("argument") - public Row toRow() { - return Row.withSchema(SCHEMA) - .withFieldValue("catalog_name", getCatalogName()) - .withFieldValue("catalog_type", getCatalogType()) - .withFieldValue("catalog_implementation", getCatalogImplementation()) - .withFieldValue("warehouse_location", getWarehouseLocation()) - .build(); - } - - public static final Set VALID_CATALOG_TYPES = - Sets.newHashSet( - CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, - CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE, - CatalogUtil.ICEBERG_CATALOG_TYPE_REST); - - public void validate() { - if (!Strings.isNullOrEmpty(getCatalogType())) { - checkArgument( - VALID_CATALOG_TYPES.contains(Preconditions.checkArgumentNotNull(getCatalogType())), - "Invalid catalog type. Please pick one of %s", - VALID_CATALOG_TYPES); - } - } -} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index b490693a9adb..3576d1630093 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -21,6 +21,8 @@ import com.google.auto.value.AutoValue; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Config; import org.apache.beam.sdk.managed.ManagedTransformConstants; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -39,7 +41,6 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.iceberg.catalog.TableIdentifier; /** @@ -64,7 +65,6 @@ public String description() { @Override protected SchemaTransform from(Config configuration) { - configuration.validate(); return new IcebergWriteSchemaTransform(configuration); } @@ -93,21 +93,17 @@ public static Builder builder() { @SchemaFieldDescription("Identifier of the Iceberg table to write to.") public abstract String getTable(); - @SchemaFieldDescription("Configuration parameters used to set up the Iceberg catalog.") - public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig(); + @SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.") + public abstract Map getCatalogProperties(); @AutoValue.Builder public abstract static class Builder { public abstract Builder setTable(String tables); - public abstract Builder setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig); + public abstract Builder setCatalogProperties(Map catalogProperties); public abstract Config build(); } - - public void validate() { - getCatalogConfig().validate(); - } } static class IcebergWriteSchemaTransform extends SchemaTransform { @@ -133,26 +129,18 @@ Row getConfigurationRow() { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - PCollection rows = input.get(INPUT_TAG); - IcebergSchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig(); + Properties properties = new Properties(); + properties.putAll(configuration.getCatalogProperties()); - IcebergCatalogConfig.Builder catalogBuilder = - IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName()); - - if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) { - catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType()); - } - if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) { - catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation()); - } + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder().setProperties(properties).build(); // TODO: support dynamic destinations IcebergWriteResult result = rows.apply( - IcebergIO.writeRows(catalogBuilder.build()) - .to(TableIdentifier.parse(configuration.getTable()))); + IcebergIO.writeRows(catalog).to(TableIdentifier.parse(configuration.getTable()))); PCollection snapshots = result diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index 467a2cbaf242..e4146e1ac659 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -207,11 +207,10 @@ public void testRead() throws Exception { ImmutableMap.builder() .put("table", tableId.toString()) .put( - "catalog_config", + "catalog_properties", ImmutableMap.builder() - .put("catalog_name", "hadoop") - .put("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse_location", warehouseLocation) + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouseLocation) .build()) .build(); @@ -247,11 +246,10 @@ public void testWrite() { ImmutableMap.builder() .put("table", tableId.toString()) .put( - "catalog_config", + "catalog_properties", ImmutableMap.builder() - .put("catalog_name", "hadoop") - .put("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .put("warehouse_location", warehouseLocation) + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouseLocation) .build()) .build(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 12d86811e604..0b4bd0a04461 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import java.util.List; +import java.util.Properties; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -93,12 +94,12 @@ public void testSimpleScan() throws Exception { .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) .collect(Collectors.toList()); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + IcebergCatalogConfig catalogConfig = - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + IcebergCatalogConfig.builder().setProperties(props).build(); PCollection output = testPipeline diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index e04eaf48cb3d..45ee7f857d87 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -23,6 +23,7 @@ import java.io.Serializable; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.UUID; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.TestPipeline; @@ -75,12 +76,11 @@ public void testSimpleAppend() throws Exception { // Create a table and add records to it. Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); - IcebergCatalogConfig catalog = - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + + IcebergCatalogConfig catalog = IcebergCatalogConfig.builder().setProperties(props).build(); testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) @@ -109,12 +109,11 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception { Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA); Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA); - IcebergCatalogConfig catalog = - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + + IcebergCatalogConfig catalog = IcebergCatalogConfig.builder().setProperties(props).build(); DynamicDestinations dynamicDestinations = new DynamicDestinations() { @@ -199,12 +198,11 @@ public void testDynamicDestinationsWithSpillover() throws Exception { elementsPerTable.computeIfAbsent(tableId, ignored -> Lists.newArrayList()).add(element); } - IcebergCatalogConfig catalog = - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + + IcebergCatalogConfig catalog = IcebergCatalogConfig.builder().setProperties(props).build(); DynamicDestinations dynamicDestinations = new DynamicDestinations() { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java index 46168a487dda..de4f347f3d7e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -52,16 +53,14 @@ public class IcebergReadSchemaTransformProviderTest { @Test public void testBuildTransformWithRow() { - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); + Map properties = new HashMap<>(); + properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + properties.put("warehouse", "test_location"); + Row transformConfigRow = Row.withSchema(new IcebergReadSchemaTransformProvider().configurationSchema()) .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_properties", properties) .build(); new IcebergReadSchemaTransformProvider().from(transformConfigRow); @@ -97,17 +96,14 @@ public void testSimpleScan() throws Exception { .map(record -> SchemaAndRowConversions.recordToRow(schema, record)) .collect(Collectors.toList()); - IcebergSchemaTransformCatalogConfig catalogConfig = - IcebergSchemaTransformCatalogConfig.builder() - .setCatalogName("hadoop") - .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build(); + Map properties = new HashMap<>(); + properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + properties.put("warehouse", warehouse.location); IcebergReadSchemaTransformProvider.Config readConfig = IcebergReadSchemaTransformProvider.Config.builder() .setTable(identifier) - .setCatalogConfig(catalogConfig) + .setCatalogProperties(properties) .build(); PCollection output = @@ -157,11 +153,7 @@ public void testReadUsingManagedTransform() throws Exception { String yamlConfig = String.format( - "table: %s\n" - + "catalog_config: \n" - + " catalog_name: hadoop\n" - + " catalog_type: %s\n" - + " warehouse_location: %s", + "table: %s\n" + "catalog_properties: \n" + " type: %s\n" + " warehouse: %s", identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); Map configMap = new Yaml().load(yamlConfig); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java index fb4c98cb0bdf..905d34fbbb5b 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java @@ -25,7 +25,9 @@ import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.ExternalTransforms.SchemaTransformPayload; @@ -42,6 +44,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.TableIdentifier; import org.junit.ClassRule; @@ -63,18 +66,18 @@ public class IcebergSchemaTransformTranslationTest { static final IcebergReadSchemaTransformProvider READ_PROVIDER = new IcebergReadSchemaTransformProvider(); + private static final Map CATALOG_PROPERTIES = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", "test_location") + .build(); + @Test public void testReCreateWriteTransformFromRow() { - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); Row transformConfigRow = Row.withSchema(WRITE_PROVIDER.configurationSchema()) .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); IcebergWriteSchemaTransform writeTransform = (IcebergWriteSchemaTransform) WRITE_PROVIDER.from(transformConfigRow); @@ -101,17 +104,10 @@ public void testWriteTransformProtoTranslation() Collections.singletonList(Row.withSchema(inputSchema).addValue("a").build()))) .setRowSchema(inputSchema); - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_catalog") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("catalog_implementation", "test_implementation") - .withFieldValue("warehouse_location", warehouse.location) - .build(); Row transformConfigRow = Row.withSchema(WRITE_PROVIDER.configurationSchema()) .withFieldValue("table", "test_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); IcebergWriteSchemaTransform writeTransform = @@ -158,16 +154,10 @@ public void testWriteTransformProtoTranslation() @Test public void testReCreateReadTransformFromRow() { // setting a subset of fields here. - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); Row transformConfigRow = Row.withSchema(READ_PROVIDER.configurationSchema()) .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); IcebergReadSchemaTransform readTransform = @@ -188,19 +178,16 @@ public void testReadTransformProtoTranslation() throws InvalidProtocolBufferException, IOException { // First build a pipeline Pipeline p = Pipeline.create(); - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_catalog") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", warehouse.location) - .build(); String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16); warehouse.createTable(TableIdentifier.parse(identifier), TestFixtures.SCHEMA); + Map properties = new HashMap<>(CATALOG_PROPERTIES); + properties.put("warehouse", warehouse.location); + Row transformConfigRow = Row.withSchema(READ_PROVIDER.configurationSchema()) .withFieldValue("table", identifier) - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_properties", properties) .build(); IcebergReadSchemaTransform readTransform = diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 9ef3e9945ec9..0b4c71f80184 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -23,6 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; @@ -61,16 +62,14 @@ public class IcebergWriteSchemaTransformProviderTest { @Test public void testBuildTransformWithRow() { - Row catalogConfigRow = - Row.withSchema(IcebergSchemaTransformCatalogConfig.SCHEMA) - .withFieldValue("catalog_name", "test_name") - .withFieldValue("catalog_type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .withFieldValue("warehouse_location", "test_location") - .build(); + Map properties = new HashMap<>(); + properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + properties.put("warehouse", "test_location"); + Row transformConfigRow = Row.withSchema(new IcebergWriteSchemaTransformProvider().configurationSchema()) .withFieldValue("table", "test_table_identifier") - .withFieldValue("catalog_config", catalogConfigRow) + .withFieldValue("catalog_properties", properties) .build(); new IcebergWriteSchemaTransformProvider().from(transformConfigRow); @@ -85,16 +84,11 @@ public void testSimpleAppend() { // Create a table and add records to it. Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA); - Config config = - Config.builder() - .setTable(identifier) - .setCatalogConfig( - IcebergSchemaTransformCatalogConfig.builder() - .setCatalogName("hadoop") - .setCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build()) - .build(); + Map properties = new HashMap<>(); + properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + properties.put("warehouse", warehouse.location); + + Config config = Config.builder().setTable(identifier).setCatalogProperties(properties).build(); PCollectionRowTuple input = PCollectionRowTuple.of( @@ -126,11 +120,7 @@ public void testWriteUsingManagedTransform() { String yamlConfig = String.format( - "table: %s\n" - + "catalog_config: \n" - + " catalog_name: hadoop\n" - + " catalog_type: %s\n" - + " warehouse_location: %s", + "table: %s\n" + "catalog_properties: \n" + " type: %s\n" + " warehouse: %s", identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); Map configMap = new Yaml().load(yamlConfig); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java index c7d5353428c2..b4aae3668e1a 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import java.util.List; +import java.util.Properties; import java.util.UUID; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -64,15 +65,14 @@ public void testUnstartedReaderReadsSamesItsSource() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + BoundedSource source = new ScanSource( IcebergScanConfig.builder() - .setCatalogConfig( - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build()) + .setCatalogConfig(IcebergCatalogConfig.builder().setProperties(props).build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) @@ -103,15 +103,14 @@ public void testInitialSplitting() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + BoundedSource source = new ScanSource( IcebergScanConfig.builder() - .setCatalogConfig( - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build()) + .setCatalogConfig(IcebergCatalogConfig.builder().setProperties(props).build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) @@ -146,15 +145,14 @@ public void testDoubleInitialSplitting() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); + Properties props = new Properties(); + props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); + props.setProperty("warehouse", warehouse.location); + BoundedSource source = new ScanSource( IcebergScanConfig.builder() - .setCatalogConfig( - IcebergCatalogConfig.builder() - .setName("hadoop") - .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) - .setWarehouseLocation(warehouse.location) - .build()) + .setCatalogConfig(IcebergCatalogConfig.builder().setProperties(props).build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) From 922633a5b0f466a5d06312ff62c7a98ebd43c385 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 29 Jun 2024 20:04:12 -0400 Subject: [PATCH 2/8] add to CHANGES.md --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 0ee9f656a180..bb538d3d209e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,10 +67,12 @@ ## New Features / Improvements * Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)). +* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726)) ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog properties in a key-store fashion ([#31726](https://github.com/apache/beam/pull/31726)) ## Deprecations From 018ae1e9b8bedb8f3131032763bd6ef0f67c804e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 29 Jun 2024 20:05:37 -0400 Subject: [PATCH 3/8] trigger integration test --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index a03c067d2c4e..1efc8e9e4405 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,3 +1,4 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run" + "comment": "Modify this file in a trivial way to cause this test suite to run", + "modification": 1 } From e2e0b2e88328c73a7fa9bbbbbd83452ed6f386c2 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 2 Jul 2024 16:10:24 -0400 Subject: [PATCH 4/8] remove custom configuration; pass catalog name --- .../sdk/io/iceberg/IcebergCatalogConfig.java | 18 ++++++------------ .../IcebergReadSchemaTransformProvider.java | 11 +++++++++-- .../IcebergWriteSchemaTransformProvider.java | 12 ++++++++++-- .../beam/sdk/io/iceberg/IcebergIOIT.java | 2 ++ .../beam/sdk/io/iceberg/IcebergIOReadTest.java | 2 +- .../sdk/io/iceberg/IcebergIOWriteTest.java | 9 ++++++--- ...IcebergReadSchemaTransformProviderTest.java | 8 +++++++- .../IcebergSchemaTransformTranslationTest.java | 4 ++++ ...cebergWriteSchemaTransformProviderTest.java | 14 ++++++++++++-- .../beam/sdk/io/iceberg/ScanSourceTest.java | 18 +++++++++++++++--- 10 files changed, 72 insertions(+), 26 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 1562bb7e9138..2956d75a266e 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -20,8 +20,6 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; import java.util.Properties; -import javax.annotation.Nullable; -import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; @@ -30,10 +28,10 @@ @AutoValue public abstract class IcebergCatalogConfig implements Serializable { @Pure - public abstract Properties getProperties(); + public abstract String getCatalogName(); @Pure - public abstract @Nullable Configuration getConfiguration(); + public abstract Properties getProperties(); @Pure public static Builder builder() { @@ -41,19 +39,15 @@ public static Builder builder() { } public org.apache.iceberg.catalog.Catalog catalog() { - Configuration conf = getConfiguration(); - String catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion(); - if (conf == null) { - conf = new Configuration(); - } - return CatalogUtil.buildIcebergCatalog(catalogName, Maps.fromProperties(getProperties()), conf); + return CatalogUtil.buildIcebergCatalog( + getCatalogName(), Maps.fromProperties(getProperties()), new Configuration()); } @AutoValue.Builder public abstract static class Builder { - public abstract Builder setProperties(Properties props); + public abstract Builder setCatalogName(String catalogName); - public abstract Builder setConfiguration(@Nullable Configuration conf); + public abstract Builder setProperties(Properties props); public abstract IcebergCatalogConfig build(); } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java index 7d0beda23439..ef535353efd0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProvider.java @@ -72,12 +72,17 @@ public static Builder builder() { @SchemaFieldDescription("Identifier of the Iceberg table to write to.") public abstract String getTable(); + @SchemaFieldDescription("Name of the catalog containing the table.") + public abstract String getCatalogName(); + @SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.") public abstract Map getCatalogProperties(); @AutoValue.Builder public abstract static class Builder { - public abstract Builder setTable(String tables); + public abstract Builder setTable(String table); + + public abstract Builder setCatalogName(String catalogName); public abstract Builder setCatalogProperties(Map catalogProperties); @@ -112,7 +117,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { properties.putAll(configuration.getCatalogProperties()); IcebergCatalogConfig.Builder catalogBuilder = - IcebergCatalogConfig.builder().setProperties(properties); + IcebergCatalogConfig.builder() + .setCatalogName(configuration.getCatalogName()) + .setProperties(properties); PCollection output = input diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java index 3576d1630093..b3de7a88c541 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProvider.java @@ -93,12 +93,17 @@ public static Builder builder() { @SchemaFieldDescription("Identifier of the Iceberg table to write to.") public abstract String getTable(); + @SchemaFieldDescription("Name of the catalog containing the table.") + public abstract String getCatalogName(); + @SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.") public abstract Map getCatalogProperties(); @AutoValue.Builder public abstract static class Builder { - public abstract Builder setTable(String tables); + public abstract Builder setTable(String table); + + public abstract Builder setCatalogName(String catalogName); public abstract Builder setCatalogProperties(Map catalogProperties); @@ -135,7 +140,10 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { properties.putAll(configuration.getCatalogProperties()); IcebergCatalogConfig catalog = - IcebergCatalogConfig.builder().setProperties(properties).build(); + IcebergCatalogConfig.builder() + .setCatalogName(configuration.getCatalogName()) + .setProperties(properties) + .build(); // TODO: support dynamic destinations IcebergWriteResult result = diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java index e4146e1ac659..0420e2f57797 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java @@ -206,6 +206,7 @@ public void testRead() throws Exception { Map config = ImmutableMap.builder() .put("table", tableId.toString()) + .put("catalog_name", "test-name") .put( "catalog_properties", ImmutableMap.builder() @@ -245,6 +246,7 @@ public void testWrite() { Map config = ImmutableMap.builder() .put("table", tableId.toString()) + .put("catalog_name", "test-name") .put( "catalog_properties", ImmutableMap.builder() diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java index 0b4bd0a04461..d6db3f689117 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java @@ -99,7 +99,7 @@ public void testSimpleScan() throws Exception { props.setProperty("warehouse", warehouse.location); IcebergCatalogConfig catalogConfig = - IcebergCatalogConfig.builder().setProperties(props).build(); + IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build(); PCollection output = testPipeline diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 45ee7f857d87..e0a584ec9da9 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -80,7 +80,8 @@ public void testSimpleAppend() throws Exception { props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); props.setProperty("warehouse", warehouse.location); - IcebergCatalogConfig catalog = IcebergCatalogConfig.builder().setProperties(props).build(); + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build(); testPipeline .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) @@ -113,7 +114,8 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception { props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); props.setProperty("warehouse", warehouse.location); - IcebergCatalogConfig catalog = IcebergCatalogConfig.builder().setProperties(props).build(); + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build(); DynamicDestinations dynamicDestinations = new DynamicDestinations() { @@ -202,7 +204,8 @@ public void testDynamicDestinationsWithSpillover() throws Exception { props.setProperty("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); props.setProperty("warehouse", warehouse.location); - IcebergCatalogConfig catalog = IcebergCatalogConfig.builder().setProperties(props).build(); + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder().setCatalogName("name").setProperties(props).build(); DynamicDestinations dynamicDestinations = new DynamicDestinations() { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java index de4f347f3d7e..bc15021fa2b0 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java @@ -60,6 +60,7 @@ public void testBuildTransformWithRow() { Row transformConfigRow = Row.withSchema(new IcebergReadSchemaTransformProvider().configurationSchema()) .withFieldValue("table", "test_table_identifier") + .withFieldValue("catalog_name", "test-name") .withFieldValue("catalog_properties", properties) .build(); @@ -103,6 +104,7 @@ public void testSimpleScan() throws Exception { IcebergReadSchemaTransformProvider.Config readConfig = IcebergReadSchemaTransformProvider.Config.builder() .setTable(identifier) + .setCatalogName("name") .setCatalogProperties(properties) .build(); @@ -153,7 +155,11 @@ public void testReadUsingManagedTransform() throws Exception { String yamlConfig = String.format( - "table: %s\n" + "catalog_properties: \n" + " type: %s\n" + " warehouse: %s", + "table: %s\n" + + "catalog_name: test-name\n" + + "catalog_properties: \n" + + " type: %s\n" + + " warehouse: %s", identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); Map configMap = new Yaml().load(yamlConfig); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java index 905d34fbbb5b..7863f7812a13 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergSchemaTransformTranslationTest.java @@ -77,6 +77,7 @@ public void testReCreateWriteTransformFromRow() { Row transformConfigRow = Row.withSchema(WRITE_PROVIDER.configurationSchema()) .withFieldValue("table", "test_table_identifier") + .withFieldValue("catalog_name", "test-name") .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); IcebergWriteSchemaTransform writeTransform = @@ -107,6 +108,7 @@ public void testWriteTransformProtoTranslation() Row transformConfigRow = Row.withSchema(WRITE_PROVIDER.configurationSchema()) .withFieldValue("table", "test_identifier") + .withFieldValue("catalog_name", "test-name") .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); @@ -157,6 +159,7 @@ public void testReCreateReadTransformFromRow() { Row transformConfigRow = Row.withSchema(READ_PROVIDER.configurationSchema()) .withFieldValue("table", "test_table_identifier") + .withFieldValue("catalog_name", "test-name") .withFieldValue("catalog_properties", CATALOG_PROPERTIES) .build(); @@ -187,6 +190,7 @@ public void testReadTransformProtoTranslation() Row transformConfigRow = Row.withSchema(READ_PROVIDER.configurationSchema()) .withFieldValue("table", identifier) + .withFieldValue("catalog_name", "test-name") .withFieldValue("catalog_properties", properties) .build(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java index 0b4c71f80184..75884f4bcf70 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java @@ -69,6 +69,7 @@ public void testBuildTransformWithRow() { Row transformConfigRow = Row.withSchema(new IcebergWriteSchemaTransformProvider().configurationSchema()) .withFieldValue("table", "test_table_identifier") + .withFieldValue("catalog_name", "test-name") .withFieldValue("catalog_properties", properties) .build(); @@ -88,7 +89,12 @@ public void testSimpleAppend() { properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP); properties.put("warehouse", warehouse.location); - Config config = Config.builder().setTable(identifier).setCatalogProperties(properties).build(); + Config config = + Config.builder() + .setTable(identifier) + .setCatalogName("name") + .setCatalogProperties(properties) + .build(); PCollectionRowTuple input = PCollectionRowTuple.of( @@ -120,7 +126,11 @@ public void testWriteUsingManagedTransform() { String yamlConfig = String.format( - "table: %s\n" + "catalog_properties: \n" + " type: %s\n" + " warehouse: %s", + "table: %s\n" + + "catalog_name: test-name\n" + + "catalog_properties: \n" + + " type: %s\n" + + " warehouse: %s", identifier, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP, warehouse.location); Map configMap = new Yaml().load(yamlConfig); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java index b4aae3668e1a..143687e3c999 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java @@ -72,7 +72,11 @@ public void testUnstartedReaderReadsSamesItsSource() throws Exception { BoundedSource source = new ScanSource( IcebergScanConfig.builder() - .setCatalogConfig(IcebergCatalogConfig.builder().setProperties(props).build()) + .setCatalogConfig( + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setProperties(props) + .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) @@ -110,7 +114,11 @@ public void testInitialSplitting() throws Exception { BoundedSource source = new ScanSource( IcebergScanConfig.builder() - .setCatalogConfig(IcebergCatalogConfig.builder().setProperties(props).build()) + .setCatalogConfig( + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setProperties(props) + .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) @@ -152,7 +160,11 @@ public void testDoubleInitialSplitting() throws Exception { BoundedSource source = new ScanSource( IcebergScanConfig.builder() - .setCatalogConfig(IcebergCatalogConfig.builder().setProperties(props).build()) + .setCatalogConfig( + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setProperties(props) + .build()) .setScanType(IcebergScanConfig.ScanType.TABLE) .setTableIdentifier(simpleTable.name().replace("hadoop.", "").split("\\.")) .setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) From 6963f4e5d1868b66daa0298f29b545d9434de41c Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 3 Jul 2024 02:11:29 -0400 Subject: [PATCH 5/8] shade dependencies --- sdks/java/core/build.gradle | 2 +- sdks/java/io/iceberg/build.gradle | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e150c22de62d..1251bef335a3 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,7 +98,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema - implementation library.java.snake_yaml + shadow library.java.snake_yaml shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 7965cde86e7d..81b58e264365 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -23,6 +23,7 @@ import java.util.stream.Collectors plugins { id 'org.apache.beam.module' } applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.iceberg', + shadowClosure: {} ) description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" @@ -54,11 +55,12 @@ dependencies { implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common + // Hadoop GCS filesystem dependencies + runtimeOnly library.java.hadoop_client + runtimeOnly library.java.bigdataoss_gcsio + runtimeOnly library.java.bigdataoss_gcs_connector + runtimeOnly library.java.bigdataoss_util_hadoop - testImplementation library.java.hadoop_client - testImplementation library.java.bigdataoss_gcsio - testImplementation library.java.bigdataoss_gcs_connector - testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") From e9fe5cd93b5a0435f8416f580bd67e148236cb5d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 3 Jul 2024 13:10:42 -0400 Subject: [PATCH 6/8] dont validate shadow jar --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 11 +++++++++++ sdks/java/io/iceberg/build.gradle | 3 ++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index af3568a359ef..91841ced4d09 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -621,15 +621,18 @@ class BeamModulePlugin implements Plugin { def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" def everit_json_version = "1.14.2" + def iceberg_version = "1.4.2" def kafka_version = "2.4.1" def log4j2_version = "2.20.0" def nemo_version = "0.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom def netty_version = "4.1.100.Final" + def parquet_version = "1.12.0" def postgres_version = "42.2.16" def powermock_version = "2.0.9" // [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom def protobuf_version = "3.25.3" + def orc_version = "1.9.2" def qpid_jms_client_version = "0.61.0" def quickcheck_version = "1.0" def sbe_tool_version = "1.25.1" @@ -804,6 +807,12 @@ class BeamModulePlugin implements Plugin { hamcrest : "org.hamcrest:hamcrest:$hamcrest_version", http_client : "org.apache.httpcomponents:httpclient:$httpclient_version", http_core : "org.apache.httpcomponents:httpcore:$httpcore_version", + iceberg_core : "org.apache.iceberg:iceberg-core:$iceberg_version", + iceberg_api : "org.apache.iceberg:iceberg-api:$iceberg_version", + iceberg_data : "org.apache.iceberg:iceberg-data:$iceberg_version", + iceberg_gcp : "org.apache.iceberg:iceberg-gcp:$iceberg_version", + iceberg_parquet : "org.apache.iceberg:iceberg-parquet:$iceberg_version", + iceberg_orc : "org.apache.iceberg:iceberg-orc:$iceberg_version", influxdb_library : "org.influxdb:influxdb-java:$influxdb_version", jackson_annotations : "com.fasterxml.jackson.core:jackson-annotations:$jackson_version", jackson_jaxb_annotations : "com.fasterxml.jackson.module:jackson-module-jaxb-annotations:$jackson_version", @@ -848,6 +857,8 @@ class BeamModulePlugin implements Plugin { netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final", netty_transport : "io.netty:netty-transport:$netty_version", netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version", + orc_core : "org.apache.orc:orc-core:$orc_version", + parquet_column : "org.apache.parquet:parquet-column:$parquet_version", postgres : "org.postgresql:postgresql:$postgres_version", powermock : "org.powermock:powermock-module-junit4:$powermock_version", powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version", diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 81b58e264365..657277513f19 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -23,7 +23,8 @@ import java.util.stream.Collectors plugins { id 'org.apache.beam.module' } applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.iceberg', - shadowClosure: {} + shadowClosure: {}, + validateShadowJar: false, ) description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" From 8c25e41784a98641847afb3daf05cb209fbfe37f Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 3 Jul 2024 13:22:05 -0400 Subject: [PATCH 7/8] cleanup --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 91841ced4d09..af3568a359ef 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -621,18 +621,15 @@ class BeamModulePlugin implements Plugin { def jaxb_api_version = "2.3.3" def jsr305_version = "3.0.2" def everit_json_version = "1.14.2" - def iceberg_version = "1.4.2" def kafka_version = "2.4.1" def log4j2_version = "2.20.0" def nemo_version = "0.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom def netty_version = "4.1.100.Final" - def parquet_version = "1.12.0" def postgres_version = "42.2.16" def powermock_version = "2.0.9" // [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom def protobuf_version = "3.25.3" - def orc_version = "1.9.2" def qpid_jms_client_version = "0.61.0" def quickcheck_version = "1.0" def sbe_tool_version = "1.25.1" @@ -807,12 +804,6 @@ class BeamModulePlugin implements Plugin { hamcrest : "org.hamcrest:hamcrest:$hamcrest_version", http_client : "org.apache.httpcomponents:httpclient:$httpclient_version", http_core : "org.apache.httpcomponents:httpcore:$httpcore_version", - iceberg_core : "org.apache.iceberg:iceberg-core:$iceberg_version", - iceberg_api : "org.apache.iceberg:iceberg-api:$iceberg_version", - iceberg_data : "org.apache.iceberg:iceberg-data:$iceberg_version", - iceberg_gcp : "org.apache.iceberg:iceberg-gcp:$iceberg_version", - iceberg_parquet : "org.apache.iceberg:iceberg-parquet:$iceberg_version", - iceberg_orc : "org.apache.iceberg:iceberg-orc:$iceberg_version", influxdb_library : "org.influxdb:influxdb-java:$influxdb_version", jackson_annotations : "com.fasterxml.jackson.core:jackson-annotations:$jackson_version", jackson_jaxb_annotations : "com.fasterxml.jackson.module:jackson-module-jaxb-annotations:$jackson_version", @@ -857,8 +848,6 @@ class BeamModulePlugin implements Plugin { netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final", netty_transport : "io.netty:netty-transport:$netty_version", netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version", - orc_core : "org.apache.orc:orc-core:$orc_version", - parquet_column : "org.apache.parquet:parquet-column:$parquet_version", postgres : "org.postgresql:postgresql:$postgres_version", powermock : "org.powermock:powermock-module-junit4:$powermock_version", powermock_mockito : "org.powermock:powermock-api-mockito2:$powermock_version", From c42ac295334e79765b3aee7e782f73455bc962b5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sat, 6 Jul 2024 15:47:13 -0400 Subject: [PATCH 8/8] revert dependency changes --- sdks/java/core/build.gradle | 2 +- sdks/java/io/iceberg/build.gradle | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 1251bef335a3..e150c22de62d 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,7 +98,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema - shadow library.java.snake_yaml + implementation library.java.snake_yaml shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 657277513f19..7965cde86e7d 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -23,8 +23,6 @@ import java.util.stream.Collectors plugins { id 'org.apache.beam.module' } applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.iceberg', - shadowClosure: {}, - validateShadowJar: false, ) description = "Apache Beam :: SDKs :: Java :: IO :: Iceberg" @@ -56,12 +54,11 @@ dependencies { implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version" implementation "org.apache.iceberg:iceberg-orc:$iceberg_version" implementation library.java.hadoop_common - // Hadoop GCS filesystem dependencies - runtimeOnly library.java.hadoop_client - runtimeOnly library.java.bigdataoss_gcsio - runtimeOnly library.java.bigdataoss_gcs_connector - runtimeOnly library.java.bigdataoss_util_hadoop + testImplementation library.java.hadoop_client + testImplementation library.java.bigdataoss_gcsio + testImplementation library.java.bigdataoss_gcs_connector + testImplementation library.java.bigdataoss_util_hadoop testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version" testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version" testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")