From f549fd33abdc672143ccbe3f0f66104995d30fe6 Mon Sep 17 00:00:00 2001 From: reuvenlax Date: Fri, 21 Apr 2023 21:01:11 -0700 Subject: [PATCH] Merge pull request #26063: #21431 Pubsub dynamic topic destinations --- .../construction/PTransformTranslation.java | 2 + .../beam/runners/dataflow/DataflowRunner.java | 26 +- .../options/DataflowPipelineOptions.java | 9 +- .../runners/dataflow/util/PropertyNames.java | 4 + .../runners/dataflow/DataflowRunnerTest.java | 74 ++++ .../dataflow/worker/PubsubDynamicSink.java | 163 ++++++++ .../worker/PubsubDynamicSinkTest.java | 163 ++++++++ .../beam/sdk/io/gcp/pubsub/ExternalWrite.java | 1 + .../io/gcp/pubsub/PreparePubsubWriteDoFn.java | 142 +++++++ .../gcp/pubsub/PubSubPayloadTranslation.java | 39 +- .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 19 +- .../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 4 +- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 357 +++++++++++------- .../sdk/io/gcp/pubsub/PubsubJsonClient.java | 13 +- .../beam/sdk/io/gcp/pubsub/PubsubMessage.java | 27 +- .../pubsub/PubsubMessageWithTopicCoder.java | 72 ++++ .../sdk/io/gcp/pubsub/PubsubTestClient.java | 27 +- .../io/gcp/pubsub/PubsubUnboundedSink.java | 194 ++++++++-- .../pubsub/PreparePubsubWriteDoFnTest.java | 135 +++++++ .../PubSubWritePayloadTranslationTest.java | 41 ++ .../io/gcp/pubsub/PubsubGrpcClientTest.java | 3 +- .../io/gcp/pubsub/PubsubIOExternalTest.java | 4 +- .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 183 +++++---- .../io/gcp/pubsub/PubsubJsonClientTest.java | 9 +- .../io/gcp/pubsub/PubsubTestClientTest.java | 9 +- .../gcp/pubsub/PubsubUnboundedSinkTest.java | 82 +++- 26 files changed, 1472 insertions(+), 330 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index e701ae60bb57..485350715c9e 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -107,6 +107,8 @@ public class PTransformTranslation { public static final String PUBSUB_READ = "beam:transform:pubsub_read:v1"; public static final String PUBSUB_WRITE = "beam:transform:pubsub_write:v1"; + public static final String PUBSUB_WRITE_DYNAMIC = "beam:transform:pubsub_write:v2"; + // CombineComponents public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN = "beam:transform:combine_per_key_precombine:v1"; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index dfa3d37a400d..2c24df1852ba 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1797,8 +1797,7 @@ public PubsubMessage apply(PubsubMessage input) { * Suppress application of {@link PubsubUnboundedSink#expand} in streaming mode so that we can * instead defer to Windmill's implementation. */ - private static class StreamingPubsubIOWrite - extends PTransform, PDone> { + static class StreamingPubsubIOWrite extends PTransform, PDone> { private final PubsubUnboundedSink transform; @@ -1850,13 +1849,24 @@ private static void translate( StepTranslationContext stepContext, PCollection input) { stepContext.addInput(PropertyNames.FORMAT, "pubsub"); - if (overriddenTransform.getTopicProvider().isAccessible()) { - stepContext.addInput( - PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getFullPath()); + if (overriddenTransform.getTopicProvider() != null) { + if (overriddenTransform.getTopicProvider().isAccessible()) { + stepContext.addInput( + PropertyNames.PUBSUB_TOPIC, overriddenTransform.getTopic().getFullPath()); + } else { + stepContext.addInput( + PropertyNames.PUBSUB_TOPIC_OVERRIDE, + ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName()); + } } else { - stepContext.addInput( - PropertyNames.PUBSUB_TOPIC_OVERRIDE, - ((NestedValueProvider) overriddenTransform.getTopicProvider()).propertyName()); + DataflowPipelineOptions options = + input.getPipeline().getOptions().as(DataflowPipelineOptions.class); + if (options.getEnableDynamicPubsubDestinations()) { + stepContext.addInput(PropertyNames.PUBSUB_DYNAMIC_DESTINATIONS, true); + } else { + throw new RuntimeException( + "Dynamic Pubsub destinations not yet supported. Topic must be set."); + } } if (overriddenTransform.getTimestampAttribute() != null) { stepContext.addInput( diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index c716cfe82ec2..3f4c4c975f5a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -153,7 +153,14 @@ public interface DataflowPipelineOptions @Description("The customized dataflow worker jar") String getDataflowWorkerJar(); - void setDataflowWorkerJar(String dataflowWorkerJar); + void setDataflowWorkerJar(String dataflowWorkerJafr); + + // Disable this support for now until the Dataflow backend fully supports this option. + @Description("Whether to allow dynamic pubsub destinations. Temporary option: will be removed.") + @Default.Boolean(false) + Boolean getEnableDynamicPubsubDestinations(); + + void setEnableDynamicPubsubDestinations(Boolean enable); /** Set of available Flexible Resource Scheduling goals. */ enum FlexResourceSchedulingGoal { diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java index f1a8993a3be3..80441e8c9218 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PropertyNames.java @@ -46,11 +46,15 @@ public class PropertyNames { public static final String PARALLEL_INPUT = "parallel_input"; public static final String PUBSUB_ID_ATTRIBUTE = "pubsub_id_label"; public static final String PUBSUB_SERIALIZED_ATTRIBUTES_FN = "pubsub_serialized_attributes_fn"; + public static final String PUBSUB_SUBSCRIPTION = "pubsub_subscription"; public static final String PUBSUB_SUBSCRIPTION_OVERRIDE = "pubsub_subscription_runtime_override"; public static final String PUBSUB_TIMESTAMP_ATTRIBUTE = "pubsub_timestamp_label"; public static final String PUBSUB_TOPIC = "pubsub_topic"; public static final String PUBSUB_TOPIC_OVERRIDE = "pubsub_topic_runtime_override"; + + public static final String PUBSUB_DYNAMIC_DESTINATIONS = "pubsub_with_dynamic_destinations"; + public static final String SCALAR_FIELD_NAME = "value"; public static final String SERIALIZED_FN = "serialized_fn"; public static final String SERIALIZED_TEST_STREAM = "serialized_test_stream"; diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 7f9c4a0c5b9c..c534af38a8d4 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -76,6 +76,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.ArrayList; @@ -120,6 +121,8 @@ import org.apache.beam.sdk.io.WriteFiles; import org.apache.beam.sdk.io.WriteFilesResult; import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; @@ -163,7 +166,10 @@ import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.hamcrest.Description; import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; @@ -2341,6 +2347,74 @@ public void testStreamingGroupIntoBatchesWithShardedKeyOverrideBytes() throws IO verifyGroupIntoBatchesOverrideBytes(p, true, true); } + @Test + public void testPubsubSinkOverride() throws IOException { + PipelineOptions options = buildPipelineOptions(); + List experiments = + new ArrayList<>( + ImmutableList.of( + GcpOptions.STREAMING_ENGINE_EXPERIMENT, + GcpOptions.WINDMILL_SERVICE_EXPERIMENT, + "use_runner_v2")); + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + dataflowOptions.setExperiments(experiments); + dataflowOptions.setStreaming(true); + Pipeline p = Pipeline.create(options); + + List testValues = + Arrays.asList( + new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), Collections.emptyMap())); + PCollection input = + p.apply("CreateValuesBytes", Create.of(testValues)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + input.apply(PubsubIO.writeMessages().to("projects/project/topics/topic")); + p.run(); + + AtomicBoolean sawPubsubOverride = new AtomicBoolean(false); + p.traverseTopologically( + new PipelineVisitor.Defaults() { + @Override + public void visitPrimitiveTransform(@UnknownKeyFor @NonNull @Initialized Node node) { + if (node.getTransform() instanceof DataflowRunner.StreamingPubsubIOWrite) { + sawPubsubOverride.set(true); + } + } + }); + assertTrue(sawPubsubOverride.get()); + } + + @Test + public void testPubinkDynamicOverride() throws IOException { + PipelineOptions options = buildPipelineOptions(); + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + dataflowOptions.setStreaming(true); + dataflowOptions.setEnableDynamicPubsubDestinations(true); + Pipeline p = Pipeline.create(options); + + List testValues = + Arrays.asList( + new PubsubMessage("foo".getBytes(StandardCharsets.UTF_8), Collections.emptyMap()) + .withTopic("")); + PCollection input = + p.apply("CreateValuesBytes", Create.of(testValues)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + input.apply(PubsubIO.writeMessagesDynamic()); + p.run(); + + AtomicBoolean sawPubsubOverride = new AtomicBoolean(false); + p.traverseTopologically( + new PipelineVisitor.Defaults() { + + @Override + public void visitPrimitiveTransform(@UnknownKeyFor @NonNull @Initialized Node node) { + if (node.getTransform() instanceof DataflowRunner.StreamingPubsubIOWrite) { + sawPubsubOverride.set(true); + } + } + }); + assertTrue(sawPubsubOverride.get()); + } + static class TestExpansionServiceClientFactory implements ExpansionServiceClientFactory { ExpansionApi.ExpansionResponse response; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java new file mode 100644 index 000000000000..70a74c3f1563 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSink.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import static org.apache.beam.runners.dataflow.util.Structs.getString; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import com.google.auto.service.AutoService; +import java.io.IOException; +import java.util.Map; +import org.apache.beam.runners.dataflow.util.CloudObject; +import org.apache.beam.runners.dataflow.util.PropertyNames; +import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink; +import org.apache.beam.runners.dataflow.worker.windmill.Pubsub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.Nullable; + +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) +}) +public class PubsubDynamicSink extends Sink> { + private final String timestampLabel; + private final String idLabel; + private final StreamingModeExecutionContext context; + + PubsubDynamicSink(String timestampLabel, String idLabel, StreamingModeExecutionContext context) { + this.timestampLabel = timestampLabel; + this.idLabel = idLabel; + this.context = context; + } + + /** A {@link SinkFactory.Registrar} for pubsub sinks. */ + @AutoService(SinkFactory.Registrar.class) + public static class Registrar implements SinkFactory.Registrar { + + @Override + public Map factories() { + PubsubDynamicSink.Factory factory = new Factory(); + return ImmutableMap.of( + "PubsubDynamicSink", + factory, + "org.apache.beam.runners.dataflow.worker.PubsubDynamicSink", + factory); + } + } + + static class Factory implements SinkFactory { + @Override + public PubsubDynamicSink create( + CloudObject spec, + Coder coder, + @Nullable PipelineOptions options, + @Nullable DataflowExecutionContext executionContext, + DataflowOperationContext operationContext) + throws Exception { + String timestampLabel = getString(spec, PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, ""); + String idLabel = getString(spec, PropertyNames.PUBSUB_ID_ATTRIBUTE, ""); + + return new PubsubDynamicSink( + timestampLabel, + idLabel, + (StreamingModeExecutionContext) checkArgumentNotNull(executionContext)); + } + } + + @Override + public Sink.SinkWriter> writer() { + return new PubsubDynamicSink.PubsubWriter(); + } + + class PubsubWriter implements Sink.SinkWriter> { + private final Map outputBuilders; + private final ByteStringOutputStream stream; // Kept across adds for buffer reuse. + + PubsubWriter() { + outputBuilders = Maps.newHashMap(); + stream = new ByteStringOutputStream(); + } + + public ByteString getDataFromMessage(PubsubMessage formatted, ByteStringOutputStream stream) + throws IOException { + Pubsub.PubsubMessage.Builder pubsubMessageBuilder = + Pubsub.PubsubMessage.newBuilder().setData(ByteString.copyFrom(formatted.getPayload())); + Map attributeMap = formatted.getAttributeMap(); + if (attributeMap != null) { + pubsubMessageBuilder.putAllAttributes(attributeMap); + } + pubsubMessageBuilder.build().writeTo(stream); + return stream.toByteStringAndReset(); + } + + public void close(Windmill.PubSubMessageBundle.Builder outputBuilder) throws IOException { + context.getOutputBuilder().addPubsubMessages(outputBuilder); + outputBuilder.clear(); + } + + @Override + public long add(WindowedValue data) throws IOException { + String dataTopic = + checkArgumentNotNull( + data.getValue().getTopic(), "No topic set for message when using dynamic topics."); + Preconditions.checkArgument( + !dataTopic.isEmpty(), "No topic set for message when using dynamic topics."); + ByteString byteString = getDataFromMessage(data.getValue(), stream); + Windmill.PubSubMessageBundle.Builder builder = + outputBuilders.computeIfAbsent( + dataTopic, + topic -> + context + .getOutputBuilder() + .addPubsubMessagesBuilder() + .setTopic(topic) + .setTimestampLabel(timestampLabel) + .setIdLabel(idLabel) + .setWithAttributes(true)); + builder.addMessages( + Windmill.Message.newBuilder() + .setData(byteString) + .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(data.getTimestamp())) + .build()); + return byteString.size(); + } + + @Override + public void close() throws IOException { + outputBuilders.clear(); + } + + @Override + public void abort() throws IOException { + close(); + } + } + + @Override + public boolean supportsRestart() { + return true; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java new file mode 100644 index 000000000000..71d6e649427a --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/PubsubDynamicSinkTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.dataflow.util.CloudObject; +import org.apache.beam.runners.dataflow.util.PropertyNames; +import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink; +import org.apache.beam.runners.dataflow.worker.windmill.Pubsub; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Unit tests for {@link PubsubSink}. */ +@RunWith(JUnit4.class) +public class PubsubDynamicSinkTest { + @Mock StreamingModeExecutionContext mockContext; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testWriteDynamicDestinations() throws Exception { + Windmill.WorkItemCommitRequest.Builder outputBuilder = + Windmill.WorkItemCommitRequest.newBuilder() + .setKey(ByteString.copyFromUtf8("key")) + .setWorkToken(0); + + when(mockContext.getOutputBuilder()).thenReturn(outputBuilder); + + Map spec = new HashMap<>(); + spec.put(PropertyNames.OBJECT_TYPE_NAME, "PubsubDynamicSink"); + spec.put(PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, "ts"); + spec.put(PropertyNames.PUBSUB_ID_ATTRIBUTE, "id"); + + CloudObject cloudSinkSpec = CloudObject.fromSpec(spec); + PubsubDynamicSink sink = + (PubsubDynamicSink) + SinkRegistry.defaultRegistry() + .create( + cloudSinkSpec, + WindowedValue.getFullCoder(VoidCoder.of(), IntervalWindow.getCoder()), + null, + mockContext, + null) + .getUnderlyingSink(); + + Sink.SinkWriter> writer = sink.writer(); + + List expectedMessages1 = Lists.newArrayList(); + List expectedMessages2 = Lists.newArrayList(); + List expectedMessages3 = Lists.newArrayList(); + + for (int i = 0; i < 10; ++i) { + int baseTimestamp = i * 10; + byte[] payload1 = String.format("value_%d_%d", i, 1).getBytes(StandardCharsets.UTF_8); + byte[] payload2 = String.format("value_%d_%d", i, 2).getBytes(StandardCharsets.UTF_8); + byte[] payload3 = String.format("value_%d_%d", i, 3).getBytes(StandardCharsets.UTF_8); + + expectedMessages1.add( + Windmill.Message.newBuilder() + .setTimestamp(baseTimestamp * 1000) + .setData( + Pubsub.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(payload1)) + .build() + .toByteString()) + .build()); + expectedMessages2.add( + Windmill.Message.newBuilder() + .setTimestamp((baseTimestamp + 1) * 1000) + .setData( + Pubsub.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(payload2)) + .build() + .toByteString()) + .build()); + expectedMessages3.add( + Windmill.Message.newBuilder() + .setTimestamp((baseTimestamp + 2) * 1000) + .setData( + Pubsub.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(payload3)) + .build() + .toByteString()) + .build()); + writer.add( + WindowedValue.timestampedValueInGlobalWindow( + new PubsubMessage(payload1, null).withTopic("topic1"), new Instant(baseTimestamp))); + writer.add( + WindowedValue.timestampedValueInGlobalWindow( + new PubsubMessage(payload2, null).withTopic("topic2"), + new Instant(baseTimestamp + 1))); + writer.add( + WindowedValue.timestampedValueInGlobalWindow( + new PubsubMessage(payload3, null).withTopic("topic3"), + new Instant(baseTimestamp + 2))); + } + writer.close(); + + Windmill.WorkItemCommitRequest expectedCommit = + Windmill.WorkItemCommitRequest.newBuilder() + .setKey(ByteString.copyFromUtf8("key")) + .setWorkToken(0) + .addPubsubMessages( + Windmill.PubSubMessageBundle.newBuilder() + .setTopic("topic1") + .setTimestampLabel("ts") + .setIdLabel("id") + .setWithAttributes(true) + .addAllMessages(expectedMessages1)) + .addPubsubMessages( + Windmill.PubSubMessageBundle.newBuilder() + .setTopic("topic2") + .setTimestampLabel("ts") + .setIdLabel("id") + .setWithAttributes(true) + .addAllMessages(expectedMessages2)) + .addPubsubMessages( + Windmill.PubSubMessageBundle.newBuilder() + .setTopic("topic3") + .setTimestampLabel("ts") + .setIdLabel("id") + .setWithAttributes(true) + .addAllMessages(expectedMessages3)) + .build(); + assertEquals(expectedCommit, outputBuilder.build()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java index 196403dacd05..a1e883c8c02e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java @@ -85,6 +85,7 @@ public PTransform, PDone> buildExternal(Configuration config if (config.timestampAttribute != null) { writeBuilder.setTimestampAttribute(config.timestampAttribute); } + writeBuilder.setDynamicDestinations(false); return writeBuilder.build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java new file mode 100644 index 000000000000..c082b2007aa8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import javax.naming.SizeLimitExceededException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; + +public class PreparePubsubWriteDoFn extends DoFn { + // See https://cloud.google.com/pubsub/quotas#resource_limits. + private static final int PUBSUB_MESSAGE_DATA_MAX_BYTES = 10 << 20; + private static final int PUBSUB_MESSAGE_MAX_ATTRIBUTES = 100; + private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES = 256; + private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024; + // The amount of bytes that each attribute entry adds up to the request + private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6; + private int maxPublishBatchSize; + + private SerializableFunction formatFunction; + @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction; + + static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchSize) + throws SizeLimitExceededException { + int payloadSize = message.getPayload().length; + if (payloadSize > PUBSUB_MESSAGE_DATA_MAX_BYTES) { + throw new SizeLimitExceededException( + "Pubsub message data field of length " + + payloadSize + + " exceeds maximum of " + + PUBSUB_MESSAGE_DATA_MAX_BYTES + + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); + } + int totalSize = payloadSize; + + @Nullable Map attributes = message.getAttributeMap(); + if (attributes != null) { + if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) { + throw new SizeLimitExceededException( + "Pubsub message contains " + + attributes.size() + + " attributes which exceeds the maximum of " + + PUBSUB_MESSAGE_MAX_ATTRIBUTES + + ". See https://cloud.google.com/pubsub/quotas#resource_limits"); + } + + // Consider attribute encoding overhead, so it doesn't go over the request limits + totalSize += attributes.size() * PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES; + + for (Map.Entry attribute : attributes.entrySet()) { + String key = attribute.getKey(); + int keySize = key.getBytes(StandardCharsets.UTF_8).length; + if (keySize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES) { + throw new SizeLimitExceededException( + "Pubsub message attribute key '" + + key + + "' exceeds the maximum of " + + PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES + + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); + } + totalSize += keySize; + + String value = attribute.getValue(); + int valueSize = value.getBytes(StandardCharsets.UTF_8).length; + if (valueSize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES) { + throw new SizeLimitExceededException( + "Pubsub message attribute value for key '" + + key + + "' starting with '" + + value.substring(0, Math.min(256, value.length())) + + "' exceeds the maximum of " + + PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES + + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); + } + totalSize += valueSize; + } + } + + if (totalSize > maxPublishBatchSize) { + throw new SizeLimitExceededException( + "Pubsub message of length " + + totalSize + + " exceeds maximum of " + + maxPublishBatchSize + + " bytes, when considering the payload and attributes. " + + "See https://cloud.google.com/pubsub/quotas#resource_limits"); + } + return totalSize; + } + + PreparePubsubWriteDoFn( + SerializableFunction formatFunction, + @Nullable + SerializableFunction, PubsubIO.PubsubTopic> topicFunction, + int maxPublishBatchSize) { + this.formatFunction = formatFunction; + this.topicFunction = topicFunction; + this.maxPublishBatchSize = maxPublishBatchSize; + } + + @ProcessElement + public void process( + @Element InputT element, + @Timestamp Instant ts, + BoundedWindow window, + PaneInfo paneInfo, + OutputReceiver o) { + PubsubMessage message = formatFunction.apply(element); + if (topicFunction != null) { + message = + message.withTopic( + topicFunction.apply(ValueInSingleWindow.of(element, ts, window, paneInfo)).asPath()); + } + try { + validatePubsubMessageSize(message, maxPublishBatchSize); + } catch (SizeLimitExceededException e) { + throw new IllegalArgumentException(e); + } + o.output(message); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java index 7e27ac2ce77d..3115da55cefc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubPayloadTranslation.java @@ -37,6 +37,8 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.Preconditions; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -103,7 +105,6 @@ public RunnerApi.FunctionSpec translate( static class PubSubWritePayloadTranslator implements TransformPayloadTranslator { - @Override public String getUrn(PubsubUnboundedSink.PubsubSink transform) { return PTransformTranslation.PUBSUB_WRITE; @@ -114,7 +115,8 @@ public RunnerApi.FunctionSpec translate( AppliedPTransform transform, SdkComponents components) { PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder(); - ValueProvider topicProvider = transform.getTransform().outer.getTopicProvider(); + ValueProvider topicProvider = + Preconditions.checkStateNotNull(transform.getTransform().outer.getTopicProvider()); if (topicProvider.isAccessible()) { payloadBuilder.setTopic(topicProvider.get().getFullPath()); } else { @@ -135,6 +137,32 @@ public RunnerApi.FunctionSpec translate( } } + static class PubSubDynamicWritePayloadTranslator + implements TransformPayloadTranslator { + @Override + public String getUrn(PubsubUnboundedSink.PubsubDynamicSink transform) { + return PTransformTranslation.PUBSUB_WRITE_DYNAMIC; + } + + @Override + public RunnerApi.FunctionSpec translate( + AppliedPTransform transform, + SdkComponents components) { + PubSubWritePayload.Builder payloadBuilder = PubSubWritePayload.newBuilder(); + if (transform.getTransform().outer.getTimestampAttribute() != null) { + payloadBuilder.setTimestampAttribute( + transform.getTransform().outer.getTimestampAttribute()); + } + if (transform.getTransform().outer.getIdAttribute() != null) { + payloadBuilder.setIdAttribute(transform.getTransform().outer.getIdAttribute()); + } + return FunctionSpec.newBuilder() + .setUrn(getUrn(transform.getTransform())) + .setPayload(payloadBuilder.build().toByteString()) + .build(); + } + } + @AutoService(TransformPayloadTranslatorRegistrar.class) public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar { @@ -142,8 +170,11 @@ public static class WriteRegistrar implements TransformPayloadTranslatorRegistra @SuppressWarnings("rawtypes") public Map, ? extends TransformPayloadTranslator> getTransformPayloadTranslators() { - return Collections.singletonMap( - PubsubUnboundedSink.PubsubSink.class, new PubSubWritePayloadTranslator()); + return ImmutableMap.of( + PubsubUnboundedSink.PubsubSink.class, + new PubSubWritePayloadTranslator(), + PubsubUnboundedSink.PubsubDynamicSink.class, + new PubSubDynamicWritePayloadTranslator()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index f075daf2c22d..06d7c344a088 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -357,10 +357,10 @@ public static TopicPath topicPathFromName(String projectId, String topicName) { public abstract static class OutgoingMessage implements Serializable { /** Underlying Message. May not have publish timestamp set. */ - public abstract PubsubMessage message(); + public abstract PubsubMessage getMessage(); /** Timestamp for element (ms since epoch). */ - public abstract long timestampMsSinceEpoch(); + public abstract long getTimestampMsSinceEpoch(); /** * If using an id attribute, the record id to associate with this record's metadata so the @@ -368,15 +368,22 @@ public abstract static class OutgoingMessage implements Serializable { */ public abstract @Nullable String recordId(); + public abstract @Nullable String topic(); + public static OutgoingMessage of( - PubsubMessage message, long timestampMsSinceEpoch, @Nullable String recordId) { - return new AutoValue_PubsubClient_OutgoingMessage(message, timestampMsSinceEpoch, recordId); + PubsubMessage message, + long timestampMsSinceEpoch, + @Nullable String recordId, + @Nullable String topic) { + return new AutoValue_PubsubClient_OutgoingMessage( + message, timestampMsSinceEpoch, recordId, topic); } public static OutgoingMessage of( org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage message, long timestampMsSinceEpoch, - @Nullable String recordId) { + @Nullable String recordId, + @Nullable String topic) { PubsubMessage.Builder builder = PubsubMessage.newBuilder().setData(ByteString.copyFrom(message.getPayload())); if (message.getAttributeMap() != null) { @@ -385,7 +392,7 @@ public static OutgoingMessage of( if (message.getOrderingKey() != null) { builder.setOrderingKey(message.getOrderingKey()); } - return of(builder.build(), timestampMsSinceEpoch, recordId); + return of(builder.build(), timestampMsSinceEpoch, recordId, topic); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index 60c096f72f81..7d8ca3f7517f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -230,11 +230,11 @@ public int publish(TopicPath topic, List outgoingMessages) thro PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath()); for (OutgoingMessage outgoingMessage : outgoingMessages) { PubsubMessage.Builder message = - outgoingMessage.message().toBuilder().clearMessageId().clearPublishTime(); + outgoingMessage.getMessage().toBuilder().clearMessageId().clearPublishTime(); if (timestampAttribute != null) { message.putAttributes( - timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch())); + timestampAttribute, String.valueOf(outgoingMessage.getTimestampMsSinceEpoch())); } if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 7bac875e2375..7dde9c986491 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -29,9 +29,9 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.naming.SizeLimitExceededException; @@ -67,6 +67,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.EncodableThrowable; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; @@ -74,9 +75,12 @@ import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; import org.slf4j.Logger; @@ -103,6 +107,65 @@ * reviewers mentioned * here. + * + *

