From 50ed69ab560491339409f0b50b5fcd596ff53623 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Wed, 13 Nov 2024 12:23:57 -0500 Subject: [PATCH] Portable Managed BigQuery destinations (#33017) * managed bigqueryio * spotless * move managed dependency to test only * cleanup after merging snake_case PR * choose write method based on boundedness and pipeline options * rename bigquery write config class * spotless * change read output tag to 'output' * spotless * revert logic that depends on DataflowServiceOptions. switching BQ methods can instead be done in Dataflow service side * spotless * fix typo * separate BQ write config to a new class * fix doc * resolve after syncing to HEAD * spotless * fork on batch/streaming * cleanup * spotless * portable bigquery destinations * move forking logic to BQ schematransform side * add file loads translation and tests; add test checks that the correct transform is chosen * set top-level wrapper to be the underlying managed BQ transform urn; change tests to verify underlying transform name * move unit tests to respectvie schematransform test classes * expose to Python SDK as well * cleanup * address comment * set enable_streaming_engine option; add to CHANGES --- CHANGES.md | 1 + .../io/google-cloud-platform/build.gradle | 1 + ...QueryFileLoadsSchemaTransformProvider.java | 12 +- ...torageWriteApiSchemaTransformProvider.java | 142 ++++++------------ .../providers/BigQueryWriteConfiguration.java | 30 +++- .../PortableBigQueryDestinations.java | 105 +++++++++++++ ...yFileLoadsSchemaTransformProviderTest.java | 39 +++++ .../bigquery/providers/BigQueryManagedIT.java | 91 ++++++++--- ...geWriteApiSchemaTransformProviderTest.java | 51 +++++-- 9 files changed, 327 insertions(+), 145 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java diff --git a/CHANGES.md b/CHANGES.md index 6962b0fb8ded..bc7ec096fe33 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -70,6 +70,7 @@ * [Managed Iceberg] Add support for TIMESTAMP, TIME, and DATE types ([#32688](https://github.com/apache/beam/pull/32688)) * BigQuery CDC writes are now available in Python SDK, only supported when using StorageWrite API at least once mode ([#32527](https://github.com/apache/beam/issues/32527)) * [Managed Iceberg] Allow updating table partition specs during pipeline runtime ([#32879](https://github.com/apache/beam/pull/32879)) +* Added BigQueryIO as a Managed IO ([#31486](https://github.com/apache/beam/pull/31486)) * Support for writing to [Solace messages queues](https://solace.com/) (`SolaceIO.Write`) added (Java) ([#31905](https://github.com/apache/beam/issues/31905)). ## New Features / Improvements diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 2acce3e94cc2..b8e71e289827 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -198,6 +198,7 @@ task integrationTest(type: Test, dependsOn: processTestResources) { "--runner=DirectRunner", "--project=${gcpProject}", "--tempRoot=${gcpTempRoot}", + "--tempLocation=${gcpTempRoot}", "--firestoreDb=${firestoreDb}", "--firestoreHost=${firestoreHost}", "--bigtableChangeStreamInstanceId=${bigtableChangeStreamInstanceId}", diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java index 092cf42a29a4..7872c91d1f72 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -89,16 +90,19 @@ public static class BigQueryFileLoadsSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection rowPCollection = input.getSinglePCollection(); - BigQueryIO.Write write = toWrite(input.getPipeline().getOptions()); + BigQueryIO.Write write = + toWrite(rowPCollection.getSchema(), input.getPipeline().getOptions()); rowPCollection.apply(write); return PCollectionRowTuple.empty(input.getPipeline()); } - BigQueryIO.Write toWrite(PipelineOptions options) { + BigQueryIO.Write toWrite(Schema schema, PipelineOptions options) { + PortableBigQueryDestinations dynamicDestinations = + new PortableBigQueryDestinations(schema, configuration); BigQueryIO.Write write = BigQueryIO.write() - .to(configuration.getTable()) + .to(dynamicDestinations) .withMethod(BigQueryIO.Write.Method.FILE_LOADS) .withFormatFunction(BigQueryUtils.toTableRow()) // TODO(https://github.com/apache/beam/issues/33074) BatchLoad's @@ -106,7 +110,7 @@ BigQueryIO.Write toWrite(PipelineOptions options) { .withCustomGcsTempLocation( ValueProvider.StaticValueProvider.of(options.getTempLocation())) .withWriteDisposition(WriteDisposition.WRITE_APPEND) - .useBeamSchema(); + .withFormatFunction(dynamicDestinations.getFilterFormatFunction(false)); if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java index c45433aaf0e7..1e53ad3553e0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java @@ -18,15 +18,14 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers; import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations.DESTINATION; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations.RECORD; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import com.google.api.services.bigquery.model.TableConstraints; -import com.google.api.services.bigquery.model.TableSchema; import com.google.auto.service.AutoService; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Optional; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; @@ -34,9 +33,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; -import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation; -import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -54,7 +51,6 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.joda.time.Duration; @@ -80,6 +76,7 @@ public class BigQueryStorageWriteApiSchemaTransformProvider private static final String FAILED_ROWS_TAG = "FailedRows"; private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors"; // magic string that tells us to write to dynamic destinations + protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"; protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info"; protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type"; protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number"; @@ -176,52 +173,6 @@ private static class NoOutputDoFn extends DoFn { public void process(ProcessContext c) {} } - private static class RowDynamicDestinations extends DynamicDestinations { - final Schema schema; - final String fixedDestination; - final List primaryKey; - - RowDynamicDestinations(Schema schema) { - this.schema = schema; - this.fixedDestination = null; - this.primaryKey = null; - } - - public RowDynamicDestinations( - Schema schema, String fixedDestination, List primaryKey) { - this.schema = schema; - this.fixedDestination = fixedDestination; - this.primaryKey = primaryKey; - } - - @Override - public String getDestination(ValueInSingleWindow element) { - return Optional.ofNullable(fixedDestination) - .orElseGet(() -> element.getValue().getString("destination")); - } - - @Override - public TableDestination getTable(String destination) { - return new TableDestination(destination, null); - } - - @Override - public TableSchema getSchema(String destination) { - return BigQueryUtils.toTableSchema(schema); - } - - @Override - public TableConstraints getTableConstraints(String destination) { - return Optional.ofNullable(this.primaryKey) - .filter(pk -> !pk.isEmpty()) - .map( - pk -> - new TableConstraints() - .setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk))) - .orElse(null); - } - } - @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { // Check that the input exists @@ -309,13 +260,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - void validateDynamicDestinationsExpectedSchema(Schema schema) { - checkArgument( - schema.getFieldNames().containsAll(Arrays.asList("destination", "record")), - "When writing to dynamic destinations, we expect Row Schema with a " - + "\"destination\" string field and a \"record\" Row field."); - } - BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { Method writeMethod = configuration.getUseAtLeastOnceSemantics() != null @@ -326,21 +270,37 @@ BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { BigQueryIO.Write write = BigQueryIO.write() .withMethod(writeMethod) - .withFormatFunction(BigQueryUtils.toTableRow()) .withWriteDisposition(WriteDisposition.WRITE_APPEND); - // in case CDC writes are configured we validate and include them in the configuration - if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) { - write = validateAndIncludeCDCInformation(write, schema); - } else if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { - validateDynamicDestinationsExpectedSchema(schema); + Schema rowSchema = schema; + boolean fetchNestedRecord = false; + if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { + validateDynamicDestinationsSchema(schema); + rowSchema = schema.getField(RECORD).getType().getRowSchema(); + fetchNestedRecord = true; + } + if (Boolean.TRUE.equals(configuration.getUseCdcWrites())) { + validateCdcSchema(schema); + rowSchema = schema.getField(RECORD).getType().getRowSchema(); + fetchNestedRecord = true; write = write - .to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema())) - .withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record"))); - } else { - write = write.to(configuration.getTable()).useBeamSchema(); + .withPrimaryKey(configuration.getPrimaryKey()) + .withRowMutationInformationFn( + row -> + RowMutationInformation.of( + RowMutationInformation.MutationType.valueOf( + row.getRow(ROW_PROPERTY_MUTATION_INFO) + .getString(ROW_PROPERTY_MUTATION_TYPE)), + row.getRow(ROW_PROPERTY_MUTATION_INFO) + .getString(ROW_PROPERTY_MUTATION_SQN))); } + PortableBigQueryDestinations dynamicDestinations = + new PortableBigQueryDestinations(rowSchema, configuration); + write = + write + .to(dynamicDestinations) + .withFormatFunction(dynamicDestinations.getFilterFormatFunction(fetchNestedRecord)); if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = @@ -363,19 +323,27 @@ BigQueryIO.Write createStorageWriteApiTransform(Schema schema) { return write; } - BigQueryIO.Write validateAndIncludeCDCInformation( - BigQueryIO.Write write, Schema schema) { + void validateDynamicDestinationsSchema(Schema schema) { + checkArgument( + schema.getFieldNames().containsAll(Arrays.asList(DESTINATION, RECORD)), + String.format( + "When writing to dynamic destinations, we expect Row Schema with a " + + "\"%s\" string field and a \"%s\" Row field.", + DESTINATION, RECORD)); + } + + private void validateCdcSchema(Schema schema) { checkArgument( - schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, "record")), + schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, RECORD)), "When writing using CDC functionality, we expect Row Schema with a " + "\"" + ROW_PROPERTY_MUTATION_INFO + "\" Row field and a \"record\" Row field."); - Schema rowSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema(); + Schema mutationSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema(); checkArgument( - rowSchema.equals(ROW_SCHEMA_MUTATION_INFO), + mutationSchema != null && mutationSchema.equals(ROW_SCHEMA_MUTATION_INFO), "When writing using CDC functionality, we expect a \"" + ROW_PROPERTY_MUTATION_INFO + "\" field of Row type with schema:\n" @@ -384,31 +352,7 @@ BigQueryIO.Write validateAndIncludeCDCInformation( + "Received \"" + ROW_PROPERTY_MUTATION_INFO + "\" field with schema:\n" - + rowSchema.toString()); - - String tableDestination = null; - - if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { - validateDynamicDestinationsExpectedSchema(schema); - } else { - tableDestination = configuration.getTable(); - } - - return write - .to( - new RowDynamicDestinations( - schema.getField("record").getType().getRowSchema(), - tableDestination, - configuration.getPrimaryKey())) - .withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record"))) - .withPrimaryKey(configuration.getPrimaryKey()) - .withRowMutationInformationFn( - row -> - RowMutationInformation.of( - RowMutationInformation.MutationType.valueOf( - row.getRow(ROW_PROPERTY_MUTATION_INFO) - .getString(ROW_PROPERTY_MUTATION_TYPE)), - row.getRow(ROW_PROPERTY_MUTATION_INFO).getString(ROW_PROPERTY_MUTATION_SQN))); + + mutationSchema); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java index 4296da7e0cd5..505ce7125cee 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteConfiguration.java @@ -18,20 +18,18 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -import javax.annotation.Nullable; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Configuration for writing to BigQuery with SchemaTransforms. Used by {@link @@ -68,11 +66,6 @@ public void validate() { !Strings.isNullOrEmpty(this.getTable()), invalidConfigMessage + "Table spec for a BigQuery Write must be specified."); - // if we have an input table spec, validate it - if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) { - checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable())); - } - // validate create and write dispositions String createDisposition = getCreateDisposition(); if (createDisposition != null && !createDisposition.isEmpty()) { @@ -186,6 +179,21 @@ public static Builder builder() { @Nullable public abstract List getPrimaryKey(); + @SchemaFieldDescription( + "A list of field names to keep in the input record. All other fields are dropped before writing. " + + "Is mutually exclusive with 'drop' and 'only'.") + public abstract @Nullable List getKeep(); + + @SchemaFieldDescription( + "A list of field names to drop from the input record before writing. " + + "Is mutually exclusive with 'keep' and 'only'.") + public abstract @Nullable List getDrop(); + + @SchemaFieldDescription( + "The name of a single record field that should be written. " + + "Is mutually exclusive with 'keep' and 'drop'.") + public abstract @Nullable String getOnly(); + /** Builder for {@link BigQueryWriteConfiguration}. */ @AutoValue.Builder public abstract static class Builder { @@ -212,6 +220,12 @@ public abstract static class Builder { public abstract Builder setPrimaryKey(List pkColumns); + public abstract Builder setKeep(List keep); + + public abstract Builder setDrop(List drop); + + public abstract Builder setOnly(String only); + /** Builds a {@link BigQueryWriteConfiguration} instance. */ public abstract BigQueryWriteConfiguration build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java new file mode 100644 index 000000000000..54d125012eac --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery.providers; + +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration.DYNAMIC_DESTINATIONS; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.api.services.bigquery.model.TableConstraints; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import java.util.List; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.RowFilter; +import org.apache.beam.sdk.util.RowStringInterpolator; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +@Internal +public class PortableBigQueryDestinations extends DynamicDestinations { + public static final String DESTINATION = "destination"; + public static final String RECORD = "record"; + private @MonotonicNonNull RowStringInterpolator interpolator = null; + private final @Nullable List primaryKey; + private final RowFilter rowFilter; + + public PortableBigQueryDestinations(Schema rowSchema, BigQueryWriteConfiguration configuration) { + // DYNAMIC_DESTINATIONS magic string is the old way of doing it for cross-language. + // In that case, we do no interpolation + if (!configuration.getTable().equals(DYNAMIC_DESTINATIONS)) { + this.interpolator = new RowStringInterpolator(configuration.getTable(), rowSchema); + } + this.primaryKey = configuration.getPrimaryKey(); + RowFilter rf = new RowFilter(rowSchema); + if (configuration.getDrop() != null) { + rf = rf.drop(checkStateNotNull(configuration.getDrop())); + } + if (configuration.getKeep() != null) { + rf = rf.keep(checkStateNotNull(configuration.getKeep())); + } + if (configuration.getOnly() != null) { + rf = rf.only(checkStateNotNull(configuration.getOnly())); + } + this.rowFilter = rf; + } + + @Override + public String getDestination(@Nullable ValueInSingleWindow element) { + if (interpolator != null) { + return interpolator.interpolate(checkArgumentNotNull(element)); + } + return checkStateNotNull(checkStateNotNull(element).getValue().getString(DESTINATION)); + } + + @Override + public TableDestination getTable(String destination) { + return new TableDestination(destination, null); + } + + @Override + public @Nullable TableSchema getSchema(String destination) { + return BigQueryUtils.toTableSchema(rowFilter.outputSchema()); + } + + @Override + public @Nullable TableConstraints getTableConstraints(String destination) { + if (primaryKey != null) { + return new TableConstraints() + .setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(primaryKey)); + } + return null; + } + + public SerializableFunction getFilterFormatFunction(boolean fetchNestedRecord) { + return row -> { + if (fetchNestedRecord) { + row = checkStateNotNull(row.getRow(RECORD)); + } + Row filtered = rowFilter.filter(row); + return BigQueryUtils.toTableRow(filtered); + }; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java index 897d95da3b13..168febea9d88 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java @@ -25,6 +25,7 @@ import com.google.api.services.bigquery.model.TableReference; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -32,6 +33,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; @@ -42,6 +44,7 @@ import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.util.RowFilter; import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -125,6 +128,42 @@ public void testLoad() throws IOException, InterruptedException { assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, TABLE_ID).size()); } + @Test + public void testWriteToPortableDynamicDestinations() throws Exception { + String destinationTemplate = + String.format("%s:%s.dynamic_write_{name}_{number}", PROJECT, DATASET); + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() + .setTable(destinationTemplate) + .setDrop(Collections.singletonList("number")) + .build(); + BigQueryFileLoadsSchemaTransform write = + (BigQueryFileLoadsSchemaTransform) + new BigQueryFileLoadsSchemaTransformProvider().from(config); + write.setTestBigQueryServices(fakeBigQueryServices); + + PCollection inputRows = p.apply(Create.of(ROWS)).setRowSchema(SCHEMA); + PCollectionRowTuple.of("input", inputRows).apply(write); + p.run().waitUntilFinish(); + + RowFilter rowFilter = new RowFilter(SCHEMA).drop(Collections.singletonList("number")); + assertEquals( + rowFilter.filter(ROWS.get(0)), + BigQueryUtils.toBeamRow( + rowFilter.outputSchema(), + fakeDatasetService.getAllRows(PROJECT, DATASET, "dynamic_write_a_1").get(0))); + assertEquals( + rowFilter.filter(ROWS.get(1)), + BigQueryUtils.toBeamRow( + rowFilter.outputSchema(), + fakeDatasetService.getAllRows(PROJECT, DATASET, "dynamic_write_b_2").get(0))); + assertEquals( + rowFilter.filter(ROWS.get(2)), + BigQueryUtils.toBeamRow( + rowFilter.outputSchema(), + fakeDatasetService.getAllRows(PROJECT, DATASET, "dynamic_write_c_3").get(0))); + } + @Test public void testManagedChoosesFileLoadsForBoundedWrites() { PCollection batchInput = p.apply(Create.of(ROWS)).setRowSchema(SCHEMA); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java index 63727107a651..3aba2c2c6fef 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java @@ -17,22 +17,30 @@ */ package org.apache.beam.sdk.io.gcp.bigquery.providers; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.LongStream; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PeriodicImpulse; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.RowFilter; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; @@ -58,17 +66,14 @@ public class BigQueryManagedIT { private static final Schema SCHEMA = Schema.of( Schema.Field.of("str", Schema.FieldType.STRING), - Schema.Field.of("number", Schema.FieldType.INT64)); + Schema.Field.of("number", Schema.FieldType.INT64), + Schema.Field.of("dest", Schema.FieldType.INT64)); + + private static final SerializableFunction ROW_FUNC = + l -> Row.withSchema(SCHEMA).addValue(Long.toString(l)).addValue(l).addValue(l % 3).build(); private static final List ROWS = - LongStream.range(0, 20) - .mapToObj( - i -> - Row.withSchema(SCHEMA) - .withFieldValue("str", Long.toString(i)) - .withFieldValue("number", i) - .build()) - .collect(Collectors.toList()); + LongStream.range(0, 20).mapToObj(ROW_FUNC::apply).collect(Collectors.toList()); private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryManagedIT"); @@ -93,10 +98,6 @@ public void testBatchFileLoadsWriteRead() { String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); Map config = ImmutableMap.of("table", table); - // file loads requires a GCS temp location - String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); - writePipeline.getOptions().setTempLocation(tempLocation); - // batch write PCollectionRowTuple.of("input", getInput(writePipeline, false)) .apply(Managed.write(Managed.BIGQUERY).withConfig(config)); @@ -131,6 +132,59 @@ public void testStreamingStorageWriteRead() { readPipeline.run().waitUntilFinish(); } + public void testDynamicDestinations(boolean streaming) throws IOException, InterruptedException { + String baseTableName = + String.format("%s:%s.dynamic_" + System.nanoTime(), PROJECT, BIG_QUERY_DATASET_ID); + String destinationTemplate = baseTableName + "_{dest}"; + Map config = + ImmutableMap.of("table", destinationTemplate, "drop", Collections.singletonList("dest")); + + // write + PCollectionRowTuple.of("input", getInput(writePipeline, streaming)) + .apply(Managed.write(Managed.BIGQUERY).withConfig(config)); + writePipeline.run().waitUntilFinish(); + + List destinations = + Arrays.asList(baseTableName + "_0", baseTableName + "_1", baseTableName + "_2"); + + // read and validate each table destination + RowFilter rowFilter = new RowFilter(SCHEMA).drop(Collections.singletonList("dest")); + for (int i = 0; i < destinations.size(); i++) { + long mod = i; + String dest = destinations.get(i); + List writtenRows = + BQ_CLIENT + .queryUnflattened(String.format("SELECT * FROM [%s]", dest), PROJECT, true, false) + .stream() + .map(tableRow -> BigQueryUtils.toBeamRow(rowFilter.outputSchema(), tableRow)) + .collect(Collectors.toList()); + + List expectedRecords = + ROWS.stream() + .filter(row -> row.getInt64("dest") == mod) + .map(rowFilter::filter) + .collect(Collectors.toList()); + + assertThat(writtenRows, containsInAnyOrder(expectedRecords.toArray())); + } + } + + @Test + public void testStreamingDynamicDestinations() throws IOException, InterruptedException { + if (writePipeline.getOptions().getRunner().getName().contains("DataflowRunner")) { + // Need to manually enable streaming engine for legacy dataflow runner + ExperimentalOptions.addExperiment( + writePipeline.getOptions().as(ExperimentalOptions.class), + GcpOptions.STREAMING_ENGINE_EXPERIMENT); + } + testDynamicDestinations(true); + } + + @Test + public void testBatchDynamicDestinations() throws IOException, InterruptedException { + testDynamicDestinations(false); + } + public PCollection getInput(Pipeline p, boolean isStreaming) { if (isStreaming) { return p.apply( @@ -138,14 +192,7 @@ public PCollection getInput(Pipeline p, boolean isStreaming) { .startAt(new Instant(0)) .stopAt(new Instant(19)) .withInterval(Duration.millis(1))) - .apply( - MapElements.into(TypeDescriptors.rows()) - .via( - i -> - Row.withSchema(SCHEMA) - .withFieldValue("str", Long.toString(i.getMillis())) - .withFieldValue("number", i.getMillis()) - .build())) + .apply(MapElements.into(TypeDescriptors.rows()).via(i -> ROW_FUNC.apply(i.getMillis()))) .setRowSchema(SCHEMA); } return p.apply(Create.of(ROWS)).setRowSchema(SCHEMA); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java index 7b59552bbbe4..584309778286 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.bigquery.providers; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations.DESTINATION; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations.RECORD; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; @@ -37,6 +39,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransform; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; @@ -56,6 +59,7 @@ import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.util.RowFilter; import org.apache.beam.sdk.util.construction.PipelineTranslation; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -116,7 +120,6 @@ public void setUp() throws Exception { public void testInvalidConfig() { List invalidConfigs = Arrays.asList( - BigQueryWriteConfiguration.builder().setTable("not_a_valid_table_spec"), BigQueryWriteConfiguration.builder() .setTable("project:dataset.table") .setCreateDisposition("INVALID_DISPOSITION")); @@ -170,10 +173,7 @@ public Boolean rowsEquals(List expectedRows, List actualRows) { } public boolean rowEquals(Row expectedRow, TableRow actualRow) { - return expectedRow.getValue("name").equals(actualRow.get("name")) - && expectedRow - .getValue("number") - .equals(Long.parseLong(actualRow.get("number").toString())); + return expectedRow.equals(BigQueryUtils.toBeamRow(expectedRow.getSchema(), actualRow)); } @Test @@ -199,14 +199,14 @@ public void testWriteToDynamicDestinations() throws Exception { String baseTableSpec = "project:dataset.dynamic_write_"; Schema schemaWithDestinations = - Schema.builder().addStringField("destination").addRowField("record", SCHEMA).build(); + Schema.builder().addStringField(DESTINATION).addRowField(RECORD, SCHEMA).build(); List rowsWithDestinations = ROWS.stream() .map( row -> Row.withSchema(schemaWithDestinations) - .withFieldValue("destination", baseTableSpec + row.getInt64("number")) - .withFieldValue("record", row) + .withFieldValue(DESTINATION, baseTableSpec + row.getInt64("number")) + .withFieldValue(RECORD, row) .build()) .collect(Collectors.toList()); @@ -227,17 +227,44 @@ public void testWriteToDynamicDestinations() throws Exception { fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_3").get(0))); } + @Test + public void testWriteToPortableDynamicDestinations() throws Exception { + String destinationTemplate = "project:dataset.dynamic_write_{name}_{number}"; + BigQueryWriteConfiguration config = + BigQueryWriteConfiguration.builder() + .setTable(destinationTemplate) + .setKeep(Arrays.asList("number", "dt")) + .build(); + + runWithConfig(config); + p.run().waitUntilFinish(); + + RowFilter rowFilter = new RowFilter(SCHEMA).keep(Arrays.asList("number", "dt")); + assertTrue( + rowEquals( + rowFilter.filter(ROWS.get(0)), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_a_1").get(0))); + assertTrue( + rowEquals( + rowFilter.filter(ROWS.get(1)), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_b_2").get(0))); + assertTrue( + rowEquals( + rowFilter.filter(ROWS.get(2)), + fakeDatasetService.getAllRows("project", "dataset", "dynamic_write_c_3").get(0))); + } + List createCDCUpsertRows(List rows, boolean dynamicDestination, String tablePrefix) { Schema.Builder schemaBuilder = Schema.builder() - .addRowField("record", SCHEMA) + .addRowField(RECORD, SCHEMA) .addRowField( BigQueryStorageWriteApiSchemaTransformProvider.ROW_PROPERTY_MUTATION_INFO, BigQueryStorageWriteApiSchemaTransformProvider.ROW_SCHEMA_MUTATION_INFO); if (dynamicDestination) { - schemaBuilder = schemaBuilder.addStringField("destination"); + schemaBuilder = schemaBuilder.addStringField(DESTINATION); } Schema schemaWithCDC = schemaBuilder.build(); @@ -261,10 +288,10 @@ List createCDCUpsertRows(List rows, boolean dynamicDestination, String .ROW_PROPERTY_MUTATION_SQN, "AAA" + idx) .build()) - .withFieldValue("record", row); + .withFieldValue(RECORD, row); if (dynamicDestination) { rowBuilder = - rowBuilder.withFieldValue("destination", tablePrefix + row.getInt64("number")); + rowBuilder.withFieldValue(DESTINATION, tablePrefix + row.getInt64("number")); } return rowBuilder.build(); })