From 78a744484233f7b52b142855bf6de6330b6836b5 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Mon, 22 Jan 2024 11:17:57 -0500 Subject: [PATCH 01/14] initial work on bqio dlq --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 183 +++++++++++++++--- .../gcp/bigquery/BigQueryIOTranslation.java | 13 ++ .../bigquery/BigQueryIOTranslationTest.java | 2 + 3 files changed, 167 insertions(+), 31 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 4424b53f83ce..56f6226f0d32 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -74,13 +75,13 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.extensions.avro.coders.AvroCoder; import org.apache.beam.sdk.extensions.avro.io.AvroSource; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.extensions.gcp.util.Transport; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions; @@ -111,6 +112,8 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -120,6 +123,11 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.ThrowingBadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -742,6 +750,8 @@ public static TypedRead read(SerializableFunction par .setUseAvroLogicalTypes(false) .setFormat(DataFormat.AVRO) .setProjectionPushdownApplied(false) + .setBadRecordErrorHandler(new DefaultErrorHandler<>()) + .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .build(); } @@ -770,6 +780,8 @@ public static TypedRead readWithDatumReader( .setUseAvroLogicalTypes(false) .setFormat(DataFormat.AVRO) .setProjectionPushdownApplied(false) + .setBadRecordErrorHandler(new DefaultErrorHandler<>()) + .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .build(); } @@ -985,6 +997,11 @@ abstract Builder setDatumReaderFactory( abstract Builder setUseAvroLogicalTypes(Boolean useAvroLogicalTypes); + abstract Builder setBadRecordErrorHandler( + ErrorHandler badRecordErrorHandler); + + abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); + abstract Builder setProjectionPushdownApplied(boolean projectionPushdownApplied); } @@ -1033,6 +1050,10 @@ abstract Builder setDatumReaderFactory( abstract Boolean getUseAvroLogicalTypes(); + abstract ErrorHandler getBadRecordErrorHandler(); + + abstract BadRecordRouter getBadRecordRouter(); + abstract boolean getProjectionPushdownApplied(); /** @@ -1138,6 +1159,9 @@ public void validate(PipelineOptions options) { e); } } + checkArgument( + getBadRecordRouter().equals(BadRecordRouter.THROWING_ROUTER), + "BigQueryIO Read with Error Handling is only available when DIRECT_READ is used"); } ValueProvider table = getTableProvider(); @@ -1428,7 +1452,7 @@ private PCollection expandForDirectRead( PBegin input, Coder outputCoder, Schema beamSchema, BigQueryOptions bqOptions) { ValueProvider tableProvider = getTableProvider(); Pipeline p = input.getPipeline(); - if (tableProvider != null) { + if (tableProvider != null && getBadRecordRouter() instanceof ThrowingBadRecordRouter) { // No job ID is required. Read directly from BigQuery storage. PCollection rows = p.apply( @@ -1475,7 +1499,8 @@ private PCollection expandForDirectRead( PCollectionTuple tuple; PCollection rows; - if (!getWithTemplateCompatibility()) { + if (!getWithTemplateCompatibility() + && getBadRecordRouter() instanceof ThrowingBadRecordRouter) { // Create a singleton job ID token at pipeline construction time. String staticJobUuid = BigQueryHelpers.randomUUIDString(); jobIdTokenView = @@ -1724,13 +1749,44 @@ public void processElement(ProcessContext c) throws Exception { return tuple; } + private class ErrorHandlingParseFn implements SerializableFunction { + private final SerializableFunction parseFn; + + private SchemaAndRecord schemaAndRecord = null; + + private ErrorHandlingParseFn(SerializableFunction parseFn) { + this.parseFn = parseFn; + } + + @Override + public T apply(SchemaAndRecord input) { + schemaAndRecord = input; + try { + return parseFn.apply(input); + } catch (Exception e) { + throw new ParseException(e); + } + } + + public SchemaAndRecord getSchemaAndRecord() { + return schemaAndRecord; + } + } + + private static class ParseException extends RuntimeException { + public ParseException(Exception e) { + super(e); + } + } + private PCollection createPCollectionForDirectRead( PCollectionTuple tuple, Coder outputCoder, TupleTag readStreamsTag, PCollectionView readSessionView, PCollectionView tableSchemaView) { - PCollection rows = + TupleTag rowTag = new TupleTag<>(); + PCollectionTuple resultTuple = tuple .get(readStreamsTag) .apply(Reshuffle.viaRandomKey()) @@ -1738,36 +1794,43 @@ private PCollection createPCollectionForDirectRead( ParDo.of( new DoFn() { @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement( + ProcessContext c, MultiOutputReceiver outputReceiver) + throws Exception { ReadSession readSession = c.sideInput(readSessionView); TableSchema tableSchema = BigQueryHelpers.fromJsonString( c.sideInput(tableSchemaView), TableSchema.class); ReadStream readStream = c.element(); + ErrorHandlingParseFn errorHandlingParseFn = + new ErrorHandlingParseFn(getParseFn()); + BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create( readSession, readStream, tableSchema, - getParseFn(), + errorHandlingParseFn, outputCoder, getBigQueryServices()); - // Read all of the data from the stream. In the event that this work - // item fails and is rescheduled, the same rows will be returned in - // the same order. - BoundedSource.BoundedReader reader = - streamSource.createReader(c.getPipelineOptions()); - for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); - } + readStreamSource( + c.getPipelineOptions(), + rowTag, + outputReceiver, + streamSource, + errorHandlingParseFn); } }) - .withSideInputs(readSessionView, tableSchemaView)) - .setCoder(outputCoder); + .withSideInputs(readSessionView, tableSchemaView) + .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG))); - return rows; + getBadRecordErrorHandler() + .addErrorCollection( + resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(tuple.getPipeline()))); + + return resultTuple.get(rowTag).setCoder(outputCoder); } private PCollection createPCollectionForDirectReadWithStreamBundle( @@ -1776,7 +1839,8 @@ private PCollection createPCollectionForDirectReadWithStreamBundle( TupleTag> listReadStreamsTag, PCollectionView readSessionView, PCollectionView tableSchemaView) { - PCollection rows = + TupleTag rowTag = new TupleTag<>(); + PCollectionTuple resultTuple = tuple .get(listReadStreamsTag) .apply(Reshuffle.viaRandomKey()) @@ -1784,37 +1848,87 @@ private PCollection createPCollectionForDirectReadWithStreamBundle( ParDo.of( new DoFn, T>() { @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement( + ProcessContext c, MultiOutputReceiver outputReceiver) + throws Exception { ReadSession readSession = c.sideInput(readSessionView); TableSchema tableSchema = BigQueryHelpers.fromJsonString( c.sideInput(tableSchemaView), TableSchema.class); List streamBundle = c.element(); + ErrorHandlingParseFn errorHandlingParseFn = + new ErrorHandlingParseFn(getParseFn()); + BigQueryStorageStreamBundleSource streamSource = BigQueryStorageStreamBundleSource.create( readSession, streamBundle, tableSchema, - getParseFn(), + errorHandlingParseFn, outputCoder, getBigQueryServices(), 1L); - // Read all of the data from the stream. In the event that this work - // item fails and is rescheduled, the same rows will be returned in - // the same order. - BoundedReader reader = - streamSource.createReader(c.getPipelineOptions()); - for (boolean more = reader.start(); more; more = reader.advance()) { - c.output(reader.getCurrent()); - } + readStreamSource( + c.getPipelineOptions(), + rowTag, + outputReceiver, + streamSource, + errorHandlingParseFn); } }) - .withSideInputs(readSessionView, tableSchemaView)) - .setCoder(outputCoder); + .withSideInputs(readSessionView, tableSchemaView) + .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG))); - return rows; + getBadRecordErrorHandler() + .addErrorCollection( + resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(tuple.getPipeline()))); + + return resultTuple.get(rowTag).setCoder(outputCoder); + } + + public void readStreamSource( + PipelineOptions options, + TupleTag rowTag, + MultiOutputReceiver outputReceiver, + BoundedSource streamSource, + ErrorHandlingParseFn errorHandlingParseFn) + throws Exception { + // Read all the data from the stream. In the event that this work + // item fails and is rescheduled, the same rows will be returned in + // the same order. + BoundedSource.BoundedReader reader = streamSource.createReader(options); + + boolean more = false; + try { + more = reader.start(); + } catch (ParseException e) { + GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord(); + getBadRecordRouter() + .route( + outputReceiver, + record, + AvroCoder.of(record.getSchema()), + (Exception) e.getCause(), + "Unable to parse record reading from BigQuery"); + } + + while (more) { + outputReceiver.get(rowTag).output(reader.getCurrent()); + try { + more = reader.advance(); + } catch (ParseException e) { + GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord(); + getBadRecordRouter() + .route( + outputReceiver, + record, + AvroCoder.of(record.getSchema()), + (Exception) e.getCause(), + "Unable to parse record reading from BigQuery"); + } + } } @Override @@ -2014,6 +2128,13 @@ public TypedRead withRowRestriction(ValueProvider rowRestriction) { return toBuilder().setRowRestriction(rowRestriction).build(); } + public TypedRead withErrorHandler(ErrorHandler badRecordErrorHandler) { + return toBuilder() + .setBadRecordErrorHandler(badRecordErrorHandler) + .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) + .build(); + } + public TypedRead withTemplateCompatibility() { return toBuilder().setWithTemplateCompatibility(true).build(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index a3a270a315b2..05772ee6fdd3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -57,6 +57,9 @@ import org.apache.beam.sdk.schemas.logicaltypes.NanosDuration; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; @@ -99,6 +102,8 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator transform) { fieldValues.put("use_avro_logical_types", transform.getUseAvroLogicalTypes()); } fieldValues.put("projection_pushdown_applied", transform.getProjectionPushdownApplied()); + fieldValues.put("bad_record_router", toByteArray(transform.getBadRecordRouter())); + fieldValues.put( + "bad_record_error_handler", toByteArray(transform.getBadRecordErrorHandler())); return Row.withSchema(schema).withFieldValues(fieldValues).build(); } @@ -302,6 +310,11 @@ public TypedRead fromConfigRow(Row configRow) { if (projectionPushdownApplied != null) { builder = builder.setProjectionPushdownApplied(projectionPushdownApplied); } + byte[] badRecordRouter = configRow.getBytes("bad_record_router"); + builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); + byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler"); + builder.setBadRecordErrorHandler( + (ErrorHandler) fromByteArray(badRecordErrorHandler)); return builder.build(); } catch (InvalidClassException e) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index c46d382bb298..845e93bb4ee2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -68,6 +68,8 @@ public class BigQueryIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put("getUseAvroLogicalTypes", "use_avro_logical_types"); READ_TRANSFORM_SCHEMA_MAPPING.put( "getProjectionPushdownApplied", "projection_pushdown_applied"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordRouter", "bad_record_router"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordErrorHandler", "bad_record_error_handler"); } static final Map WRITE_TRANSFORM_SCHEMA_MAPPING = new HashMap<>(); From 7597a602b2d50486943cae88355839d78496401e Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 23 Jan 2024 10:59:00 -0500 Subject: [PATCH 02/14] add test for exception handling on bq --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 16 ++-- .../BigQueryStorageStreamBundleSource.java | 9 ++- .../bigquery/BigQueryStorageStreamSource.java | 8 +- .../bigquery/BigQueryIOStorageQueryTest.java | 80 ++++++++++++++----- 4 files changed, 79 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 56f6226f0d32..78ec881473f7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1900,9 +1900,12 @@ public void readStreamSource( // the same order. BoundedSource.BoundedReader reader = streamSource.createReader(options); - boolean more = false; try { - more = reader.start(); + if (reader.start()) { + outputReceiver.get(rowTag).output(reader.getCurrent()); + } else { + return; + } } catch (ParseException e) { GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord(); getBadRecordRouter() @@ -1914,10 +1917,13 @@ public void readStreamSource( "Unable to parse record reading from BigQuery"); } - while (more) { - outputReceiver.get(rowTag).output(reader.getCurrent()); + while (true) { try { - more = reader.advance(); + if (reader.advance()) { + outputReceiver.get(rowTag).output(reader.getCurrent()); + } else { + return; + } } catch (ParseException e) { GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord(); getBadRecordRouter() diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java index a2df86af1ee6..044a87529c02 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java @@ -334,10 +334,6 @@ private boolean readNextRecord() throws IOException { reader.processReadRowsResponse(response); } - SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema); - - current = parseFn.apply(schemaAndRecord); - // Calculates the fraction of the current stream that has been consumed. This value is // calculated by interpolating between the fraction consumed value from the previous server // response (or zero if we're consuming the first response) and the fractional value in the @@ -355,6 +351,11 @@ private boolean readNextRecord() throws IOException { // progress made in the current Stream gives us the overall StreamBundle progress. fractionOfStreamBundleConsumed = (currentStreamBundleIndex + fractionOfCurrentStreamConsumed) / source.streamBundle.size(); + + SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema); + + current = parseFn.apply(schemaAndRecord); + return true; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java index a4336cd48f94..3ec3fed5eb55 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java @@ -274,10 +274,6 @@ private synchronized boolean readNextRecord() throws IOException { reader.processReadRowsResponse(response); } - SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema); - - current = parseFn.apply(schemaAndRecord); - // Updates the fraction consumed value. This value is calculated by interpolating between // the fraction consumed value from the previous server response (or zero if we're consuming // the first response) and the fractional value in the current response based on how many of @@ -291,6 +287,10 @@ private synchronized boolean readNextRecord() throws IOException { * 1.0 / totalRowsInCurrentResponse; + SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema); + + current = parseFn.apply(schemaAndRecord); + return true; } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java index af6dd505b916..0c5325286dd7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java @@ -78,6 +78,9 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -769,18 +772,8 @@ public void testQuerySourceCreateReader() throws Exception { querySource.createReader(options); } - @Test - public void testReadFromBigQueryIO() throws Exception { - doReadFromBigQueryIO(false); - } - - @Test - public void testReadFromBigQueryIOWithTemplateCompatibility() throws Exception { - doReadFromBigQueryIO(true); - } - - private void doReadFromBigQueryIO(boolean templateCompatibility) throws Exception { - + public TypedRead> configureTypedRead( + SerializableFunction> parseFn) throws Exception { TableReference sourceTableRef = BigQueryHelpers.parseTableSpec("project:dataset.table"); fakeDatasetService.createDataset( @@ -840,15 +833,29 @@ private void doReadFromBigQueryIO(boolean templateCompatibility) throws Exceptio when(fakeStorageClient.readRows(expectedReadRowsRequest, "")) .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses)); - BigQueryIO.TypedRead> typedRead = - BigQueryIO.read(new ParseKeyValue()) - .fromQuery(encodedQuery) - .withMethod(Method.DIRECT_READ) - .withTestServices( - new FakeBigQueryServices() - .withDatasetService(fakeDatasetService) - .withJobService(fakeJobService) - .withStorageClient(fakeStorageClient)); + return BigQueryIO.read(parseFn) + .fromQuery(encodedQuery) + .withMethod(Method.DIRECT_READ) + .withTestServices( + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withJobService(fakeJobService) + .withStorageClient(fakeStorageClient)); + } + + @Test + public void testReadFromBigQueryIO() throws Exception { + doReadFromBigQueryIO(false); + } + + @Test + public void testReadFromBigQueryIOWithTemplateCompatibility() throws Exception { + doReadFromBigQueryIO(true); + } + + private void doReadFromBigQueryIO(boolean templateCompatibility) throws Exception { + + BigQueryIO.TypedRead> typedRead = configureTypedRead(new ParseKeyValue()); if (templateCompatibility) { typedRead = typedRead.withTemplateCompatibility(); @@ -862,4 +869,35 @@ private void doReadFromBigQueryIO(boolean templateCompatibility) throws Exceptio p.run(); } + + private static final class FailingParseKeyValue + implements SerializableFunction> { + @Override + public KV apply(SchemaAndRecord input) { + if (input.getRecord().get("name").toString().equals("B")) { + throw new RuntimeException("ExpectedException"); + } + return KV.of( + input.getRecord().get("name").toString(), (Long) input.getRecord().get("number")); + } + } + + @Test + public void testReadFromBigQueryWithExceptionHandling() throws Exception { + + TypedRead> typedRead = configureTypedRead(new FailingParseKeyValue()); + + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + typedRead = typedRead.withErrorHandler(errorHandler); + PCollection> output = p.apply(typedRead); + errorHandler.close(); + + PAssert.that(output) + .containsInAnyOrder(ImmutableList.of(KV.of("A", 1L), KV.of("C", 3L), KV.of("D", 4L))); + + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(1L); + + p.run(); + } } From c09cacc216ffe97bf9e408c7529261ce17237a71 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 23 Jan 2024 11:18:53 -0500 Subject: [PATCH 03/14] add tests --- .../gcp/bigquery/BigQueryIOStorageReadIT.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index b4f6ddb76f72..5adea9ff227d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION; import static org.junit.Assert.assertEquals; +import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.DataFormat; import java.util.Map; import org.apache.beam.sdk.Pipeline; @@ -43,6 +44,10 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -121,6 +126,38 @@ private void runBigQueryIOStorageReadPipeline() { p.run().waitUntilFinish(); } + static class FailingTableRowParser implements SerializableFunction { + + public static final FailingTableRowParser INSTANCE = new FailingTableRowParser(); + + private int parseCount = 0; + @Override + public TableRow apply(SchemaAndRecord schemaAndRecord) { + parseCount++; + if (parseCount % 50 == 0) { + throw new RuntimeException("ExpectedException"); + } + return TableRowParser.INSTANCE.apply(schemaAndRecord); + } + } + + private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception { + Pipeline p = Pipeline.create(options); + ErrorHandler> errorHandler = p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + PCollection count = p.apply( + "Read", + BigQueryIO.read(FailingTableRowParser.INSTANCE) + .from(options.getInputTable()) + .withMethod(Method.DIRECT_READ) + .withFormat(options.getDataFormat()) + .withErrorHandler(errorHandler)) + .apply("Count", Count.globally()); + + PAssert.thatSingleton(count).isEqualTo(options.getNumRecords()* 49/50); + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(options.getNumRecords()/50); + p.run().waitUntilFinish(); + } + @Test public void testBigQueryStorageRead1GAvro() throws Exception { setUpTestEnvironment("1G", DataFormat.AVRO); @@ -133,6 +170,18 @@ public void testBigQueryStorageRead1GArrow() throws Exception { runBigQueryIOStorageReadPipeline(); } + @Test + public void testBigQueryStorageRead1MErrorHandlingAvro() throws Exception { + setUpTestEnvironment("1M", DataFormat.AVRO); + runBigQueryIOStorageReadPipelineErrorHandling(); + } + + @Test + public void testBigQueryStorageRead1MErrorHandlingArrow() throws Exception { + setUpTestEnvironment("1M", DataFormat.ARROW); + runBigQueryIOStorageReadPipelineErrorHandling(); + } + @Test public void testBigQueryStorageReadWithAvro() throws Exception { storageReadWithSchema(DataFormat.AVRO); From c7ad4195d755d58de771e029ba787f37220bb028 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 23 Jan 2024 13:48:46 -0500 Subject: [PATCH 04/14] add trigger file to force the postcommit --- .github/trigger_files/beam_PostCommit_Java.json | 0 .../apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java | 1 - 2 files changed, 1 deletion(-) create mode 100644 .github/trigger_files/beam_PostCommit_Java.json diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index 5adea9ff227d..54c3a90e6d31 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -46,7 +46,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.errorhandling.BadRecord; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; -import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; From 983b1356017098df044472e504e0cf181b863a16 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 1 Feb 2024 12:12:30 -0500 Subject: [PATCH 05/14] wire error handling into storage write, update check to not log a warning if errorhandler is used but error output isn't consumed. --- .../beam/runners/dataflow/DataflowRunner.java | 6 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 31 +++++++- .../sdk/io/gcp/bigquery/StorageApiLoads.java | 74 ++++++++++++++++++- .../gcp/bigquery/BigQueryIOStorageReadIT.java | 27 ++++--- 4 files changed, 122 insertions(+), 16 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5f8098768261..0906575bf9c4 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1668,7 +1668,11 @@ public CompositeBehavior enterCompositeTransform(Node node) { String rootBigQueryTransform = ""; if (transform.getClass().equals(StorageApiLoads.class)) { StorageApiLoads storageLoads = (StorageApiLoads) transform; - failedTag = storageLoads.getFailedRowsTag(); + //If the storage load is directing exceptions to an error handler, we don't need to + //warn for unconsumed rows + if (!storageLoads.usesErrorHandler()) { + failedTag = storageLoads.getFailedRowsTag(); + } // For storage API the transform that outputs failed rows is nested one layer below // BigQueryIO. rootBigQueryTransform = node.getEnclosingNode().getFullName(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 78ec881473f7..69ecab419385 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.RECORDING_ROUTER; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -113,7 +114,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; -import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -2278,6 +2278,8 @@ public static Write write() { .setDirectWriteProtos(true) .setDefaultMissingValueInterpretation( AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE) + .setBadRecordErrorHandler(new DefaultErrorHandler<>()) + .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .build(); } @@ -2486,6 +2488,10 @@ public enum Method { abstract @Nullable SerializableFunction getRowMutationInformationFn(); + abstract ErrorHandler getBadRecordErrorHandler(); + + abstract BadRecordRouter getBadRecordRouter(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -2594,6 +2600,11 @@ abstract Builder setDeterministicRecordIdFn( abstract Builder setRowMutationInformationFn( SerializableFunction rowMutationFn); + abstract Builder setBadRecordErrorHandler( + ErrorHandler badRecordErrorHandler); + + abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); + abstract Write build(); } @@ -3261,6 +3272,13 @@ public Write withWriteTempDataset(String writeTempDataset) { return toBuilder().setWriteTempDataset(writeTempDataset).build(); } + public Write withErrorHandler(ErrorHandler errorHandler) { + return toBuilder() + .setBadRecordErrorHandler(errorHandler) + .setBadRecordRouter(RECORDING_ROUTER) + .build(); + } + @Override public void validate(PipelineOptions pipelineOptions) { BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class); @@ -3667,6 +3685,9 @@ private WriteResult continueExpandTyped( checkArgument( !getPropagateSuccessfulStorageApiWrites(), "withPropagateSuccessfulStorageApiWrites only supported when using storage api writes."); + checkArgument( + !(getBadRecordRouter() instanceof ThrowingBadRecordRouter), + "Error Handling is not supported with STREAMING_INSERTS"); RowWriterFactory.TableRowWriterFactory tableRowWriterFactory = (RowWriterFactory.TableRowWriterFactory) rowWriterFactory; @@ -3701,6 +3722,10 @@ private WriteResult continueExpandTyped( checkArgument( !getPropagateSuccessfulStorageApiWrites(), "withPropagateSuccessfulStorageApiWrites only supported when using storage api writes."); + if (!(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) { + LOG.warn( + "Error Handling is partially supported when using FILE_LOADS. Consider using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE"); + } // Batch load jobs currently support JSON data insertion only with CSV files if (getJsonSchema() != null && getJsonSchema().isAccessible()) { @@ -3845,7 +3870,9 @@ private WriteResult continueExpandTyped( getIgnoreUnknownValues(), getPropagateSuccessfulStorageApiWrites(), getRowMutationInformationFn() != null, - getDefaultMissingValueInterpretation()); + getDefaultMissingValueInterpretation(), + getBadRecordRouter(), + getBadRecordErrorHandler()); return input.apply("StorageApiLoads", storageApiLoads); } else { throw new RuntimeException("Unexpected write method " + method); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 0227b8020129..c782f81195a3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -19,6 +19,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.ThreadLocalRandom; import javax.annotation.Nullable; @@ -27,11 +28,17 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.ThrowingBadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; @@ -68,6 +75,10 @@ public class StorageApiLoads private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation; + private final BadRecordRouter badRecordRouter; + + private final ErrorHandler badRecordErrorHandler; + public StorageApiLoads( Coder destinationCoder, StorageApiDynamicDestinations dynamicDestinations, @@ -83,7 +94,9 @@ public StorageApiLoads( boolean ignoreUnknownValues, boolean propagateSuccessfulStorageApiWrites, boolean usesCdc, - AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) { + AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation, + BadRecordRouter badRecordRouter, + ErrorHandler badRecordErrorHandler) { this.destinationCoder = destinationCoder; this.dynamicDestinations = dynamicDestinations; this.rowUpdateFn = rowUpdateFn; @@ -101,12 +114,18 @@ public StorageApiLoads( } this.usesCdc = usesCdc; this.defaultMissingValueInterpretation = defaultMissingValueInterpretation; + this.badRecordRouter = badRecordRouter; + this.badRecordErrorHandler = badRecordErrorHandler; } public TupleTag getFailedRowsTag() { return failedRowsTag; } + public boolean usesErrorHandler() { + return (badRecordRouter instanceof ThrowingBadRecordRouter); + } + @Override public WriteResult expand(PCollection> input) { Coder payloadCoder; @@ -171,6 +190,9 @@ public WriteResult expandInconsistent( if (successfulWrittenRowsTag != null) { successfulWrittenRows = writeRecordsResult.get(successfulWrittenRowsTag); } + + addErrorCollections(convertMessagesResult, writeRecordsResult); + return WriteResult.in( input.getPipeline(), null, @@ -261,6 +283,8 @@ public WriteResult expandTriggered( successfulWrittenRows = writeRecordsResult.get(successfulWrittenRowsTag); } + addErrorCollections(convertMessagesResult, writeRecordsResult); + return WriteResult.in( input.getPipeline(), null, @@ -350,6 +374,8 @@ public WriteResult expandUntriggered( successfulWrittenRows = writeRecordsResult.get(successfulWrittenRowsTag); } + addErrorCollections(convertMessagesResult, writeRecordsResult); + return WriteResult.in( input.getPipeline(), null, @@ -362,4 +388,50 @@ public WriteResult expandUntriggered( successfulWrittenRowsTag, successfulWrittenRows); } + + private void addErrorCollections( + PCollectionTuple convertMessagesResult, PCollectionTuple writeRecordsResult) { + if (usesErrorHandler()) { + PCollection badRecords = + PCollectionList.of( + convertMessagesResult + .get(failedRowsTag) + .apply( + ParDo.of( + new ConvertInsertErrorToBadRecord( + "Failed to Convert to Storage API Message")))) + .and( + writeRecordsResult + .get(failedRowsTag) + .apply( + ParDo.of( + new ConvertInsertErrorToBadRecord( + "Failed to Write Message to Storage API")))) + .apply("flattenErrors", Flatten.pCollections()); + badRecordErrorHandler.addErrorCollection(badRecords); + } + } + + private static class ConvertInsertErrorToBadRecord + extends DoFn { + + private final String errorMessage; + + public ConvertInsertErrorToBadRecord(String errorMessage) { + this.errorMessage = errorMessage; + } + + @ProcessElement + public void processElement( + @Element BigQueryStorageApiInsertError bigQueryStorageApiInsertError, + OutputReceiver outputReceiver) + throws IOException { + outputReceiver.output( + BadRecord.fromExceptionInformation( + bigQueryStorageApiInsertError, + BigQueryStorageApiInsertErrorCoder.of(), + null, + errorMessage)); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index 54c3a90e6d31..b19903e14b6a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -130,6 +130,7 @@ static class FailingTableRowParser implements SerializableFunction> errorHandler = p.registerBadRecordErrorHandler(new ErrorSinkTransform()); - PCollection count = p.apply( - "Read", - BigQueryIO.read(FailingTableRowParser.INSTANCE) - .from(options.getInputTable()) - .withMethod(Method.DIRECT_READ) - .withFormat(options.getDataFormat()) - .withErrorHandler(errorHandler)) - .apply("Count", Count.globally()); - - PAssert.thatSingleton(count).isEqualTo(options.getNumRecords()* 49/50); - PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(options.getNumRecords()/50); + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + PCollection count = + p.apply( + "Read", + BigQueryIO.read(FailingTableRowParser.INSTANCE) + .from(options.getInputTable()) + .withMethod(Method.DIRECT_READ) + .withFormat(options.getDataFormat()) + .withErrorHandler(errorHandler)) + .apply("Count", Count.globally()); + + PAssert.thatSingleton(count).isEqualTo(options.getNumRecords() * 49 / 50); + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(options.getNumRecords() / 50); p.run().waitUntilFinish(); } From 06f909613516ddafe4e82bc25b2373d0885db321 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 1 Feb 2024 13:07:28 -0500 Subject: [PATCH 06/14] wire batch loads with error handler --- .../apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 11 ++++++++++- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 4 +++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 32ee29738bf8..6f755ede25da 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -57,6 +57,9 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; @@ -161,6 +164,8 @@ class BatchLoads private final RowWriterFactory rowWriterFactory; private final @Nullable String kmsKey; private final String tempDataset; + private final BadRecordRouter badRecordRouter; + private final ErrorHandler badRecordErrorHandler; private Coder tableDestinationCoder; // The maximum number of times to retry failed load or copy jobs. @@ -180,7 +185,9 @@ class BatchLoads @Nullable String kmsKey, boolean clusteringEnabled, boolean useAvroLogicalTypes, - String tempDataset) { + String tempDataset, + BadRecordRouter badRecordRouter, + ErrorHandler badRecordErrorHandler) { bigQueryServices = new BigQueryServicesImpl(); this.writeDisposition = writeDisposition; this.createDisposition = createDisposition; @@ -207,6 +214,8 @@ class BatchLoads this.tempDataset = tempDataset; this.tableDestinationCoder = clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of(); + this.badRecordRouter = badRecordRouter; + this.badRecordErrorHandler = badRecordErrorHandler; } void setSchemaUpdateOptions(Set schemaUpdateOptions) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 69ecab419385..c784ded7041e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3750,7 +3750,9 @@ private WriteResult continueExpandTyped( getKmsKey(), getClustering() != null, getUseAvroLogicalTypes(), - getWriteTempDataset()); + getWriteTempDataset(), + getBadRecordRouter(), + getBadRecordErrorHandler()); batchLoads.setTestServices(getBigQueryServices()); if (getSchemaUpdateOptions() != null) { batchLoads.setSchemaUpdateOptions(getSchemaUpdateOptions()); From df375dfdd6c189a8e0525ba88146f14c5edbecfd Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 7 Feb 2024 14:01:49 -0500 Subject: [PATCH 07/14] wire in batch loads changes, update tests --- .../sdk/io/gcp/bigquery/AvroRowWriter.java | 10 +- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 142 +++++++++++------- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 +- .../gcp/bigquery/BigQueryIOTranslation.java | 10 ++ .../io/gcp/bigquery/BigQueryRowWriter.java | 9 +- .../sdk/io/gcp/bigquery/StorageApiLoads.java | 8 +- .../sdk/io/gcp/bigquery/TableRowWriter.java | 10 +- .../io/gcp/bigquery/WriteBundlesToFiles.java | 42 ++++-- .../bigquery/WriteGroupedRecordsToFiles.java | 41 +++-- .../bigquery/BigQueryIOTranslationTest.java | 2 + 10 files changed, 194 insertions(+), 82 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java index 0b64a65c4503..1f45371b19ff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroRowWriter.java @@ -47,9 +47,15 @@ class AvroRowWriter extends BigQueryRowWriter { } @Override - public void write(T element) throws IOException { + public void write(T element) throws IOException, BigQueryRowSerializationException { AvroWriteRequest writeRequest = new AvroWriteRequest<>(element, schema); - writer.append(toAvroRecord.apply(writeRequest)); + AvroT serializedRequest; + try { + serializedRequest = toAvroRecord.apply(writeRequest); + } catch (Exception e) { + throw new BigQueryRowSerializationException(e); + } + writer.append(serializedRequest); } public Schema getSchema() { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 6f755ede25da..eaaca2888534 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -80,6 +81,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -612,9 +614,13 @@ PCollection> writeDynamicallyShardedFil unwrittedRecordsTag, maxNumWritersPerBundle, maxFileSize, - rowWriterFactory)) + rowWriterFactory, + input.getCoder(), + badRecordRouter)) .withSideInputs(tempFilePrefix) - .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag))); + .withOutputTags( + writtenFilesTag, + TupleTagList.of(ImmutableList.of(unwrittedRecordsTag, BAD_RECORD_TAG)))); PCollection> writtenFiles = writeBundlesTuple .get(writtenFilesTag) @@ -623,6 +629,8 @@ PCollection> writeDynamicallyShardedFil writeBundlesTuple .get(unwrittedRecordsTag) .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder)); + badRecordErrorHandler.addErrorCollection( + writeBundlesTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()))); // If the bundles contain too many output tables to be written inline to files (due to memory // limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection. @@ -691,62 +699,92 @@ PCollection> writeDynamicallyShardedFil // parallelize properly. We also ensure that the files are written if a threshold number of // records are ready. Dynamic sharding is achieved via the withShardedKey() option provided by // GroupIntoBatches. - return input - .apply( - GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) - .withByteSize(byteSize) - .withMaxBufferingDuration(maxBufferingDuration) - .withShardedKey()) - .setCoder( - KvCoder.of( - org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder), - IterableCoder.of(elementCoder))) - .apply( - "StripShardId", - MapElements.via( - new SimpleFunction< - KV, Iterable>, - KV>>() { - @Override - public KV> apply( - KV, Iterable> - input) { - return KV.of(input.getKey().getKey(), input.getValue()); - } - })) - .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) - .apply( - "WriteGroupedRecords", - ParDo.of( - new WriteGroupedRecordsToFiles( - tempFilePrefix, maxFileSize, rowWriterFactory)) - .withSideInputs(tempFilePrefix)) + TupleTag> successfulResultsTag = new TupleTag<>(); + PCollectionTuple writeResults = + input + .apply( + GroupIntoBatches.ofSize(FILE_TRIGGERING_RECORD_COUNT) + .withByteSize(byteSize) + .withMaxBufferingDuration(maxBufferingDuration) + .withShardedKey()) + .setCoder( + KvCoder.of( + org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder), + IterableCoder.of(elementCoder))) + .apply( + "StripShardId", + MapElements.via( + new SimpleFunction< + KV, Iterable>, + KV>>() { + @Override + public KV> apply( + KV, Iterable> + input) { + return KV.of(input.getKey().getKey(), input.getValue()); + } + })) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) + .apply( + "WriteGroupedRecords", + ParDo.of( + new WriteGroupedRecordsToFiles( + tempFilePrefix, + maxFileSize, + rowWriterFactory, + badRecordRouter, + successfulResultsTag, + elementCoder)) + .withSideInputs(tempFilePrefix) + .withOutputTags(successfulResultsTag, TupleTagList.of(BAD_RECORD_TAG))); + badRecordErrorHandler.addErrorCollection( + writeResults.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()))); + + return writeResults + .get(successfulResultsTag) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); } private PCollection> writeShardedRecords( PCollection, ElementT>> shardedRecords, PCollectionView tempFilePrefix) { - return shardedRecords - .apply("GroupByDestination", GroupByKey.create()) - .apply( - "StripShardId", - MapElements.via( - new SimpleFunction< - KV, Iterable>, - KV>>() { - @Override - public KV> apply( - KV, Iterable> input) { - return KV.of(input.getKey().getKey(), input.getValue()); - } - })) - .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) - .apply( - "WriteGroupedRecords", - ParDo.of( - new WriteGroupedRecordsToFiles<>(tempFilePrefix, maxFileSize, rowWriterFactory)) - .withSideInputs(tempFilePrefix)) + TupleTag> successfulResultsTag = new TupleTag<>(); + PCollectionTuple writeResults = + shardedRecords + .apply("GroupByDestination", GroupByKey.create()) + .apply( + "StripShardId", + MapElements.via( + new SimpleFunction< + KV, Iterable>, + KV>>() { + @Override + public KV> apply( + KV, Iterable> input) { + return KV.of(input.getKey().getKey(), input.getValue()); + } + })) + .setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder))) + .apply( + "WriteGroupedRecords", + ParDo.of( + new WriteGroupedRecordsToFiles<>( + tempFilePrefix, + maxFileSize, + rowWriterFactory, + badRecordRouter, + successfulResultsTag, + elementCoder)) + .withSideInputs(tempFilePrefix) + .withOutputTags(successfulResultsTag, TupleTagList.of(BAD_RECORD_TAG))); + + badRecordErrorHandler.addErrorCollection( + writeResults + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(shardedRecords.getPipeline()))); + + return writeResults + .get(successfulResultsTag) .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder)); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index c784ded7041e..7545d4e6fc37 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3686,7 +3686,7 @@ private WriteResult continueExpandTyped( !getPropagateSuccessfulStorageApiWrites(), "withPropagateSuccessfulStorageApiWrites only supported when using storage api writes."); checkArgument( - !(getBadRecordRouter() instanceof ThrowingBadRecordRouter), + getBadRecordRouter() instanceof ThrowingBadRecordRouter, "Error Handling is not supported with STREAMING_INSERTS"); RowWriterFactory.TableRowWriterFactory tableRowWriterFactory = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index 05772ee6fdd3..c98fc6ecd2e2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -389,6 +389,8 @@ static class BigQueryIOWriteTranslator implements TransformPayloadTranslator transform) { fieldValues.put( "row_mutation_information_fn", toByteArray(transform.getRowMutationInformationFn())); } + fieldValues.put("bad_record_router", toByteArray(transform.getBadRecordRouter())); + fieldValues.put( + "bad_record_error_handler", toByteArray(transform.getBadRecordErrorHandler())); return Row.withSchema(schema).withFieldValues(fieldValues).build(); } @@ -815,6 +820,11 @@ public Write fromConfigRow(Row configRow) { builder.setRowMutationInformationFn( (SerializableFunction) fromByteArray(rowMutationInformationFnBytes)); } + byte[] badRecordRouter = configRow.getBytes("bad_record_router"); + builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); + byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler"); + builder.setBadRecordErrorHandler( + (ErrorHandler) fromByteArray(badRecordErrorHandler)); return builder.build(); } catch (InvalidClassException e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java index a442144e1610..b846a06af580 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryRowWriter.java @@ -63,7 +63,7 @@ protected OutputStream getOutputStream() { return out; } - abstract void write(T value) throws Exception; + abstract void write(T value) throws IOException, BigQueryRowSerializationException; long getByteSize() { return out.getCount(); @@ -80,4 +80,11 @@ Result getResult() { checkState(isClosed, "Not yet closed"); return new Result(resourceId, out.getCount()); } + + public static class BigQueryRowSerializationException extends Exception { + + public BigQueryRowSerializationException(Exception e) { + super(e); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index c782f81195a3..67e9b5a3c4e4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -28,8 +28,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFn.Element; -import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupIntoBatches; import org.apache.beam.sdk.transforms.PTransform; @@ -396,18 +394,18 @@ private void addErrorCollections( PCollectionList.of( convertMessagesResult .get(failedRowsTag) - .apply( + .apply("ConvertMessageFailuresToBadRecord", ParDo.of( new ConvertInsertErrorToBadRecord( "Failed to Convert to Storage API Message")))) .and( writeRecordsResult .get(failedRowsTag) - .apply( + .apply("WriteRecordFailuresToBadRecord", ParDo.of( new ConvertInsertErrorToBadRecord( "Failed to Write Message to Storage API")))) - .apply("flattenErrors", Flatten.pCollections()); + .apply("flattenBadRecords", Flatten.pCollections()); badRecordErrorHandler.addErrorCollection(badRecords); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java index 6cbeb61f624f..4d5fb1b3d746 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; import java.nio.charset.StandardCharsets; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -37,8 +38,13 @@ class TableRowWriter extends BigQueryRowWriter { } @Override - void write(T value) throws Exception { - TableRow tableRow = toRow.apply(value); + void write(T value) throws IOException, BigQueryRowSerializationException { + TableRow tableRow; + try { + tableRow = toRow.apply(value); + } catch (Exception e) { + throw new BigQueryRowSerializationException(e); + } CODER.encode(tableRow, getOutputStream(), Context.OUTER); getOutputStream().write(NEWLINE); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 894983ab664f..8b74fa26ed91 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -32,8 +32,10 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryRowWriter.BigQueryRowSerializationException; import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; @@ -69,6 +71,8 @@ class WriteBundlesToFiles private final int maxNumWritersPerBundle; private final long maxFileSize; private final RowWriterFactory rowWriterFactory; + private final Coder> coder; + private final BadRecordRouter badRecordRouter; private int spilledShardNumber; /** @@ -165,12 +169,16 @@ public void verifyDeterministic() {} TupleTag, ElementT>> unwrittenRecordsTag, int maxNumWritersPerBundle, long maxFileSize, - RowWriterFactory rowWriterFactory) { + RowWriterFactory rowWriterFactory, + Coder> coder, + BadRecordRouter badRecordRouter) { this.tempFilePrefixView = tempFilePrefixView; this.unwrittenRecordsTag = unwrittenRecordsTag; this.maxNumWritersPerBundle = maxNumWritersPerBundle; this.maxFileSize = maxFileSize; this.rowWriterFactory = rowWriterFactory; + this.coder = coder; + this.badRecordRouter = badRecordRouter; } @StartBundle @@ -197,7 +205,10 @@ BigQueryRowWriter createAndInsertWriter( @ProcessElement public void processElement( - ProcessContext c, @Element KV element, BoundedWindow window) + ProcessContext c, + @Element KV element, + BoundedWindow window, + MultiOutputReceiver outputReceiver) throws Exception { Map> writers = Preconditions.checkStateNotNull(this.writers); @@ -234,17 +245,28 @@ public void processElement( try { writer.write(element.getValue()); - } catch (Exception e) { - // Discard write result and close the write. + } catch (BigQueryRowSerializationException e) { try { - writer.close(); - // The writer does not need to be reset, as this DoFn cannot be reused. - } catch (Exception closeException) { - // Do not mask the exception that caused the write to fail. - e.addSuppressed(closeException); + badRecordRouter.route( + outputReceiver, element, coder, e, "Unable to Write BQ Record to File"); + } catch (Exception e2) { + cleanupWriter(writer, e2); } - throw e; + } catch (IOException e) { + cleanupWriter(writer, e); + } + } + + private void cleanupWriter(BigQueryRowWriter writer, Exception e) throws Exception { + // Discard write result and close the write. + try { + writer.close(); + // The writer does not need to be reset, as this DoFn cannot be reused. + } catch (Exception closeException) { + // Do not mask the exception that caused the write to fail. + e.addSuppressed(closeException); } + throw e; } @FinishBundle diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java index 236b07d74756..3a4f377ce2b8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteGroupedRecordsToFiles.java @@ -17,9 +17,14 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryRowWriter.BigQueryRowSerializationException; +import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; /** * Receives elements grouped by their destination, and writes them out to a file. Since all the @@ -31,21 +36,30 @@ class WriteGroupedRecordsToFiles private final PCollectionView tempFilePrefix; private final long maxFileSize; private final RowWriterFactory rowWriterFactory; + private final BadRecordRouter badRecordRouter; + private final TupleTag> successfulResultsTag; + private final Coder elementCoder; WriteGroupedRecordsToFiles( PCollectionView tempFilePrefix, long maxFileSize, - RowWriterFactory rowWriterFactory) { + RowWriterFactory rowWriterFactory, + BadRecordRouter badRecordRouter, + TupleTag> successfulResultsTag, + Coder elementCoder) { this.tempFilePrefix = tempFilePrefix; this.maxFileSize = maxFileSize; this.rowWriterFactory = rowWriterFactory; + this.badRecordRouter = badRecordRouter; + this.successfulResultsTag = successfulResultsTag; + this.elementCoder = elementCoder; } @ProcessElement public void processElement( ProcessContext c, @Element KV> element, - OutputReceiver> o) + MultiOutputReceiver outputReceiver) throws Exception { String tempFilePrefix = c.sideInput(this.tempFilePrefix); @@ -58,20 +72,29 @@ public void processElement( if (writer.getByteSize() > maxFileSize) { writer.close(); BigQueryRowWriter.Result result = writer.getResult(); - o.output( - new WriteBundlesToFiles.Result<>( - result.resourceId.toString(), result.byteSize, c.element().getKey())); + outputReceiver + .get(successfulResultsTag) + .output( + new WriteBundlesToFiles.Result<>( + result.resourceId.toString(), result.byteSize, c.element().getKey())); writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey()); } - writer.write(tableRow); + try { + writer.write(tableRow); + } catch (BigQueryRowSerializationException e) { + badRecordRouter.route( + outputReceiver, tableRow, elementCoder, e, "Unable to Write BQ Record to File"); + } } } finally { writer.close(); } BigQueryRowWriter.Result result = writer.getResult(); - o.output( - new WriteBundlesToFiles.Result<>( - result.resourceId.toString(), result.byteSize, c.element().getKey())); + outputReceiver + .get(successfulResultsTag) + .output( + new WriteBundlesToFiles.Result<>( + result.resourceId.toString(), result.byteSize, c.element().getKey())); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 845e93bb4ee2..6158d7cf4879 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -127,6 +127,8 @@ public class BigQueryIOTranslationTest { WRITE_TRANSFORM_SCHEMA_MAPPING.put("getWriteTempDataset", "write_temp_dataset"); WRITE_TRANSFORM_SCHEMA_MAPPING.put( "getRowMutationInformationFn", "row_mutation_information_fn"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordRouter", "bad_record_router"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put("getBadRecordErrorHandler", "bad_record_error_handler"); } @Test From f001a041db456e583e954ad7fab894a9941a29ab Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 8 Feb 2024 13:08:48 -0500 Subject: [PATCH 08/14] add BQ write with error handling test cases --- .../errorhandling/ErrorHandlingTestUtils.java | 8 + .../bigquery/StorageApiConvertMessages.java | 44 +++- .../sdk/io/gcp/bigquery/StorageApiLoads.java | 20 +- .../gcp/bigquery/BigQueryIOStorageReadIT.java | 2 + .../io/gcp/bigquery/BigQueryIOWriteTest.java | 240 ++++++++++++++++++ 5 files changed, 301 insertions(+), 13 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java index 41367765b920..40310c4b2a92 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java @@ -45,4 +45,12 @@ public static class ErrorSinkTransform } } } + + public static class EchoErrorTransform extends PTransform, PCollection>{ + + @Override + public PCollection expand(PCollection input){ + return input; + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java index 23fe0250b7d9..67c1813803c2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; + import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; import org.apache.beam.sdk.coders.Coder; @@ -27,12 +29,15 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -50,6 +55,7 @@ public class StorageApiConvertMessages private final Coder> successCoder; private final @Nullable SerializableFunction rowMutationFn; + private final BadRecordRouter badRecordRouter; public StorageApiConvertMessages( StorageApiDynamicDestinations dynamicDestinations, @@ -58,7 +64,8 @@ public StorageApiConvertMessages( TupleTag> successfulWritesTag, Coder errorCoder, Coder> successCoder, - @Nullable SerializableFunction rowMutationFn) { + @Nullable SerializableFunction rowMutationFn, + BadRecordRouter badRecordRouter) { this.dynamicDestinations = dynamicDestinations; this.bqServices = bqServices; this.failedWritesTag = failedWritesTag; @@ -66,6 +73,7 @@ public StorageApiConvertMessages( this.errorCoder = errorCoder; this.successCoder = successCoder; this.rowMutationFn = rowMutationFn; + this.badRecordRouter = badRecordRouter; } @Override @@ -82,11 +90,16 @@ public PCollectionTuple expand(PCollection> input) { operationName, failedWritesTag, successfulWritesTag, - rowMutationFn)) - .withOutputTags(successfulWritesTag, TupleTagList.of(failedWritesTag)) + rowMutationFn, + badRecordRouter, + input.getCoder())) + .withOutputTags( + successfulWritesTag, + TupleTagList.of(ImmutableList.of(failedWritesTag, BAD_RECORD_TAG))) .withSideInputs(dynamicDestinations.getSideInputs())); result.get(successfulWritesTag).setCoder(successCoder); result.get(failedWritesTag).setCoder(errorCoder); + result.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())); return result; } @@ -98,6 +111,8 @@ public static class ConvertMessagesDoFn failedWritesTag; private final TupleTag> successfulWritesTag; private final @Nullable SerializableFunction rowMutationFn; + private final BadRecordRouter badRecordRouter; + Coder> elementCoder; private transient @Nullable DatasetService datasetServiceInternal = null; ConvertMessagesDoFn( @@ -106,13 +121,17 @@ public static class ConvertMessagesDoFn failedWritesTag, TupleTag> successfulWritesTag, - @Nullable SerializableFunction rowMutationFn) { + @Nullable SerializableFunction rowMutationFn, + BadRecordRouter badRecordRouter, + Coder> elementCoder) { this.dynamicDestinations = dynamicDestinations; this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); this.bqServices = bqServices; this.failedWritesTag = failedWritesTag; this.successfulWritesTag = successfulWritesTag; this.rowMutationFn = rowMutationFn; + this.badRecordRouter = badRecordRouter; + this.elementCoder = elementCoder; } private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { @@ -159,9 +178,20 @@ public void processElement( .toMessage(element.getValue(), rowMutationInformation) .withTimestamp(timestamp); o.get(successfulWritesTag).output(KV.of(element.getKey(), payload)); - } catch (TableRowToStorageApiProto.SchemaConversionException e) { - TableRow tableRow = messageConverter.toTableRow(element.getValue()); - o.get(failedWritesTag).output(new BigQueryStorageApiInsertError(tableRow, e.toString())); + } catch (TableRowToStorageApiProto.SchemaConversionException conversionException) { + TableRow tableRow; + try { + tableRow = messageConverter.toTableRow(element.getValue()); + } catch (Exception e) { + badRecordRouter.route( + o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload"); + return; + } + o.get(failedWritesTag) + .output(new BigQueryStorageApiInsertError(tableRow, conversionException.toString())); + } catch (Exception e) { + badRecordRouter.route( + o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload"); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java index 67e9b5a3c4e4..62174b5c917a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiLoads.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; + import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.bigquery.storage.v1.AppendRowsRequest; import java.io.IOException; @@ -121,7 +123,7 @@ public TupleTag getFailedRowsTag() { } public boolean usesErrorHandler() { - return (badRecordRouter instanceof ThrowingBadRecordRouter); + return !(badRecordRouter instanceof ThrowingBadRecordRouter); } @Override @@ -160,7 +162,8 @@ public WriteResult expandInconsistent( successfulConvertedRowsTag, BigQueryStorageApiInsertErrorCoder.of(), successCoder, - rowUpdateFn)); + rowUpdateFn, + badRecordRouter)); PCollectionTuple writeRecordsResult = convertMessagesResult .get(successfulConvertedRowsTag) @@ -221,7 +224,8 @@ public WriteResult expandTriggered( successfulConvertedRowsTag, BigQueryStorageApiInsertErrorCoder.of(), successCoder, - rowUpdateFn)); + rowUpdateFn, + badRecordRouter)); PCollection, Iterable>> groupedRecords; @@ -341,7 +345,8 @@ public WriteResult expandUntriggered( successfulConvertedRowsTag, BigQueryStorageApiInsertErrorCoder.of(), successCoder, - rowUpdateFn)); + rowUpdateFn, + badRecordRouter)); PCollectionTuple writeRecordsResult = convertMessagesResult @@ -394,14 +399,17 @@ private void addErrorCollections( PCollectionList.of( convertMessagesResult .get(failedRowsTag) - .apply("ConvertMessageFailuresToBadRecord", + .apply( + "ConvertMessageFailuresToBadRecord", ParDo.of( new ConvertInsertErrorToBadRecord( "Failed to Convert to Storage API Message")))) + .and(convertMessagesResult.get(BAD_RECORD_TAG)) .and( writeRecordsResult .get(failedRowsTag) - .apply("WriteRecordFailuresToBadRecord", + .apply( + "WriteRecordFailuresToBadRecord", ParDo.of( new ConvertInsertErrorToBadRecord( "Failed to Write Message to Storage API")))) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index b19903e14b6a..983192e8cd78 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -155,6 +155,8 @@ private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception { .withErrorHandler(errorHandler)) .apply("Count", Count.globally()); + errorHandler.close(); + PAssert.thatSingleton(count).isEqualTo(options.getNumRecords() * 49 / 50); PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(options.getNumRecords() / 50); p.run().waitUntilFinish(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 55269342155f..9f77aa46dee6 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -70,6 +70,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -82,6 +83,7 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.StreamSupport; +import org.apache.avro.Schema.Field; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; @@ -91,6 +93,7 @@ import org.apache.avro.io.Encoder; import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; @@ -139,6 +142,10 @@ import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.EchoErrorTransform; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -172,6 +179,7 @@ import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -895,6 +903,124 @@ public void testBatchFileLoadsWithTempTablesCreateNever() throws Exception { containsInAnyOrder(Iterables.toArray(elements, TableRow.class))); } + private static final SerializableFunction failingIntegerToTableRow = + new SerializableFunction() { + @Override + public TableRow apply(Integer input) { + if (input == 15) { + throw new RuntimeException("Expected Exception"); + } + return new TableRow().set("number", input); + } + }; + + @Test + public void testBatchLoadsWithTableRowErrorHandling() throws Exception { + assumeTrue(!useStreaming); + assumeTrue(!useStorageApi); + List elements = Lists.newArrayList(); + for (int i = 0; i < 30; ++i) { + elements.add(i); + } + + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + + WriteResult result = + p.apply(Create.of(elements).withCoder(BigEndianIntegerCoder.of())) + .apply( + BigQueryIO.write() + .to("dataset-id.table-id") + .withFormatFunction(failingIntegerToTableRow) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withErrorHandler(errorHandler) + .withoutValidation()); + + errorHandler.close(); + + PAssert.that(result.getSuccessfulTableLoads()) + .containsInAnyOrder(new TableDestination("project-id:dataset-id.table-id", null)); + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(1L); + p.run(); + + elements.remove(15); + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map(tr -> ((Integer) tr.get("number"))) + .collect(Collectors.toList()), + containsInAnyOrder(Iterables.toArray(elements, Integer.class))); + } + + private static final org.apache.avro.Schema avroSchema = + org.apache.avro.Schema.createRecord( + ImmutableList.of( + new Field( + "number", + org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG), + "nodoc", + 0))); + private static final SerializableFunction, GenericRecord> + failingLongToAvro = + new SerializableFunction, GenericRecord>() { + @Override + public GenericRecord apply(AvroWriteRequest input) { + if (input.getElement() == 15) { + throw new RuntimeException("Expected Exception"); + } + return new GenericRecordBuilder(avroSchema).set("number", input.getElement()).build(); + } + }; + + @Test + public void testBatchLoadsWithAvroErrorHandling() throws Exception { + assumeTrue(!useStreaming); + assumeTrue(!useStorageApi); + List elements = Lists.newArrayList(); + for (long i = 0L; i < 30L; ++i) { + elements.add(i); + } + + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + + WriteResult result = + p.apply(Create.of(elements).withCoder(VarLongCoder.of())) + .apply( + BigQueryIO.write() + .to("dataset-id.table-id") + .withAvroFormatFunction(failingLongToAvro) + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withErrorHandler(errorHandler) + .withoutValidation()); + + errorHandler.close(); + + PAssert.that(result.getSuccessfulTableLoads()) + .containsInAnyOrder(new TableDestination("project-id:dataset-id.table-id", null)); + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(1L); + p.run(); + + elements.remove(15); + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map(tr -> Long.valueOf((String) tr.get("number"))) + .collect(Collectors.toList()), + containsInAnyOrder(Iterables.toArray(elements, Long.class))); + } + @Test public void testStreamingInsertsFailuresNoRetryPolicy() throws Exception { assumeTrue(!useStorageApi); @@ -1328,6 +1454,120 @@ public void testStreamingStorageApiWriteWithAutoSharding() throws Exception { storageWrite(true); } + // There are two failure scenarios in storage write. + // first is in conversion, which is triggered by using a bad format function + // second is in actually sending to BQ, which is triggered by telling te dataset service + // to fail a row + private void storageWriteWithErrorHandling(boolean autoSharding) throws Exception { + assumeTrue(useStorageApi); + if (autoSharding) { + assumeTrue(!useStorageApiApproximate); + assumeTrue(useStreaming); + } + List elements = Lists.newArrayList(); + for (int i = 0; i < 30; ++i) { + elements.add(i); + } + + Function shouldFailRow = + (Function & Serializable) + tr -> + tr.containsKey("number") + && (tr.get("number").equals("27") || tr.get("number").equals("3")); + fakeDatasetService.setShouldFailRow(shouldFailRow); + + TestStream testStream = + TestStream.create(BigEndianIntegerCoder.of()) + .addElements(elements.get(0), Iterables.toArray(elements.subList(1, 10), Integer.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + elements.get(10), Iterables.toArray(elements.subList(11, 20), Integer.class)) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + elements.get(20), Iterables.toArray(elements.subList(21, 30), Integer.class)) + .advanceWatermarkToInfinity(); + + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new EchoErrorTransform()); + + BigQueryIO.Write write = + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withFormatFunction(failingIntegerToTableRow) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withErrorHandler(errorHandler) + .withoutValidation(); + + if (useStreaming) { + if (!useStorageApiApproximate) { + write = + write + .withTriggeringFrequency(Duration.standardSeconds(30)) + .withNumStorageWriteApiStreams(2); + } + if (autoSharding) { + write = write.withAutoSharding(); + } + } + + PTransform> source = + useStreaming ? testStream : Create.of(elements).withCoder(BigEndianIntegerCoder.of()); + + p.apply(source).apply("WriteToBQ", write); + + errorHandler.close(); + + PAssert.that(errorHandler.getOutput()) + .satisfies( + badRecords -> { + int count = 0; + Iterator iterator = badRecords.iterator(); + while (iterator.hasNext()) { + count++; + iterator.next(); + } + Assert.assertEquals("Wrong number of bad records", 3, count); + return null; + }); + + p.run().waitUntilFinish(); + + // remove the "bad" elements from the expected elements written + elements.remove(27); + elements.remove(15); + elements.remove(3); + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id").stream() + .map(tr -> Integer.valueOf((String) tr.get("number"))) + .collect(Collectors.toList()), + containsInAnyOrder(Iterables.toArray(elements, Integer.class))); + } + + @Test + public void testBatchStorageApiWriteWithErrorHandling() throws Exception { + assumeTrue(!useStreaming); + storageWriteWithErrorHandling(false); + } + + @Test + public void testStreamingStorageApiWriteWithErrorHandling() throws Exception { + assumeTrue(useStreaming); + storageWriteWithErrorHandling(false); + } + + @Test + public void testStreamingStorageApiWriteWithAutoShardingWithErrorHandling() throws Exception { + assumeTrue(useStreaming); + assumeTrue(!useStorageApiApproximate); + storageWriteWithErrorHandling(true); + } + @DefaultSchema(JavaFieldSchema.class) static class SchemaPojo { final String name; From 921793b5205997fbfc9bbeda9349f97bc7200888 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 8 Feb 2024 14:15:36 -0500 Subject: [PATCH 09/14] spotless --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 4 ++-- .../sdk/transforms/errorhandling/ErrorHandlingTestUtils.java | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 0906575bf9c4..7ffe725349ab 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1668,8 +1668,8 @@ public CompositeBehavior enterCompositeTransform(Node node) { String rootBigQueryTransform = ""; if (transform.getClass().equals(StorageApiLoads.class)) { StorageApiLoads storageLoads = (StorageApiLoads) transform; - //If the storage load is directing exceptions to an error handler, we don't need to - //warn for unconsumed rows + // If the storage load is directing exceptions to an error handler, we don't need to + // warn for unconsumed rows if (!storageLoads.usesErrorHandler()) { failedTag = storageLoads.getFailedRowsTag(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java index 40310c4b2a92..b4db4867cfc7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlingTestUtils.java @@ -46,10 +46,11 @@ public static class ErrorSinkTransform } } - public static class EchoErrorTransform extends PTransform, PCollection>{ + public static class EchoErrorTransform + extends PTransform, PCollection> { @Override - public PCollection expand(PCollection input){ + public PCollection expand(PCollection input) { return input; } } From e5c53fe0ffb656507ddb31a3bf04ec04515619fe Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Wed, 14 Feb 2024 13:44:51 -0500 Subject: [PATCH 10/14] add trigger files --- .github/trigger_files/beam_PostCommit_Java_DataflowV1.json | 0 .../trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 .github/trigger_files/beam_PostCommit_Java_DataflowV1.json create mode 100644 .github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Dataflow.json new file mode 100644 index 000000000000..e69de29bb2d1 From c6fb18b8528f60078e2f5796114f2802f92fb4cb Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 15 Feb 2024 12:32:59 -0500 Subject: [PATCH 11/14] address comments, support error handling when direct read from table --- .../io/google-cloud-platform/build.gradle | 4 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 181 +++++++++++++----- .../BigQueryStorageStreamBundleSource.java | 12 ++ .../bigquery/BigQueryStorageStreamSource.java | 6 + .../bigquery/StorageApiConvertMessages.java | 3 +- .../io/gcp/bigquery/WriteBundlesToFiles.java | 8 +- .../bigquery/BigQueryIOStorageQueryIT.java | 44 +++++ .../gcp/bigquery/BigQueryIOStorageReadIT.java | 4 +- 8 files changed, 207 insertions(+), 55 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index f4f9e7a2dc30..1a689a85b7fd 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -210,8 +210,8 @@ task integrationTest(type: Test, dependsOn: processTestResources) { include '**/*IT.class' exclude '**/BigQueryIOReadIT.class' - exclude '**/BigQueryIOStorageQueryIT.class' - exclude '**/BigQueryIOStorageReadIT.class' +// exclude '**/BigQueryIOStorageQueryIT.class' +// exclude '**/BigQueryIOStorageReadIT.class' exclude '**/BigQueryIOStorageWriteIT.class' exclude '**/BigQueryToTableIT.class' diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index e9e3ae79987b..632a63127473 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -113,7 +113,9 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Element; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -1452,28 +1454,65 @@ private PCollection expandForDirectRead( PBegin input, Coder outputCoder, Schema beamSchema, BigQueryOptions bqOptions) { ValueProvider tableProvider = getTableProvider(); Pipeline p = input.getPipeline(); - if (tableProvider != null && getBadRecordRouter() instanceof ThrowingBadRecordRouter) { - // No job ID is required. Read directly from BigQuery storage. - PCollection rows = - p.apply( - org.apache.beam.sdk.io.Read.from( - BigQueryStorageTableSource.create( - tableProvider, - getFormat(), - getSelectedFields(), - getRowRestriction(), - getParseFn(), - outputCoder, - getBigQueryServices(), - getProjectionPushdownApplied()))); - if (beamSchema != null) { - rows.setSchema( - beamSchema, - getTypeDescriptor(), - getToBeamRowFn().apply(beamSchema), - getFromBeamRowFn().apply(beamSchema)); + if (tableProvider != null) { + // ThrowingBadRecordRouter is the default value, and is what is used if the user hasn't + // specified any particular error handling. + if (getBadRecordRouter() instanceof ThrowingBadRecordRouter) { + // No job ID is required. Read directly from BigQuery storage. + PCollection rows = + p.apply( + org.apache.beam.sdk.io.Read.from( + BigQueryStorageTableSource.create( + tableProvider, + getFormat(), + getSelectedFields(), + getRowRestriction(), + getParseFn(), + outputCoder, + getBigQueryServices(), + getProjectionPushdownApplied()))); + if (beamSchema != null) { + rows.setSchema( + beamSchema, + getTypeDescriptor(), + getToBeamRowFn().apply(beamSchema), + getFromBeamRowFn().apply(beamSchema)); + } + return rows; + } else { + // We need to manually execute the table source, so as to be able to capture exceptions + // to pipe to error handling + BigQueryStorageTableSource source = + BigQueryStorageTableSource.create( + tableProvider, + getFormat(), + getSelectedFields(), + getRowRestriction(), + getParseFn(), + outputCoder, + getBigQueryServices(), + getProjectionPushdownApplied()); + List> sources; + try { + sources = source.split(0, input.getPipeline().getOptions()); + } catch (Exception e) { + throw new RuntimeException("Unable to split TableSource", e); + } + TupleTag rowTag = new TupleTag<>(); + PCollectionTuple resultTuple = + p.apply(Create.of(sources)) + .apply( + "Read Storage Table Source", + ParDo.of(new ReadTableSource(rowTag, getParseFn(), getBadRecordRouter())) + .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG))); + getBadRecordErrorHandler() + .addErrorCollection( + resultTuple + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(input.getPipeline()))); + + return resultTuple.get(rowTag).setCoder(outputCoder); } - return rows; } checkArgument( @@ -1613,6 +1652,52 @@ void cleanup(ContextContainer c) throws Exception { return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView)); } + private static class ReadTableSource extends DoFn, T> { + + private final TupleTag rowTag; + + private final SerializableFunction parseFn; + + private final BadRecordRouter badRecordRouter; + + public ReadTableSource( + TupleTag rowTag, + SerializableFunction parseFn, + BadRecordRouter badRecordRouter) { + this.rowTag = rowTag; + this.parseFn = parseFn; + this.badRecordRouter = badRecordRouter; + } + + @ProcessElement + public void processElement( + @Element BoundedSource boundedSource, + MultiOutputReceiver outputReceiver, + PipelineOptions options) + throws Exception { + ErrorHandlingParseFn errorHandlingParseFn = new ErrorHandlingParseFn(parseFn); + BoundedSource sourceWithErrorHandlingParseFn; + if (boundedSource instanceof BigQueryStorageStreamSource) { + sourceWithErrorHandlingParseFn = + ((BigQueryStorageStreamSource) boundedSource).fromExisting(errorHandlingParseFn); + } else if (boundedSource instanceof BigQueryStorageStreamBundleSource) { + sourceWithErrorHandlingParseFn = + ((BigQueryStorageStreamBundleSource) boundedSource) + .fromExisting(errorHandlingParseFn); + } else { + throw new RuntimeException( + "Bounded Source is not BigQueryStorageStreamSource or BigQueryStorageStreamBundleSource, unable to read"); + } + readSource( + options, + rowTag, + outputReceiver, + sourceWithErrorHandlingParseFn, + errorHandlingParseFn, + badRecordRouter); + } + } + private PCollectionTuple createTupleForDirectRead( PCollection jobIdTokenCollection, Coder outputCoder, @@ -1749,10 +1834,11 @@ public void processElement(ProcessContext c) throws Exception { return tuple; } - private class ErrorHandlingParseFn implements SerializableFunction { + private static class ErrorHandlingParseFn + implements SerializableFunction { private final SerializableFunction parseFn; - private SchemaAndRecord schemaAndRecord = null; + private transient SchemaAndRecord schemaAndRecord = null; private ErrorHandlingParseFn(SerializableFunction parseFn) { this.parseFn = parseFn; @@ -1803,8 +1889,8 @@ public void processElement( c.sideInput(tableSchemaView), TableSchema.class); ReadStream readStream = c.element(); - ErrorHandlingParseFn errorHandlingParseFn = - new ErrorHandlingParseFn(getParseFn()); + ErrorHandlingParseFn errorHandlingParseFn = + new ErrorHandlingParseFn(getParseFn()); BigQueryStorageStreamSource streamSource = BigQueryStorageStreamSource.create( @@ -1815,12 +1901,13 @@ public void processElement( outputCoder, getBigQueryServices()); - readStreamSource( + readSource( c.getPipelineOptions(), rowTag, outputReceiver, streamSource, - errorHandlingParseFn); + errorHandlingParseFn, + getBadRecordRouter()); } }) .withSideInputs(readSessionView, tableSchemaView) @@ -1857,8 +1944,8 @@ public void processElement( c.sideInput(tableSchemaView), TableSchema.class); List streamBundle = c.element(); - ErrorHandlingParseFn errorHandlingParseFn = - new ErrorHandlingParseFn(getParseFn()); + ErrorHandlingParseFn errorHandlingParseFn = + new ErrorHandlingParseFn(getParseFn()); BigQueryStorageStreamBundleSource streamSource = BigQueryStorageStreamBundleSource.create( @@ -1870,12 +1957,13 @@ public void processElement( getBigQueryServices(), 1L); - readStreamSource( + readSource( c.getPipelineOptions(), rowTag, outputReceiver, streamSource, - errorHandlingParseFn); + errorHandlingParseFn, + getBadRecordRouter()); } }) .withSideInputs(readSessionView, tableSchemaView) @@ -1888,12 +1976,13 @@ public void processElement( return resultTuple.get(rowTag).setCoder(outputCoder); } - public void readStreamSource( + public static void readSource( PipelineOptions options, TupleTag rowTag, MultiOutputReceiver outputReceiver, BoundedSource streamSource, - ErrorHandlingParseFn errorHandlingParseFn) + ErrorHandlingParseFn errorHandlingParseFn, + BadRecordRouter badRecordRouter) throws Exception { // Read all the data from the stream. In the event that this work // item fails and is rescheduled, the same rows will be returned in @@ -1908,13 +1997,12 @@ public void readStreamSource( } } catch (ParseException e) { GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord(); - getBadRecordRouter() - .route( - outputReceiver, - record, - AvroCoder.of(record.getSchema()), - (Exception) e.getCause(), - "Unable to parse record reading from BigQuery"); + badRecordRouter.route( + outputReceiver, + record, + AvroCoder.of(record.getSchema()), + (Exception) e.getCause(), + "Unable to parse record reading from BigQuery"); } while (true) { @@ -1926,13 +2014,12 @@ public void readStreamSource( } } catch (ParseException e) { GenericRecord record = errorHandlingParseFn.getSchemaAndRecord().getRecord(); - getBadRecordRouter() - .route( - outputReceiver, - record, - AvroCoder.of(record.getSchema()), - (Exception) e.getCause(), - "Unable to parse record reading from BigQuery"); + badRecordRouter.route( + outputReceiver, + record, + AvroCoder.of(record.getSchema()), + (Exception) e.getCause(), + "Unable to parse record reading from BigQuery"); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java index 044a87529c02..eeb747f21ea5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamBundleSource.java @@ -110,6 +110,18 @@ public BigQueryStorageStreamBundleSource fromExisting(List newStr getMinBundleSize()); } + public BigQueryStorageStreamBundleSource fromExisting( + SerializableFunction parseFn) { + return new BigQueryStorageStreamBundleSource<>( + readSession, + streamBundle, + jsonTableSchema, + parseFn, + outputCoder, + bqServices, + getMinBundleSize()); + } + private final ReadSession readSession; private final List streamBundle; private final String jsonTableSchema; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java index 3ec3fed5eb55..8f7f50febaf4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java @@ -82,6 +82,12 @@ public BigQueryStorageStreamSource fromExisting(ReadStream newReadStream) { readSession, newReadStream, jsonTableSchema, parseFn, outputCoder, bqServices); } + public BigQueryStorageStreamSource fromExisting( + SerializableFunction parseFn) { + return new BigQueryStorageStreamSource<>( + readSession, readStream, jsonTableSchema, parseFn, outputCoder, bqServices); + } + private final ReadSession readSession; private final ReadStream readStream; private final String jsonTableSchema; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java index 67c1813803c2..aefdb79c535c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiConvertMessages.java @@ -183,8 +183,7 @@ public void processElement( try { tableRow = messageConverter.toTableRow(element.getValue()); } catch (Exception e) { - badRecordRouter.route( - o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload"); + badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow"); return; } o.get(failedWritesTag) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java index 8b74fa26ed91..9d84abbbbf1a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java @@ -248,11 +248,15 @@ public void processElement( } catch (BigQueryRowSerializationException e) { try { badRecordRouter.route( - outputReceiver, element, coder, e, "Unable to Write BQ Record to File"); + outputReceiver, + element, + coder, + e, + "Unable to Write BQ Record to File because serialization to TableRow failed"); } catch (Exception e2) { cleanupWriter(writer, e2); } - } catch (IOException e) { + } catch (Exception e) { cleanupWriter(writer, e); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index d355d6bb9336..93c055799030 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION; +import com.google.api.services.bigquery.model.TableRow; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; @@ -33,6 +34,9 @@ import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.junit.Test; @@ -107,4 +111,44 @@ public void testBigQueryStorageQuery1G() throws Exception { setUpTestEnvironment("1G"); runBigQueryIOStorageQueryPipeline(); } + + static class FailingTableRowParser implements SerializableFunction { + + public static final BigQueryIOStorageReadIT.FailingTableRowParser INSTANCE = + new BigQueryIOStorageReadIT.FailingTableRowParser(); + + private int parseCount = 0; + + @Override + public TableRow apply(SchemaAndRecord schemaAndRecord) { + parseCount++; + if (parseCount % 50 == 0) { + throw new RuntimeException("ExpectedException"); + } + return TableRowParser.INSTANCE.apply(schemaAndRecord); + } + } + + @Test + public void testBigQueryStorageQueryWithErrorHandling1M() throws Exception { + setUpTestEnvironment("1M"); + Pipeline p = Pipeline.create(options); + ErrorHandler> errorHandler = + p.registerBadRecordErrorHandler(new ErrorSinkTransform()); + PCollection count = + p.apply( + "Read", + BigQueryIO.read(FailingTableRowParser.INSTANCE) + .fromQuery("SELECT * FROM `" + options.getInputTable() + "`") + .usingStandardSql() + .withMethod(Method.DIRECT_READ) + .withErrorHandler(errorHandler)) + .apply("Count", Count.globally()); + + errorHandler.close(); + + PAssert.thatSingleton(count).isEqualTo(10381L); + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L); + p.run().waitUntilFinish(); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index c42ec665ca0f..9a9912f6d42d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -157,8 +157,8 @@ private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception { errorHandler.close(); - PAssert.thatSingleton(count).isEqualTo(options.getNumRecords() * 49 / 50); - PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(options.getNumRecords() / 50); + PAssert.thatSingleton(count).isEqualTo(10381L); + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L); p.run().waitUntilFinish(); } From c886de11635c6e0a7b18f49a80b5cabacc61f0fd Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Thu, 15 Feb 2024 15:22:52 -0500 Subject: [PATCH 12/14] revert change to build.gradle --- sdks/java/io/google-cloud-platform/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle index 1a689a85b7fd..f4f9e7a2dc30 100644 --- a/sdks/java/io/google-cloud-platform/build.gradle +++ b/sdks/java/io/google-cloud-platform/build.gradle @@ -210,8 +210,8 @@ task integrationTest(type: Test, dependsOn: processTestResources) { include '**/*IT.class' exclude '**/BigQueryIOReadIT.class' -// exclude '**/BigQueryIOStorageQueryIT.class' -// exclude '**/BigQueryIOStorageReadIT.class' + exclude '**/BigQueryIOStorageQueryIT.class' + exclude '**/BigQueryIOStorageReadIT.class' exclude '**/BigQueryIOStorageWriteIT.class' exclude '**/BigQueryToTableIT.class' From f78a73fba941b81c0f2227aa7d46c304256d21d4 Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 20 Feb 2024 11:06:38 -0500 Subject: [PATCH 13/14] address comments, improve splitting logic --- .../apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 13 ++++++++++++- .../io/gcp/bigquery/BigQueryIOStorageQueryIT.java | 2 ++ .../io/gcp/bigquery/BigQueryIOStorageReadIT.java | 2 ++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 632a63127473..0b59f65257e6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1494,7 +1494,18 @@ private PCollection expandForDirectRead( getProjectionPushdownApplied()); List> sources; try { - sources = source.split(0, input.getPipeline().getOptions()); + //This splitting logic taken from the SDF implementation of Read + long estimatedSize = source.getEstimatedSizeBytes(bqOptions); + // Split into pieces as close to the default desired bundle size but if that would cause too + // few splits then prefer to split up to the default desired number of splits. + long desiredChunkSize; + if (estimatedSize <= 0) { + desiredChunkSize = 64 << 20; // 64mb + } else { + // 1mb --> 1 shard; 1gb --> 32 shards; 1tb --> 1000 shards, 1pb --> 32k shards + desiredChunkSize = Math.max(1 << 20, (long) (1000 * Math.sqrt(estimatedSize))); + } + sources = source.split(desiredChunkSize, bqOptions); } catch (Exception e) { throw new RuntimeException("Unable to split TableSource", e); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index 93c055799030..7107dc693af3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -147,7 +147,9 @@ public void testBigQueryStorageQueryWithErrorHandling1M() throws Exception { errorHandler.close(); + //When 1/50 elements fail sequentially, this is the expected success count PAssert.thatSingleton(count).isEqualTo(10381L); + //this is the total elements, less the successful elements PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L); p.run().waitUntilFinish(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index 9a9912f6d42d..4acce1e0bcd5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -157,7 +157,9 @@ private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception { errorHandler.close(); + //When 1/50 elements fail sequentially, this is the expected success count PAssert.thatSingleton(count).isEqualTo(10381L); + //this is the total elements, less the successful elements PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L); p.run().waitUntilFinish(); } From 47fce173e30493cc7455c364b154396e3b22a92f Mon Sep 17 00:00:00 2001 From: johnjcasey Date: Tue, 20 Feb 2024 11:36:44 -0500 Subject: [PATCH 14/14] spotless --- .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 6 +++--- .../beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java | 4 ++-- .../beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 0b59f65257e6..43c5af163190 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1494,10 +1494,10 @@ private PCollection expandForDirectRead( getProjectionPushdownApplied()); List> sources; try { - //This splitting logic taken from the SDF implementation of Read + // This splitting logic taken from the SDF implementation of Read long estimatedSize = source.getEstimatedSizeBytes(bqOptions); - // Split into pieces as close to the default desired bundle size but if that would cause too - // few splits then prefer to split up to the default desired number of splits. + // Split into pieces as close to the default desired bundle size but if that would cause + // too few splits then prefer to split up to the default desired number of splits. long desiredChunkSize; if (estimatedSize <= 0) { desiredChunkSize = 64 << 20; // 64mb diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java index 7107dc693af3..2b1c111269df 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryIT.java @@ -147,9 +147,9 @@ public void testBigQueryStorageQueryWithErrorHandling1M() throws Exception { errorHandler.close(); - //When 1/50 elements fail sequentially, this is the expected success count + // When 1/50 elements fail sequentially, this is the expected success count PAssert.thatSingleton(count).isEqualTo(10381L); - //this is the total elements, less the successful elements + // this is the total elements, less the successful elements PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L); p.run().waitUntilFinish(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index 4acce1e0bcd5..4e20d3634800 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -157,9 +157,9 @@ private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception { errorHandler.close(); - //When 1/50 elements fail sequentially, this is the expected success count + // When 1/50 elements fail sequentially, this is the expected success count PAssert.thatSingleton(count).isEqualTo(10381L); - //this is the total elements, less the successful elements + // this is the total elements, less the successful elements PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L); p.run().waitUntilFinish(); }