Example PubsubIO read usage

+ * + *
{@code
+ * // Read from a specific topic; a subscription will be created at pipeline start time.
+ * PCollection messages = PubsubIO.readMessages().fromTopic(topic);
+ *
+ * // Read from a subscription.
+ * PCollection messages = PubsubIO.readMessages().fromSubscription(subscription);
+ *
+ * // Read messages including attributes. All PubSub attributes will be included in the PubsubMessage.
+ * PCollection messages = PubsubIO.readMessagesWithAttributes().fromTopic(topic);
+ *
+ * // Examples of reading different types from PubSub.
+ * PCollection strings = PubsubIO.readStrings().fromTopic(topic);
+ * PCollection protos = PubsubIO.readProtos(MyProto.class).fromTopic(topic);
+ * PCollection avros = PubsubIO.readAvros(MyType.class).fromTopic(topic);
+ *
+ * }
+ * + *

Example PubsubIO write usage

+ * + * Data can be written to a single topic or to a dynamic set of topics. In order to write to a + * single topic, the {@link PubsubIO.Write#to(String)} method can be used. For example: + * + *
{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).to(topic));
+ * protos.apply(PubsubIO.writeProtos(MyProto.class).to(topic));
+ * strings.apply(PubsubIO.writeStrings().to(topic));
+ * }
+ * + * Dynamic topic destinations can be accomplished by specifying a function to extract the topic from + * the record using the {@link PubsubIO.Write#to(SerializableFunction)} method. For example: + * + *
{@code
+ * avros.apply(PubsubIO.writeAvros(MyType.class).
+ *      to((ValueInSingleWindow quote) -> {
+ *               String country = quote.getCountry();
+ *               return "projects/myproject/topics/events_" + country;
+ *              });
+ * }
+ * + * Dynamic topics can also be specified by writing {@link PubsubMessage} objects containing the + * topic and writing using the {@link PubsubIO#writeMessagesDynamic()} method. For example: + * + *
{@code
+ * events.apply(MapElements.into(new TypeDescriptor() {})
+ *                         .via(e -> new PubsubMessage(
+ *                             e.toByteString(), Collections.emptyMap()).withTopic(e.getCountry())))
+ * .apply(PubsubIO.writeMessagesDynamic());
+ * }
+ * + *

Custom timestamps

+ * + * All messages read from PubSub have a stable publish timestamp that is independent of when the + * message is read from the PubSub topic. By default, the publish time is used as the timestamp for + * all messages read and the watermark is based on that. If there is a different logical timestamp + * to be used, that timestamp must be published in a PubSub attribute and specified using {@link + * PubsubIO.Read#withTimestampAttribute}. See the Javadoc for that method for the timestamp format. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) @@ -134,19 +197,11 @@ public class PubsubIO { private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+"); + static final int PUBSUB_MESSAGE_MAX_TOTAL_SIZE = 10 << 20; + private static final int PUBSUB_NAME_MIN_LENGTH = 3; private static final int PUBSUB_NAME_MAX_LENGTH = 255; - // See https://cloud.google.com/pubsub/quotas#resource_limits. - private static final int PUBSUB_MESSAGE_MAX_TOTAL_SIZE = 10 << 20; - private static final int PUBSUB_MESSAGE_DATA_MAX_BYTES = 10 << 20; - private static final int PUBSUB_MESSAGE_MAX_ATTRIBUTES = 100; - private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES = 256; - private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024; - - // The amount of bytes that each attribute entry adds up to the request - private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6; - private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/"; private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/"; private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null"; @@ -182,76 +237,6 @@ private static void validatePubsubName(String name) { } } - @VisibleForTesting - static int validateAndGetPubsubMessageSize(PubsubMessage message) - throws SizeLimitExceededException { - int payloadSize = message.getPayload().length; - if (payloadSize > PUBSUB_MESSAGE_DATA_MAX_BYTES) { - throw new SizeLimitExceededException( - "Pubsub message data field of length " - + payloadSize - + " exceeds maximum of " - + PUBSUB_MESSAGE_DATA_MAX_BYTES - + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); - } - int totalSize = payloadSize; - - @Nullable Map attributes = message.getAttributeMap(); - if (attributes != null) { - if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) { - throw new SizeLimitExceededException( - "Pubsub message contains " - + attributes.size() - + " attributes which exceeds the maximum of " - + PUBSUB_MESSAGE_MAX_ATTRIBUTES - + ". See https://cloud.google.com/pubsub/quotas#resource_limits"); - } - - // Consider attribute encoding overhead, so it doesn't go over the request limits - totalSize += attributes.size() * PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES; - - for (Map.Entry attribute : attributes.entrySet()) { - String key = attribute.getKey(); - int keySize = key.getBytes(StandardCharsets.UTF_8).length; - if (keySize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES) { - throw new SizeLimitExceededException( - "Pubsub message attribute key '" - + key - + "' exceeds the maximum of " - + PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES - + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); - } - totalSize += keySize; - - String value = attribute.getValue(); - int valueSize = value.getBytes(StandardCharsets.UTF_8).length; - if (valueSize > PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES) { - throw new SizeLimitExceededException( - "Pubsub message attribute value for key '" - + key - + "' starting with '" - + value.substring(0, Math.min(256, value.length())) - + "' exceeds the maximum of " - + PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES - + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); - } - totalSize += valueSize; - } - } - - if (totalSize > PUBSUB_MESSAGE_MAX_TOTAL_SIZE) { - throw new SizeLimitExceededException( - "Pubsub message of length " - + totalSize - + " exceeds maximum of " - + PUBSUB_MESSAGE_MAX_TOTAL_SIZE - + " bytes, when considering the payload and attributes. " - + "See https://cloud.google.com/pubsub/quotas#resource_limits"); - } - - return totalSize; - } - /** Populate common {@link DisplayData} between Pubsub source and sink. */ private static void populateCommonDisplayData( DisplayData.Builder builder, @@ -402,6 +387,22 @@ public TopicPath apply(PubsubTopic from) { /** Class representing a Cloud Pub/Sub Topic. */ public static class PubsubTopic implements Serializable { + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PubsubTopic)) { + return false; + } + PubsubTopic that = (PubsubTopic) o; + return type == that.type && project.equals(that.project) && topic.equals(that.topic); + } + + @Override + public int hashCode() { + return Objects.hash(type, project, topic); + } private enum Type { NORMAL, @@ -709,7 +710,25 @@ public static Read readAvrosWithBeamSchema(Class clazz) { /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ public static Write writeMessages() { - return Write.newBuilder().build(); + return Write.newBuilder() + .setTopicProvider(null) + .setTopicFunction(null) + .setDynamicDestinations(false) + .build(); + } + + /** + * Enables dynamic destination topics. The {@link PubsubMessage} elements are each expected to + * contain a destination topic, which can be set using {@link PubsubMessage#withTopic}. If {@link + * Write#to} is called, that will be used instead to generate the topic and the value returned by + * {@link PubsubMessage#getTopic} will be ignored. + */ + public static Write writeMessagesDynamic() { + return Write.newBuilder() + .setTopicProvider(null) + .setTopicFunction(null) + .setDynamicDestinations(true) + .build(); } /** @@ -720,6 +739,7 @@ public static Write writeStrings() { return Write.newBuilder( (String string) -> new PubsubMessage(string.getBytes(StandardCharsets.UTF_8), ImmutableMap.of())) + .setDynamicDestinations(false) .build(); } @@ -729,7 +749,9 @@ public static Write writeStrings() { */ public static Write writeProtos(Class messageClass) { // TODO: Like in readProtos(), stop using ProtoCoder and instead format the payload directly. - return Write.newBuilder(formatPayloadUsingCoder(ProtoCoder.of(messageClass))).build(); + return Write.newBuilder(formatPayloadUsingCoder(ProtoCoder.of(messageClass))) + .setDynamicDestinations(false) + .build(); } /** @@ -738,7 +760,9 @@ public static Write writeProtos(Class messageClass) { */ public static Write writeAvros(Class clazz) { // TODO: Like in readAvros(), stop using AvroCoder and instead format the payload directly. - return Write.newBuilder(formatPayloadUsingCoder(AvroCoder.of(clazz))).build(); + return Write.newBuilder(formatPayloadUsingCoder(AvroCoder.of(clazz))) + .setDynamicDestinations(false) + .build(); } /** Implementation of read methods. */ @@ -1128,6 +1152,10 @@ public abstract static class Write extends PTransform, PDone> abstract @Nullable ValueProvider getTopicProvider(); + abstract @Nullable SerializableFunction, PubsubTopic> getTopicFunction(); + + abstract boolean getDynamicDestinations(); + abstract PubsubClient.PubsubClientFactory getPubsubClientFactory(); /** the batch size for bulk submissions to pubsub. */ @@ -1164,6 +1192,11 @@ static Builder newBuilder() { abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topicProvider); + abstract Builder setTopicFunction( + SerializableFunction, PubsubTopic> topicFunction); + + abstract Builder setDynamicDestinations(boolean dynamicDestinations); + abstract Builder setPubsubClientFactory(PubsubClient.PubsubClientFactory factory); abstract Builder setMaxBatchSize(Integer batchSize); @@ -1195,6 +1228,21 @@ public Write to(String topic) { public Write to(ValueProvider topic) { return toBuilder() .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) + .setTopicFunction(null) + .setDynamicDestinations(false) + .build(); + } + + /** + * Provides a function to dynamically specify the target topic per message. Not compatible with + * any of the other to methods. If {@link #to} is called again specifying a topic, then this + * topicFunction will be ignored. + */ + public Write to(SerializableFunction, String> topicFunction) { + return toBuilder() + .setTopicProvider(null) + .setTopicFunction(v -> PubsubTopic.fromPath(topicFunction.apply(v))) + .setDynamicDestinations(true) .build(); } @@ -1263,13 +1311,34 @@ public Write withPubsubRootUrl(String pubsubRootUrl) { @Override public PDone expand(PCollection input) { - if (getTopicProvider() == null) { - throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform"); + if (getTopicProvider() == null && !getDynamicDestinations()) { + throw new IllegalStateException( + "need to set the topic of a PubsubIO.Write transform if not using " + + "dynamic topic destinations."); } + SerializableFunction, PubsubIO.PubsubTopic> topicFunction = + getTopicFunction(); + if (topicFunction == null && getTopicProvider() != null) { + topicFunction = v -> getTopicProvider().get(); + } + int maxMessageSize = PUBSUB_MESSAGE_MAX_TOTAL_SIZE; + if (input.isBounded() == PCollection.IsBounded.BOUNDED) { + maxMessageSize = + Math.min( + maxMessageSize, + MoreObjects.firstNonNull( + getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT)); + } + PCollection pubsubMessages = + input + .apply( + ParDo.of( + new PreparePubsubWriteDoFn<>(getFormatFn(), topicFunction, maxMessageSize))) + .setCoder(new PubsubMessageWithTopicCoder()); switch (input.isBounded()) { case BOUNDED: - input.apply( + pubsubMessages.apply( ParDo.of( new PubsubBoundedWriter( MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE), @@ -1277,31 +1346,20 @@ public PDone expand(PCollection input) { getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT)))); return PDone.in(input.getPipeline()); case UNBOUNDED: - return input - .apply( - MapElements.into(new TypeDescriptor() {}) - .via( - elem -> { - PubsubMessage message = getFormatFn().apply(elem); - try { - validateAndGetPubsubMessageSize(message); - } catch (SizeLimitExceededException e) { - throw new IllegalArgumentException(e); - } - return message; - })) - .apply( - new PubsubUnboundedSink( - getPubsubClientFactory(), - NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()), - getTimestampAttribute(), - getIdAttribute(), - 100 /* numShards */, - MoreObjects.firstNonNull( - getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), - MoreObjects.firstNonNull( - getMaxBatchBytesSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES), - getPubsubRootUrl())); + return pubsubMessages.apply( + new PubsubUnboundedSink( + getPubsubClientFactory(), + getTopicProvider() != null + ? NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()) + : null, + getTimestampAttribute(), + getIdAttribute(), + 100 /* numShards */, + MoreObjects.firstNonNull( + getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), + MoreObjects.firstNonNull( + getMaxBatchBytesSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES), + getPubsubRootUrl())); } throw new RuntimeException(); // cases are exhaustive. } @@ -1318,10 +1376,20 @@ public void populateDisplayData(DisplayData.Builder builder) { * *

Public so can be suppressed by runners. */ - public class PubsubBoundedWriter extends DoFn { - private transient List output; + public class PubsubBoundedWriter extends DoFn { + private class OutgoingData { + List messages; + long bytes; + + OutgoingData() { + this.messages = Lists.newArrayList(); + this.bytes = 0; + } + } + + private transient Map output; + private transient PubsubClient pubsubClient; - private transient int currentOutputBytes; private int maxPublishBatchByteSize; private int maxPublishBatchSize; @@ -1337,8 +1405,7 @@ public class PubsubBoundedWriter extends DoFn { @StartBundle public void startBundle(StartBundleContext c) throws IOException { - this.output = new ArrayList<>(); - this.currentOutputBytes = 0; + this.output = Maps.newHashMap(); // NOTE: idAttribute is ignored. this.pubsubClient = @@ -1348,25 +1415,31 @@ public void startBundle(StartBundleContext c) throws IOException { } @ProcessElement - public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { - PubsubMessage message = getFormatFn().apply(c.element()); - int messageSize = validateAndGetPubsubMessageSize(message); - if (messageSize > maxPublishBatchByteSize) { - String msg = - String.format( - "Pub/Sub message size (%d) exceeded maximum batch size (%d)", - messageSize, maxPublishBatchByteSize); - throw new SizeLimitExceededException(msg); + public void processElement(@Element PubsubMessage message, @Timestamp Instant timestamp) + throws IOException, SizeLimitExceededException { + // Validate again here just as a sanity check. + PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchSize); + byte[] payload = message.getPayload(); + int messageSize = payload.length; + + PubsubTopic pubsubTopic; + if (getTopicProvider() != null) { + pubsubTopic = getTopicProvider().get(); + } else { + pubsubTopic = + PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic())); } - // Checking before adding the message stops us from violating max batch size or bytes - if (output.size() >= maxPublishBatchSize - || (!output.isEmpty() - && (currentOutputBytes + messageSize) >= maxPublishBatchByteSize)) { - publish(); + OutgoingData currentTopicOutput = + output.computeIfAbsent(pubsubTopic, t -> new OutgoingData()); + if (currentTopicOutput.messages.size() >= maxPublishBatchSize + || (!currentTopicOutput.messages.isEmpty() + && (currentTopicOutput.bytes + messageSize) >= maxPublishBatchByteSize)) { + publish(pubsubTopic, currentTopicOutput.messages); + currentTopicOutput.messages.clear(); + currentTopicOutput.bytes = 0; } - byte[] payload = message.getPayload(); Map attributes = message.getAttributeMap(); String orderingKey = message.getOrderingKey(); @@ -1380,29 +1453,27 @@ public void processElement(ProcessContext c) throws IOException, SizeLimitExceed } // NOTE: The record id is always null. - output.add(OutgoingMessage.of(msgBuilder.build(), c.timestamp().getMillis(), null)); - currentOutputBytes += messageSize; + currentTopicOutput.messages.add( + OutgoingMessage.of( + msgBuilder.build(), timestamp.getMillis(), null, message.getTopic())); + currentTopicOutput.bytes += messageSize; } @FinishBundle public void finishBundle() throws IOException { - if (!output.isEmpty()) { - publish(); + for (Map.Entry entry : output.entrySet()) { + publish(entry.getKey(), entry.getValue().messages); } output = null; - currentOutputBytes = 0; pubsubClient.close(); pubsubClient = null; } - private void publish() throws IOException { - PubsubTopic topic = getTopicProvider().get(); + private void publish(PubsubTopic topic, List messages) throws IOException { int n = pubsubClient.publish( - PubsubClient.topicPathFromName(topic.project, topic.topic), output); - checkState(n == output.size()); - output.clear(); - currentOutputBytes = 0; + PubsubClient.topicPathFromName(topic.project, topic.topic), messages); + checkState(n == messages.size()); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index ab6b6533343e..140931a9f054 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -141,10 +141,10 @@ public int publish(TopicPath topic, List outgoingMessages) thro List pubsubMessages = new ArrayList<>(outgoingMessages.size()); for (OutgoingMessage outgoingMessage : outgoingMessages) { PubsubMessage pubsubMessage = - new PubsubMessage().encodeData(outgoingMessage.message().getData().toByteArray()); + new PubsubMessage().encodeData(outgoingMessage.getMessage().getData().toByteArray()); pubsubMessage.setAttributes(getMessageAttributes(outgoingMessage)); - if (!outgoingMessage.message().getOrderingKey().isEmpty()) { - pubsubMessage.setOrderingKey(outgoingMessage.message().getOrderingKey()); + if (!outgoingMessage.getMessage().getOrderingKey().isEmpty()) { + pubsubMessage.setOrderingKey(outgoingMessage.getMessage().getOrderingKey()); } // N.B. publishTime and messageId are intentionally not set on the message that is published @@ -158,13 +158,14 @@ public int publish(TopicPath topic, List outgoingMessages) thro private Map getMessageAttributes(OutgoingMessage outgoingMessage) { Map attributes = null; - if (outgoingMessage.message().getAttributesMap() == null) { + if (outgoingMessage.getMessage().getAttributesMap() == null) { attributes = new TreeMap<>(); } else { - attributes = new TreeMap<>(outgoingMessage.message().getAttributesMap()); + attributes = new TreeMap<>(outgoingMessage.getMessage().getAttributesMap()); } if (timestampAttribute != null) { - attributes.put(timestampAttribute, String.valueOf(outgoingMessage.timestampMsSinceEpoch())); + attributes.put( + timestampAttribute, String.valueOf(outgoingMessage.getTimestampMsSinceEpoch())); } if (idAttribute != null && !Strings.isNullOrEmpty(outgoingMessage.recordId())) { attributes.put(idAttribute, outgoingMessage.recordId()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java index 549daf92657f..3649c6c6d460 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessage.java @@ -33,6 +33,8 @@ public class PubsubMessage { @AutoValue abstract static class Impl { + abstract @Nullable String getTopic(); + @SuppressWarnings("mutable") abstract byte[] getPayload(); @@ -43,11 +45,12 @@ abstract static class Impl { abstract @Nullable String getOrderingKey(); static Impl create( + @Nullable String topic, byte[] payload, @Nullable Map attributes, @Nullable String messageId, @Nullable String orderingKey) { - return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId, orderingKey); + return new AutoValue_PubsubMessage_Impl(topic, payload, attributes, messageId, orderingKey); } } @@ -59,7 +62,7 @@ public PubsubMessage(byte[] payload, @Nullable Map attributes) { public PubsubMessage( byte[] payload, @Nullable Map attributes, @Nullable String messageId) { - impl = Impl.create(payload, attributes, messageId, null); + impl = Impl.create(null, payload, attributes, messageId, null); } public PubsubMessage( @@ -67,7 +70,25 @@ public PubsubMessage( @Nullable Map attributes, @Nullable String messageId, @Nullable String orderingKey) { - impl = Impl.create(payload, attributes, messageId, orderingKey); + impl = Impl.create(null, payload, attributes, messageId, orderingKey); + } + + private PubsubMessage(Impl impl) { + this.impl = impl; + } + + public PubsubMessage withTopic(String topic) { + return new PubsubMessage( + Impl.create( + topic, + impl.getPayload(), + impl.getAttributeMap(), + impl.getMessageId(), + impl.getOrderingKey())); + } + + public @Nullable String getTopic() { + return impl.getTopic(); } /** Returns the main PubSub message. */ diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java new file mode 100644 index 000000000000..d10b9a2f1066 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithTopicCoder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.MapCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** A coder for PubsubMessage including the topic from the PubSub server. */ +public class PubsubMessageWithTopicCoder extends CustomCoder { + // A message's payload cannot be null + private static final Coder PAYLOAD_CODER = ByteArrayCoder.of(); + private static final Coder<@Nullable Map> ATTRIBUTES_CODER = + NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + private static final Coder<@Nullable String> MESSAGE_ID_CODER = + NullableCoder.of(StringUtf8Coder.of()); + + private static final Coder<@Nullable String> TOPIC_CODER = NullableCoder.of(StringUtf8Coder.of()); + + public static Coder of(TypeDescriptor ignored) { + return of(); + } + + public static PubsubMessageWithAttributesAndMessageIdCoder of() { + return new PubsubMessageWithAttributesAndMessageIdCoder(); + } + + @Override + public void encode(PubsubMessage value, OutputStream outStream) throws IOException { + PAYLOAD_CODER.encode(value.getPayload(), outStream); + ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream); + MESSAGE_ID_CODER.encode(value.getMessageId(), outStream); + TOPIC_CODER.encode(value.getTopic(), outStream); + } + + @Override + public PubsubMessage decode(InputStream inStream) throws IOException { + byte[] payload = PAYLOAD_CODER.decode(inStream); + @Nullable Map attributes = ATTRIBUTES_CODER.decode(inStream); + @Nullable String messageId = MESSAGE_ID_CODER.decode(inStream); + @Nullable String topic = TOPIC_CODER.decode(inStream); + PubsubMessage pubsubMessage = new PubsubMessage(payload, attributes, messageId); + if (topic != null) { + pubsubMessage = pubsubMessage.withTopic(topic); + } + return pubsubMessage; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index 575957c60728..9c41ff8b2f20 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -57,6 +57,8 @@ private static class State { /** True if has been primed for a test but not yet validated. */ boolean isActive; + boolean isPublish; + /** Publish mode only: Only publish calls for this topic are allowed. */ @Nullable TopicPath expectedTopic; @@ -111,7 +113,7 @@ default PubsubIO.Read setClock(PubsubIO.Read readTransform, Clock cloc * factory must be closed when the test is complete, at which point final validation will occur. */ public static PubsubTestClientFactory createFactoryForPublish( - final TopicPath expectedTopic, + final @Nullable TopicPath expectedTopic, final Iterable expectedOutgoingMessages, final Iterable failingOutgoingMessages) { activate( @@ -315,9 +317,10 @@ private static void deactivate(Runnable runFinalChecks) { /** Handles setting {@code STATE} values for a publishing client. */ private static void setPublishState( - final TopicPath expectedTopic, + final @Nullable TopicPath expectedTopic, final Iterable expectedOutgoingMessages, final Iterable failingOutgoingMessages) { + STATE.isPublish = true; STATE.expectedTopic = expectedTopic; STATE.remainingExpectedOutgoingMessages = Sets.newHashSet(expectedOutgoingMessages); STATE.remainingFailingOutgoingMessages = Sets.newHashSet(failingOutgoingMessages); @@ -422,7 +425,7 @@ private boolean inPullMode() { /** Return true if in publish mode. */ private boolean inPublishMode() { checkState(STATE.isActive, "No test is active"); - return STATE.expectedTopic != null; + return STATE.isPublish; } /** @@ -452,12 +455,20 @@ public void close() {} public int publish(TopicPath topic, List outgoingMessages) throws IOException { synchronized (STATE) { checkState(inPublishMode(), "Can only publish in publish mode"); - checkState( - topic.equals(STATE.expectedTopic), - "Topic %s does not match expected %s", - topic, - STATE.expectedTopic); + boolean isDynamic = STATE.expectedTopic == null; + if (!isDynamic) { + checkState( + topic.equals(STATE.expectedTopic), + "Topic %s does not match expected %s", + topic, + STATE.expectedTopic); + } for (OutgoingMessage outgoingMessage : outgoingMessages) { + if (isDynamic) { + checkState(outgoingMessage.topic().equals(topic.getPath())); + } else { + checkState(outgoingMessage.topic() == null); + } if (STATE.remainingFailingOutgoingMessages.remove(outgoingMessage)) { throw new RuntimeException("Simulating failure for " + outgoingMessage); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index cc3009c73131..b53aef87bc3f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -46,8 +47,14 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.MapValues; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableBiFunction; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -59,10 +66,13 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; +import org.joda.time.Instant; /** * A PTransform which streams messages to Pubsub. @@ -94,17 +104,20 @@ public class PubsubUnboundedSink extends PTransform, /** Default longest delay between receiving a message and pushing it to Pubsub. */ private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2); + /** Coder for conveying outgoing messages between internal stages. */ /** Coder for conveying outgoing messages between internal stages. */ private static class OutgoingMessageCoder extends AtomicCoder { private static final NullableCoder RECORD_ID_CODER = NullableCoder.of(StringUtf8Coder.of()); + private static final NullableCoder TOPIC_CODER = NullableCoder.of(StringUtf8Coder.of()); @Override public void encode(OutgoingMessage value, OutputStream outStream) throws CoderException, IOException { - ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.message(), outStream); - BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch(), outStream); + ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).encode(value.getMessage(), outStream); + BigEndianLongCoder.of().encode(value.getTimestampMsSinceEpoch(), outStream); RECORD_ID_CODER.encode(value.recordId(), outStream); + TOPIC_CODER.encode(value.topic(), outStream); } @Override @@ -113,7 +126,8 @@ public OutgoingMessage decode(InputStream inStream) throws CoderException, IOExc ProtoCoder.of(com.google.pubsub.v1.PubsubMessage.class).decode(inStream); long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream); @Nullable String recordId = RECORD_ID_CODER.decode(inStream); - return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId); + @Nullable String topic = TOPIC_CODER.decode(inStream); + return OutgoingMessage.of(message, timestampMsSinceEpoch, recordId, topic); } } @@ -139,24 +153,38 @@ enum RecordIdMethod { // ================================================================================ /** Convert elements to messages and shard them. */ - private static class ShardFn extends DoFn> { + private static class ShardFn extends DoFn> { private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements"); private final int numShards; private final RecordIdMethod recordIdMethod; - ShardFn(int numShards, RecordIdMethod recordIdMethod) { + private final SerializableFunction toProto; + private final SerializableFunction dynamicTopicFn; + + private final SerializableBiFunction keyFunction; + + ShardFn( + int numShards, + RecordIdMethod recordIdMethod, + SerializableFunction toProto, + SerializableFunction dynamicTopicFn, + SerializableBiFunction keyFunction) { this.numShards = numShards; this.recordIdMethod = recordIdMethod; + this.toProto = toProto; + this.dynamicTopicFn = dynamicTopicFn; + this.keyFunction = keyFunction; } @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement( + @Element T element, @Timestamp Instant timestamp, OutputReceiver> o) + throws Exception { + com.google.pubsub.v1.PubsubMessage message = toProto.apply(element); elementCounter.inc(); - com.google.pubsub.v1.PubsubMessage message = - com.google.pubsub.v1.PubsubMessage.parseFrom(c.element()); byte[] elementBytes = message.getData().toByteArray(); - long timestampMsSinceEpoch = c.timestamp().getMillis(); + long timestampMsSinceEpoch = timestamp.getMillis(); @Nullable String recordId = null; switch (recordIdMethod) { case NONE: @@ -172,10 +200,10 @@ public void processElement(ProcessContext c) throws Exception { recordId = UUID.randomUUID().toString(); break; } - c.output( - KV.of( - ThreadLocalRandom.current().nextInt(numShards), - OutgoingMessage.of(message, timestampMsSinceEpoch, recordId))); + + @Nullable String topic = dynamicTopicFn.apply(element); + K key = keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic); + o.output(KV.of(key, OutgoingMessage.of(message, timestampMsSinceEpoch, recordId, topic))); } @Override @@ -190,9 +218,9 @@ public void populateDisplayData(DisplayData.Builder builder) { // ================================================================================ /** Publish messages to Pubsub in batches. */ - private static class WriterFn extends DoFn>, Void> { + private static class WriterFn extends DoFn, Void> { private final PubsubClientFactory pubsubFactory; - private final ValueProvider topic; + private final @Nullable ValueProvider topic; private final String timestampAttribute; private final String idAttribute; private final int publishBatchSize; @@ -209,7 +237,7 @@ private static class WriterFn extends DoFn WriterFn( PubsubClientFactory pubsubFactory, - ValueProvider topic, + @Nullable ValueProvider topic, String timestampAttribute, String idAttribute, int publishBatchSize, @@ -225,7 +253,7 @@ private static class WriterFn extends DoFn WriterFn( PubsubClientFactory pubsubFactory, - ValueProvider topic, + @Nullable ValueProvider topic, String timestampAttribute, String idAttribute, int publishBatchSize, @@ -242,7 +270,18 @@ private static class WriterFn extends DoFn /** BLOCKING Send {@code messages} as a batch to Pubsub. */ private void publishBatch(List messages, int bytes) throws IOException { - int n = pubsubClient.publish(topic.get(), messages); + Preconditions.checkState(!messages.isEmpty()); + TopicPath topicPath; + if (topic != null) { + topicPath = topic.get(); + } else { + // This is the dynamic topic destinations case. Since we first group by topic, we can assume + // that all messages in the batch have the same topic. + topicPath = + PubsubClient.topicPathFromPath( + org.apache.beam.sdk.util.Preconditions.checkStateNotNull(messages.get(0).topic())); + } + int n = pubsubClient.publish(topicPath, messages); checkState( n == messages.size(), "Attempted to publish %s messages but %s were successful", @@ -256,6 +295,7 @@ private void publishBatch(List messages, int bytes) throws IOEx @StartBundle public void startBundle(StartBundleContext c) throws Exception { checkState(pubsubClient == null, "startBundle invoked without prior finishBundle"); + // TODO: Do we really need to recreate the client on every bundle? pubsubClient = pubsubFactory.newClient( timestampAttribute, @@ -268,9 +308,9 @@ public void startBundle(StartBundleContext c) throws Exception { public void processElement(ProcessContext c) throws Exception { List pubsubMessages = new ArrayList<>(publishBatchSize); int bytes = 0; - for (OutgoingMessage message : c.element().getValue()) { + for (OutgoingMessage message : c.element()) { if (!pubsubMessages.isEmpty() - && bytes + message.message().getData().size() > publishBatchBytes) { + && bytes + message.getMessage().getData().size() > publishBatchBytes) { // Break large (in bytes) batches into smaller. // (We've already broken by batch size using the trigger below, though that may // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since @@ -281,7 +321,7 @@ public void processElement(ProcessContext c) throws Exception { bytes = 0; } pubsubMessages.add(message); - bytes += message.message().getData().size(); + bytes += message.getMessage().getData().size(); } if (!pubsubMessages.isEmpty()) { // BLOCKS until published. @@ -298,7 +338,7 @@ public void finishBundle() throws Exception { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.add(DisplayData.item("topic", topic)); + builder.addIfNotNull(DisplayData.item("topic", topic)); builder.add(DisplayData.item("transport", pubsubFactory.getKind())); builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute)); builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute)); @@ -312,8 +352,11 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Which factory to use for creating Pubsub transport. */ private final PubsubClientFactory pubsubFactory; - /** Pubsub topic to publish to. */ - private final ValueProvider topic; + /** + * Pubsub topic to publish to. If null, that indicates that the PubsubMessage instead contains the + * topic. + */ + private final @Nullable ValueProvider topic; /** * Pubsub metadata field holding timestamp of each element, or {@literal null} if should use @@ -355,7 +398,7 @@ public void populateDisplayData(DisplayData.Builder builder) { @VisibleForTesting PubsubUnboundedSink( PubsubClientFactory pubsubFactory, - ValueProvider topic, + @Nullable ValueProvider topic, String timestampAttribute, String idAttribute, int numShards, @@ -458,12 +501,12 @@ public PubsubUnboundedSink( pubsubRootUrl); } /** Get the topic being written to. */ - public TopicPath getTopic() { - return topic.get(); + public @Nullable TopicPath getTopic() { + return topic != null ? topic.get() : null; } /** Get the {@link ValueProvider} for the topic being written to. */ - public ValueProvider getTopicProvider() { + public @Nullable ValueProvider getTopicProvider() { return topic; } @@ -479,13 +522,79 @@ public ValueProvider getTopicProvider() { @Override public PDone expand(PCollection input) { - return input - .apply( - "Output Serialized PubsubMessage Proto", - MapElements.into(new TypeDescriptor() {}) - .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto())) - .setCoder(ByteArrayCoder.of()) - .apply(new PubsubSink(this)); + if (topic != null) { + return input + .apply( + "Output Serialized PubsubMessage Proto", + MapElements.into(new TypeDescriptor() {}) + .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto())) + .setCoder(ByteArrayCoder.of()) + .apply(new PubsubSink(this)); + } else { + // dynamic destinations. + return input + .apply( + "WithDynamicTopics", + WithKeys.of(PubsubMessage::getTopic).withKeyType(TypeDescriptors.strings())) + .apply( + MapValues.into(new TypeDescriptor() {}) + .via(new PubsubMessages.ParsePayloadAsPubsubMessageProto())) + .setCoder(KvCoder.of(StringUtf8Coder.of(), ByteArrayCoder.of())) + .apply(new PubsubDynamicSink(this)); + } + } + + static class PubsubDynamicSink extends PTransform>, PDone> { + public final PubsubUnboundedSink outer; + + PubsubDynamicSink(PubsubUnboundedSink outer) { + this.outer = outer; + } + + @Override + public PDone expand(PCollection> input) { + input + .apply( + "PubsubUnboundedSink.Window", + Window.>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterPane.elementCountAtLeast(outer.publishBatchSize), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(outer.maxLatency)))) + .discardingFiredPanes()) + .apply( + "PubsubUnboundedSink.ShardDynamicDestinations", + ParDo.of( + new ShardFn, KV>( + outer.numShards, + outer.recordIdMethod, + kv -> { + try { + return com.google.pubsub.v1.PubsubMessage.parseFrom(kv.getValue()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }, + KV::getKey, + KV::of))) + .setCoder(KvCoder.of(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()), CODER)) + .apply(GroupByKey.create()) + .apply(Values.create()) + .apply( + "PubsubUnboundedSink.Writer", + ParDo.of( + new WriterFn( + outer.pubsubFactory, + outer.topic, + outer.timestampAttribute, + outer.idAttribute, + outer.publishBatchSize, + outer.publishBatchBytes, + outer.pubsubRootUrl))); + return PDone.in(input.getPipeline()); + } } static class PubsubSink extends PTransform, PDone> { @@ -510,9 +619,22 @@ public PDone expand(PCollection input) { .discardingFiredPanes()) .apply( "PubsubUnboundedSink.Shard", - ParDo.of(new ShardFn(outer.numShards, outer.recordIdMethod))) + ParDo.of( + new ShardFn<>( + outer.numShards, + outer.recordIdMethod, + m -> { + try { + return com.google.pubsub.v1.PubsubMessage.parseFrom(m); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + }, + SerializableFunctions.constant(null), + (s, t) -> s))) .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) .apply(GroupByKey.create()) + .apply(Values.create()) .apply( "PubsubUnboundedSink.Writer", ParDo.of( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java new file mode 100644 index 000000000000..ae0cbb82727c --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.PUBSUB_MESSAGE_MAX_TOTAL_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import javax.naming.SizeLimitExceededException; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class PreparePubsubWriteDoFnTest implements Serializable { + @Test + public void testValidatePubsubMessageSizeOnlyPayload() throws SizeLimitExceededException { + byte[] data = new byte[1024]; + PubsubMessage message = new PubsubMessage(data, null); + + int messageSize = + PreparePubsubWriteDoFn.validatePubsubMessageSize(message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE); + + assertEquals(data.length, messageSize); + } + + @Test + public void testValidatePubsubMessageSizePayloadAndAttributes() + throws SizeLimitExceededException { + byte[] data = new byte[1024]; + String attributeKey = "key"; + String attributeValue = "value"; + Map attributes = ImmutableMap.of(attributeKey, attributeValue); + PubsubMessage message = new PubsubMessage(data, attributes); + + int messageSize = + PreparePubsubWriteDoFn.validatePubsubMessageSize(message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE); + + assertEquals( + data.length + + 6 // PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES + + attributeKey.getBytes(StandardCharsets.UTF_8).length + + attributeValue.getBytes(StandardCharsets.UTF_8).length, + messageSize); + } + + @Test + public void testValidatePubsubMessageSizePayloadTooLarge() { + byte[] data = new byte[(10 << 20) + 1]; + PubsubMessage message = new PubsubMessage(data, null); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + + @Test + public void testValidatePubsubMessageSizePayloadPlusAttributesTooLarge() { + byte[] data = new byte[(10 << 20)]; + String attributeKey = "key"; + String attributeValue = "value"; + Map attributes = ImmutableMap.of(attributeKey, attributeValue); + PubsubMessage message = new PubsubMessage(data, attributes); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + + @Test + public void testValidatePubsubMessageSizeAttributeKeyTooLarge() { + byte[] data = new byte[1024]; + String attributeKey = RandomStringUtils.randomAscii(257); + String attributeValue = "value"; + Map attributes = ImmutableMap.of(attributeKey, attributeValue); + PubsubMessage message = new PubsubMessage(data, attributes); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + + @Test + public void testValidatePubsubMessageSizeAttributeValueTooLarge() { + byte[] data = new byte[1024]; + String attributeKey = "key"; + String attributeValue = RandomStringUtils.randomAscii(1025); + Map attributes = ImmutableMap.of(attributeKey, attributeValue); + PubsubMessage message = new PubsubMessage(data, attributes); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + + @Test + public void testValidatePubsubMessagePayloadTooLarge() { + byte[] data = new byte[(10 << 20) + 1]; + PubsubMessage message = new PubsubMessage(data, null); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java index 75f484d9d29a..02e424bc1c83 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.resourcehints.ResourceHints; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PValues; @@ -52,6 +53,8 @@ public class PubSubWritePayloadTranslationTest { private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); private final PubSubPayloadTranslation.PubSubWritePayloadTranslator sinkTranslator = new PubSubWritePayloadTranslator(); + private final PubSubPayloadTranslation.PubSubDynamicWritePayloadTranslator dynamicSinkTranslator = + new PubSubPayloadTranslation.PubSubDynamicWritePayloadTranslator(); @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); @@ -92,6 +95,44 @@ public void testTranslateSinkWithTopic() throws Exception { assertEquals(ID_ATTRIBUTE, payload.getIdAttribute()); } + @Test + public void testTranslateDynamicSink() throws Exception { + PubsubUnboundedSink pubsubUnboundedSink = + new PubsubUnboundedSink( + null, + StaticValueProvider.of(TOPIC), + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + 0, + 0, + 0, + Duration.ZERO, + null, + null); + PubsubUnboundedSink.PubsubDynamicSink pubsubSink = + new PubsubUnboundedSink.PubsubDynamicSink(pubsubUnboundedSink); + PCollection> input = pipeline.apply(Create.of(KV.of("foo", new byte[0]))); + PDone output = input.apply(pubsubSink); + AppliedPTransform appliedPTransform = + AppliedPTransform.of( + "sink", + PValues.expandInput(input), + PValues.expandOutput(output), + pubsubSink, + ResourceHints.create(), + pipeline); + SdkComponents components = SdkComponents.create(); + components.registerEnvironment(Environments.createDockerEnvironment("java")); + RunnerApi.FunctionSpec spec = dynamicSinkTranslator.translate(appliedPTransform, components); + + assertEquals(PTransformTranslation.PUBSUB_WRITE_DYNAMIC, spec.getUrn()); + PubSubWritePayload payload = PubSubWritePayload.parseFrom(spec.getPayload()); + assertEquals("", payload.getTopic()); + assertTrue(payload.getTopicRuntimeOverridden().isEmpty()); + assertEquals(TIMESTAMP_ATTRIBUTE, payload.getTimestampAttribute()); + assertEquals(ID_ATTRIBUTE, payload.getIdAttribute()); + } + @Test public void testTranslateSinkWithTopicOverridden() throws Exception { ValueProvider runtimeProvider = pipeline.newProvider(TOPIC); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index ce70f4f40793..022608c87d80 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -269,7 +269,8 @@ public void publish( .putAllAttributes(ATTRIBUTES) .build(), MESSAGE_TIME_MS, - RECORD_ID); + RECORD_ID, + null); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); assertEquals(expectedRequest, Iterables.getOnlyElement(requestsReceived)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java index 89ac47244ea6..620f5b228067 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java @@ -150,7 +150,7 @@ public void testConstructPubsubWrite() throws Exception { RunnerApi.PTransform transform = result.getTransform(); assertThat( transform.getSubtransformsList(), - Matchers.hasItem(MatchesPattern.matchesPattern(".*MapElements.*"))); + Matchers.hasItem(MatchesPattern.matchesPattern(".*PreparePubsubWrite.*"))); assertThat( transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*PubsubUnboundedSink.*"))); @@ -167,7 +167,7 @@ public void testConstructPubsubWrite() throws Exception { // test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer RunnerApi.PTransform writeComposite3 = - result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(3)); + result.getComponents().getTransformsOrThrow(writeComposite2.getSubtransforms(4)); // test_namespacetest/PubsubUnboundedSink/PubsubSink/PubsubUnboundedSink.Writer/ParMultiDo(Writer) RunnerApi.PTransform writeParDo = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index c7b3ce764a7f..9c9b1abfc3e0 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThrows; import com.google.api.client.util.Clock; import com.google.protobuf.ByteString; @@ -36,14 +35,13 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import javax.naming.SizeLimitExceededException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.AvroSchema; @@ -67,22 +65,23 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.commons.lang3.RandomStringUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.Instant; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -361,7 +360,7 @@ public boolean equals(@Nullable Object other) { private static final TopicPath TOPIC = PubsubClient.topicPathFromName("test-project", "testTopic"); private static final Clock CLOCK = (Clock & Serializable) () -> 673L; - transient TestPipeline readPipeline; + transient TestPipeline pipeline; private static final String SCHEMA_STRING = "{\"namespace\": \"example.avro\",\n" @@ -391,8 +390,8 @@ public Statement apply(final Statement base, final Description description) { public void evaluate() throws Throwable { options = TestPipeline.testingPipelineOptions(); options.as(PubsubOptions.class).setProject("test-project"); - readPipeline = TestPipeline.fromOptions(options); - readPipeline.apply(base, description).evaluate(); + pipeline = TestPipeline.fromOptions(options); + pipeline.apply(base, description).evaluate(); } }; return withPipeline; @@ -457,13 +456,14 @@ public void testFailedParseWithDeadLetterConfigured() { .putAttributes("pubsubMessageId", "") .build(), 1234L, + null, null)); clientFactory = PubsubTestClient.createFactoryForPullAndPublish( SUBSCRIPTION, TOPIC, CLOCK, 60, expectedReads, expectedWrites, ImmutableList.of()); PCollection read = - readPipeline.apply( + pipeline.apply( PubsubIO.readStrings() .fromSubscription(SUBSCRIPTION.getPath()) .withDeadLetterTopic(TOPIC.getPath()) @@ -478,7 +478,7 @@ public void testFailedParseWithDeadLetterConfigured() { TypeDescriptors.strings()))); PAssert.that(read).empty(); - readPipeline.run(); + pipeline.run(); } @Test @@ -491,13 +491,13 @@ public void testProto() { Primitive.newBuilder().setPrimitiveString("Hello, World!").build()); setupTestClient(inputs, coder); PCollection read = - readPipeline.apply( + pipeline.apply( PubsubIO.readProtos(Primitive.class) .fromSubscription(SUBSCRIPTION.getPath()) .withClock(CLOCK) .withClientFactory(clientFactory)); PAssert.that(read).containsInAnyOrder(inputs); - readPipeline.run(); + pipeline.run(); } @Test @@ -513,7 +513,7 @@ public void testProtoDynamicMessages() { ProtoDomain domain = ProtoDomain.buildFrom(Primitive.getDescriptor()); String name = Primitive.getDescriptor().getFullName(); PCollection read = - readPipeline + pipeline .apply( PubsubIO.readProtoDynamicMessages(domain, name) .fromSubscription(SUBSCRIPTION.getPath()) @@ -535,7 +535,7 @@ public void testProtoDynamicMessages() { })); PAssert.that(read).containsInAnyOrder(inputs); - readPipeline.run(); + pipeline.run(); } @Test @@ -549,7 +549,7 @@ public void testProtoDynamicMessagesFromDescriptor() { setupTestClient(inputs, coder); PCollection read = - readPipeline + pipeline .apply( PubsubIO.readProtoDynamicMessages(Primitive.getDescriptor()) .fromSubscription(SUBSCRIPTION.getPath()) @@ -568,7 +568,7 @@ public void testProtoDynamicMessagesFromDescriptor() { })); PAssert.that(read).containsInAnyOrder(inputs); - readPipeline.run(); + pipeline.run(); } @Test @@ -581,13 +581,13 @@ public void testAvroGenericRecords() { new AvroGeneratedUser("Ted", null, "white")); setupTestClient(inputs, coder); PCollection read = - readPipeline.apply( + pipeline.apply( PubsubIO.readAvroGenericRecords(SCHEMA) .fromSubscription(SUBSCRIPTION.getPath()) .withClock(CLOCK) .withClientFactory(clientFactory)); PAssert.that(read).containsInAnyOrder(inputs); - readPipeline.run(); + pipeline.run(); } @Test @@ -601,13 +601,13 @@ public void testAvroPojo() { 2, "bar", new DateTime().withDate(1986, 10, 1).withZone(DateTimeZone.UTC))); setupTestClient(inputs, coder); PCollection read = - readPipeline.apply( + pipeline.apply( PubsubIO.readAvrosWithBeamSchema(GenericClass.class) .fromSubscription(SUBSCRIPTION.getPath()) .withClock(CLOCK) .withClientFactory(clientFactory)); PAssert.that(read).containsInAnyOrder(inputs); - readPipeline.run(); + pipeline.run(); } @Test @@ -620,13 +620,13 @@ public void testAvroSpecificRecord() { new AvroGeneratedUser("Ted", null, "white")); setupTestClient(inputs, coder); PCollection read = - readPipeline.apply( + pipeline.apply( PubsubIO.readAvrosWithBeamSchema(AvroGeneratedUser.class) .fromSubscription(SUBSCRIPTION.getPath()) .withClock(CLOCK) .withClientFactory(clientFactory)); PAssert.that(read).containsInAnyOrder(inputs); - readPipeline.run(); + pipeline.run(); } @Test @@ -663,7 +663,7 @@ public void testReadMessagesWithCoderAndParseFn() { setupTestClient(inputs, coder); PCollection read = - readPipeline.apply( + pipeline.apply( PubsubIO.readMessagesWithCoderAndParseFn( StringUtf8Coder.of(), new StringPayloadParseFn()) .fromSubscription(SUBSCRIPTION.getPath()) @@ -672,89 +672,74 @@ public void testReadMessagesWithCoderAndParseFn() { List outputs = ImmutableList.of("foo", "bar"); PAssert.that(read).containsInAnyOrder(outputs); - readPipeline.run(); + pipeline.run(); } @Test - public void testValidatePubsubMessageSizeOnlyPayload() throws SizeLimitExceededException { - byte[] data = new byte[1024]; - PubsubMessage message = new PubsubMessage(data, null); - - int messageSize = PubsubIO.validateAndGetPubsubMessageSize(message); - - assertEquals(data.length, messageSize); - } - - @Test - public void testValidatePubsubMessageSizePayloadAndAttributes() - throws SizeLimitExceededException { - byte[] data = new byte[1024]; - String attributeKey = "key"; - String attributeValue = "value"; - Map attributes = ImmutableMap.of(attributeKey, attributeValue); - PubsubMessage message = new PubsubMessage(data, attributes); - - int messageSize = PubsubIO.validateAndGetPubsubMessageSize(message); - - assertEquals( - data.length - + 6 // PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES - + attributeKey.getBytes(StandardCharsets.UTF_8).length - + attributeValue.getBytes(StandardCharsets.UTF_8).length, - messageSize); - } - - @Test - public void testValidatePubsubMessageSizePayloadTooLarge() { - byte[] data = new byte[(10 << 20) + 1]; - PubsubMessage message = new PubsubMessage(data, null); - - assertThrows( - SizeLimitExceededException.class, () -> PubsubIO.validateAndGetPubsubMessageSize(message)); - } - - @Test - public void testValidatePubsubMessageSizePayloadPlusAttributesTooLarge() { - byte[] data = new byte[(10 << 20)]; - String attributeKey = "key"; - String attributeValue = "value"; - Map attributes = ImmutableMap.of(attributeKey, attributeValue); - PubsubMessage message = new PubsubMessage(data, attributes); - - assertThrows( - SizeLimitExceededException.class, () -> PubsubIO.validateAndGetPubsubMessageSize(message)); + public void testDynamicTopicsBounded() throws IOException { + testDynamicTopics(true); } @Test - public void testValidatePubsubMessageSizeAttributeKeyTooLarge() { - byte[] data = new byte[1024]; - String attributeKey = RandomStringUtils.randomAscii(257); - String attributeValue = "value"; - Map attributes = ImmutableMap.of(attributeKey, attributeValue); - PubsubMessage message = new PubsubMessage(data, attributes); - - assertThrows( - SizeLimitExceededException.class, () -> PubsubIO.validateAndGetPubsubMessageSize(message)); + public void testDynamicTopicsUnbounded() throws IOException { + testDynamicTopics(false); } - @Test - public void testValidatePubsubMessageSizeAttributeValueTooLarge() { - byte[] data = new byte[1024]; - String attributeKey = "key"; - String attributeValue = RandomStringUtils.randomAscii(1025); - Map attributes = ImmutableMap.of(attributeKey, attributeValue); - PubsubMessage message = new PubsubMessage(data, attributes); - - assertThrows( - SizeLimitExceededException.class, () -> PubsubIO.validateAndGetPubsubMessageSize(message)); - } - - @Test - public void testValidatePubsubMessagePayloadTooLarge() { - byte[] data = new byte[(10 << 20) + 1]; - PubsubMessage message = new PubsubMessage(data, null); - - assertThrows( - SizeLimitExceededException.class, () -> PubsubIO.validateAndGetPubsubMessageSize(message)); + public void testDynamicTopics(boolean isBounded) throws IOException { + List expectedOutgoing = + ImmutableList.of( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("0")) + .build(), + 0, + null, + "projects/project/topics/topic1"), + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("1")) + .build(), + 1, + null, + "projects/project/topics/topic1"), + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("2")) + .build(), + 2, + null, + "projects/project/topics/topic2"), + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("3")) + .build(), + 3, + null, + "projects/project/topics/topic2")); + + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(null, expectedOutgoing, ImmutableList.of())) { + List> pubsubMessages = + expectedOutgoing.stream() + .map( + o -> + TimestampedValue.of( + new PubsubMessage( + o.getMessage().getData().toByteArray(), + Collections.emptyMap(), + o.recordId()) + .withTopic(o.topic()), + Instant.ofEpochMilli(o.getTimestampMsSinceEpoch()))) + .collect(Collectors.toList()); + + PCollection messages = + pipeline.apply( + Create.timestamped(pubsubMessages).withCoder(new PubsubMessageWithTopicCoder())); + if (!isBounded) { + messages = messages.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); + } + messages.apply(PubsubIO.writeMessagesDynamic().withClientFactory(factory)); + pipeline.run(); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index e815df258961..5de8f68aee82 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -219,7 +219,8 @@ public void publishOneMessage() throws IOException { .setOrderingKey(ORDERING_KEY) .build(), MESSAGE_TIME, - RECORD_ID); + RECORD_ID, + null); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } @@ -247,7 +248,8 @@ public void publishOneMessageWithOnlyTimestampAndIdAttributes() throws IOExcepti .setData(ByteString.copyFromUtf8(DATA)) .build(), MESSAGE_TIME, - RECORD_ID); + RECORD_ID, + null); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } @@ -278,7 +280,8 @@ public void publishOneMessageWithNoTimestampOrIdAttribute() throws IOException { .putAllAttributes(attrs) .build(), MESSAGE_TIME, - RECORD_ID); + RECORD_ID, + null); int n = client.publish(TOPIC, ImmutableList.of(actualMessage)); assertEquals(1, n); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java index b0746392d998..aeeca762c56e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClientTest.java @@ -116,7 +116,8 @@ public void publishOneMessage() throws IOException { OutgoingMessage.of( PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(), MESSAGE_TIME, - MESSAGE_ID); + MESSAGE_ID, + null); try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish( TOPIC, Sets.newHashSet(expectedOutgoingMessage), ImmutableList.of())) { @@ -134,7 +135,8 @@ public void testPullThenPublish() throws IOException { PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(); IncomingMessage expectedIncomingMessage = IncomingMessage.of(message, MESSAGE_TIME, REQ_TIME, ACK_ID, MESSAGE_ID); - OutgoingMessage expectedOutgoingMessage = OutgoingMessage.of(message, MESSAGE_TIME, MESSAGE_ID); + OutgoingMessage expectedOutgoingMessage = + OutgoingMessage.of(message, MESSAGE_TIME, MESSAGE_ID, null); try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPullAndPublish( @@ -164,7 +166,8 @@ public void testPullThenPublish() throws IOException { OutgoingMessage.of( incomingMessage.message(), incomingMessage.timestampMsSinceEpoch(), - incomingMessage.recordId()); + incomingMessage.recordId(), + null); client.publish(TOPIC, ImmutableList.of(actualOutgoingMessage)); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index 418f65551e1a..f79732e74278 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; @@ -34,6 +35,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing; @@ -89,7 +91,8 @@ public void saneCoder() throws Exception { .setData(ByteString.copyFromUtf8(DATA)) .build(), TIMESTAMP, - getRecordId(DATA)); + getRecordId(DATA), + null); CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, message); CoderProperties.coderSerializable(PubsubUnboundedSink.CODER); } @@ -104,7 +107,8 @@ public void sendOneMessage() throws IOException { .putAllAttributes(ATTRIBUTES) .build(), TIMESTAMP, - getRecordId(DATA))); + getRecordId(DATA), + null)); int batchSize = 1; int batchBytes = 1; try (PubsubTestClientFactory factory = @@ -137,7 +141,8 @@ public void sendOneMessageWithoutAttributes() throws IOException { .setData(ByteString.copyFromUtf8(DATA)) .build(), TIMESTAMP, - getRecordId(DATA))); + getRecordId(DATA), + null)); try (PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish(TOPIC, outgoing, ImmutableList.of())) { PubsubUnboundedSink sink = @@ -161,6 +166,71 @@ public void sendOneMessageWithoutAttributes() throws IOException { // message does not match the expected publish message. } + @Test + public void testDynamicTopics() throws IOException { + List outgoing = + ImmutableList.of( + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA + "0")) + .build(), + TIMESTAMP, + getRecordId(DATA + "0"), + "topic1"), + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA + "1")) + .build(), + TIMESTAMP + 1, + getRecordId(DATA + "1"), + "topic1"), + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA + "2")) + .build(), + TIMESTAMP + 2, + getRecordId(DATA + "2"), + "topic2"), + OutgoingMessage.of( + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8(DATA + "3")) + .build(), + TIMESTAMP + 3, + getRecordId(DATA + "3"), + "topic2")); + try (PubsubTestClientFactory factory = + PubsubTestClient.createFactoryForPublish(null, outgoing, ImmutableList.of())) { + PubsubUnboundedSink sink = + new PubsubUnboundedSink( + factory, + null, + TIMESTAMP_ATTRIBUTE, + ID_ATTRIBUTE, + NUM_SHARDS, + 1 /* batchSize */, + 1 /* batchBytes */, + Duration.standardSeconds(2), + RecordIdMethod.DETERMINISTIC, + null); + + List> pubsubMessages = + outgoing.stream() + .map( + o -> + TimestampedValue.of( + new PubsubMessage(o.getMessage().getData().toByteArray(), null) + .withTopic(o.topic()), + Instant.ofEpochMilli(o.getTimestampMsSinceEpoch()))) + .collect(Collectors.toList()); + + p.apply(Create.timestamped(pubsubMessages).withCoder(new PubsubMessageWithTopicCoder())) + .apply(sink); + p.run(); + } + // The PubsubTestClientFactory will assert fail on close if the actual published + // message does not match the expected publish message. + } + @Test public void sendMoreThanOneBatchByNumMessages() throws IOException { List outgoing = new ArrayList<>(); @@ -175,7 +245,8 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { .setData(ByteString.copyFromUtf8(str)) .build(), TIMESTAMP, - getRecordId(str))); + getRecordId(str), + null)); data.add(str); } try (PubsubTestClientFactory factory = @@ -218,7 +289,8 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { .setData(ByteString.copyFromUtf8(str)) .build(), TIMESTAMP, - getRecordId(str))); + getRecordId(str), + null)); data.add(str); n += str.length(); }