diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 48d647779917..b30bea862998 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.pubsub; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; import com.google.api.client.util.DateTime; @@ -30,9 +31,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; /** An (abstract) helper class for talking to Pubsub via an underlying transport. */ @@ -40,6 +45,11 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public abstract class PubsubClient implements Closeable { + private static final Map> + schemaTypeToConversionFnMap = + ImmutableMap.of( + com.google.pubsub.v1.Schema.Type.AVRO.name(), new PubsubAvroDefinitionToSchemaFn()); + /** Factory for creating clients. */ public interface PubsubClientFactory extends Serializable { /** @@ -170,6 +180,53 @@ public static ProjectPath projectPathFromId(String projectId) { return new ProjectPath(String.format("projects/%s", projectId)); } + /** Path representing a Pubsub schema. */ + public static class SchemaPath implements Serializable { + static final String DELETED_SCHEMA_PATH = "_deleted-schema_"; + static final SchemaPath DELETED_SCHEMA = new SchemaPath("", DELETED_SCHEMA_PATH); + private final String projectId; + private final String schemaId; + + SchemaPath(String projectId, String schemaId) { + this.projectId = projectId; + this.schemaId = schemaId; + } + + SchemaPath(String path) { + List splits = Splitter.on('/').splitToList(path); + checkState( + splits.size() == 4 && "projects".equals(splits.get(0)) && "schemas".equals(splits.get(2)), + "Malformed schema path %s: " + + "must be of the form \"projects/\" + + \"schemas\"", + path); + this.projectId = splits.get(1); + this.schemaId = splits.get(3); + } + + public String getPath() { + if (schemaId.equals(DELETED_SCHEMA_PATH)) { + return DELETED_SCHEMA_PATH; + } + return String.format("projects/%s/schemas/%s", projectId, schemaId); + } + + public String getId() { + return schemaId; + } + + public String getProjectId() { + return projectId; + } + } + + public static SchemaPath schemaPathFromPath(String path) { + return new SchemaPath(path); + } + + public static SchemaPath schemaPathFromId(String projectId, String schemaId) { + return new SchemaPath(projectId, schemaId); + } + /** Path representing a Pubsub subscription. */ public static class SubscriptionPath implements Serializable { private final String projectId; @@ -403,6 +460,9 @@ public abstract void modifyAckDeadline( /** Create {@code topic}. */ public abstract void createTopic(TopicPath topic) throws IOException; + /** Create {link TopicPath} with {@link SchemaPath}. */ + public abstract void createTopic(TopicPath topic, SchemaPath schema) throws IOException; + /* * Delete {@code topic}. */ @@ -445,4 +505,51 @@ public abstract List listSubscriptions(ProjectPath project, To * messages have been pulled and the test may complete. */ public abstract boolean isEOF(); + + /** Create {@link com.google.api.services.pubsub.model.Schema} from resource path. */ + public abstract void createSchema( + SchemaPath schemaPath, String resourcePath, com.google.pubsub.v1.Schema.Type type) + throws IOException; + + /** Delete {@link SchemaPath}. */ + public abstract void deleteSchema(SchemaPath schemaPath) throws IOException; + + /** Return {@link SchemaPath} from {@link TopicPath} if exists. */ + public abstract SchemaPath getSchemaPath(TopicPath topicPath) throws IOException; + + /** Return a Beam {@link Schema} from the Pub/Sub schema resource, if exists. */ + public abstract Schema getSchema(SchemaPath schemaPath) throws IOException; + + /** Convert a {@link com.google.api.services.pubsub.model.Schema} to a Beam {@link Schema}. */ + static Schema fromPubsubSchema(com.google.api.services.pubsub.model.Schema pubsubSchema) { + if (!schemaTypeToConversionFnMap.containsKey(pubsubSchema.getType())) { + throw new IllegalArgumentException( + String.format( + "Pub/Sub schema type %s is not supported at this time", pubsubSchema.getType())); + } + SerializableFunction definitionToSchemaFn = + schemaTypeToConversionFnMap.get(pubsubSchema.getType()); + return definitionToSchemaFn.apply(pubsubSchema.getDefinition()); + } + + /** Convert a {@link com.google.pubsub.v1.Schema} to a Beam {@link Schema}. */ + static Schema fromPubsubSchema(com.google.pubsub.v1.Schema pubsubSchema) { + String typeName = pubsubSchema.getType().name(); + if (!schemaTypeToConversionFnMap.containsKey(typeName)) { + throw new IllegalArgumentException( + String.format("Pub/Sub schema type %s is not supported at this time", typeName)); + } + SerializableFunction definitionToSchemaFn = + schemaTypeToConversionFnMap.get(typeName); + return definitionToSchemaFn.apply(pubsubSchema.getDefinition()); + } + + static class PubsubAvroDefinitionToSchemaFn implements SerializableFunction { + @Override + public Schema apply(String definition) { + checkNotNull(definition, "Pub/Sub schema definition is null"); + org.apache.avro.Schema avroSchema = new org.apache.avro.Schema.Parser().parse(definition); + return AvroUtils.toBeamSchema(avroSchema); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index 2c3cc6678aae..3f3ecbcfdbdc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -23,9 +23,14 @@ import com.google.auth.Credentials; import com.google.protobuf.Timestamp; import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.CreateSchemaRequest; +import com.google.pubsub.v1.DeleteSchemaRequest; import com.google.pubsub.v1.DeleteSubscriptionRequest; import com.google.pubsub.v1.DeleteTopicRequest; +import com.google.pubsub.v1.Encoding; +import com.google.pubsub.v1.GetSchemaRequest; import com.google.pubsub.v1.GetSubscriptionRequest; +import com.google.pubsub.v1.GetTopicRequest; import com.google.pubsub.v1.ListSubscriptionsRequest; import com.google.pubsub.v1.ListSubscriptionsResponse; import com.google.pubsub.v1.ListTopicsRequest; @@ -39,6 +44,9 @@ import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.SchemaServiceGrpc; +import com.google.pubsub.v1.SchemaServiceGrpc.SchemaServiceBlockingStub; +import com.google.pubsub.v1.SchemaSettings; import com.google.pubsub.v1.SubscriberGrpc; import com.google.pubsub.v1.SubscriberGrpc.SubscriberBlockingStub; import com.google.pubsub.v1.Subscription; @@ -51,11 +59,17 @@ import io.grpc.netty.NegotiationType; import io.grpc.netty.NettyChannelBuilder; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; @@ -137,6 +151,8 @@ public String getKind() { private SubscriberGrpc.SubscriberBlockingStub cachedSubscriberStub; + private SchemaServiceGrpc.SchemaServiceBlockingStub cachedSchemaServiceStub; + @VisibleForTesting PubsubGrpcClient( @Nullable String timestampAttribute, @@ -161,6 +177,7 @@ public void close() { // Can gc the underlying stubs. cachedPublisherStub = null; cachedSubscriberStub = null; + cachedSchemaServiceStub = null; // Mark the client as having been closed before going further // in case we have an exception from the channel. ManagedChannel publisherChannel = this.publisherChannel; @@ -205,6 +222,14 @@ private SubscriberBlockingStub subscriberStub() throws IOException { return cachedSubscriberStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); } + /** Return a stub for making a schema service request with a timeout. */ + private SchemaServiceBlockingStub schemaServiceStub() throws IOException { + if (cachedSchemaServiceStub == null) { + cachedSchemaServiceStub = SchemaServiceGrpc.newBlockingStub(newChannel()); + } + return cachedSchemaServiceStub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS); + } + @Override public int publish(TopicPath topic, List outgoingMessages) throws IOException { PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath()); @@ -310,6 +335,20 @@ public void createTopic(TopicPath topic) throws IOException { publisherStub().createTopic(request); // ignore Topic result. } + @Override + public void createTopic(TopicPath topic, SchemaPath schema) throws IOException { + Topic request = + Topic.newBuilder() + .setName(topic.getPath()) + .setSchemaSettings( + SchemaSettings.newBuilder() + .setSchema(schema.getPath()) + .setEncoding(Encoding.BINARY) + .build()) + .build(); + publisherStub().createTopic(request); // ignore Topic result. + } + @Override public void deleteTopic(TopicPath topic) throws IOException { DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(topic.getPath()).build(); @@ -396,4 +435,62 @@ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException public boolean isEOF() { return false; } + + /** Create {@link com.google.pubsub.v1.Schema} from resource path. */ + @Override + public void createSchema( + SchemaPath schemaPath, String resourcePath, com.google.pubsub.v1.Schema.Type type) + throws IOException { + + Path path = + FileSystems.getDefault() + .getPath( + Objects.requireNonNull(PubsubGrpcClient.class.getResource(resourcePath)).getPath()); + byte[] b = Files.readAllBytes(path); + String definition = new String(b, StandardCharsets.UTF_8); + + CreateSchemaRequest request = + CreateSchemaRequest.newBuilder() + .setSchemaId(schemaPath.getId()) + .setParent("projects/" + schemaPath.getProjectId()) + .setSchema( + com.google.pubsub.v1.Schema.newBuilder() + .setType(type) + .setDefinition(definition) + .build()) + .build(); + + schemaServiceStub().createSchema(request); // Result is ignored + } + + /** Delete {@link SchemaPath}. */ + @Override + public void deleteSchema(SchemaPath schemaPath) throws IOException { + DeleteSchemaRequest request = + DeleteSchemaRequest.newBuilder().setName(schemaPath.getPath()).build(); + schemaServiceStub().deleteSchema(request); + } + + /** Return {@link SchemaPath} from {@link TopicPath} if exists. */ + @Override + public SchemaPath getSchemaPath(TopicPath topicPath) throws IOException { + GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topicPath.getPath()).build(); + Topic topic = publisherStub().getTopic(request); + SchemaSettings schemaSettings = topic.getSchemaSettings(); + if (schemaSettings.getSchema().isEmpty()) { + return null; + } + String schemaPath = schemaSettings.getSchema(); + if (schemaPath.equals(SchemaPath.DELETED_SCHEMA_PATH)) { + return null; + } + return PubsubClient.schemaPathFromPath(schemaPath); + } + + /** Return a Beam {@link Schema} from the Pub/Sub schema resource, if exists. */ + @Override + public Schema getSchema(SchemaPath schemaPath) throws IOException { + GetSchemaRequest request = GetSchemaRequest.newBuilder().setName(schemaPath.getPath()).build(); + return fromPubsubSchema(schemaServiceStub().getSchema(request)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 9a008041fc68..613ca4581a5e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -47,6 +47,7 @@ import java.util.TreeMap; import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer; import org.apache.beam.sdk.extensions.gcp.util.Transport; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; @@ -277,6 +278,11 @@ public void createTopic(TopicPath topic) throws IOException { .execute(); // ignore Topic result. } + @Override + public void createTopic(TopicPath topic, SchemaPath schema) throws IOException { + throw new UnsupportedOperationException(); + } + @Override public void deleteTopic(TopicPath topic) throws IOException { pubsub.projects().topics().delete(topic.getPath()).execute(); // ignore Empty result. @@ -358,4 +364,40 @@ public int ackDeadlineSeconds(SubscriptionPath subscription) throws IOException public boolean isEOF() { return false; } + + /** Create {@link com.google.api.services.pubsub.model.Schema} from resource path. */ + @Override + public void createSchema( + SchemaPath schemaPath, String resourcePath, com.google.pubsub.v1.Schema.Type type) + throws IOException { + throw new UnsupportedOperationException(); + } + + /** Delete {@link SchemaPath}. */ + @Override + public void deleteSchema(SchemaPath schemaPath) throws IOException { + throw new UnsupportedOperationException(); + } + + /** Return {@link SchemaPath} from {@link TopicPath} if exists. */ + @Override + public SchemaPath getSchemaPath(TopicPath topicPath) throws IOException { + Topic topic = pubsub.projects().topics().get(topicPath.getPath()).execute(); + if (topic.getSchemaSettings() == null) { + return null; + } + String schemaPath = topic.getSchemaSettings().getSchema(); + if (schemaPath.equals(SchemaPath.DELETED_SCHEMA_PATH)) { + return null; + } + return PubsubClient.schemaPathFromPath(schemaPath); + } + + /** Return a Beam {@link Schema} from the Pub/Sub schema resource, if exists. */ + @Override + public Schema getSchema(SchemaPath schemaPath) throws IOException { + com.google.api.services.pubsub.model.Schema pubsubSchema = + pubsub.projects().schemas().get(schemaPath.getPath()).execute(); + return fromPubsubSchema(pubsubSchema); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java index 9d7bbcf67c43..767388e6fa55 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessage.java @@ -47,6 +47,7 @@ import org.joda.time.DateTime; import org.joda.time.Instant; import org.joda.time.ReadableDateTime; +import org.joda.time.format.ISODateTimeFormat; /** Write side {@link Row} to {@link PubsubMessage} converter. */ @Internal @@ -77,6 +78,10 @@ static Builder builder() { static final String PAYLOAD_KEY_NAME = "pubsub_payload"; static final TypeName PAYLOAD_BYTES_TYPE_NAME = TypeName.BYTES; static final TypeName PAYLOAD_ROW_TYPE_NAME = TypeName.ROW; + static final String DEFAULT_ATTRIBUTES_KEY_NAME = DEFAULT_KEY_PREFIX + ATTRIBUTES_KEY_NAME; + static final String DEFAULT_EVENT_TIMESTAMP_KEY_NAME = + DEFAULT_KEY_PREFIX + EVENT_TIMESTAMP_KEY_NAME; + static final String DEFAULT_PAYLOAD_KEY_NAME = DEFAULT_KEY_PREFIX + PAYLOAD_KEY_NAME; /** The prefix for all non-user fields. Defaults to {@link #DEFAULT_KEY_PREFIX}. */ abstract String getKeyPrefix(); @@ -314,6 +319,16 @@ boolean matchesAll(FieldMatcher... fieldMatchers) { } return true; } + + /** Returns true of any {@param fieldMatchers} {@link FieldMatcher#match(Schema)}. */ + boolean matchesAny(FieldMatcher... fieldMatchers) { + for (FieldMatcher fieldMatcher : fieldMatchers) { + if (fieldMatcher.match(schema)) { + return true; + } + } + return false; + } } /** {@link FieldMatcher} matches fields in a {@link Schema}. */ @@ -460,7 +475,7 @@ Map attributesWithoutTimestamp(Row row) { * 2015-10-29T23:41:41.123Z}. */ String timestampAsString(Row row) { - return timestamp(row).toString(); + return ISODateTimeFormat.dateTime().print(timestamp(row)); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformWriteConfiguration.java deleted file mode 100644 index 62c019ecc308..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformWriteConfiguration.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsub; - -import com.google.auto.value.AutoValue; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; - -/** - * Configuration for writing to Pub/Sub. - * - *

Internal only: This class is actively being worked on, and it will likely change. We - * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam - * repository. - */ -@Experimental -@DefaultSchema(AutoValueSchema.class) -@AutoValue -public abstract class PubsubSchemaTransformWriteConfiguration { - - /** - * The topic to which to write Pub/Sub messages. - * - *

See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the - * topic string. - */ - public abstract String getTopic(); - - /** - * The expected format of the Pub/Sub message. - * - *

Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from - * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. - */ - @Nullable - public abstract String getFormat(); - - /** - * When writing to Cloud Pub/Sub where record timestamps are configured as Pub/Sub message - * attributes, specifies the name of the attribute that contains the timestamp. - */ - @Nullable - public abstract String getTimestampAttribute(); - - /** - * When writing to Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub message - * attributes, specifies the name of the attribute containing the unique identifier. - */ - @Nullable - public abstract String getIdAttribute(); - - /** Builder for {@link PubsubSchemaTransformWriteConfiguration}. */ - @AutoValue.Builder - public abstract static class Builder { - - /** - * The topic to which to write Pub/Sub messages. - * - *

See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the - * topic string. - */ - public abstract Builder setTopic(String value); - - /** - * The expected format of the Pub/Sub message. - * - *

Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} - * from {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. - */ - public abstract Builder setFormat(String value); - - /** - * When writing to Cloud Pub/Sub where record timestamps are configured as Pub/Sub message - * attributes, specifies the name of the attribute that contains the timestamp. - */ - public abstract Builder setTimestampAttribute(String value); - - /** - * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub - * message attributes, specifies the name of the attribute containing the unique identifier. - */ - public abstract Builder setIdAttribute(String value); - - /** Builds a {@link PubsubSchemaTransformWriteConfiguration} instance. */ - public abstract PubsubSchemaTransformWriteConfiguration build(); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index 6c2d3af1877f..4ad8f9fab397 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.checkerframework.checker.nullness.qual.Nullable; @@ -88,6 +89,12 @@ private static class State { /** Pull mode only: When above messages are due to have their ACK deadlines expire. */ @Nullable Map ackDeadline; + + /** The Pub/Sub schema resource path. */ + @Nullable SchemaPath expectedSchemaPath; + + /** Expected Pub/sub mapped Beam Schema. */ + @Nullable Schema expectedSchema; } private static final State STATE = new State(); @@ -227,6 +234,43 @@ public String getKind() { }; } + public static PubsubTestClientFactory createFactoryForGetSchema( + TopicPath expectedTopic, + @Nullable SchemaPath expectedSchemaPath, + @Nullable Schema expectedSchema) { + return new PubsubTestClientFactory() { + @Override + public void close() { + deactivate(() -> {}); + } + + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, + @Nullable String idAttribute, + PubsubOptions options, + @Nullable String rootUrlOverride) { + activate( + () -> { + setSchemaState(expectedTopic, expectedSchemaPath, expectedSchema); + }); + return new PubsubTestClient(); + } + + @Override + public PubsubClient newClient( + @Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options) + throws IOException { + return newClient(timestampAttribute, idAttribute, options, null); + } + + @Override + public String getKind() { + return "GetSchemaTest"; + } + }; + } + /** * Activates {@link PubsubTestClientFactory} state for the test. This can only be called once per * test. @@ -289,6 +333,15 @@ private static void setPullState( STATE.ackDeadline = new HashMap<>(); } + private static void setSchemaState( + TopicPath expectedTopic, + @Nullable SchemaPath expectedSchemaPath, + @Nullable Schema expectedSchema) { + STATE.expectedTopic = expectedTopic; + STATE.expectedSchemaPath = expectedSchemaPath; + STATE.expectedSchema = expectedSchema; + } + /** Handles verifying {@code STATE} at end of publish test. */ private static void performFinalPublishStateChecks() { checkState(STATE.isActive, "No test still in flight"); @@ -524,6 +577,11 @@ public void createTopic(TopicPath topic) throws IOException { throw new UnsupportedOperationException(); } + @Override + public void createTopic(TopicPath topic, SchemaPath schema) throws IOException { + throw new UnsupportedOperationException(); + } + @Override public void deleteTopic(TopicPath topic) throws IOException { throw new UnsupportedOperationException(); @@ -565,4 +623,27 @@ public boolean isEOF() { return STATE.remainingPendingIncomingMessages.isEmpty(); } } + + @Override + public void createSchema( + SchemaPath schemaPath, String resourcePath, com.google.pubsub.v1.Schema.Type type) + throws IOException { + throw new UnsupportedOperationException(); + } + + /** Delete {@link SchemaPath}. */ + @Override + public void deleteSchema(SchemaPath schemaPath) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public SchemaPath getSchemaPath(TopicPath topicPath) throws IOException { + return STATE.expectedSchemaPath; + } + + @Override + public Schema getSchema(SchemaPath schemaPath) throws IOException { + return STATE.expectedSchema; + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java new file mode 100644 index 000000000000..eced2405eb21 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; + +/** + * Configuration for writing to Pub/Sub. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@Experimental +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class PubsubWriteSchemaTransformConfiguration { + + public static final String DEFAULT_TIMESTAMP_ATTRIBUTE = "event_timestamp"; + + public static Builder builder() { + return new AutoValue_PubsubWriteSchemaTransformConfiguration.Builder(); + } + + public static TargetConfiguration.Builder targetConfigurationBuilder() { + return new AutoValue_PubsubWriteSchemaTransformConfiguration_TargetConfiguration.Builder() + .setTimestampAttributeKey(DEFAULT_TIMESTAMP_ATTRIBUTE); + } + + public static SourceConfiguration.Builder sourceConfigurationBuilder() { + return new AutoValue_PubsubWriteSchemaTransformConfiguration_SourceConfiguration.Builder(); + } + + /** + * Configuration details of the source {@link org.apache.beam.sdk.values.Row} {@link + * org.apache.beam.sdk.schemas.Schema}. + */ + @Nullable + public abstract SourceConfiguration getSource(); + + /** Configuration details of the target {@link PubsubMessage}. */ + public abstract TargetConfiguration getTarget(); + + /** + * The topic to which to write Pub/Sub messages. + * + *

See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the + * topic string. + */ + public abstract String getTopic(); + + /** + * The expected format of the Pub/Sub message. + * + *

Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from + * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See list of supported + * values by invoking {@link org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}. + * + *

{@code Providers.loadProviders(PayloadSerializer.class).keySet()}
+ */ + @Nullable + public abstract String getFormat(); + + /** + * When writing to Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub message + * attributes, specifies the name of the attribute containing the unique identifier. + */ + @Nullable + public abstract String getIdAttribute(); + + /** Builder for {@link PubsubWriteSchemaTransformConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + + /** + * Configuration details of the source {@link org.apache.beam.sdk.values.Row} {@link + * org.apache.beam.sdk.schemas.Schema}. + */ + public abstract Builder setSource(SourceConfiguration value); + + /** Configuration details of the target {@link PubsubMessage}. */ + public abstract Builder setTarget(TargetConfiguration value); + + /** + * The topic to which to write Pub/Sub messages. + * + *

See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the + * topic string. + */ + public abstract Builder setTopic(String value); + + /** + * The expected format of the Pub/Sub message. + * + *

Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} + * from {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See list of + * supported values by invoking {@link + * org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}. + * + *

{@code Providers.loadProviders(PayloadSerializer.class).keySet()}
+ */ + public abstract Builder setFormat(String value); + + /** + * When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub + * message attributes, specifies the name of the attribute containing the unique identifier. + */ + public abstract Builder setIdAttribute(String value); + + abstract PubsubWriteSchemaTransformConfiguration build(); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class SourceConfiguration { + /** + * The attributes field name of the source {@link org.apache.beam.sdk.values.Row}. {@link + * org.apache.beam.sdk.schemas.Schema.FieldType} must be a Map<String, String> + * + */ + @Nullable + public abstract String getAttributesFieldName(); + + /** + * The timestamp field name of the source {@link org.apache.beam.sdk.values.Row}. {@link + * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link + * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}. + */ + @Nullable + public abstract String getTimestampFieldName(); + + /** + * The payload field name of the source {@link org.apache.beam.sdk.values.Row}. {@link + * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link + * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link + * org.apache.beam.sdk.values.Row}. If null, payload serialized from user fields other than + * attributes. Not compatible with other payload intended fields. + */ + @Nullable + public abstract String getPayloadFieldName(); + + @AutoValue.Builder + public abstract static class Builder { + /** + * The attributes field name of the source {@link org.apache.beam.sdk.values.Row}. {@link + * org.apache.beam.sdk.schemas.Schema.FieldType} must be a Map<String, String> + * + */ + public abstract Builder setAttributesFieldName(String value); + + /** + * The timestamp field name of the source {@link org.apache.beam.sdk.values.Row}. {@link + * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link + * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}. + */ + public abstract Builder setTimestampFieldName(String value); + + /** + * The payload field name of the source {@link org.apache.beam.sdk.values.Row}. {@link + * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link + * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link + * org.apache.beam.sdk.values.Row}. If null, payload serialized from user fields other than + * attributes. Not compatible with other payload intended fields. + */ + public abstract Builder setPayloadFieldName(String value); + + public abstract SourceConfiguration build(); + } + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class TargetConfiguration { + + /** + * The attribute key to assign the {@link PubsubMessage} stringified timestamp value. {@link + * #builder()} method defaults value to {@link #DEFAULT_TIMESTAMP_ATTRIBUTE}. + */ + public abstract String getTimestampAttributeKey(); + + @AutoValue.Builder + public abstract static class Builder { + + /** + * The attribute key to assign the {@link PubsubMessage} stringified timestamp value. Defaults + * to {@link #DEFAULT_TIMESTAMP_ATTRIBUTE}. + */ + public abstract Builder setTimestampAttributeKey(String value); + + public abstract TargetConfiguration build(); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java new file mode 100644 index 000000000000..cc68cc7772da --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ERROR; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.OUTPUT; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_ROW_TYPE_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.removeFields; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.api.client.util.Clock; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.FieldMatcher; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.SchemaReflection; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration.SourceConfiguration; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.io.Providers; +import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; +import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider; +import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Instant; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads configured using + * {@link PubsubWriteSchemaTransformConfiguration}. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@Internal +@Experimental(Kind.SCHEMAS) +public class PubsubWriteSchemaTransformProvider + extends TypedSchemaTransformProvider { + private static final String IDENTIFIER = "beam:schematransform:org.apache.beam:pubsub_write:v1"; + static final String INPUT_TAG = "input"; + static final String ERROR_TAG = "error"; + + /** Returns the expected class of the configuration. */ + @Override + protected Class configurationClass() { + return PubsubWriteSchemaTransformConfiguration.class; + } + + /** Returns the expected {@link SchemaTransform} of the configuration. */ + @Override + protected SchemaTransform from(PubsubWriteSchemaTransformConfiguration configuration) { + return new PubsubWriteSchemaTransform(configuration); + } + + /** Implementation of the {@link SchemaTransformProvider} identifier method. */ + @Override + public String identifier() { + return IDENTIFIER; + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a + * single input is expected, this returns a list with a single name. + */ + @Override + public List inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + /** + * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. The + * only expected output is the {@link #ERROR_TAG}. + */ + @Override + public List outputCollectionNames() { + return Collections.singletonList(ERROR_TAG); + } + + /** + * An implementation of {@link SchemaTransform} for Pub/Sub writes configured using {@link + * PubsubWriteSchemaTransformConfiguration}. + */ + static class PubsubWriteSchemaTransform + extends PTransform implements SchemaTransform { + + private final PubsubWriteSchemaTransformConfiguration configuration; + + private PubsubClient.PubsubClientFactory pubsubClientFactory; + + PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration configuration) { + this.configuration = configuration; + } + + PubsubWriteSchemaTransform withPubsubClientFactory(PubsubClient.PubsubClientFactory factory) { + this.pubsubClientFactory = factory; + return this; + } + + /** Implements {@link SchemaTransform} buildTransform method. */ + @Override + public PTransform buildTransform() { + return this; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + if (input.getAll().size() != 1 || !input.has(INPUT_TAG)) { + throw new IllegalArgumentException( + String.format( + "%s %s input is expected to contain a single %s tagged PCollection", + input.getClass().getSimpleName(), getClass().getSimpleName(), INPUT_TAG)); + } + + PCollection rows = input.get(INPUT_TAG); + if (rows.getSchema().getFieldCount() == 0) { + throw new IllegalArgumentException(String.format("empty Schema for %s", INPUT_TAG)); + } + + Schema targetSchema = buildTargetSchema(rows.getSchema()); + + rows = + rows.apply( + ConvertForRowToMessage.class.getSimpleName(), + convertForRowToMessage(targetSchema)) + .setRowSchema(targetSchema); + + Schema schema = rows.getSchema(); + + Schema serializableSchema = + removeFields(schema, DEFAULT_ATTRIBUTES_KEY_NAME, DEFAULT_EVENT_TIMESTAMP_KEY_NAME); + FieldMatcher payloadRowMatcher = FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW); + if (payloadRowMatcher.match(serializableSchema)) { + serializableSchema = + serializableSchema.getField(DEFAULT_PAYLOAD_KEY_NAME).getType().getRowSchema(); + } + + validateTargetSchemaAgainstPubsubSchema(serializableSchema, input.getPipeline().getOptions()); + + PCollectionTuple pct = + rows.apply( + PubsubRowToMessage.class.getSimpleName(), + buildPubsubRowToMessage(serializableSchema)); + + PCollection messages = pct.get(OUTPUT); + messages.apply(PubsubIO.Write.class.getSimpleName(), buildPubsubWrite()); + return PCollectionRowTuple.of(ERROR_TAG, pct.get(ERROR)); + } + + PayloadSerializer getPayloadSerializer(Schema schema) { + if (configuration.getFormat() == null) { + return null; + } + String format = configuration.getFormat(); + Set availableFormats = + Providers.loadProviders(PayloadSerializerProvider.class).keySet(); + if (!availableFormats.contains(format)) { + String availableFormatsString = String.join(",", availableFormats); + throw new IllegalArgumentException( + String.format( + "%s is not among the valid formats: [%s]", format, availableFormatsString)); + } + return PayloadSerializers.getSerializer(configuration.getFormat(), schema, ImmutableMap.of()); + } + + PubsubRowToMessage buildPubsubRowToMessage(Schema schema) { + PubsubRowToMessage.Builder builder = + PubsubRowToMessage.builder().setPayloadSerializer(getPayloadSerializer(schema)); + + if (configuration.getTarget() != null) { + builder = + builder.setTargetTimestampAttributeName( + configuration.getTarget().getTimestampAttributeKey()); + } + + return builder.build(); + } + + PubsubIO.Write buildPubsubWrite() { + PubsubIO.Write write = PubsubIO.writeMessages().to(configuration.getTopic()); + + if (configuration.getIdAttribute() != null) { + write = write.withIdAttribute(configuration.getIdAttribute()); + } + + if (pubsubClientFactory != null) { + write = write.withClientFactory(pubsubClientFactory); + } + + return write; + } + + void validateSourceSchemaAgainstConfiguration(Schema sourceSchema) { + if (sourceSchema.getFieldCount() == 0) { + throw new IllegalArgumentException(String.format("empty Schema for %s", INPUT_TAG)); + } + + if (configuration.getSource() == null) { + return; + } + + SourceConfiguration source = configuration.getSource(); + + if (source.getAttributesFieldName() != null) { + String fieldName = source.getAttributesFieldName(); + FieldType fieldType = ATTRIBUTES_FIELD_TYPE; + FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType); + checkArgument( + fieldMatcher.match(sourceSchema), + String.format("schema missing field: %s for type %s: ", fieldName, fieldType)); + } + + if (source.getTimestampFieldName() != null) { + String fieldName = source.getTimestampFieldName(); + FieldType fieldType = EVENT_TIMESTAMP_FIELD_TYPE; + FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType); + checkArgument( + fieldMatcher.match(sourceSchema), + String.format("schema missing field: %s for type: %s", fieldName, fieldType)); + } + + if (source.getPayloadFieldName() == null) { + return; + } + + String fieldName = source.getPayloadFieldName(); + FieldMatcher bytesFieldMatcher = FieldMatcher.of(fieldName, PAYLOAD_BYTES_TYPE_NAME); + FieldMatcher rowFieldMatcher = FieldMatcher.of(fieldName, PAYLOAD_ROW_TYPE_NAME); + SchemaReflection schemaReflection = SchemaReflection.of(sourceSchema); + checkArgument( + schemaReflection.matchesAny(bytesFieldMatcher, rowFieldMatcher), + String.format( + "schema missing field: %s for types %s or %s", + fieldName, PAYLOAD_BYTES_TYPE_NAME, PAYLOAD_ROW_TYPE_NAME)); + + String[] fieldsToExclude = + Stream.of( + source.getAttributesFieldName(), + source.getTimestampFieldName(), + source.getPayloadFieldName()) + .filter(Objects::nonNull) + .toArray(String[]::new); + + Schema userFieldsSchema = removeFields(sourceSchema, fieldsToExclude); + + if (userFieldsSchema.getFieldCount() > 0) { + throw new IllegalArgumentException( + String.format("user fields incompatible with %s field", source.getPayloadFieldName())); + } + } + + void validateTargetSchemaAgainstPubsubSchema(Schema targetSchema, PipelineOptions options) { + checkArgument(options != null); + + try (PubsubClient pubsubClient = getPubsubClient(options.as(PubsubOptions.class))) { + PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromPath(configuration.getTopic()); + PubsubClient.SchemaPath schemaPath = pubsubClient.getSchemaPath(topicPath); + if (schemaPath == null || schemaPath.equals(SchemaPath.DELETED_SCHEMA)) { + return; + } + Schema expectedSchema = pubsubClient.getSchema(schemaPath); + checkState( + targetSchema.equals(expectedSchema), + String.format( + "input schema mismatch with expected schema at path: %s\ninput schema: %s\nPub/Sub schema: %s", + schemaPath, targetSchema, expectedSchema)); + } catch (IOException e) { + throw new IllegalStateException(e.getMessage()); + } + } + + Schema buildTargetSchema(Schema sourceSchema) { + validateSourceSchemaAgainstConfiguration(sourceSchema); + FieldType payloadFieldType = null; + + List fieldsToRemove = new ArrayList<>(); + + if (configuration.getSource() != null) { + SourceConfiguration source = configuration.getSource(); + + if (source.getAttributesFieldName() != null) { + fieldsToRemove.add(source.getAttributesFieldName()); + } + + if (source.getTimestampFieldName() != null) { + fieldsToRemove.add(source.getTimestampFieldName()); + } + + if (source.getPayloadFieldName() != null) { + String fieldName = source.getPayloadFieldName(); + Field field = sourceSchema.getField(fieldName); + payloadFieldType = field.getType(); + fieldsToRemove.add(fieldName); + } + } + + Schema targetSchema = + PubsubRowToMessage.builder() + .build() + .inputSchemaFactory(payloadFieldType) + .buildSchema(sourceSchema.getFields().toArray(new Field[0])); + + return removeFields(targetSchema, fieldsToRemove.toArray(new String[0])); + } + + private PubsubClient.PubsubClientFactory getPubsubClientFactory() { + if (pubsubClientFactory != null) { + return pubsubClientFactory; + } + return PubsubGrpcClient.FACTORY; + } + + private PubsubClient getPubsubClient(PubsubOptions options) throws IOException { + return getPubsubClientFactory() + .newClient( + configuration.getTarget().getTimestampAttributeKey(), + configuration.getIdAttribute(), + options); + } + + ParDo.SingleOutput convertForRowToMessage(Schema targetSchema) { + return convertForRowToMessage(targetSchema, null); + } + + ParDo.SingleOutput convertForRowToMessage( + Schema targetSchema, @Nullable Clock clock) { + String attributesName = null; + String timestampName = null; + String payloadName = null; + SourceConfiguration source = configuration.getSource(); + if (source != null) { + attributesName = source.getAttributesFieldName(); + timestampName = source.getTimestampFieldName(); + payloadName = source.getPayloadFieldName(); + } + return ParDo.of( + new ConvertForRowToMessage( + targetSchema, clock, attributesName, timestampName, payloadName)); + } + } + + private static class ConvertForRowToMessage extends DoFn { + private final Schema targetSchema; + @Nullable private final Clock clock; + @Nullable private final String attributesFieldName; + @Nullable private final String timestampFieldName; + @Nullable private final String payloadFieldName; + + ConvertForRowToMessage( + Schema targetSchema, + @Nullable Clock clock, + @Nullable String attributesFieldName, + @Nullable String timestampFieldName, + @Nullable String payloadFieldName) { + this.targetSchema = targetSchema; + this.clock = clock; + this.attributesFieldName = attributesFieldName; + this.timestampFieldName = timestampFieldName; + this.payloadFieldName = payloadFieldName; + } + + @ProcessElement + public void process(@Element Row row, OutputReceiver receiver) { + Instant now = Instant.now(); + if (clock != null) { + now = Instant.ofEpochMilli(clock.currentTimeMillis()); + } + Map values = new HashMap<>(); + + // Default attributes value + checkState(targetSchema.hasField(DEFAULT_ATTRIBUTES_KEY_NAME)); + values.put(DEFAULT_ATTRIBUTES_KEY_NAME, ImmutableMap.of()); + + // Default timestamp value + checkState(targetSchema.hasField(DEFAULT_EVENT_TIMESTAMP_KEY_NAME)); + values.put(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, now); + + for (String fieldName : row.getSchema().getFieldNames()) { + if (targetSchema.hasField(fieldName)) { + values.put(fieldName, row.getValue(fieldName)); + } + + if (attributesFieldName != null) { + values.put(DEFAULT_ATTRIBUTES_KEY_NAME, row.getValue(attributesFieldName)); + } + if (timestampFieldName != null) { + values.put(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, row.getValue(timestampFieldName)); + } + if (payloadFieldName != null) { + values.put(DEFAULT_PAYLOAD_KEY_NAME, row.getValue(payloadFieldName)); + } + } + receiver.output(Row.withSchema(targetSchema).withFieldValues(values).build()); + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java index ab62c1e907a5..8defeec289f3 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClientTest.java @@ -18,9 +18,13 @@ package org.apache.beam.sdk.io.gcp.pubsub; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import com.google.pubsub.v1.Schema; import java.util.Map; +import org.apache.avro.SchemaParseException; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -175,4 +179,112 @@ public void topicPathFromNameWellFormed() { assertEquals("projects/test/topics/something", path.getPath()); assertEquals("/topics/test/something", path.getFullPath()); } + + @Test + public void schemaPathFromIdPathWellFormed() { + SchemaPath path = PubsubClient.schemaPathFromId("projectId", "schemaId"); + assertEquals("projects/projectId/schemas/schemaId", path.getPath()); + assertEquals("schemaId", path.getId()); + } + + @Test + public void schemaPathFromPathWellFormed() { + SchemaPath path = PubsubClient.schemaPathFromPath("projects/projectId/schemas/schemaId"); + assertEquals("projects/projectId/schemas/schemaId", path.getPath()); + assertEquals("schemaId", path.getId()); + } + + @Test + public void fromPubsubSchema() { + assertThrows( + "null definition should throw an exception", + NullPointerException.class, + () -> + PubsubClient.fromPubsubSchema( + new com.google.api.services.pubsub.model.Schema().setType("AVRO"))); + + assertThrows( + "null definition should throw an exception", + NullPointerException.class, + () -> + PubsubClient.fromPubsubSchema( + com.google.pubsub.v1.Schema.newBuilder().setType(Schema.Type.AVRO).build())); + + String badSchema = + "{\"type\": \"record\", \"name\": \"Avro\",\"fields\": [{\"name\": \"bad\", \"type\": \"notatype\"}]}"; + String goodSchema = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"Avro\"," + + " \"fields\" : [" + + " {" + + " \"name\" : \"StringField\"," + + " \"type\" : \"string\"" + + " }," + + " {" + + " \"name\" : \"FloatField\"," + + " \"type\" : \"float\"" + + " }," + + " {" + + " \"name\" : \"IntField\"," + + " \"type\" : \"int\"" + + " }," + + " {" + + " \"name\" : \"LongField\"," + + " \"type\" : \"long\"" + + " }," + + " {" + + " \"name\" : \"DoubleField\"," + + " \"type\" : \"double\"" + + " }," + + " {" + + " \"name\" : \"BytesField\"," + + " \"type\" : \"bytes\"" + + " }," + + " {" + + " \"name\" : \"BooleanField\"," + + " \"type\" : \"boolean\"" + + " }" + + " ]" + + "}"; + + assertThrows( + "unsupported Schema type should throw an exception", + IllegalArgumentException.class, + () -> + PubsubClient.fromPubsubSchema( + new com.google.api.services.pubsub.model.Schema() + .setType("PROTOCOL_BUFFER") + .setDefinition(goodSchema))); + + assertThrows( + "'notatype' Avro type should throw an exception", + SchemaParseException.class, + () -> + PubsubClient.fromPubsubSchema( + new com.google.api.services.pubsub.model.Schema() + .setType("AVRO") + .setDefinition(badSchema))); + + assertEquals( + org.apache.beam.sdk.schemas.Schema.of( + org.apache.beam.sdk.schemas.Schema.Field.of( + "StringField", org.apache.beam.sdk.schemas.Schema.FieldType.STRING), + org.apache.beam.sdk.schemas.Schema.Field.of( + "FloatField", org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT), + org.apache.beam.sdk.schemas.Schema.Field.of( + "IntField", org.apache.beam.sdk.schemas.Schema.FieldType.INT32), + org.apache.beam.sdk.schemas.Schema.Field.of( + "LongField", org.apache.beam.sdk.schemas.Schema.FieldType.INT64), + org.apache.beam.sdk.schemas.Schema.Field.of( + "DoubleField", org.apache.beam.sdk.schemas.Schema.FieldType.DOUBLE), + org.apache.beam.sdk.schemas.Schema.Field.of( + "BytesField", org.apache.beam.sdk.schemas.Schema.FieldType.BYTES), + org.apache.beam.sdk.schemas.Schema.Field.of( + "BooleanField", org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN)), + PubsubClient.fromPubsubSchema( + new com.google.api.services.pubsub.model.Schema() + .setType("AVRO") + .setDefinition(goodSchema))); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index 8eeacc47e671..ce70f4f40793 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -18,10 +18,14 @@ package org.apache.beam.sdk.io.gcp.pubsub; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import com.google.auth.Credentials; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; +import com.google.pubsub.v1.GetSchemaRequest; +import com.google.pubsub.v1.GetTopicRequest; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase; @@ -29,9 +33,14 @@ import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.Schema; +import com.google.pubsub.v1.SchemaServiceGrpc.SchemaServiceImplBase; +import com.google.pubsub.v1.SchemaSettings; import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase; +import com.google.pubsub.v1.Topic; import io.grpc.ManagedChannel; import io.grpc.Server; +import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; @@ -43,8 +52,11 @@ import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; @@ -66,6 +78,8 @@ public class PubsubGrpcClientTest { private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); private static final SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + private static final SchemaPath SCHEMA = + PubsubClient.schemaPathFromId("testProject", "testSchemaId"); private static final long REQ_TIME_MS = 1234L; private static final long PUB_TIME_MS = 3456L; private static final long MESSAGE_TIME_MS = 6789L; @@ -263,4 +277,158 @@ public void publish( server.shutdownNow(); } } + + @Test + public void getSchemaPath() throws IOException { + initializeClient(null, null); + TopicPath topicDoesNotExist = + PubsubClient.topicPathFromPath("projects/testProject/topics/idontexist"); + TopicPath topicExistsDeletedSchema = + PubsubClient.topicPathFromPath("projects/testProject/topics/deletedSchema"); + TopicPath topicExistsNoSchema = + PubsubClient.topicPathFromPath("projects/testProject/topics/noSchema"); + TopicPath topicExistsSchema = + PubsubClient.topicPathFromPath("projects/testProject/topics/topicWithSchema"); + PublisherImplBase publisherImplBase = + new PublisherImplBase() { + @Override + public void getTopic(GetTopicRequest request, StreamObserver responseObserver) { + String topicPath = request.getTopic(); + if (topicPath.equals(topicDoesNotExist.getPath())) { + responseObserver.onError( + new IOException(String.format("%s does not exist", topicPath))); + } + if (topicPath.equals(topicExistsDeletedSchema.getPath())) { + responseObserver.onNext( + Topic.newBuilder() + .setName(topicPath) + .setSchemaSettings( + SchemaSettings.newBuilder() + .setSchema(SchemaPath.DELETED_SCHEMA_PATH) + .build()) + .build()); + responseObserver.onCompleted(); + } + if (topicPath.equals(topicExistsNoSchema.getPath())) { + responseObserver.onNext(Topic.newBuilder().setName(topicPath).build()); + responseObserver.onCompleted(); + } + if (topicPath.equals(topicExistsSchema.getPath())) { + responseObserver.onNext( + Topic.newBuilder() + .setName(topicPath) + .setSchemaSettings( + SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build()) + .build()); + responseObserver.onCompleted(); + } + } + }; + Server server = + InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start(); + try { + assertThrows( + "topic does not exist", + StatusRuntimeException.class, + () -> client.getSchemaPath(topicDoesNotExist)); + + assertNull( + "topic with deleted Schema should return null SchemaPath", + client.getSchemaPath(topicExistsDeletedSchema)); + + assertNull( + "topic without Schema should return null SchemaPath", + client.getSchemaPath(topicExistsNoSchema)); + + assertEquals(SCHEMA.getPath(), client.getSchemaPath(topicExistsSchema).getPath()); + + } finally { + server.shutdownNow(); + } + } + + @Test + public void getAvroSchema() throws IOException { + String schemaDefinition = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"Avro\"," + + " \"fields\" : [" + + " {" + + " \"name\" : \"StringField\"," + + " \"type\" : \"string\"" + + " }," + + " {" + + " \"name\" : \"FloatField\"," + + " \"type\" : \"float\"" + + " }," + + " {" + + " \"name\" : \"BooleanField\"," + + " \"type\" : \"boolean\"" + + " }" + + " ]" + + "}"; + initializeClient(null, null); + final Schema schema = + com.google.pubsub.v1.Schema.newBuilder() + .setName(SCHEMA.getPath()) + .setType(Schema.Type.AVRO) + .setDefinition(schemaDefinition) + .build(); + SchemaServiceImplBase schemaImplBase = + new SchemaServiceImplBase() { + @Override + public void getSchema(GetSchemaRequest request, StreamObserver responseObserver) { + if (request.getName().equals(SCHEMA.getPath())) { + responseObserver.onNext(schema); + responseObserver.onCompleted(); + } + } + }; + Server server = + InProcessServerBuilder.forName(channelName).addService(schemaImplBase).build().start(); + try { + assertEquals( + org.apache.beam.sdk.schemas.Schema.of( + Field.of("StringField", FieldType.STRING), + Field.of("FloatField", FieldType.FLOAT), + Field.of("BooleanField", FieldType.BOOLEAN)), + client.getSchema(SCHEMA)); + } finally { + server.shutdownNow(); + } + } + + @Test + public void getProtoSchema() throws IOException { + String schemaDefinition = + "syntax = \"proto3\"; message ProtocolBuffer { string string_field = 1; int32 int_field = 2; }"; + initializeClient(null, null); + final Schema schema = + com.google.pubsub.v1.Schema.newBuilder() + .setName(SCHEMA.getPath()) + .setType(Schema.Type.PROTOCOL_BUFFER) + .setDefinition(schemaDefinition) + .build(); + SchemaServiceImplBase schemaImplBase = + new SchemaServiceImplBase() { + @Override + public void getSchema(GetSchemaRequest request, StreamObserver responseObserver) { + if (request.getName().equals(SCHEMA.getPath())) { + responseObserver.onNext(schema); + responseObserver.onCompleted(); + } + } + }; + Server server = + InProcessServerBuilder.forName(channelName).addService(schemaImplBase).build().start(); + try { + assertThrows( + "Pub/Sub Schema type PROTOCOL_BUFFER is not supported at this time", + IllegalArgumentException.class, + () -> client.getSchema(SCHEMA)); + } finally { + server.shutdownNow(); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index da64d7bd3a84..e815df258961 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -19,6 +19,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; import com.google.api.services.pubsub.Pubsub; @@ -32,6 +34,8 @@ import com.google.api.services.pubsub.model.PullRequest; import com.google.api.services.pubsub.model.PullResponse; import com.google.api.services.pubsub.model.ReceivedMessage; +import com.google.api.services.pubsub.model.Schema; +import com.google.api.services.pubsub.model.SchemaSettings; import com.google.api.services.pubsub.model.Subscription; import com.google.api.services.pubsub.model.Topic; import com.google.protobuf.ByteString; @@ -65,6 +69,9 @@ public class PubsubJsonClientTest { private static final TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic"); private static final SubscriptionPath SUBSCRIPTION = PubsubClient.subscriptionPathFromName("testProject", "testSubscription"); + + private static final PubsubClient.SchemaPath SCHEMA = + PubsubClient.schemaPathFromId("testProject", "testSchemaId"); private static final long REQ_TIME = 1234L; private static final long PUB_TIME = 3456L; private static final long MESSAGE_TIME = 6789L; @@ -322,4 +329,97 @@ private static Subscription buildSubscription(int i) { subscription.setTopic(PubsubClient.topicPathFromName(PROJECT.getId(), "Topic" + i).getPath()); return subscription; } + + @Test + public void testGetSchemaPath() throws IOException { + TopicPath topicDoesNotExist = + PubsubClient.topicPathFromPath("projects/testProject/topics/idontexist"); + TopicPath topicExistsDeletedSchema = + PubsubClient.topicPathFromPath("projects/testProject/topics/deletedSchema"); + TopicPath topicExistsNoSchema = + PubsubClient.topicPathFromPath("projects/testProject/topics/noSchema"); + TopicPath topicExistsSchema = + PubsubClient.topicPathFromPath("projects/testProject/topics/topicWithSchema"); + when(mockPubsub.projects().topics().get(topicDoesNotExist.getPath()).execute()) + .thenThrow( + new IOException( + String.format("topic does not exist: %s", topicDoesNotExist.getPath()))); + when(mockPubsub.projects().topics().get(topicExistsDeletedSchema.getPath()).execute()) + .thenReturn( + new Topic() + .setName(topicExistsDeletedSchema.getName()) + .setSchemaSettings( + new SchemaSettings().setSchema(PubsubClient.SchemaPath.DELETED_SCHEMA_PATH))); + when(mockPubsub.projects().topics().get(topicExistsNoSchema.getPath()).execute()) + .thenReturn(new Topic().setName(topicExistsNoSchema.getName())); + when(mockPubsub.projects().topics().get(topicExistsSchema.getPath()).execute()) + .thenReturn( + new Topic() + .setName(topicExistsSchema.getName()) + .setSchemaSettings(new SchemaSettings().setSchema(SCHEMA.getPath()))); + + client = new PubsubJsonClient(null, null, mockPubsub); + + assertThrows( + "topic does not exist", IOException.class, () -> client.getSchemaPath(topicDoesNotExist)); + + assertNull("schema for topic is deleted", client.getSchemaPath(topicExistsDeletedSchema)); + + assertNull("topic has no schema", client.getSchemaPath(topicExistsNoSchema)); + + assertEquals(SCHEMA.getPath(), client.getSchemaPath(topicExistsSchema).getPath()); + } + + @Test + public void testAvroSchema() throws IOException { + String schemaDefinition = + "{" + + " \"type\" : \"record\"," + + " \"name\" : \"Avro\"," + + " \"fields\" : [" + + " {" + + " \"name\" : \"StringField\"," + + " \"type\" : \"string\"" + + " }," + + " {" + + " \"name\" : \"FloatField\"," + + " \"type\" : \"float\"" + + " }," + + " {" + + " \"name\" : \"BooleanField\"," + + " \"type\" : \"boolean\"" + + " }" + + " ]" + + "}"; + Schema schema = + new Schema().setName(SCHEMA.getPath()).setType("AVRO").setDefinition(schemaDefinition); + when(mockPubsub.projects().schemas().get(SCHEMA.getPath()).execute()).thenReturn(schema); + client = new PubsubJsonClient(null, null, mockPubsub); + assertEquals( + org.apache.beam.sdk.schemas.Schema.of( + org.apache.beam.sdk.schemas.Schema.Field.of( + "StringField", org.apache.beam.sdk.schemas.Schema.FieldType.STRING), + org.apache.beam.sdk.schemas.Schema.Field.of( + "FloatField", org.apache.beam.sdk.schemas.Schema.FieldType.FLOAT), + org.apache.beam.sdk.schemas.Schema.Field.of( + "BooleanField", org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN)), + client.getSchema(SCHEMA)); + } + + @Test + public void getProtoSchema() throws IOException { + String schemaDefinition = + "syntax = \"proto3\"; message ProtocolBuffer { string string_field = 1; int32 int_field = 2; }"; + Schema schema = + new Schema() + .setName(SCHEMA.getPath()) + .setType("PROTOCOL_BUFFER") + .setDefinition(schemaDefinition); + when(mockPubsub.projects().schemas().get(SCHEMA.getPath()).execute()).thenReturn(schema); + client = new PubsubJsonClient(null, null, mockPubsub); + assertThrows( + "Pub/Sub Schema type PROTOCOL_BUFFER is not supported at this time", + IllegalArgumentException.class, + () -> client.getSchema(SCHEMA)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java index 5ef8bc473e95..029c8ef08a4a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubRowToMessageTest.java @@ -19,7 +19,9 @@ import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE; import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_KEY_NAME; -import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_KEY_PREFIX; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME; import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ERROR; import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE; import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_KEY_NAME; @@ -72,22 +74,17 @@ public class PubsubRowToMessageTest { private static final PipelineOptions PIPELINE_OPTIONS = PipelineOptionsFactory.create(); - private static final String DEFAULT_ATTRIBUTES_KEY_NAME = - DEFAULT_KEY_PREFIX + ATTRIBUTES_KEY_NAME; - private static final String DEFAULT_EVENT_TIMESTAMP_KEY_NAME = - DEFAULT_KEY_PREFIX + EVENT_TIMESTAMP_KEY_NAME; - private static final String DEFAULT_PAYLOAD_KEY_NAME = DEFAULT_KEY_PREFIX + PAYLOAD_KEY_NAME; - private static final Field BOOLEAN_FIELD = Field.of("boolean", FieldType.BOOLEAN); - private static final Field BYTE_FIELD = Field.of("byte", FieldType.BYTE); - private static final Field DATETIME_FIELD = Field.of("datetime", FieldType.DATETIME); - private static final Field DECIMAL_FIELD = Field.of("decimal", FieldType.DECIMAL); - private static final Field DOUBLE_FIELD = Field.of("double", FieldType.DOUBLE); - private static final Field FLOAT_FIELD = Field.of("float", FieldType.FLOAT); - private static final Field INT16_FIELD = Field.of("int16", FieldType.INT16); - private static final Field INT32_FIELD = Field.of("int32", FieldType.INT32); - private static final Field INT64_FIELD = Field.of("int64", FieldType.INT64); - private static final Field STRING_FIELD = Field.of("string", FieldType.STRING); - private static final Schema ALL_DATA_TYPES_SCHEMA = + static final Field BOOLEAN_FIELD = Field.of("boolean", FieldType.BOOLEAN); + static final Field BYTE_FIELD = Field.of("byte", FieldType.BYTE); + static final Field DATETIME_FIELD = Field.of("datetime", FieldType.DATETIME); + static final Field DECIMAL_FIELD = Field.of("decimal", FieldType.DECIMAL); + static final Field DOUBLE_FIELD = Field.of("double", FieldType.DOUBLE); + static final Field FLOAT_FIELD = Field.of("float", FieldType.FLOAT); + static final Field INT16_FIELD = Field.of("int16", FieldType.INT16); + static final Field INT32_FIELD = Field.of("int32", FieldType.INT32); + static final Field INT64_FIELD = Field.of("int64", FieldType.INT64); + static final Field STRING_FIELD = Field.of("string", FieldType.STRING); + static final Schema ALL_DATA_TYPES_SCHEMA = Schema.of( BOOLEAN_FIELD, BYTE_FIELD, @@ -100,19 +97,19 @@ public class PubsubRowToMessageTest { INT64_FIELD, STRING_FIELD); - private static final Schema NON_USER_WITH_BYTES_PAYLOAD = + static final Schema NON_USER_WITH_BYTES_PAYLOAD = Schema.of( Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE), Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE), Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES)); - private static final Schema NON_USER_WITH_ROW_PAYLOAD = + static final Schema NON_USER_WITH_ROW_PAYLOAD = Schema.of( Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE), Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE), Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA))); - private static final Schema NON_USER_WITHOUT_PAYLOAD = + static final Schema NON_USER_WITHOUT_PAYLOAD = Schema.of( Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE), Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE)); @@ -820,6 +817,71 @@ public void testSchemaReflection_matchesAll() { FieldMatcher.of(STRING_FIELD.getName(), TypeName.INT16))); } + @Test + public void testSchemaReflection_matchesAny() { + SchemaReflection schemaReflection = SchemaReflection.of(ALL_DATA_TYPES_SCHEMA); + assertTrue( + schemaReflection.matchesAny( + FieldMatcher.of(BOOLEAN_FIELD.getName()), + FieldMatcher.of(BYTE_FIELD.getName()), + FieldMatcher.of(DATETIME_FIELD.getName()), + FieldMatcher.of(DECIMAL_FIELD.getName()), + FieldMatcher.of(DOUBLE_FIELD.getName()), + FieldMatcher.of(FLOAT_FIELD.getName()), + FieldMatcher.of(INT16_FIELD.getName()), + FieldMatcher.of(INT32_FIELD.getName()), + FieldMatcher.of(INT64_FIELD.getName()), + FieldMatcher.of(STRING_FIELD.getName()))); + + assertTrue( + schemaReflection.matchesAny( + FieldMatcher.of(BOOLEAN_FIELD.getName(), FieldType.BOOLEAN), + FieldMatcher.of(BYTE_FIELD.getName(), FieldType.BYTE), + FieldMatcher.of(DATETIME_FIELD.getName(), FieldType.DATETIME), + FieldMatcher.of(DECIMAL_FIELD.getName(), FieldType.DECIMAL), + FieldMatcher.of(DOUBLE_FIELD.getName(), FieldType.DOUBLE), + FieldMatcher.of(FLOAT_FIELD.getName(), FieldType.FLOAT), + FieldMatcher.of(INT16_FIELD.getName(), FieldType.INT16), + FieldMatcher.of(INT32_FIELD.getName(), FieldType.INT32), + FieldMatcher.of(INT64_FIELD.getName(), FieldType.INT64), + FieldMatcher.of(STRING_FIELD.getName(), FieldType.STRING))); + + assertTrue( + schemaReflection.matchesAny( + FieldMatcher.of(BOOLEAN_FIELD.getName(), TypeName.BOOLEAN), + FieldMatcher.of(BYTE_FIELD.getName(), TypeName.BYTE), + FieldMatcher.of(DATETIME_FIELD.getName(), TypeName.DATETIME), + FieldMatcher.of(DECIMAL_FIELD.getName(), TypeName.DECIMAL), + FieldMatcher.of(DOUBLE_FIELD.getName(), TypeName.DOUBLE), + FieldMatcher.of(FLOAT_FIELD.getName(), TypeName.FLOAT), + FieldMatcher.of(INT16_FIELD.getName(), TypeName.INT16), + FieldMatcher.of(INT32_FIELD.getName(), TypeName.INT32), + FieldMatcher.of(INT64_FIELD.getName(), TypeName.INT64), + FieldMatcher.of(STRING_FIELD.getName(), TypeName.STRING))); + + assertTrue( + schemaReflection.matchesAny( + FieldMatcher.of("idontexist"), FieldMatcher.of(STRING_FIELD.getName()))); + + assertTrue( + schemaReflection.matchesAny( + FieldMatcher.of(INT64_FIELD.getName(), FieldType.INT64), + // should not match type: + FieldMatcher.of(STRING_FIELD.getName(), FieldType.BYTE))); + + assertTrue( + schemaReflection.matchesAny( + FieldMatcher.of(INT64_FIELD.getName(), TypeName.INT64), + // should not match TypeName: + FieldMatcher.of(STRING_FIELD.getName(), TypeName.INT16))); + + assertFalse( + schemaReflection.matchesAny( + FieldMatcher.of("idontexist"), + FieldMatcher.of(STRING_FIELD.getName(), FieldType.BYTE), + FieldMatcher.of(STRING_FIELD.getName(), TypeName.INT16))); + } + @Test public void testFieldMatcher_match_NameOnly() { FieldMatcher fieldMatcher = FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME); @@ -1045,7 +1107,7 @@ private static PubsubRowToMessageDoFn doFn(Schema schema, PayloadSerializer payl payloadSerializer); } - private static Row rowWithAllDataTypes( + static Row rowWithAllDataTypes( boolean boolean0, byte byte0, ReadableDateTime datetime, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIT.java new file mode 100644 index 000000000000..7adc0ca907d0 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaIT.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +import java.io.IOException; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.TestPipeline; +import org.joda.time.Instant; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for {@link Schema} related {@link PubsubClient} operations. */ +@RunWith(JUnit4.class) +public class PubsubSchemaIT { + private static final String HAS_NO_SCHEMA = "has-no-schema"; + private static final String HAS_AVRO_SCHEMA = "has-avro-schema"; + private static final String HAS_PROTO_SCHEMA = "has-proto-schema"; + private static final String AVRO_PRIMITIVE_TYPES_FLAT = "avro-primitive-types-flat"; + + private static final String AVRO_SCHEMA_FILE = "avro_all_data_types_flat_schema.json"; + private static final String PROTO_PRIMITIVE_TYPES_FLAT = "proto-primitive-types-flat"; + + private static final String PROTO_SCHEMA_FILE = "all_data_types_flat_schema.proto"; + + private static PubsubClient pubsubClient; + + private static TopicPath hasNoSchemaTopic; + + private static TopicPath hasAvroSchemaTopic; + + private static TopicPath hasProtoSchemaTopic; + + private static SchemaPath hasAvroSchemaPath; + + private static SchemaPath hasProtoSchemaPath; + + static final Schema ALL_DATA_TYPES_AVRO_SCHEMA = + Schema.of( + Field.of("BooleanField", FieldType.BOOLEAN), + Field.of("IntField", FieldType.INT32), + Field.of("LongField", FieldType.INT64), + Field.of("FloatField", FieldType.FLOAT), + Field.of("DoubleField", FieldType.DOUBLE), + Field.of("StringField", FieldType.STRING)); + + @BeforeClass + public static void setup() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + String project = options.getProject(); + String postFix = "-" + Instant.now().getMillis(); + pubsubClient = PubsubGrpcClient.FACTORY.newClient(null, null, options); + hasNoSchemaTopic = PubsubClient.topicPathFromName(project, HAS_NO_SCHEMA + postFix); + hasAvroSchemaTopic = PubsubClient.topicPathFromName(project, HAS_AVRO_SCHEMA + postFix); + hasProtoSchemaTopic = PubsubClient.topicPathFromName(project, HAS_PROTO_SCHEMA + postFix); + hasAvroSchemaPath = PubsubClient.schemaPathFromId(project, AVRO_PRIMITIVE_TYPES_FLAT + postFix); + hasProtoSchemaPath = + PubsubClient.schemaPathFromId(project, PROTO_PRIMITIVE_TYPES_FLAT + postFix); + + pubsubClient.createSchema( + hasAvroSchemaPath, AVRO_SCHEMA_FILE, com.google.pubsub.v1.Schema.Type.AVRO); + pubsubClient.createSchema( + hasProtoSchemaPath, PROTO_SCHEMA_FILE, com.google.pubsub.v1.Schema.Type.PROTOCOL_BUFFER); + pubsubClient.createTopic(hasNoSchemaTopic); + pubsubClient.createTopic(hasAvroSchemaTopic, hasAvroSchemaPath); + pubsubClient.createTopic(hasProtoSchemaTopic, hasProtoSchemaPath); + } + + @AfterClass + public static void tearDown() throws IOException { + pubsubClient.deleteTopic(hasNoSchemaTopic); + pubsubClient.deleteTopic(hasAvroSchemaTopic); + pubsubClient.deleteTopic(hasProtoSchemaTopic); + pubsubClient.deleteSchema(hasAvroSchemaPath); + pubsubClient.deleteSchema(hasProtoSchemaPath); + pubsubClient.close(); + } + + @Test + public void testGetSchemaPath() throws IOException { + assertNull(pubsubClient.getSchemaPath(hasNoSchemaTopic)); + + assertEquals( + hasAvroSchemaPath.getPath(), pubsubClient.getSchemaPath(hasAvroSchemaTopic).getPath()); + + assertEquals( + hasProtoSchemaPath.getPath(), pubsubClient.getSchemaPath(hasProtoSchemaTopic).getPath()); + } + + @Test + public void testGetSchema() throws IOException { + assertEquals(ALL_DATA_TYPES_AVRO_SCHEMA, pubsubClient.getSchema(hasAvroSchemaPath)); + + assertThrows( + "Pub/Sub schema type PROTOCOL_BUFFER is not supported at this time", + IllegalArgumentException.class, + () -> pubsubClient.getSchema(hasProtoSchemaPath)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java new file mode 100644 index 000000000000..7ada9686853a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration.DEFAULT_TIMESTAMP_ATTRIBUTE; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.INPUT_TAG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.commons.lang3.tuple.Pair; +import org.joda.time.Instant; +import org.joda.time.format.ISODateTimeFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +/** Integration tests for {@link PubsubWriteSchemaTransformProvider}. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class PubsubWriteSchemaTransformProviderIT { + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + private static final TestPubsubOptions TEST_PUBSUB_OPTIONS = + TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class); + + static { + TEST_PUBSUB_OPTIONS.setBlockOnRun(false); + } + + private static final String HAS_NO_SCHEMA = "has-no-schema"; + + private static PubsubClient pubsubClient; + + private static PubsubClient.TopicPath hasNoSchemaTopic; + + private static PubsubClient.SubscriptionPath hasNoSchemaSubscription; + + private static final Instant TIMESTAMP = Instant.now(); + + private static final String RESOURCE_NAME_POSTFIX = "-" + TIMESTAMP.getMillis(); + + private static final int ACK_DEADLINE_SECONDS = 60; + + private static final int AWAIT_TERMINATED_SECONDS = 30; + + private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); + + private static final TypeDescriptor + CONFIGURATION_TYPE_DESCRIPTOR = + TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class); + + private static final SerializableFunction + TO_ROW_FN = AUTO_VALUE_SCHEMA.toRowFunction(CONFIGURATION_TYPE_DESCRIPTOR); + + private final Field timestampField = Field.of("timestamp", FieldType.DATETIME); + + private final Field payloadBytesField = Field.of("payload", FieldType.BYTES); + + @BeforeClass + public static void setUp() throws IOException { + String project = TEST_PUBSUB_OPTIONS.as(PubsubOptions.class).getProject(); + pubsubClient = PubsubGrpcClient.FACTORY.newClient(null, null, TEST_PUBSUB_OPTIONS); + hasNoSchemaTopic = + PubsubClient.topicPathFromName(project, HAS_NO_SCHEMA + RESOURCE_NAME_POSTFIX); + hasNoSchemaSubscription = + PubsubClient.subscriptionPathFromName(project, HAS_NO_SCHEMA + RESOURCE_NAME_POSTFIX); + + pubsubClient.createTopic(hasNoSchemaTopic); + pubsubClient.createSubscription( + hasNoSchemaTopic, hasNoSchemaSubscription, ACK_DEADLINE_SECONDS); + } + + @AfterClass + public static void tearDown() throws IOException { + pubsubClient.deleteSubscription(hasNoSchemaSubscription); + pubsubClient.deleteTopic(hasNoSchemaTopic); + + pubsubClient.close(); + } + + @Test + public void testWritePayloadBytes() throws IOException { + Instant timestamp = Instant.ofEpochMilli(100000L); + Schema schema = Schema.of(payloadBytesField, timestampField); + List input = + Collections.singletonList( + Row.withSchema(schema).attachValues("aaa".getBytes(StandardCharsets.UTF_8), timestamp)); + Row configuration = + TO_ROW_FN.apply( + PubsubWriteSchemaTransformConfiguration.builder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setPayloadFieldName(payloadBytesField.getName()) + .setTimestampFieldName(timestampField.getName()) + .build()) + .setTopic(hasNoSchemaTopic.getPath()) + .setTarget( + PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build()) + .build()); + + PCollectionRowTuple.of(INPUT_TAG, pipeline.apply(Create.of(input).withRowSchema(schema))) + .apply(new PubsubWriteSchemaTransformProvider().from(configuration).buildTransform()); + + PipelineResult job = pipeline.run(TEST_PUBSUB_OPTIONS); + Instant now = Instant.now(); + Instant stop = Instant.ofEpochMilli(now.getMillis() + AWAIT_TERMINATED_SECONDS * 1000); + List>> actualList = new ArrayList<>(); + while (now.isBefore(stop)) { + List received = pubsubClient.pull(0, hasNoSchemaSubscription, 1, true); + for (IncomingMessage incoming : received) { + actualList.add( + Pair.of( + incoming.message().getData().toStringUtf8(), + ImmutableMap.of( + DEFAULT_TIMESTAMP_ATTRIBUTE, + incoming + .message() + .getAttributesMap() + .getOrDefault(DEFAULT_TIMESTAMP_ATTRIBUTE, "")))); + } + if (actualList.size() == input.size()) { + break; + } + now = Instant.now(); + } + job.cancel(); + assertFalse( + String.format( + "messages pulled from %s should not be empty", hasNoSchemaSubscription.getPath()), + actualList.isEmpty()); + Pair> actual = actualList.get(0); + Row expected = input.get(0); + String payload = + new String( + Objects.requireNonNull(expected.getBytes(payloadBytesField.getName())), + StandardCharsets.UTF_8); + assertEquals(payload, actual.getLeft()); + assertEquals( + ISODateTimeFormat.dateTime().print(timestamp), + actual.getRight().get(DEFAULT_TIMESTAMP_ATTRIBUTE)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java new file mode 100644 index 000000000000..b9c912ffea68 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java @@ -0,0 +1,786 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.NON_USER_WITH_BYTES_PAYLOAD; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.rowWithAllDataTypes; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.INPUT_TAG; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +import com.google.api.client.util.Clock; +import java.io.IOException; +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.avro.SchemaParseException; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.io.payloads.AvroPayloadSerializerProvider; +import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider; +import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.RowJson.UnsupportedRowJsonException; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link PubsubWriteSchemaTransformProvider}. */ +@RunWith(JUnit4.class) +public class PubsubWriteSchemaTransformProviderTest { + + private static final String ID_ATTRIBUTE = "id_attribute"; + private static final String TOPIC = "projects/project/topics/topic"; + private static final MockClock CLOCK = new MockClock(Instant.now()); + private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema(); + private static final TypeDescriptor TYPE_DESCRIPTOR = + TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class); + private static final SerializableFunction TO_ROW = + AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR); + + private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create(); + + static { + OPTIONS.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF); + } + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testBuildPubsubWrite() { + assertEquals( + "default configuration should yield a topic Pub/Sub write", + pubsubWrite(), + transform(configurationBuilder()).buildPubsubWrite()); + + assertEquals( + "idAttribute in configuration should yield a idAttribute set Pub/Sub write", + pubsubWrite().withIdAttribute(ID_ATTRIBUTE), + transform(configurationBuilder().setIdAttribute(ID_ATTRIBUTE)).buildPubsubWrite()); + } + + @Test + public void testBuildPubsubRowToMessage() { + assertEquals( + "override timestamp attribute on configuration should yield a PubsubRowToMessage with target timestamp", + rowToMessageBuilder().setTargetTimestampAttributeName("custom_timestamp_attribute").build(), + transform( + configurationBuilder() + .setTarget( + PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder() + .setTimestampAttributeKey("custom_timestamp_attribute") + .build())) + .buildPubsubRowToMessage(NON_USER_WITH_BYTES_PAYLOAD)); + + assertNull( + "failing to set format should yield a null payload serializer", + transform(configurationBuilder()) + .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA) + .getPayloadSerializer()); + + assertThrows( + "setting 'json' format for a unsupported field containing Schema should throw an Exception", + UnsupportedRowJsonException.class, + () -> + transform(configurationBuilder().setFormat("json")) + .buildPubsubRowToMessage( + Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE)))); + + assertThrows( + "setting 'avro' format for a unsupported field containing Schema should throw an Exception", + SchemaParseException.class, + () -> + transform(configurationBuilder().setFormat("avro")) + .buildPubsubRowToMessage( + Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE)))); + + assertNotNull( + "setting 'json' format for valid schema should yield PayloadSerializer", + transform(configurationBuilder().setFormat("json")) + .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA) + .getPayloadSerializer()); + + assertNotNull( + "setting 'avro' format for valid schema should yield PayloadSerializer", + transform(configurationBuilder().setFormat("avro")) + .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA) + .getPayloadSerializer()); + } + + @Test + public void testInvalidTaggedInput() { + Row withAllDataTypes = + rowWithAllDataTypes( + true, + (byte) 0, + Instant.now().toDateTime(), + BigDecimal.valueOf(1L), + 3.12345, + 4.1f, + (short) 5, + 2, + 7L, + "asdfjkl;"); + + PCollection rows = + pipeline.apply(Create.of(withAllDataTypes)).setRowSchema(ALL_DATA_TYPES_SCHEMA); + + assertThrows( + "empty input should not be allowed", + IllegalArgumentException.class, + () -> transform(configurationBuilder()).expand(PCollectionRowTuple.empty(pipeline))); + + assertThrows( + "input with >1 tagged rows should not be allowed", + IllegalArgumentException.class, + () -> + transform(configurationBuilder()) + .expand(PCollectionRowTuple.of(INPUT_TAG, rows).and("somethingelse", rows))); + + assertThrows( + "input missing INPUT tag should not be allowed", + IllegalArgumentException.class, + () -> + transform(configurationBuilder()) + .expand(PCollectionRowTuple.of("somethingelse", rows))); + + pipeline.run(OPTIONS); + } + + @Test + public void testValidateSourceSchemaAgainstConfiguration() { + // Only containing user fields and no configuration details should be valid + transform(configurationBuilder()) + .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA); + + // Matching attributes, timestamp, and payload (bytes) fields configured with expected types + // should be valid + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("attributes") + .setTimestampFieldName("timestamp") + .setPayloadFieldName("payload") + .build())) + .validateSourceSchemaAgainstConfiguration( + Schema.of( + Field.of("attributes", ATTRIBUTES_FIELD_TYPE), + Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE), + Field.of("payload", Schema.FieldType.BYTES))); + + // Matching attributes, timestamp, and payload (ROW) fields configured with expected types + // should be valid + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("attributes") + .setTimestampFieldName("timestamp") + .setPayloadFieldName("payload") + .build())) + .validateSourceSchemaAgainstConfiguration( + Schema.of( + Field.of("attributes", ATTRIBUTES_FIELD_TYPE), + Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE), + Field.of("payload", Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA)))); + + assertThrows( + "empty Schema should be invalid", + IllegalArgumentException.class, + () -> + transform(configurationBuilder()) + .validateSourceSchemaAgainstConfiguration(Schema.of())); + + assertThrows( + "attributes field in configuration but not in schema should be invalid", + IllegalArgumentException.class, + () -> + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("attributes") + .build())) + .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA)); + + assertThrows( + "timestamp field in configuration but not in schema should be invalid", + IllegalArgumentException.class, + () -> + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setTimestampFieldName("timestamp") + .build())) + .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA)); + + assertThrows( + "payload field in configuration but not in schema should be invalid", + IllegalArgumentException.class, + () -> + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setPayloadFieldName("payload") + .build())) + .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA)); + + assertThrows( + "attributes field in configuration but mismatching attributes type should be invalid", + IllegalArgumentException.class, + () -> + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("attributes") + .build())) + .validateSourceSchemaAgainstConfiguration( + // should be FieldType.map(FieldType.STRING, FieldType.STRING) + Schema.of( + Field.of("attributes", FieldType.map(FieldType.BYTES, FieldType.STRING))))); + + assertThrows( + "timestamp field in configuration but mismatching timestamp type should be invalid", + IllegalArgumentException.class, + () -> + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("timestamp") + .build())) + .validateSourceSchemaAgainstConfiguration( + // should be FieldType.DATETIME + Schema.of(Field.of("timestamp", FieldType.STRING)))); + + assertThrows( + "payload field in configuration but mismatching payload type should be invalid", + IllegalArgumentException.class, + () -> + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("payload") + .build())) + .validateSourceSchemaAgainstConfiguration( + // should be FieldType.BYTES or FieldType.row(...) + Schema.of(Field.of("payload", FieldType.STRING)))); + } + + @Test + public void testValidateTargetSchemaAgainstPubsubSchema() throws IOException { + TopicPath topicPath = PubsubClient.topicPathFromPath(TOPIC); + PubsubTestClientFactory noSchemaFactory = + PubsubTestClient.createFactoryForGetSchema(topicPath, null, null); + + PubsubTestClientFactory schemaDeletedFactory = + PubsubTestClient.createFactoryForGetSchema(topicPath, SchemaPath.DELETED_SCHEMA, null); + + PubsubTestClientFactory mismatchingSchemaFactory = + PubsubTestClient.createFactoryForGetSchema( + topicPath, + PubsubClient.schemaPathFromId("testProject", "misMatch"), + Schema.of(Field.of("StringField", FieldType.STRING))); + + PubsubTestClientFactory matchingSchemaFactory = + PubsubTestClient.createFactoryForGetSchema( + topicPath, + PubsubClient.schemaPathFromId("testProject", "match"), + ALL_DATA_TYPES_SCHEMA); + + // Should pass validation exceptions if Pub/Sub topic lacks schema + transform(configurationBuilder()) + .withPubsubClientFactory(noSchemaFactory) + .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS); + noSchemaFactory.close(); + + // Should pass validation if Pub/Sub topic schema deleted + transform(configurationBuilder()) + .withPubsubClientFactory(schemaDeletedFactory) + .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS); + schemaDeletedFactory.close(); + + assertThrows( + "mismatched schema should be detected from Pub/Sub topic", + IllegalStateException.class, + () -> + transform(configurationBuilder()) + .withPubsubClientFactory(mismatchingSchemaFactory) + .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS)); + mismatchingSchemaFactory.close(); + + // Should pass validation if Pub/Sub topic schema matches + transform(configurationBuilder()) + .withPubsubClientFactory(matchingSchemaFactory) + .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS); + matchingSchemaFactory.close(); + } + + @Test + public void testBuildTargetSchema() { + + Field sourceAttributesField = Field.of("attributes", ATTRIBUTES_FIELD_TYPE); + Field sourceTimestampField = Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE); + Field sourcePayloadBytesField = Field.of("payload", FieldType.BYTES); + Field sourcePayloadRowField = Field.of("payload", FieldType.row(ALL_DATA_TYPES_SCHEMA)); + + Field targetAttributesField = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE); + Field targetTimestampField = + Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE); + Field targetPayloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES); + Field targetPayloadRowField = + Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA)); + + assertEquals( + "attributes and timestamp field should append to user fields", + Schema.builder() + .addField(targetAttributesField) + .addField(targetTimestampField) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build(), + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .build())) + .buildTargetSchema(ALL_DATA_TYPES_SCHEMA)); + + assertEquals( + "timestamp field should append to user fields; attributes field name changed", + Schema.builder() + .addField(targetAttributesField) + .addField(targetTimestampField) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build(), + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("attributes") + .build())) + .buildTargetSchema( + Schema.builder() + .addField(sourceAttributesField) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build())); + + assertEquals( + "attributes field should append to user fields; timestamp field name changed", + Schema.builder() + .addField(targetAttributesField) + .addField(targetTimestampField) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build(), + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setTimestampFieldName("timestamp") + .build())) + .buildTargetSchema( + Schema.builder() + .addField(sourceTimestampField) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build())); + + assertEquals( + "attributes and timestamp field appended to user payload bytes field; payload field name changed", + Schema.builder() + .addField(targetAttributesField) + .addField(targetTimestampField) + .addField(targetPayloadBytesField) + .build(), + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setPayloadFieldName("payload") + .build())) + .buildTargetSchema(Schema.builder().addField(sourcePayloadBytesField).build())); + + assertEquals( + "attributes and timestamp field appended to user payload row field; payload field name changed", + Schema.builder() + .addField(targetAttributesField) + .addField(targetTimestampField) + .addField(targetPayloadRowField) + .build(), + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setPayloadFieldName("payload") + .build())) + .buildTargetSchema(Schema.builder().addField(sourcePayloadRowField).build())); + + assertEquals( + "attributes and timestamp fields name changed", + Schema.builder() + .addField(targetAttributesField) + .addField(targetTimestampField) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build(), + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("attributes") + .setTimestampFieldName("timestamp") + .build())) + .buildTargetSchema( + Schema.builder() + .addField(sourceAttributesField) + .addField(sourceTimestampField) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build())); + + assertEquals( + "attributes, timestamp, payload bytes fields name changed", + Schema.builder() + .addField(targetAttributesField) + .addField(targetTimestampField) + .addFields(targetPayloadBytesField) + .build(), + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("attributes") + .setTimestampFieldName("timestamp") + .setPayloadFieldName("payload") + .build())) + .buildTargetSchema( + Schema.builder() + .addField(sourceAttributesField) + .addField(sourceTimestampField) + .addField(sourcePayloadBytesField) + .build())); + + assertEquals( + "attributes, timestamp, payload row fields name changed", + Schema.builder() + .addField(targetAttributesField) + .addField(targetTimestampField) + .addFields(targetPayloadRowField) + .build(), + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder() + .setAttributesFieldName("attributes") + .setTimestampFieldName("timestamp") + .setPayloadFieldName("payload") + .build())) + .buildTargetSchema( + Schema.builder() + .addField(sourceAttributesField) + .addField(sourceTimestampField) + .addField(sourcePayloadRowField) + .build())); + } + + @Test + public void testConvertForRowToMessageTransform() { + Row userRow = + rowWithAllDataTypes( + false, + (byte) 0, + Instant.ofEpochMilli(CLOCK.currentTimeMillis()).toDateTime(), + BigDecimal.valueOf(1L), + 1.12345, + 1.1f, + (short) 1, + 1, + 1L, + "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮"); + + Field sourceAttributes = Field.of("attributes", ATTRIBUTES_FIELD_TYPE); + Field targetAttributes = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME, ATTRIBUTES_FIELD_TYPE); + + Field sourceTimestamp = Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE); + Field targetTimestamp = Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE); + + Field sourcePayloadBytes = Field.of("payload", FieldType.BYTES); + Field targetPayloadBytes = Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.BYTES); + + Field sourcePayloadRow = Field.of("payload", FieldType.row(ALL_DATA_TYPES_SCHEMA)); + Field targetPayloadRow = + Field.of(DEFAULT_PAYLOAD_KEY_NAME, FieldType.row(ALL_DATA_TYPES_SCHEMA)); + + Map attributes = ImmutableMap.of("a", "1"); + Instant generatedTimestamp = Instant.ofEpochMilli(CLOCK.currentTimeMillis()); + Instant timestampFromSource = Instant.ofEpochMilli(CLOCK.currentTimeMillis() + 10000L); + byte[] payloadBytes = "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮".getBytes(StandardCharsets.UTF_8); + + PAssert.that( + "attributes only source yields attributes + timestamp target", + pipeline + .apply( + Create.of(Row.withSchema(Schema.of(sourceAttributes)).attachValues(attributes))) + .setRowSchema(Schema.of(sourceAttributes)) + .apply( + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration + .sourceConfigurationBuilder() + .setAttributesFieldName(sourceAttributes.getName()) + .build())) + .convertForRowToMessage( + Schema.of(targetAttributes, targetTimestamp), CLOCK)) + .setRowSchema(Schema.of(targetAttributes, targetTimestamp))) + .containsInAnyOrder( + Row.withSchema(Schema.of(targetAttributes, targetTimestamp)) + .attachValues(attributes, generatedTimestamp)); + + PAssert.that( + "timestamp only source yields attributes + timestamp target", + pipeline + .apply( + Create.of( + Row.withSchema(Schema.of(sourceTimestamp)) + .attachValues(timestampFromSource))) + .setRowSchema(Schema.of(sourceTimestamp)) + .apply( + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration + .sourceConfigurationBuilder() + .setTimestampFieldName(sourceTimestamp.getName()) + .build())) + .convertForRowToMessage( + Schema.of(targetAttributes, targetTimestamp), CLOCK)) + .setRowSchema(Schema.of(targetAttributes, targetTimestamp))) + .containsInAnyOrder( + Row.withSchema(Schema.of(targetAttributes, targetTimestamp)) + .attachValues(ImmutableMap.of(), timestampFromSource)); + + PAssert.that( + "timestamp and attributes source yields renamed fields in target", + pipeline + .apply( + Create.of( + Row.withSchema(Schema.of(sourceAttributes, sourceTimestamp)) + .attachValues(attributes, timestampFromSource))) + .setRowSchema(Schema.of(sourceAttributes, sourceTimestamp)) + .apply( + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration + .sourceConfigurationBuilder() + .setAttributesFieldName(sourceAttributes.getName()) + .setTimestampFieldName(sourceTimestamp.getName()) + .build())) + .convertForRowToMessage( + Schema.of(targetAttributes, targetTimestamp), CLOCK)) + .setRowSchema(Schema.of(targetAttributes, targetTimestamp))) + .containsInAnyOrder( + Row.withSchema(Schema.of(targetAttributes, targetTimestamp)) + .attachValues(attributes, timestampFromSource)); + + PAssert.that( + "bytes payload only source yields attributes + timestamp + renamed bytes payload target", + pipeline + .apply( + Create.of( + Row.withSchema(Schema.of(sourcePayloadBytes)) + .withFieldValue(sourcePayloadBytes.getName(), payloadBytes) + .build())) + .setRowSchema(Schema.of(sourcePayloadBytes)) + .apply( + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration + .sourceConfigurationBuilder() + .setPayloadFieldName(sourcePayloadBytes.getName()) + .build())) + .convertForRowToMessage( + Schema.of(targetAttributes, targetTimestamp, targetPayloadBytes), + CLOCK)) + .setRowSchema(Schema.of(targetAttributes, targetTimestamp, targetPayloadBytes))) + .containsInAnyOrder( + Row.withSchema(Schema.of(targetAttributes, targetTimestamp, targetPayloadBytes)) + .attachValues(ImmutableMap.of(), generatedTimestamp, payloadBytes)); + + PAssert.that( + "row payload only source yields attributes + timestamp + renamed row payload target", + pipeline + .apply(Create.of(Row.withSchema(Schema.of(sourcePayloadRow)).attachValues(userRow))) + .setRowSchema(Schema.of(sourcePayloadRow)) + .apply( + transform( + configurationBuilder() + .setSource( + PubsubWriteSchemaTransformConfiguration + .sourceConfigurationBuilder() + .setPayloadFieldName(sourcePayloadRow.getName()) + .build())) + .convertForRowToMessage( + Schema.of(targetAttributes, targetTimestamp, targetPayloadRow), CLOCK)) + .setRowSchema(Schema.of(targetAttributes, targetTimestamp, targetPayloadRow))) + .containsInAnyOrder( + Row.withSchema(Schema.of(targetAttributes, targetTimestamp, targetPayloadRow)) + .attachValues(ImmutableMap.of(), generatedTimestamp, userRow)); + + PAssert.that( + "user only fields source yields attributes + timestamp + user fields target", + pipeline + .apply(Create.of(userRow)) + .setRowSchema(ALL_DATA_TYPES_SCHEMA) + .apply( + transform(configurationBuilder()) + .convertForRowToMessage( + Schema.builder() + .addField(targetAttributes) + .addField(targetTimestamp) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build(), + CLOCK)) + .setRowSchema( + Schema.builder() + .addField(targetAttributes) + .addField(targetTimestamp) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build())) + .containsInAnyOrder( + Row.withSchema( + Schema.builder() + .addField(targetAttributes) + .addField(targetTimestamp) + .addFields(ALL_DATA_TYPES_SCHEMA.getFields()) + .build()) + .addValue(ImmutableMap.of()) + .addValue(generatedTimestamp) + .addValues(userRow.getValues()) + .build()); + + pipeline.run(OPTIONS); + } + + @Test + public void testGetPayloadSerializer() { + Row withAllDataTypes = + rowWithAllDataTypes( + false, + (byte) 0, + Instant.now().toDateTime(), + BigDecimal.valueOf(-1L), + -3.12345, + -4.1f, + (short) -5, + -2, + -7L, + "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮"); + + PayloadSerializer jsonPayloadSerializer = + new JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of()); + byte[] expectedJson = jsonPayloadSerializer.serialize(withAllDataTypes); + byte[] actualJson = + transform(configurationBuilder().setFormat("json")) + .getPayloadSerializer(ALL_DATA_TYPES_SCHEMA) + .serialize(withAllDataTypes); + + PayloadSerializer avroPayloadSerializer = + new AvroPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA, ImmutableMap.of()); + byte[] expectedAvro = avroPayloadSerializer.serialize(withAllDataTypes); + byte[] actualAvro = + transform(configurationBuilder().setFormat("avro")) + .getPayloadSerializer(ALL_DATA_TYPES_SCHEMA) + .serialize(withAllDataTypes); + + assertArrayEquals( + "configuration with json format should yield JSON PayloadSerializer", + expectedJson, + actualJson); + + assertArrayEquals( + "configuration with avro format should yield Avro PayloadSerializer", + expectedAvro, + actualAvro); + } + + private static PubsubWriteSchemaTransformConfiguration.Builder configurationBuilder() { + return PubsubWriteSchemaTransformConfiguration.builder() + .setTopic(TOPIC) + .setTarget(PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build()); + } + + private static PubsubRowToMessage.Builder rowToMessageBuilder() { + return PubsubRowToMessage.builder(); + } + + private static PubsubIO.Write pubsubWrite() { + return PubsubIO.writeMessages().to(TOPIC); + } + + private static PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform transform( + PubsubWriteSchemaTransformConfiguration.Builder configurationBuilder) { + Row configurationRow = TO_ROW.apply(configurationBuilder.build()); + PubsubWriteSchemaTransformProvider provider = new PubsubWriteSchemaTransformProvider(); + return (PubsubWriteSchemaTransform) provider.from(configurationRow); + } + + private static class MockClock implements Clock, Serializable { + private final Long millis; + + private MockClock(Instant timestamp) { + this.millis = timestamp.getMillis(); + } + + @Override + public long currentTimeMillis() { + return millis; + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/resources/org/apache/beam/sdk/io/gcp/pubsub/all_data_types_flat_schema.proto b/sdks/java/io/google-cloud-platform/src/test/resources/org/apache/beam/sdk/io/gcp/pubsub/all_data_types_flat_schema.proto new file mode 100644 index 000000000000..0b2bbaccded5 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/resources/org/apache/beam/sdk/io/gcp/pubsub/all_data_types_flat_schema.proto @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto3"; + +message Record { + double doubleField = 1; + float floatField = 2; + int32 int32Field = 3; + int64 int64Field = 4; + bool boolField = 5; + string stringField = 6; +} diff --git a/sdks/java/io/google-cloud-platform/src/test/resources/org/apache/beam/sdk/io/gcp/pubsub/avro_all_data_types_flat_schema.json b/sdks/java/io/google-cloud-platform/src/test/resources/org/apache/beam/sdk/io/gcp/pubsub/avro_all_data_types_flat_schema.json new file mode 100644 index 000000000000..41f3fab41672 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/resources/org/apache/beam/sdk/io/gcp/pubsub/avro_all_data_types_flat_schema.json @@ -0,0 +1,30 @@ +{ + "type": "record", + "name": "Avro", + "fields": [ + { + "name": "BooleanField", + "type": "boolean" + }, + { + "name": "IntField", + "type": "int" + }, + { + "name": "LongField", + "type": "long" + }, + { + "name": "FloatField", + "type": "float" + }, + { + "name": "DoubleField", + "type": "double" + }, + { + "name": "StringField", + "type": "string" + } + ] +}