diff --git a/CHANGES.md b/CHANGES.md index 28ca6543006b..fbbc12ef0a93 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,6 +66,8 @@ * Full Support for Storage Read and Write APIs * Partial Support for File Loads (Failures writing to files supported, failures loading files to BQ unsupported) * No Support for Extract or Streaming Inserts +* Added support for handling bad records to PubSubIO ([#30372](https://github.com/apache/beam/pull/30372)). + * Support is not available for handling schema mismatches, and enabling error handling for writing to pubsub topics with schemas is not recommended * `--enableBundling` pipeline option for BigQueryIO DIRECT_READ is replaced by `--enableStorageReadApiV2`. Both were considered experimental and may subject to change (Java) ([#26354](https://github.com/apache/beam/issues/26354)). * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java index 26f4fb5d076d..47033451ab89 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java @@ -20,10 +20,13 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import javax.naming.SizeLimitExceededException; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -41,6 +44,12 @@ public class PreparePubsubWriteDoFn extends DoFn private SerializableFunction, PubsubMessage> formatFunction; @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction; + private final BadRecordRouter badRecordRouter; + + private final Coder inputCoder; + + private final TupleTag outputTag; + static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchSize) throws SizeLimitExceededException { int payloadSize = message.getPayload().length; @@ -113,10 +122,16 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS SerializableFunction, PubsubMessage> formatFunction, @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction, - int maxPublishBatchSize) { + int maxPublishBatchSize, + BadRecordRouter badRecordRouter, + Coder inputCoder, + TupleTag outputTag) { this.formatFunction = formatFunction; this.topicFunction = topicFunction; this.maxPublishBatchSize = maxPublishBatchSize; + this.badRecordRouter = badRecordRouter; + this.inputCoder = inputCoder; + this.outputTag = outputTag; } @ProcessElement @@ -125,18 +140,42 @@ public void process( @Timestamp Instant ts, BoundedWindow window, PaneInfo paneInfo, - OutputReceiver o) { + MultiOutputReceiver o) + throws Exception { ValueInSingleWindow valueInSingleWindow = ValueInSingleWindow.of(element, ts, window, paneInfo); - PubsubMessage message = formatFunction.apply(valueInSingleWindow); + PubsubMessage message; + try { + message = formatFunction.apply(valueInSingleWindow); + } catch (Exception e) { + badRecordRouter.route( + o, + element, + inputCoder, + e, + "Failed to serialize PubSub message with provided format function"); + return; + } if (topicFunction != null) { - message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath()); + try { + message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath()); + } catch (Exception e) { + badRecordRouter.route( + o, element, inputCoder, e, "Failed to determine PubSub topic using topic function"); + return; + } } try { validatePubsubMessageSize(message, maxPublishBatchSize); } catch (SizeLimitExceededException e) { - throw new IllegalArgumentException(e); + badRecordRouter.route( + o, + element, + inputCoder, + new IllegalArgumentException(e), + "PubSub message limit exceeded, see exception for details"); + return; } - o.output(message); + o.get(outputTag).output(message); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 1080999271d3..1d687812560b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; +import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.Clock; @@ -62,6 +63,11 @@ import org.apache.beam.sdk.transforms.WithFailures; import org.apache.beam.sdk.transforms.WithFailures.Result; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.ThrowingBadRecordRouter; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.Preconditions; @@ -69,8 +75,11 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -830,6 +839,10 @@ public abstract static class Read extends PTransform> abstract boolean getNeedsOrderingKey(); + abstract BadRecordRouter getBadRecordRouter(); + + abstract ErrorHandler getBadRecordErrorHandler(); + abstract Builder toBuilder(); static Builder newBuilder(SerializableFunction parseFn) { @@ -839,6 +852,8 @@ static Builder newBuilder(SerializableFunction parseFn) builder.setNeedsAttributes(false); builder.setNeedsMessageId(false); builder.setNeedsOrderingKey(false); + builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); + builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); return builder; } @@ -881,6 +896,11 @@ abstract static class Builder { abstract Builder setClock(Clock clock); + abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); + + abstract Builder setBadRecordErrorHandler( + ErrorHandler badRecordErrorHandler); + abstract Read build(); } @@ -959,6 +979,8 @@ public Read fromTopic(ValueProvider topic) { * *

See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the * {@code deadLetterTopic} string. + * + *

This functionality is mutually exclusive with {@link Read#withErrorHandler(ErrorHandler)} */ public Read withDeadLetterTopic(String deadLetterTopic) { return withDeadLetterTopic(StaticValueProvider.of(deadLetterTopic)); @@ -1045,6 +1067,19 @@ public Read withCoderAndParseFn(Coder coder, SimpleFunction withErrorHandler(ErrorHandler badRecordErrorHandler) { + return toBuilder() + .setBadRecordErrorHandler(badRecordErrorHandler) + .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) + .build(); + } + @VisibleForTesting /** * Set's the internal Clock. @@ -1066,6 +1101,12 @@ public PCollection expand(PBegin input) { "Can't set both the topic and the subscription for " + "a PubsubIO.Read transform"); } + if (getDeadLetterTopicProvider() != null + && !(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) { + throw new IllegalArgumentException( + "PubSubIO cannot be configured with both a dead letter topic and a bad record router"); + } + @Nullable ValueProvider topicPath = getTopicProvider() == null @@ -1089,57 +1130,73 @@ public PCollection expand(PBegin input) { getNeedsMessageId(), getNeedsOrderingKey()); - PCollection read; PCollection preParse = input.apply(source); TypeDescriptor typeDescriptor = new TypeDescriptor() {}; - if (getDeadLetterTopicProvider() == null) { + PCollection read; + if (getDeadLetterTopicProvider() == null + && (getBadRecordRouter() instanceof ThrowingBadRecordRouter)) { read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn())); } else { + // parse PubSub messages, separating out exceptions Result, KV> result = preParse.apply( "PubsubIO.Read/Map/Parse-Incoming-Messages", MapElements.into(typeDescriptor) .via(getParseFn()) .exceptionsVia(new WithFailures.ThrowableHandler() {})); + + // Emit parsed records read = result.output(); - // Write out failures to the provided dead-letter topic. - result - .failures() - // Since the stack trace could easily exceed Pub/Sub limits, we need to remove it from - // the attributes. - .apply( - "PubsubIO.Read/Map/Remove-Stack-Trace-Attribute", - MapElements.into(new TypeDescriptor>>() {}) - .via( - kv -> { - PubsubMessage message = kv.getKey(); - String messageId = - message.getMessageId() == null ? "" : message.getMessageId(); - Throwable throwable = kv.getValue().throwable(); - - // In order to stay within Pub/Sub limits, we aren't adding the stack - // trace to the attributes. Therefore, we need to log the throwable. - LOG.error( - "Error parsing Pub/Sub message with id '{}'", messageId, throwable); - - ImmutableMap attributes = - ImmutableMap.builder() - .put("exceptionClassName", throwable.getClass().getName()) - .put("exceptionMessage", throwable.getMessage()) - .put("pubsubMessageId", messageId) - .build(); - - return KV.of(kv.getKey(), attributes); - })) - .apply( - "PubsubIO.Read/Map/Create-Dead-Letter-Payload", - MapElements.into(TypeDescriptor.of(PubsubMessage.class)) - .via(kv -> new PubsubMessage(kv.getKey().getPayload(), kv.getValue()))) - .apply( - writeMessages() - .to(getDeadLetterTopicProvider().get().asPath()) - .withClientFactory(getPubsubClientFactory())); + // Send exceptions to either the bad record router or the dead letter topic + if (!(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) { + PCollection badRecords = + result + .failures() + .apply( + "Map Failures To BadRecords", + ParDo.of(new ParseReadFailuresToBadRecords(preParse.getCoder()))); + getBadRecordErrorHandler() + .addErrorCollection(badRecords.setCoder(BadRecord.getCoder(input.getPipeline()))); + } else { + // Write out failures to the provided dead-letter topic. + result + .failures() + // Since the stack trace could easily exceed Pub/Sub limits, we need to remove it from + // the attributes. + .apply( + "PubsubIO.Read/Map/Remove-Stack-Trace-Attribute", + MapElements.into(new TypeDescriptor>>() {}) + .via( + kv -> { + PubsubMessage message = kv.getKey(); + String messageId = + message.getMessageId() == null ? "" : message.getMessageId(); + Throwable throwable = kv.getValue().throwable(); + + // In order to stay within Pub/Sub limits, we aren't adding the stack + // trace to the attributes. Therefore, we need to log the throwable. + LOG.error( + "Error parsing Pub/Sub message with id '{}'", messageId, throwable); + + ImmutableMap attributes = + ImmutableMap.builder() + .put("exceptionClassName", throwable.getClass().getName()) + .put("exceptionMessage", throwable.getMessage()) + .put("pubsubMessageId", messageId) + .build(); + + return KV.of(kv.getKey(), attributes); + })) + .apply( + "PubsubIO.Read/Map/Create-Dead-Letter-Payload", + MapElements.into(TypeDescriptor.of(PubsubMessage.class)) + .via(kv -> new PubsubMessage(kv.getKey().getPayload(), kv.getValue()))) + .apply( + writeMessages() + .to(getDeadLetterTopicProvider().get().asPath()) + .withClientFactory(getPubsubClientFactory())); + } } return read.setCoder(getCoder()); @@ -1156,6 +1213,28 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + private static class ParseReadFailuresToBadRecords + extends DoFn, BadRecord> { + private final Coder coder; + + public ParseReadFailuresToBadRecords(Coder coder) { + this.coder = coder; + } + + @ProcessElement + public void processElement( + OutputReceiver outputReceiver, + @Element KV element) + throws Exception { + outputReceiver.output( + BadRecord.fromExceptionInformation( + element.getKey(), + coder, + (Exception) element.getValue().throwable(), + "Failed to parse message read from PubSub")); + } + } + ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ @@ -1198,6 +1277,10 @@ public abstract static class Write extends PTransform, PDone> abstract @Nullable String getPubsubRootUrl(); + abstract BadRecordRouter getBadRecordRouter(); + + abstract ErrorHandler getBadRecordErrorHandler(); + abstract Builder toBuilder(); static Builder newBuilder( @@ -1205,6 +1288,8 @@ static Builder newBuilder( Builder builder = new AutoValue_PubsubIO_Write.Builder(); builder.setPubsubClientFactory(FACTORY); builder.setFormatFn(formatFn); + builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); + builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); return builder; } @@ -1236,6 +1321,11 @@ abstract Builder setFormatFn( abstract Builder setPubsubRootUrl(String pubsubRootUrl); + abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); + + abstract Builder setBadRecordErrorHandler( + ErrorHandler badRecordErrorHandler); + abstract Write build(); } @@ -1334,6 +1424,19 @@ public Write withPubsubRootUrl(String pubsubRootUrl) { return toBuilder().setPubsubRootUrl(pubsubRootUrl).build(); } + /** + * Writes any serialization failures out to the Error Handler. See {@link ErrorHandler} for + * details on how to configure an Error Handler. Error Handlers are not well supported when + * writing to topics with schemas, and it is not recommended to configure an error handler if + * the target topic has a schema. + */ + public Write withErrorHandler(ErrorHandler badRecordErrorHandler) { + return toBuilder() + .setBadRecordErrorHandler(badRecordErrorHandler) + .setBadRecordRouter(BadRecordRouter.RECORDING_ROUTER) + .build(); + } + @Override public PDone expand(PCollection input) { if (getTopicProvider() == null && !getDynamicDestinations()) { @@ -1355,12 +1458,26 @@ public PDone expand(PCollection input) { MoreObjects.firstNonNull( getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT)); } + TupleTag pubsubMessageTupleTag = new TupleTag<>(); + PCollectionTuple pubsubMessageTuple = + input.apply( + ParDo.of( + new PreparePubsubWriteDoFn<>( + getFormatFn(), + topicFunction, + maxMessageSize, + getBadRecordRouter(), + input.getCoder(), + pubsubMessageTupleTag)) + .withOutputTags(pubsubMessageTupleTag, TupleTagList.of(BAD_RECORD_TAG))); + + getBadRecordErrorHandler() + .addErrorCollection( + pubsubMessageTuple + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(input.getPipeline()))); PCollection pubsubMessages = - input - .apply( - ParDo.of( - new PreparePubsubWriteDoFn<>(getFormatFn(), topicFunction, maxMessageSize))) - .setCoder(new PubsubMessageWithTopicCoder()); + pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(new PubsubMessageWithTopicCoder()); switch (input.isBounded()) { case BOUNDED: pubsubMessages.apply( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index f7f9f5f91b74..abc35d0bb1b2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -71,11 +71,15 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; +import org.apache.beam.sdk.transforms.errorhandling.BadRecord; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; +import org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -482,6 +486,46 @@ public void testFailedParseWithDeadLetterConfigured() { pipeline.run(); } + @Test + public void testFailedParseWithErrorHandlerConfigured() throws Exception { + ByteString data = ByteString.copyFrom("Hello, World!".getBytes(StandardCharsets.UTF_8)); + RuntimeException exception = new RuntimeException("Some error message"); + ImmutableList expectedReads = + ImmutableList.of( + IncomingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder().setData(data).build(), + 1234L, + 0, + UUID.randomUUID().toString(), + UUID.randomUUID().toString())); + ImmutableList expectedWrites = ImmutableList.of(); + clientFactory = + PubsubTestClient.createFactoryForPullAndPublish( + SUBSCRIPTION, TOPIC, CLOCK, 60, expectedReads, expectedWrites, ImmutableList.of()); + + ErrorHandler> errorHandler = + pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); + PCollection read = + pipeline.apply( + PubsubIO.readStrings() + .fromSubscription(SUBSCRIPTION.getPath()) + .withErrorHandler(errorHandler) + .withClock(CLOCK) + .withClientFactory(clientFactory) + .withCoderAndParseFn( + StringUtf8Coder.of(), + SimpleFunction.fromSerializableFunctionWithOutputType( + message -> { + throw exception; + }, + TypeDescriptors.strings()))); + errorHandler.close(); + + PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(1L); + PAssert.that(read).empty(); + pipeline.run(); + } + @Test public void testProto() { ProtoCoder coder = ProtoCoder.of(Primitive.class); @@ -654,6 +698,66 @@ public String apply(PubsubMessage input) { } } + @Test + public void testWriteMalformedMessagesWithErrorHandler() throws Exception { + OutgoingMessage msg = + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("foo")) + .build(), + 0, + null, + "projects/project/topics/topic1"); + + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(null, ImmutableList.of(msg), ImmutableList.of())) { + TimestampedValue pubsubMsg = + TimestampedValue.of( + new PubsubMessage( + msg.getMessage().getData().toByteArray(), + Collections.emptyMap(), + msg.recordId()) + .withTopic(msg.topic()), + Instant.ofEpochMilli(msg.getTimestampMsSinceEpoch())); + + TimestampedValue failingPubsubMsg = + TimestampedValue.of( + new PubsubMessage( + "foo".getBytes(StandardCharsets.UTF_8), + Collections.emptyMap(), + msg.recordId()) + .withTopic("badTopic"), + Instant.ofEpochMilli(msg.getTimestampMsSinceEpoch())); + + PCollection messages = + pipeline.apply( + Create.timestamped(ImmutableList.of(pubsubMsg, failingPubsubMsg)) + .withCoder(new PubsubMessageWithTopicCoder())); + messages.setIsBoundedInternal(PCollection.IsBounded.BOUNDED); + ErrorHandler> badRecordErrorHandler = + pipeline.registerBadRecordErrorHandler(new ErrorSinkTransform()); + // The most straightforward method to simulate a bad message is to have a format function that + // deterministically fails based on some value + messages.apply( + PubsubIO.writeMessages() + .toBuilder() + .setFormatFn( + (ValueInSingleWindow messageAndWindow) -> { + if (messageAndWindow.getValue().getTopic().equals("badTopic")) { + throw new RuntimeException("expected exception"); + } + return messageAndWindow.getValue(); + }) + .build() + .to("projects/project/topics/topic1") + .withClientFactory(factory) + .withErrorHandler(badRecordErrorHandler)); + badRecordErrorHandler.close(); + PAssert.thatSingleton(badRecordErrorHandler.getOutput()).isEqualTo(1L); + pipeline.run(); + } + } + @Test public void testReadMessagesWithCoderAndParseFn() { Coder coder = PubsubMessagePayloadOnlyCoder.of();