From e50a136219e3b5f45467d173ff29e4a22ec499de Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Tue, 17 Dec 2024 13:10:01 -0500 Subject: [PATCH] Support creating BigLake managed tables (#33125) * create managed biglake tables * add to translation * add to changes.md * adjust changes description --- CHANGES.md | 1 + .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 50 +++++++++++- .../gcp/bigquery/BigQueryIOTranslation.java | 8 ++ .../io/gcp/bigquery/CreateTableHelpers.java | 32 +++++++- .../sdk/io/gcp/bigquery/CreateTables.java | 3 +- .../sdk/io/gcp/bigquery/StorageApiLoads.java | 13 ++- .../StorageApiWriteRecordsInconsistent.java | 9 ++- .../StorageApiWriteUnshardedRecords.java | 16 +++- .../StorageApiWritesShardedRecords.java | 8 +- .../bigquery/BigQueryIOTranslationTest.java | 1 + .../io/gcp/bigquery/BigQueryIOWriteTest.java | 34 ++++++++ .../StorageApiSinkCreateIfNeededIT.java | 80 ++++++++++++++++--- 12 files changed, 226 insertions(+), 29 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0cfac6c73c1e..7a8ed493c216 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,6 +66,7 @@ * gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). * Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)). +* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125)) ## New Features / Improvements diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 9a7f3a05556c..30626da31c7c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -54,6 +54,7 @@ import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -629,6 +630,9 @@ public class BigQueryIO { private static final SerializableFunction DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema; + static final String CONNECTION_ID = "connectionId"; + static final String STORAGE_URI = "storageUri"; + /** * @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link * #readTableRows()} does exactly the same as {@link #read}, however {@link @@ -2372,6 +2376,8 @@ public enum Method { /** Table description. Default is empty. */ abstract @Nullable String getTableDescription(); + abstract @Nullable Map getBigLakeConfiguration(); + /** An option to indicate if table validation is desired. Default is true. */ abstract boolean getValidate(); @@ -2484,6 +2490,8 @@ abstract Builder setAvroSchemaFactory( abstract Builder setTableDescription(String tableDescription); + abstract Builder setBigLakeConfiguration(Map bigLakeConfiguration); + abstract Builder setValidate(boolean validate); abstract Builder setBigQueryServices(BigQueryServices bigQueryServices); @@ -2909,6 +2917,30 @@ public Write withTableDescription(String tableDescription) { return toBuilder().setTableDescription(tableDescription).build(); } + /** + * Specifies a configuration to create BigLake tables. The following options are available: + * + *
    + *
  • connectionId (REQUIRED): the name of your cloud resource connection. + *
  • storageUri (REQUIRED): the path to your GCS folder where data will be written to. This + * sink will create sub-folders for each project, dataset, and table destination. Example: + * if you specify a storageUri of {@code "gs://foo/bar"} and writing to table {@code + * "my_project.my_dataset.my_table"}, your data will be written under {@code + * "gs://foo/bar/my_project/my_dataset/my_table/"} + *
  • fileFormat (OPTIONAL): defaults to {@code "parquet"} + *
  • tableFormat (OPTIONAL): defaults to {@code "iceberg"} + *
+ * + *

NOTE: This is only supported with the Storage Write API methods. + * + * @see BigQuery Tables for + * Apache Iceberg documentation + */ + public Write withBigLakeConfiguration(Map bigLakeConfiguration) { + checkArgument(bigLakeConfiguration != null, "bigLakeConfiguration can not be null"); + return toBuilder().setBigLakeConfiguration(bigLakeConfiguration).build(); + } + /** * Specifies a policy for handling failed inserts. * @@ -3454,8 +3486,21 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) { checkArgument( !getAutoSchemaUpdate(), "withAutoSchemaUpdate only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE."); - } else if (getWriteDisposition() == WriteDisposition.WRITE_TRUNCATE) { - LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition."); + checkArgument( + getBigLakeConfiguration() == null, + "bigLakeConfiguration is only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE."); + } else { + if (getWriteDisposition() == WriteDisposition.WRITE_TRUNCATE) { + LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition."); + } + if (getBigLakeConfiguration() != null) { + checkArgument( + Arrays.stream(new String[] {CONNECTION_ID, STORAGE_URI}) + .allMatch(getBigLakeConfiguration()::containsKey), + String.format( + "bigLakeConfiguration must contain keys '%s' and '%s'", + CONNECTION_ID, STORAGE_URI)); + } } if (getRowMutationInformationFn() != null) { checkArgument( @@ -3905,6 +3950,7 @@ private WriteResult continueExpandTyped( getPropagateSuccessfulStorageApiWritesPredicate(), getRowMutationInformationFn() != null, getDefaultMissingValueInterpretation(), + getBigLakeConfiguration(), getBadRecordRouter(), getBadRecordErrorHandler()); return input.apply("StorageApiLoads", storageApiLoads); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index 561f5ccfc457..1da47156dda7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -393,6 +393,7 @@ static class BigQueryIOWriteTranslator implements TransformPayloadTranslator transform) { if (transform.getTableDescription() != null) { fieldValues.put("table_description", transform.getTableDescription()); } + if (transform.getBigLakeConfiguration() != null) { + fieldValues.put("biglake_configuration", transform.getBigLakeConfiguration()); + } fieldValues.put("validate", transform.getValidate()); if (transform.getBigQueryServices() != null) { fieldValues.put("bigquery_services", toByteArray(transform.getBigQueryServices())); @@ -719,6 +723,10 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { if (tableDescription != null) { builder = builder.setTableDescription(tableDescription); } + Map biglakeConfiguration = configRow.getMap("biglake_configuration"); + if (biglakeConfiguration != null) { + builder = builder.setBigLakeConfiguration(biglakeConfiguration); + } Boolean validate = configRow.getBoolean("validate"); if (validate != null) { builder = builder.setValidate(validate); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java index 7a94657107ec..7c428917503f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java @@ -17,11 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.CONNECTION_ID; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.STORAGE_URI; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.gax.rpc.ApiException; +import com.google.api.services.bigquery.model.BigLakeConfiguration; import com.google.api.services.bigquery.model.Clustering; import com.google.api.services.bigquery.model.EncryptionConfiguration; import com.google.api.services.bigquery.model.Table; @@ -31,6 +34,7 @@ import com.google.api.services.bigquery.model.TimePartitioning; import io.grpc.StatusRuntimeException; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -41,6 +45,7 @@ import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier; import org.checkerframework.checker.nullness.qual.Nullable; @@ -91,7 +96,8 @@ static TableDestination possiblyCreateTable( CreateDisposition createDisposition, @Nullable Coder tableDestinationCoder, @Nullable String kmsKey, - BigQueryServices bqServices) { + BigQueryServices bqServices, + @Nullable Map bigLakeConfiguration) { checkArgument( tableDestination.getTableSpec() != null, "DynamicDestinations.getTable() must return a TableDestination " @@ -132,7 +138,8 @@ static TableDestination possiblyCreateTable( createDisposition, tableSpec, kmsKey, - bqServices); + bqServices, + bigLakeConfiguration); } } } @@ -147,7 +154,8 @@ private static void tryCreateTable( CreateDisposition createDisposition, String tableSpec, @Nullable String kmsKey, - BigQueryServices bqServices) { + BigQueryServices bqServices, + @Nullable Map bigLakeConfiguration) { TableReference tableReference = tableDestination.getTableReference().clone(); tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId())); try (DatasetService datasetService = bqServices.getDatasetService(options)) { @@ -189,6 +197,24 @@ private static void tryCreateTable( if (kmsKey != null) { table.setEncryptionConfiguration(new EncryptionConfiguration().setKmsKeyName(kmsKey)); } + if (bigLakeConfiguration != null) { + TableReference ref = table.getTableReference(); + table.setBiglakeConfiguration( + new BigLakeConfiguration() + .setTableFormat( + MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg")) + .setFileFormat( + MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet")) + .setConnectionId( + Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID))) + .setStorageUri( + String.format( + "%s/%s/%s/%s", + Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)), + ref.getProjectId(), + ref.getDatasetId(), + ref.getTableId()))); + } datasetService.createTable(table); } } catch (Exception e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java index 1bbd4e756084..7008c049a4a5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java @@ -132,7 +132,8 @@ public void processElement(ProcessContext context) { createDisposition, dynamicDestinations.getDestinationCoder(), kmsKey, - bqServices); + bqServices, + null); }); context.output(KV.of(tableDestination, context.element().getValue())); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 4ca9d5035c81..0bc60e98b253 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -23,6 +23,7 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -76,6 +77,7 @@ public class StorageApiLoads private final boolean usesCdc; private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation; + private final Map bigLakeConfiguration; private final BadRecordRouter badRecordRouter; @@ -98,6 +100,7 @@ public StorageApiLoads( Predicate propagateSuccessfulStorageApiWritesPredicate, boolean usesCdc, AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + Map bigLakeConfiguration, BadRecordRouter badRecordRouter, ErrorHandler badRecordErrorHandler) { this.destinationCoder = destinationCoder; @@ -118,6 +121,7 @@ public StorageApiLoads( this.successfulRowsPredicate = propagateSuccessfulStorageApiWritesPredicate; this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + this.bigLakeConfiguration = bigLakeConfiguration; this.badRecordRouter = badRecordRouter; this.badRecordErrorHandler = badRecordErrorHandler; } @@ -186,7 +190,8 @@ public WriteResult expandInconsistent( createDisposition, kmsKey, usesCdc, - defaultMissingValueInterpretation)); + defaultMissingValueInterpretation, + bigLakeConfiguration)); PCollection insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) @@ -279,7 +284,8 @@ public WriteResult expandTriggered( successfulRowsPredicate, autoUpdateSchema, ignoreUnknownValues, - defaultMissingValueInterpretation)); + defaultMissingValueInterpretation, + bigLakeConfiguration)); PCollection insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) @@ -372,7 +378,8 @@ public WriteResult expandUntriggered( createDisposition, kmsKey, usesCdc, - defaultMissingValueInterpretation)); + defaultMissingValueInterpretation, + bigLakeConfiguration)); PCollection insertErrors = PCollectionList.of(convertMessagesResult.get(failedRowsTag)) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java index 0860b4eda8a2..58bbed8ba5a9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteRecordsInconsistent.java @@ -19,6 +19,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import java.util.Map; import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; @@ -55,6 +56,7 @@ public class StorageApiWriteRecordsInconsistent private final @Nullable String kmsKey; private final boolean usesCdc; private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation; + private final @Nullable Map bigLakeConfiguration; public StorageApiWriteRecordsInconsistent( StorageApiDynamicDestinations dynamicDestinations, @@ -69,7 +71,8 @@ public StorageApiWriteRecordsInconsistent( BigQueryIO.Write.CreateDisposition createDisposition, @Nullable String kmsKey, boolean usesCdc, - AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + @Nullable Map bigLakeConfiguration) { this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.failedRowsTag = failedRowsTag; @@ -83,6 +86,7 @@ public StorageApiWriteRecordsInconsistent( this.kmsKey = kmsKey; this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + this.bigLakeConfiguration = bigLakeConfiguration; } @Override @@ -116,7 +120,8 @@ public PCollectionTuple expand(PCollection private final @Nullable String kmsKey; private final boolean usesCdc; private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation; + private final @Nullable Map bigLakeConfiguration; /** * The Guava cache object is thread-safe. However our protocol requires that client pin the @@ -179,7 +180,8 @@ public StorageApiWriteUnshardedRecords( BigQueryIO.Write.CreateDisposition createDisposition, @Nullable String kmsKey, boolean usesCdc, - AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + @Nullable Map bigLakeConfiguration) { this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.failedRowsTag = failedRowsTag; @@ -193,6 +195,7 @@ public StorageApiWriteUnshardedRecords( this.kmsKey = kmsKey; this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + this.bigLakeConfiguration = bigLakeConfiguration; } @Override @@ -228,7 +231,8 @@ public PCollectionTuple expand(PCollection bigLakeConfiguration; WriteRecordsDoFn( String operationName, @@ -973,7 +978,8 @@ void postFlush() { @Nullable String kmsKey, boolean usesCdc, AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, - int maxRetries) { + int maxRetries, + @Nullable Map bigLakeConfiguration) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; @@ -992,6 +998,7 @@ void postFlush() { this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; this.maxRetries = maxRetries; + this.bigLakeConfiguration = bigLakeConfiguration; } boolean shouldFlush() { @@ -1098,7 +1105,8 @@ DestinationState createDestinationState( createDisposition, destinationCoder, kmsKey, - bqServices); + bqServices, + bigLakeConfiguration); return true; }; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index e2674fe34f2e..738a52b69cb7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -131,6 +131,7 @@ public class StorageApiWritesShardedRecords bigLakeConfiguration; private final Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME; private final TupleTag failedRowsTag; @@ -232,7 +233,8 @@ public StorageApiWritesShardedRecords( Predicate successfulRowsPredicate, boolean autoUpdateSchema, boolean ignoreUnknownValues, - AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + @Nullable Map bigLakeConfiguration) { this.dynamicDestinations = dynamicDestinations; this.createDisposition = createDisposition; this.kmsKey = kmsKey; @@ -246,6 +248,7 @@ public StorageApiWritesShardedRecords( this.autoUpdateSchema = autoUpdateSchema; this.ignoreUnknownValues = ignoreUnknownValues; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + this.bigLakeConfiguration = bigLakeConfiguration; } @Override @@ -499,7 +502,8 @@ public void process( createDisposition, destinationCoder, kmsKey, - bqServices); + bqServices, + bigLakeConfiguration); return true; }; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index e15258e6ab40..5b7b5d473190 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -96,6 +96,7 @@ public class BigQueryIOTranslationTest { WRITE_TRANSFORM_SCHEMA_MAPPING.put("getWriteDisposition", "write_disposition"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getSchemaUpdateOptions", "schema_update_options"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getTableDescription", "table_description"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put("getBigLakeConfiguration", "biglake_configuration"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getValidate", "validate"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getBigQueryServices", "bigquery_services"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getMaxFilesPerBundle", "max_files_per_bundle"); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index d96e22f84907..69994c019509 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2257,6 +2257,40 @@ public void testUpdateTableSchemaNoUnknownValues() throws Exception { p.run(); } + @Test + public void testBigLakeConfigurationFailsForNonStorageApiWrites() { + assumeTrue(!useStorageApi); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "bigLakeConfiguration is only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE"); + + p.apply(Create.empty(TableRowJsonCoder.of())) + .apply( + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table") + .withBigLakeConfiguration( + ImmutableMap.of( + "connectionId", "some-connection", + "storageUri", "gs://bucket")) + .withTestServices(fakeBqServices)); + p.run(); + } + + @Test + public void testBigLakeConfigurationFailsForMissingProperties() { + assumeTrue(useStorageApi); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("bigLakeConfiguration must contain keys 'connectionId' and 'storageUri'"); + + p.apply(Create.empty(TableRowJsonCoder.of())) + .apply( + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table") + .withBigLakeConfiguration(ImmutableMap.of("connectionId", "some-connection")) + .withTestServices(fakeBqServices)); + p.run(); + } + @SuppressWarnings({"unused"}) static class UpdateTableSchemaDoFn extends DoFn, TableRow> { @TimerId("updateTimer") diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java index 18c832f0c54b..858921e19ced 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiSinkCreateIfNeededIT.java @@ -17,23 +17,32 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.CONNECTION_ID; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.STORAGE_URI; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.api.services.storage.model.Objects; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.LongStream; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.Matchers; import org.joda.time.Duration; import org.junit.AfterClass; @@ -57,11 +66,16 @@ public static Iterable data() { private static final Logger LOG = LoggerFactory.getLogger(StorageApiSinkCreateIfNeededIT.class); - private static final BigqueryClient BQ_CLIENT = new BigqueryClient("StorageApiSinkFailedRowsIT"); + private static final BigqueryClient BQ_CLIENT = + new BigqueryClient("StorageApiSinkCreateIfNeededIT"); private static final String PROJECT = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); private static final String BIG_QUERY_DATASET_ID = - "storage_api_sink_failed_rows" + System.nanoTime(); + "storage_api_sink_create_tables_" + System.nanoTime(); + private static final String TEST_CONNECTION_ID = + "projects/apache-beam-testing/locations/us/connections/apache-beam-testing-storageapi-biglake-nodelete"; + private static final String TEST_STORAGE_URI = + "gs://apache-beam-testing-bq-biglake/" + StorageApiSinkCreateIfNeededIT.class.getSimpleName(); private static final List FIELDS = ImmutableList.builder() .add(new TableFieldSchema().setType("STRING").setName("str")) @@ -96,19 +110,55 @@ public void testCreateManyTables() throws IOException, InterruptedException { String table = "table" + System.nanoTime(); String tableSpecBase = PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table; - runPipeline(getMethod(), tableSpecBase, inputs); - assertTablesCreated(tableSpecBase, 100); + runPipeline(getMethod(), tableSpecBase, inputs, null); + assertTablesCreated(tableSpecBase, 100, true); } - private void assertTablesCreated(String tableSpecPrefix, int expectedRows) + @Test + public void testCreateBigLakeTables() throws IOException, InterruptedException { + int numTables = 5; + List inputs = + LongStream.range(0, numTables) + .mapToObj(l -> new TableRow().set("str", "foo").set("tablenum", l)) + .collect(Collectors.toList()); + + String table = "iceberg_table_" + System.nanoTime() + "_"; + String tableSpecBase = PROJECT + "." + BIG_QUERY_DATASET_ID + "." + table; + Map bigLakeConfiguration = + ImmutableMap.of( + CONNECTION_ID, TEST_CONNECTION_ID, + STORAGE_URI, TEST_STORAGE_URI); + runPipeline(getMethod(), tableSpecBase, inputs, bigLakeConfiguration); + assertTablesCreated(tableSpecBase, numTables, false); + assertIcebergTablesCreated(table, numTables); + } + + private void assertIcebergTablesCreated(String tablePrefix, int expectedRows) throws IOException, InterruptedException { + GcsUtil gcsUtil = TestPipeline.testingPipelineOptions().as(GcsOptions.class).getGcsUtil(); + + Objects objects = + gcsUtil.listObjects( + "apache-beam-testing-bq-biglake", + String.format( + "%s/%s/%s/%s", + getClass().getSimpleName(), PROJECT, BIG_QUERY_DATASET_ID, tablePrefix), + null); + + assertEquals(expectedRows, objects.getItems().size()); + } + + private void assertTablesCreated(String tableSpecPrefix, int expectedRows, boolean useWildCard) + throws IOException, InterruptedException { + String query = String.format("SELECT COUNT(*) FROM `%s`", tableSpecPrefix + "*"); + if (!useWildCard) { + query = String.format("SELECT (SELECT COUNT(*) FROM `%s`)", tableSpecPrefix + 0); + for (int i = 1; i < expectedRows; i++) { + query += String.format(" + (SELECT COUNT(*) FROM `%s`)", tableSpecPrefix + i); + } + } TableRow queryResponse = - Iterables.getOnlyElement( - BQ_CLIENT.queryUnflattened( - String.format("SELECT COUNT(*) FROM `%s`", tableSpecPrefix + "*"), - PROJECT, - true, - true)); + Iterables.getOnlyElement(BQ_CLIENT.queryUnflattened(query, PROJECT, true, true)); int numRowsWritten = Integer.parseInt((String) queryResponse.get("f0_")); if (useAtLeastOnce) { assertThat(numRowsWritten, Matchers.greaterThanOrEqualTo(expectedRows)); @@ -118,7 +168,10 @@ private void assertTablesCreated(String tableSpecPrefix, int expectedRows) } private static void runPipeline( - BigQueryIO.Write.Method method, String tableSpecBase, Iterable tableRows) { + BigQueryIO.Write.Method method, + String tableSpecBase, + Iterable tableRows, + @Nullable Map bigLakeConfiguration) { Pipeline p = Pipeline.create(); BigQueryIO.Write write = @@ -131,6 +184,9 @@ private static void runPipeline( write = write.withNumStorageWriteApiStreams(1); write = write.withTriggeringFrequency(Duration.standardSeconds(1)); } + if (bigLakeConfiguration != null) { + write = write.withBigLakeConfiguration(bigLakeConfiguration); + } PCollection input = p.apply("Create test cases", Create.of(tableRows)); input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); input.apply("Write using Storage Write API", write);