Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wire error handling into PubSubIO and add initial tests #30372

Merged
merged 6 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
* Full Support for Storage Read and Write APIs
* Partial Support for File Loads (Failures writing to files supported, failures loading files to BQ unsupported)
* No Support for Extract or Streaming Inserts
* Added support for handling bad records to PubSubIO ([#30372](https://github.com/apache/beam/pull/30372)).
* Support is not available for handling schema mismatches, and enabling error handling for writing to pubsub topics with schemas is not recommended
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import java.nio.charset.StandardCharsets;
import java.util.Map;
import javax.naming.SizeLimitExceededException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
Expand All @@ -41,6 +44,12 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
@Nullable SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction;

private final BadRecordRouter badRecordRouter;

private final Coder<InputT> inputCoder;

private final TupleTag<PubsubMessage> outputTag;

static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchSize)
throws SizeLimitExceededException {
int payloadSize = message.getPayload().length;
Expand Down Expand Up @@ -113,10 +122,16 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction,
@Nullable
SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction,
int maxPublishBatchSize) {
int maxPublishBatchSize,
BadRecordRouter badRecordRouter,
Coder<InputT> inputCoder,
TupleTag<PubsubMessage> outputTag) {
this.formatFunction = formatFunction;
this.topicFunction = topicFunction;
this.maxPublishBatchSize = maxPublishBatchSize;
this.badRecordRouter = badRecordRouter;
this.inputCoder = inputCoder;
this.outputTag = outputTag;
}

@ProcessElement
Expand All @@ -125,18 +140,42 @@ public void process(
@Timestamp Instant ts,
BoundedWindow window,
PaneInfo paneInfo,
OutputReceiver<PubsubMessage> o) {
MultiOutputReceiver o)
throws Exception {
ValueInSingleWindow<InputT> valueInSingleWindow =
ValueInSingleWindow.of(element, ts, window, paneInfo);
PubsubMessage message = formatFunction.apply(valueInSingleWindow);
PubsubMessage message;
try {
message = formatFunction.apply(valueInSingleWindow);
} catch (Exception e) {
badRecordRouter.route(
o,
element,
inputCoder,
e,
"Failed to serialize PubSub message with provided format function");
return;
}
if (topicFunction != null) {
message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath());
try {
message = message.withTopic(topicFunction.apply(valueInSingleWindow).asPath());
} catch (Exception e) {
badRecordRouter.route(
o, element, inputCoder, e, "Failed to determine PubSub topic using topic function");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just add valueInSingleWindow into the error message here so its easier to see what went wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I'm adding the input element, which is what should be meaningful to the user

return;
}
}
try {
validatePubsubMessageSize(message, maxPublishBatchSize);
} catch (SizeLimitExceededException e) {
throw new IllegalArgumentException(e);
badRecordRouter.route(
o,
element,
inputCoder,
new IllegalArgumentException(e),
"PubSub message size limit exceeded");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: to avoid confusion, can we make this message more generic? Looking at the validation code, it can fail for reasons other than message size (e.g. attribute key/value size too large)

return;
}
o.output(message);
o.get(outputTag).output(message);
}
}
Loading
Loading