diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java index db3ac3de6e9a..f296f5966cc3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfig.java @@ -37,16 +37,19 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -abstract class BigtableConfig implements Serializable { +public abstract class BigtableConfig implements Serializable { /** Returns the project id being written to. */ - abstract @Nullable ValueProvider getProjectId(); + public abstract @Nullable ValueProvider getProjectId(); /** Returns the instance id being written to. */ - abstract @Nullable ValueProvider getInstanceId(); + public abstract @Nullable ValueProvider getInstanceId(); /** Returns the table being read from. */ - abstract @Nullable ValueProvider getTableId(); + public abstract @Nullable ValueProvider getTableId(); + + /** Returns the app profile being read from. */ + public abstract @Nullable ValueProvider getAppProfileId(); /** * Returns the Google Cloud Bigtable instance being written to, and other parameters. @@ -84,6 +87,8 @@ abstract static class Builder { abstract Builder setTableId(ValueProvider tableId); + abstract Builder setAppProfileId(ValueProvider appProfileId); + /** @deprecated will be replaced by bigtable options configurator. */ @Deprecated abstract Builder setBigtableOptions(BigtableOptions options); @@ -100,46 +105,51 @@ abstract Builder setBigtableOptionsConfigurator( abstract BigtableConfig build(); } - BigtableConfig withProjectId(ValueProvider projectId) { + public BigtableConfig withProjectId(ValueProvider projectId) { checkArgument(projectId != null, "Project Id of BigTable can not be null"); return toBuilder().setProjectId(projectId).build(); } - BigtableConfig withInstanceId(ValueProvider instanceId) { + public BigtableConfig withInstanceId(ValueProvider instanceId) { checkArgument(instanceId != null, "Instance Id of BigTable can not be null"); return toBuilder().setInstanceId(instanceId).build(); } - BigtableConfig withTableId(ValueProvider tableId) { + public BigtableConfig withTableId(ValueProvider tableId) { checkArgument(tableId != null, "tableId can not be null"); return toBuilder().setTableId(tableId).build(); } + public BigtableConfig withAppProfileId(ValueProvider appProfileId) { + checkArgument(appProfileId != null, "tableId can not be null"); + return toBuilder().setAppProfileId(appProfileId).build(); + } + /** @deprecated will be replaced by bigtable options configurator. */ @Deprecated - BigtableConfig withBigtableOptions(BigtableOptions options) { + public BigtableConfig withBigtableOptions(BigtableOptions options) { checkArgument(options != null, "Bigtable options can not be null"); return toBuilder().setBigtableOptions(options).build(); } - BigtableConfig withBigtableOptionsConfigurator( + public BigtableConfig withBigtableOptionsConfigurator( SerializableFunction configurator) { checkArgument(configurator != null, "configurator can not be null"); return toBuilder().setBigtableOptionsConfigurator(configurator).build(); } - BigtableConfig withValidate(boolean isEnabled) { + public BigtableConfig withValidate(boolean isEnabled) { return toBuilder().setValidate(isEnabled).build(); } @VisibleForTesting - BigtableConfig withBigtableService(BigtableService bigtableService) { + public BigtableConfig withBigtableService(BigtableService bigtableService) { checkArgument(bigtableService != null, "bigtableService can not be null"); return toBuilder().setBigtableService(bigtableService).build(); } @VisibleForTesting - BigtableConfig withEmulator(String emulatorHost) { + public BigtableConfig withEmulator(String emulatorHost) { checkArgument(emulatorHost != null, "emulatorHost can not be null"); return toBuilder().setEmulatorHost(emulatorHost).build(); } @@ -173,6 +183,9 @@ void populateDisplayData(DisplayData.Builder builder) { .addIfNotNull( DisplayData.item("instanceId", getInstanceId()).withLabel("Bigtable Instance Id")) .addIfNotNull(DisplayData.item("tableId", getTableId()).withLabel("Bigtable Table Id")) + .addIfNotNull( + DisplayData.item("appProfileId", getAppProfileId()) + .withLabel("Bigtable App Profile Id")) .add(DisplayData.item("withValidation", getValidate()).withLabel("Check is table exists")); if (getBigtableOptions() != null) { @@ -250,6 +263,7 @@ public final String toString() { .add("projectId", getProjectId()) .add("instanceId", getInstanceId()) .add("tableId", getTableId()) + .add("appProfileId", getAppProfileId()) .add( "bigtableOptionsConfigurator", getBigtableOptionsConfigurator() == null diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 1ea0240ff565..3f277b503870 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -27,11 +27,14 @@ import com.google.bigtable.v2.Row; import com.google.bigtable.v2.RowFilter; import com.google.bigtable.v2.SampleRowKeysResponse; +import com.google.cloud.Timestamp; import com.google.cloud.bigtable.config.BigtableOptions; import com.google.protobuf.ByteString; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -44,12 +47,22 @@ import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.UniqueIdGenerator; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.InitializeDoFn; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn; import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -66,6 +79,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,6 +222,40 @@ public static Write write() { return Write.create(); } + /** + * Creates an uninitialized {@link BigtableIO.ReadChangeStream}. Before use, the {@code + * ReadChangeStream} must be initialized with + * + *
    + *
  • {@link BigtableIO.ReadChangeStream#withProjectId} + *
  • {@link BigtableIO.ReadChangeStream#withInstanceId} + *
  • {@link BigtableIO.ReadChangeStream#withTableId} + *
  • {@link BigtableIO.ReadChangeStream#withAppProfileId} + *
+ * + *

And optionally with + * + *

    + *
  • {@link BigtableIO.ReadChangeStream#withStartTime} which defaults to now. + *
  • {@link BigtableIO.ReadChangeStream#withEndTime} which defaults to empty. + *
  • {@link BigtableIO.ReadChangeStream#withHeartbeatDuration} with defaults to 1 seconds. + *
  • {@link BigtableIO.ReadChangeStream#withMetadataTableProjectId} which defaults to value + * from {@link BigtableIO.ReadChangeStream#withProjectId} + *
  • {@link BigtableIO.ReadChangeStream#withMetadataTableInstanceId} which defaults to value + * from {@link BigtableIO.ReadChangeStream#withInstanceId} + *
  • {@link BigtableIO.ReadChangeStream#withMetadataTableTableId} which defaults to {@link + * MetadataTableAdminDao#DEFAULT_METADATA_TABLE_NAME} + *
  • {@link BigtableIO.ReadChangeStream#withMetadataTableAppProfileId} which defaults to value + * from {@link BigtableIO.ReadChangeStream#withAppProfileId} + *
  • {@link BigtableIO.ReadChangeStream#withChangeStreamName} which defaults to randomly + * generated string. + *
+ */ + @Experimental + public static ReadChangeStream readChangeStream() { + return ReadChangeStream.create(); + } + /** * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on * {@link BigtableIO} for more information. @@ -1431,4 +1479,311 @@ static void validateTableExists(BigtableConfig config, PipelineOptions options) } } } + + @AutoValue + public abstract static class ReadChangeStream + extends PTransform>> { + + static ReadChangeStream create() { + BigtableConfig config = + BigtableConfig.builder().setTableId(StaticValueProvider.of("")).setValidate(true).build(); + BigtableConfig metadataTableconfig = + BigtableConfig.builder().setTableId(StaticValueProvider.of("")).setValidate(true).build(); + + return new AutoValue_BigtableIO_ReadChangeStream.Builder() + .setBigtableConfig(config) + .setMetadataTableBigtableConfig(metadataTableconfig) + .build(); + } + + abstract BigtableConfig getBigtableConfig(); + + abstract @Nullable Timestamp getStartTime(); + + abstract @Nullable Timestamp getEndTime(); + + abstract @Nullable Duration getHeartbeatDuration(); + + abstract @Nullable String getChangeStreamName(); + + abstract BigtableConfig getMetadataTableBigtableConfig(); + + abstract ReadChangeStream.Builder toBuilder(); + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stream from the Cloud Bigtable + * project indicated by given parameter, requires {@link #withInstanceId} to be called to + * determine the instance. + * + *

Does not modify this object. + */ + public ReadChangeStream withProjectId(ValueProvider projectId) { + BigtableConfig config = getBigtableConfig(); + return toBuilder().setBigtableConfig(config.withProjectId(projectId)).build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stream from the Cloud Bigtable + * project indicated by given parameter, requires {@link #withInstanceId} to be called to + * determine the instance. + * + *

Does not modify this object. + */ + public ReadChangeStream withProjectId(String projectId) { + return withProjectId(StaticValueProvider.of(projectId)); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stream from the Cloud Bigtable + * instance indicated by given parameter, requires {@link #withProjectId} to be called to + * determine the project. + * + *

Does not modify this object. + */ + public ReadChangeStream withInstanceId(ValueProvider instanceId) { + BigtableConfig config = getBigtableConfig(); + return toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stream from the Cloud Bigtable + * instance indicated by given parameter, requires {@link #withProjectId} to be called to + * determine the project. + * + *

Does not modify this object. + */ + public ReadChangeStream withInstanceId(String instanceId) { + return withInstanceId(StaticValueProvider.of(instanceId)); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stream from the specified table. + * + *

Does not modify this object. + */ + public ReadChangeStream withTableId(ValueProvider tableId) { + BigtableConfig config = getBigtableConfig(); + return toBuilder().setBigtableConfig(config.withTableId(tableId)).build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stream from the specified table. + * + *

Does not modify this object. + */ + public ReadChangeStream withTableId(String tableId) { + return withTableId(StaticValueProvider.of(tableId)); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stream from the cluster specified + * by app profile id. + * + *

This must use single-cluster routing policy. If not setting a separate app profile for the + * metadata table with {@link BigtableIO.ReadChangeStream#withMetadataTableAppProfileId}, this + * app profile also needs to enable allow single-row transactions. + * + *

Does not modify this object. + */ + public ReadChangeStream withAppProfileId(ValueProvider appProfileId) { + BigtableConfig config = getBigtableConfig(); + return toBuilder().setBigtableConfig(config.withAppProfileId(appProfileId)).build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stream from the cluster specified + * by app profile id. + * + *

This must use single-cluster routing policy. If not setting a separate app profile for the + * metadata table with {@link BigtableIO.ReadChangeStream#withMetadataTableAppProfileId}, this + * app profile also needs to enable allow single-row transactions. + * + *

Does not modify this object. + */ + public ReadChangeStream withAppProfileId(String appProfileId) { + return withAppProfileId(StaticValueProvider.of(appProfileId)); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will start streaming at the specified + * start time. + * + *

Does not modify this object. + */ + public ReadChangeStream withStartTime(Timestamp startTime) { + return toBuilder().setStartTime(startTime).build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will stop streaming at the specified + * end time. + * + *

Does not modify this object. + */ + public ReadChangeStream withEndTime(Timestamp endTime) { + return toBuilder().setEndTime(endTime).build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will send heartbeat messages at + * specified interval. + * + *

Does not modify this object. + */ + public ReadChangeStream withHeartbeatDuration(Duration interval) { + return toBuilder().setHeartbeatDuration(interval).build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that uses changeStreamName as prefix for + * the metadata table. + * + *

Does not modify this object. + */ + public ReadChangeStream withChangeStreamName(String changeStreamName) { + return toBuilder().setChangeStreamName(changeStreamName).build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will use the Cloud Bigtable project + * indicated by given parameter to manage the metadata of the stream. + * + *

Optional: defaults to value from withProjectId + * + *

Does not modify this object. + */ + public ReadChangeStream withMetadataTableProjectId(String projectId) { + BigtableConfig config = getMetadataTableBigtableConfig(); + return toBuilder() + .setMetadataTableBigtableConfig(config.withProjectId(StaticValueProvider.of(projectId))) + .build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will use the Cloud Bigtable instance + * indicated by given parameter to manage the metadata of the stream. + * + *

Optional: defaults to value from withInstanceId + * + *

Does not modify this object. + */ + public ReadChangeStream withMetadataTableInstanceId(String instanceId) { + BigtableConfig config = getMetadataTableBigtableConfig(); + return toBuilder() + .setMetadataTableBigtableConfig(config.withInstanceId(StaticValueProvider.of(instanceId))) + .build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will use specified table to store the + * metadata of the stream. + * + *

Optional: defaults to value from withTableId + * + *

Does not modify this object. + */ + public ReadChangeStream withMetadataTableTableId(String tableId) { + BigtableConfig config = getMetadataTableBigtableConfig(); + return toBuilder() + .setMetadataTableBigtableConfig(config.withTableId(StaticValueProvider.of(tableId))) + .build(); + } + + /** + * Returns a new {@link BigtableIO.ReadChangeStream} that will use the cluster specified by app + * profile id to store the metadata of the stream. + * + *

Optional: defaults to value from withAppProfileId + * + *

This must use single-cluster routing policy with allow single-row transactions enabled. + * + *

Does not modify this object. + */ + public ReadChangeStream withMetadataTableAppProfileId(String appProfileId) { + BigtableConfig config = getMetadataTableBigtableConfig(); + return toBuilder() + .setMetadataTableBigtableConfig( + config.withAppProfileId(StaticValueProvider.of(appProfileId))) + .build(); + } + + @Override + public PCollection> expand(PBegin input) { + checkArgument(getBigtableConfig() != null); + checkArgument(getBigtableConfig().getProjectId() != null); + checkArgument(getBigtableConfig().getInstanceId() != null); + checkArgument(getBigtableConfig().getTableId() != null); + checkArgument(getBigtableConfig().getAppProfileId() != null); + + BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig(); + if (metadataTableConfig.getProjectId() == null + || metadataTableConfig.getProjectId().get().isEmpty()) { + metadataTableConfig = metadataTableConfig.withProjectId(getBigtableConfig().getProjectId()); + } + if (metadataTableConfig.getInstanceId() == null + || metadataTableConfig.getInstanceId().get().isEmpty()) { + metadataTableConfig = + metadataTableConfig.withInstanceId(getBigtableConfig().getInstanceId()); + } + if (metadataTableConfig.getTableId() == null + || metadataTableConfig.getTableId().get().isEmpty()) { + metadataTableConfig = + metadataTableConfig.withTableId( + StaticValueProvider.of(MetadataTableAdminDao.DEFAULT_METADATA_TABLE_NAME)); + } + if (metadataTableConfig.getAppProfileId() == null + || metadataTableConfig.getAppProfileId().get().isEmpty()) { + metadataTableConfig = + metadataTableConfig.withAppProfileId(getBigtableConfig().getAppProfileId()); + } + + Timestamp startTime = getStartTime(); + if (startTime == null) { + startTime = Timestamp.of(Date.from(Instant.now())); + } + Duration heartbeatDuration = getHeartbeatDuration(); + if (heartbeatDuration == null) { + heartbeatDuration = Duration.standardSeconds(1); + } + String changeStreamName = getChangeStreamName(); + if (changeStreamName == null || changeStreamName.isEmpty()) { + changeStreamName = UniqueIdGenerator.generateRowKeyPrefix(); + } + + ActionFactory actionFactory = new ActionFactory(); + DaoFactory daoFactory = + new DaoFactory(getBigtableConfig(), metadataTableConfig, changeStreamName); + ChangeStreamMetrics metrics = new ChangeStreamMetrics(); + InitializeDoFn initializeDoFn = + new InitializeDoFn(daoFactory, metadataTableConfig.getAppProfileId().get(), startTime); + DetectNewPartitionsDoFn detectNewPartitionsDoFn = + new DetectNewPartitionsDoFn(getEndTime(), actionFactory, daoFactory, metrics); + ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn = + new ReadChangeStreamPartitionDoFn(heartbeatDuration, daoFactory, actionFactory, metrics); + + return input + .apply(Impulse.create()) + .apply("Initialize", ParDo.of(initializeDoFn)) + .apply("DetectNewPartition", ParDo.of(detectNewPartitionsDoFn)) + .apply("ReadChangeStreamPartition", ParDo.of(readChangeStreamPartitionDoFn)); + } + + @AutoValue.Builder + abstract static class Builder { + + abstract ReadChangeStream.Builder setBigtableConfig(BigtableConfig bigtableConfig); + + abstract ReadChangeStream.Builder setMetadataTableBigtableConfig( + BigtableConfig bigtableConfig); + + abstract ReadChangeStream.Builder setStartTime(Timestamp startTime); + + abstract ReadChangeStream.Builder setEndTime(Timestamp endTime); + + abstract ReadChangeStream.Builder setHeartbeatDuration(Duration interval); + + abstract ReadChangeStream.Builder setChangeStreamName(String changeStreamName); + + abstract ReadChangeStream build(); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java new file mode 100644 index 000000000000..8f307f526f0a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ByteStringRangeHelper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams; + +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import com.google.protobuf.TextFormat; + +/** Helper functions to evaluate the completeness of collection of ByteStringRanges. */ +public class ByteStringRangeHelper { + + /** + * Returns formatted string of a partition for debugging. + * + * @param partition partition to format. + * @return String representation of partition. + */ + public static String formatByteStringRange(ByteStringRange partition) { + return "['" + + TextFormat.escapeBytes(partition.getStart()) + + "','" + + TextFormat.escapeBytes(partition.getEnd()) + + "')"; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java new file mode 100644 index 000000000000..2aaa6631aced --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMetrics.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams; + +import java.io.Serializable; + +/** Class to aggregate metrics related functionality. */ +public class ChangeStreamMetrics implements Serializable { + private static final long serialVersionUID = 7298901109362981596L; +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java new file mode 100644 index 000000000000..cc4cf467d292 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/ChangeStreamMutation.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams; + +/** + * This is a placeholder class that will be replaced by updated Cloud Bigtable java client. The java + * client is work in progress and will be checked in and updated soon. + */ +public class ChangeStreamMutation {} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/TimestampConverter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/TimestampConverter.java new file mode 100644 index 000000000000..39a90ab05d6a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/TimestampConverter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams; + +import org.joda.time.Instant; + +/** Convert between different Timestamp and Instant classes. */ +public class TimestampConverter { + public static Instant toInstant(com.google.cloud.Timestamp time) { + return Instant.ofEpochMilli(time.toDate().toInstant().toEpochMilli()); + } + + public static Instant toInstant(com.google.protobuf.Timestamp time) { + long epochMilli = + java.time.Instant.ofEpochSecond(time.getSeconds(), time.getNanos()).toEpochMilli(); + return Instant.ofEpochMilli(epochMilli); + } + + public static com.google.cloud.Timestamp toCloudTimestamp(Instant instant) { + return com.google.cloud.Timestamp.of(instant.toDate()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/UniqueIdGenerator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/UniqueIdGenerator.java new file mode 100644 index 000000000000..dc5dac92cb6f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/UniqueIdGenerator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams; + +import java.security.SecureRandom; +import java.util.Base64; + +/** Generate unique IDs that can be used to differentiate different jobs and partitions. */ +public class UniqueIdGenerator { + private static final SecureRandom secureRandom = new SecureRandom(); + + public static String getNextId() { + byte[] bytes = new byte[256]; + secureRandom.nextBytes(bytes); + return Base64.getEncoder().encodeToString(bytes); + } + + public static String generateRowKeyPrefix() { + byte[] bytes = new byte[50]; + secureRandom.nextBytes(bytes); + return Base64.getEncoder().encodeToString(bytes); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java new file mode 100644 index 000000000000..18fbc5fe404f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ActionFactory.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action; + +import com.google.cloud.Timestamp; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.joda.time.Duration; + +/** + * Factory class for creating instances that will handle different functions of DoFns. The instances + * created are all singletons. + */ +// Allows for transient fields to be initialized later +@SuppressWarnings("initialization.field.uninitialized") +public class ActionFactory implements Serializable { + private static final long serialVersionUID = -6780082495458582986L; + + private transient ChangeStreamAction changeStreamAction; + private transient DetectNewPartitionsAction detectNewPartitionsAction; + private transient GenerateInitialPartitionsAction generateInitialPartitionsAction; + private transient ReadChangeStreamPartitionAction readChangeStreamPartitionAction; + + /** + * Creates and returns a singleton instance of an action class for processing individual + * ChangeStreamMutation in {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn}. + * + *

This method is thread safe. + * + * @return singleton instance of the {@link ChangeStreamAction} + */ + public synchronized ChangeStreamAction changeStreamAction(ChangeStreamMetrics metrics) { + if (changeStreamAction == null) { + changeStreamAction = new ChangeStreamAction(metrics); + } + return changeStreamAction; + } + + /** + * Creates and returns a singleton instance of an action class for processing {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn}. + * + *

This method is thread safe. + * + * @return singleton instance of the {@link DetectNewPartitionsAction} + */ + public synchronized DetectNewPartitionsAction detectNewPartitionsAction( + ChangeStreamMetrics metrics, + MetadataTableDao metadataTableDao, + @Nullable Timestamp endTime, + GenerateInitialPartitionsAction generateInitialPartitionsAction) { + if (detectNewPartitionsAction == null) { + detectNewPartitionsAction = + new DetectNewPartitionsAction( + metrics, metadataTableDao, endTime, generateInitialPartitionsAction); + } + return detectNewPartitionsAction; + } + + /** + * Creates and returns a singleton instance of an action class for processing {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn} + * + *

This method is thread safe. + * + * @return singleton instance of the {@link GenerateInitialPartitionsAction} + */ + public synchronized GenerateInitialPartitionsAction generateInitialPartitionsAction( + ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable Timestamp endTime) { + if (generateInitialPartitionsAction == null) { + generateInitialPartitionsAction = + new GenerateInitialPartitionsAction(metrics, changeStreamDao, endTime); + } + return generateInitialPartitionsAction; + } + + /** + * Creates and returns a singleton instance of an action class for processing {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn}. + * + *

This method is thread safe. + * + * @return singleton instance of the {@link ReadChangeStreamPartitionAction} + */ + public synchronized ReadChangeStreamPartitionAction readChangeStreamPartitionAction( + MetadataTableDao metadataTableDao, + ChangeStreamDao changeStreamDao, + ChangeStreamMetrics metrics, + ChangeStreamAction changeStreamAction, + Duration heartbeatDurationSeconds) { + if (readChangeStreamPartitionAction == null) { + readChangeStreamPartitionAction = + new ReadChangeStreamPartitionAction( + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDurationSeconds); + } + return readChangeStreamPartitionAction; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java new file mode 100644 index 000000000000..0f1d8ee30cfe --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ChangeStreamAction.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action; + +import com.google.protobuf.ByteString; +import java.util.Optional; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** This class is responsible for processing individual ChangeStreamRecord. */ +@SuppressWarnings({"UnusedVariable", "UnusedMethod"}) +public class ChangeStreamAction { + private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamAction.class); + + private final ChangeStreamMetrics metrics; + + /** + * Constructs ChangeStreamAction to process individual ChangeStreamRecord. + * + * @param metrics record beam metrics. + */ + public ChangeStreamAction(ChangeStreamMetrics metrics) { + this.metrics = metrics; + } + + /** + * This class processes ReadChangeStreamResponse from bigtable server. There are 3 possible + * response types, Heartbeat, ChangeStreamMutation, CloseStream. + * + *

    + *
  • Heartbeat happens periodically based on the initial configuration set at the start of the + * beam pipeline. Heartbeat can advance the watermark forward and includes a continuation + * token that provides a point to resume from after a checkpoint. + *
  • ChangeStreamMutation includes the actual mutation that took place in the Bigtable. + * ChangeStreamMutation also includes watermark and continuation token. All + * ChangeStreamMutation are emitted to the outputReceiver with the timestamp of 0 (instead + * of the commit timestamp). Setting the timestamp to 0 discourages the use of windowing on + * this connector. All ChangeStreamMutations will be late data when windowing. This design + * decision prefers availability over consistency in the event that partitions are streamed + * slowly (due to an outages or other unavailabilities) the commit timestamp which drives + * the watermark may lag behind preventing windows from closing. + *
  • CloseStream indicates that the stream has come to an end. The CloseStream is not + * processed but stored in the RestrictionTracker to be processed later. This ensures that + * we successfully commit all pending ChangeStreamMutations. + *
+ * + * CloseStream is the only response that type will initiate a resume. Other response type will + * simply process the response and return empty. Returning empty signals to caller that we have + * processed the response, and it does not require any additional action. + * + *

There are 2 cases that cause this function to return a non-empty ProcessContinuation. + * + *

    + *
  1. We fail to claim a RestrictionTracker. This can happen for a runner-initiated checkpoint. + * When the runner initiates a checkpoint, we will stop and checkpoint pending + * ChangeStreamMutations and resume from the previous RestrictionTracker. + *
  2. The response is a CloseStream. RestrictionTracker claims the CloseStream. We don't do any + * additional processing of the response. We return resume to signal to the caller that to + * checkpoint all pending ChangeStreamMutations. We expect the caller to check the + * RestrictionTracker includes a CloseStream and process it to close the stream. + *
+ * + * @param partitionRecord the stream partition that generated the response + * @param record the change stream record to be processed + * @param tracker restrictionTracker that we use to claim next block and also to store CloseStream + * @param receiver to output DataChange + * @param watermarkEstimator manually progress watermark when processing responses with watermark + * @return Optional.of(ProcessContinuation) if the run should be stopped or resumed, otherwise + * Optional.empty() to do nothing. + */ + public Optional run( + PartitionRecord partitionRecord, + Object record, // TODO: Update once bigtable client includes + // https://github.com/googleapis/java-bigtable/pull/1569 + RestrictionTracker tracker, + DoFn.OutputReceiver> receiver, + ManualWatermarkEstimator watermarkEstimator, + boolean shouldDebug) { + return Optional.empty(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java new file mode 100644 index 000000000000..990d9b2fd01f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/DetectNewPartitionsAction.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action; + +import com.google.cloud.Timestamp; +import com.google.protobuf.InvalidProtocolBufferException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class processes {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn}. + */ +// checkstyle bug is causing an issue with '@throws InvalidProtocolBufferException' +// Allows for transient fields to be initialized later +@SuppressWarnings({ + "checkstyle:JavadocMethod", + "initialization.fields.uninitialized", + "UnusedVariable", + "UnusedMethod" +}) +public class DetectNewPartitionsAction { + private static final Logger LOG = LoggerFactory.getLogger(DetectNewPartitionsAction.class); + + private static final Duration DEBUG_WATERMARK_DELAY = Duration.standardMinutes(5); + + private final ChangeStreamMetrics metrics; + private final MetadataTableDao metadataTableDao; + @Nullable private final com.google.cloud.Timestamp endTime; + private final GenerateInitialPartitionsAction generateInitialPartitionsAction; + + public DetectNewPartitionsAction( + ChangeStreamMetrics metrics, + MetadataTableDao metadataTableDao, + @Nullable Timestamp endTime, + GenerateInitialPartitionsAction generateInitialPartitionsAction) { + this.metrics = metrics; + this.metadataTableDao = metadataTableDao; + this.endTime = endTime; + this.generateInitialPartitionsAction = generateInitialPartitionsAction; + } + + /** + * Perform the necessary steps to manage initial set of partitions and new partitions. Currently, + * we set to process new partitions every second. + * + *
    + *
  1. Look up the initial list of partitions to stream if it's the very first run. + *
  2. On rest of the runs, try advancing watermark if needed. + *
  3. Update the metadata table with info about this DoFn. + *
  4. Check if this pipeline has reached the end time. Terminate if it has. + *
  5. Process new partitions and output them. + *
  6. Register callback to clean up processed partitions after bundle has been finalized. + *
+ * + * @param tracker offset tracker that simply increment by 1 every single run + * @param receiver output new partitions + * @param watermarkEstimator update watermark that is a representation of the low watermark of the + * entire beam pipeline + * @param bundleFinalizer perform after bundle output actions to clean up metadata table + * @return {@link ProcessContinuation#resume()} with 1-second delay if the stream continues, + * otherwise {@link ProcessContinuation#stop()} + * @throws InvalidProtocolBufferException if failing to process new partitions + */ + @VisibleForTesting + public ProcessContinuation run( + RestrictionTracker tracker, + DoFn.OutputReceiver receiver, + ManualWatermarkEstimator watermarkEstimator, + DoFn.BundleFinalizer bundleFinalizer, + Timestamp startTime) + throws Exception { + + // Terminate if endTime <= watermark that means all partitions have read up to or beyond + // watermark. We no longer need to manage splits and merges, we can terminate. + if (endTime != null + && endTime.compareTo( + TimestampConverter.toCloudTimestamp(watermarkEstimator.currentWatermark())) + <= 0) { + tracker.tryClaim(tracker.currentRestriction().getTo()); + return ProcessContinuation.stop(); + } + + if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) { + return ProcessContinuation.stop(); + } + + return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java new file mode 100644 index 000000000000..be61c9a9e9b6 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action; + +import com.google.cloud.Timestamp; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.joda.time.Instant; + +/** + * Class to generate first set of outputs for {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn}. + */ +@SuppressWarnings({"UnusedVariable", "UnusedMethod"}) +public class GenerateInitialPartitionsAction { + private final ChangeStreamMetrics metrics; + private final ChangeStreamDao changeStreamDao; + @Nullable private final Timestamp endTime; + + public GenerateInitialPartitionsAction( + ChangeStreamMetrics metrics, ChangeStreamDao changeStreamDao, @Nullable Timestamp endTime) { + this.metrics = metrics; + this.changeStreamDao = changeStreamDao; + this.endTime = endTime; + } + + /** + * The very first step of the pipeline when there are no partitions being streamed yet. We want to + * get an initial list of partitions to stream and output them. + * + * @return true if this pipeline should continue, otherwise false. + */ + public boolean run( + OutputReceiver receiver, + ManualWatermarkEstimator watermarkEstimator, + Timestamp startTime) { + return true; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java new file mode 100644 index 000000000000..ed7ffaa157b8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionAction.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is part of {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF. + */ +@SuppressWarnings({"UnusedVariable", "UnusedMethod"}) +public class ReadChangeStreamPartitionAction { + private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionAction.class); + + private final MetadataTableDao metadataTableDao; + private final ChangeStreamDao changeStreamDao; + private final ChangeStreamMetrics metrics; + private final ChangeStreamAction changeStreamAction; + private final Duration heartbeatDurationSeconds; + + public ReadChangeStreamPartitionAction( + MetadataTableDao metadataTableDao, + ChangeStreamDao changeStreamDao, + ChangeStreamMetrics metrics, + ChangeStreamAction changeStreamAction, + Duration heartbeatDurationSeconds) { + this.metadataTableDao = metadataTableDao; + this.changeStreamDao = changeStreamDao; + this.metrics = metrics; + this.changeStreamAction = changeStreamAction; + this.heartbeatDurationSeconds = heartbeatDurationSeconds; + } + + /** + * Streams changes from a specific partition. This function is responsible to maintaining the + * lifecycle of streaming the partition. We delegate to {@link ChangeStreamAction} to process + * individual response from the change stream. + * + *

Before we send a request to Cloud Bigtable to stream the partition, we need to perform a few + * things. + * + *

    + *
  1. Lock the partition. Due to the design of the change streams connector, it is possible + * that multiple DoFn are started trying to stream the same partition. However, only 1 DoFn + * should be streaming a partition. So we solve this by using the metadata table as a + * distributed lock. We attempt to lock the partition for this DoFn's UUID. If it is + * successful, it means this DoFn is the only one that can stream the partition and + * continue. Otherwise, terminate this DoFn because another DoFn is streaming this partition + * already. + *
  2. Process CloseStream if it exists. In order to solve a possible inconsistent state + * problem, we do not process CloseStream after receiving it. We claim the CloseStream in + * the RestrictionTracker so it persists after a checkpoint. We checkpoint to flush all the + * DataChanges. Then on resume, we process the CloseStream. There are only 2 expected Status + * for CloseStream: OK and Out of Range. + *
      + *
    1. OK status is returned when the predetermined endTime has been reached. In this + * case, we update the watermark and update the metadata table. {@link + * DetectNewPartitionsDoFn} aggregates the watermark from all the streams to ensure + * all the streams have reached beyond endTime so it can also terminate and end the + * beam job. + *
    2. Out of Range is returned when the partition has either been split into more + * partitions or merged into a larger partition. In this case, we write to the + * metadata table the new partitions' information so that {@link + * DetectNewPartitionsDoFn} can read and output those new partitions to be streamed. + * We also need to ensure we clean up this partition's metadata to release the lock. + *
    + *
  3. Update the metadata table with the watermark and additional debugging info. + *
  4. Stream the partition. + *
+ * + * @param partitionRecord partition information used to identify this stream + * @param tracker restriction tracker of {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn} + * @param receiver output receiver for {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn} + * @param watermarkEstimator watermark estimator {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn} + * @return {@link ProcessContinuation#stop} if a checkpoint is required or the stream has + * completed. Or {@link ProcessContinuation#resume} if a checkpoint is required. + * @throws IOException when stream fails. + */ + public ProcessContinuation run( + PartitionRecord partitionRecord, + RestrictionTracker tracker, + DoFn.OutputReceiver> receiver, + ManualWatermarkEstimator watermarkEstimator) + throws IOException { + + return ProcessContinuation.stop(); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/package-info.java new file mode 100644 index 000000000000..3856debeeb10 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Business logic to process change stream for Google Cloud Bigtable. */ +@Experimental +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.action; + +import org.apache.beam.sdk.annotations.Experimental; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java new file mode 100644 index 000000000000..98c6ae2899ed --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/ChangeStreamDao.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Data access object to list and read stream partitions of a table. */ +@SuppressWarnings({"UnusedVariable", "UnusedMethod"}) +public class ChangeStreamDao { + private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamDao.class); + + private final String tableId; + + public ChangeStreamDao(String tableId) { + this.tableId = tableId; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java new file mode 100644 index 000000000000..6df9fa7a0a7a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/DaoFactory.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableConfig; + +// Allows transient fields to be intialized later +@SuppressWarnings("initialization.fields.uninitialized") +public class DaoFactory implements Serializable { + private static final long serialVersionUID = 3732208768248394205L; + + private transient ChangeStreamDao changeStreamDao; + private transient MetadataTableAdminDao metadataTableAdminDao; + private transient MetadataTableDao metadataTableDao; + + private final BigtableConfig changeStreamConfig; + private final BigtableConfig metadataTableConfig; + private final String changeStreamName; + + public DaoFactory( + BigtableConfig changeStreamConfig, + BigtableConfig metadataTableConfig, + String changeStreamName) { + this.changeStreamConfig = changeStreamConfig; + this.metadataTableConfig = metadataTableConfig; + this.changeStreamName = changeStreamName; + } + + public String getChangeStreamName() { + return changeStreamName; + } + + public String getStreamTableDebugString() { + return String.format( + "Stream Table:\n" + + "Project ID: %s\n" + + "Instance ID: %s\n" + + "Table Id: %s\n" + + "App Profile Id: %s", + this.changeStreamConfig.getProjectId(), + this.changeStreamConfig.getInstanceId(), + this.changeStreamConfig.getTableId(), + this.changeStreamConfig.getAppProfileId()); + } + + public String getMetadataTableDebugString() { + return String.format( + "Metadata Table:\n" + + "Project ID: %s\n" + + "Instance ID: %s\n" + + "Table Id: %s\n" + + "App Profile Id: %s", + this.metadataTableConfig.getProjectId(), + this.metadataTableConfig.getInstanceId(), + this.metadataTableConfig.getTableId(), + this.metadataTableConfig.getAppProfileId()); + } + + public synchronized ChangeStreamDao getChangeStreamDao() throws IOException { + if (changeStreamDao == null) { + checkArgumentNotNull(changeStreamConfig.getProjectId()); + checkArgumentNotNull(changeStreamConfig.getInstanceId()); + String tableId = checkArgumentNotNull(changeStreamConfig.getTableId()).get(); + checkArgumentNotNull(changeStreamConfig.getAppProfileId()); + changeStreamDao = new ChangeStreamDao(tableId); + } + return changeStreamDao; + } + + public synchronized MetadataTableDao getMetadataTableDao() throws IOException { + if (metadataTableDao == null) { + checkArgumentNotNull(metadataTableConfig.getProjectId()); + checkArgumentNotNull(metadataTableConfig.getInstanceId()); + checkArgumentNotNull(metadataTableConfig.getTableId()); + checkArgumentNotNull(metadataTableConfig.getAppProfileId()); + metadataTableDao = + new MetadataTableDao( + getMetadataTableAdminDao().getTableId(), + getMetadataTableAdminDao().getChangeStreamNamePrefix()); + } + return metadataTableDao; + } + + public synchronized MetadataTableAdminDao getMetadataTableAdminDao() throws IOException { + if (metadataTableAdminDao == null) { + checkArgumentNotNull(metadataTableConfig.getProjectId()); + checkArgumentNotNull(metadataTableConfig.getInstanceId()); + String tableId = checkArgumentNotNull(metadataTableConfig.getTableId()).get(); + checkArgumentNotNull(metadataTableConfig.getAppProfileId()); + metadataTableAdminDao = new MetadataTableAdminDao(changeStreamName, tableId); + } + return metadataTableAdminDao; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java new file mode 100644 index 000000000000..ba3bceb31dd4 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableAdminDao.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao; + +import com.google.protobuf.ByteString; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; + +/** + * Data access object for creating and dropping the metadata table. + * + *

The metadata table will be used to keep the state of the entire Dataflow job as well as + * splitting and merging partitions. + * + *

Each Dataflow job will create its own metadata table. + */ +@SuppressWarnings({"UnusedVariable", "UnusedMethod"}) +public class MetadataTableAdminDao { + public static final String DEFAULT_METADATA_TABLE_NAME = "__change_stream_md_table"; + public static final String CF_INITIAL_TOKEN = "initial_continuation_token"; + public static final String CF_PARENT_PARTITIONS = "parent_partitions"; + public static final String CF_PARENT_LOW_WATERMARKS = "parent_low_watermarks"; + public static final String CF_WATERMARK = "watermark"; + public static final String CF_CONTINUATION_TOKEN = "continuation_token"; + public static final String CF_LOCK = "lock"; + public static final String CF_MISSING_PARTITIONS = "missing_partitions"; + public static final String QUALIFIER_DEFAULT = "latest"; + public static final ImmutableList COLUMN_FAMILIES = + ImmutableList.of( + CF_INITIAL_TOKEN, + CF_PARENT_PARTITIONS, + CF_PARENT_LOW_WATERMARKS, + CF_WATERMARK, + CF_CONTINUATION_TOKEN, + CF_LOCK, + CF_MISSING_PARTITIONS); + public static final ByteString NEW_PARTITION_PREFIX = ByteString.copyFromUtf8("NewPartition#"); + public static final ByteString STREAM_PARTITION_PREFIX = + ByteString.copyFromUtf8("StreamPartition#"); + public static final ByteString DETECT_NEW_PARTITION_SUFFIX = + ByteString.copyFromUtf8("DetectNewPartition"); + + private final String tableId; + private final ByteString changeStreamNamePrefix; + + public MetadataTableAdminDao(String changeStreamName, String tableId) { + this.tableId = tableId; + this.changeStreamNamePrefix = ByteString.copyFromUtf8(changeStreamName + "#"); + } + + /** + * Return the prefix used to identify the rows belonging to this job. + * + * @return the prefix used to identify the rows belonging to this job + */ + public ByteString getChangeStreamNamePrefix() { + return changeStreamNamePrefix; + } + + /** + * Return the metadata table name. + * + * @return the metadata table name + */ + public String getTableId() { + return tableId; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java new file mode 100644 index 000000000000..b9f65f7a4a20 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/MetadataTableDao.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao; + +import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.DETECT_NEW_PARTITION_SUFFIX; +import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.NEW_PARTITION_PREFIX; +import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableAdminDao.STREAM_PARTITION_PREFIX; + +import com.google.protobuf.ByteString; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data access object for managing the state of the metadata Bigtable table. + * + *

Metadata table is shared across many beam jobs. Each beam job uses a specific prefix to + * identify itself which is used as the row prefix. + */ +@SuppressWarnings({"UnusedVariable", "UnusedMethod"}) +public class MetadataTableDao { + private static final Logger LOG = LoggerFactory.getLogger(MetadataTableDao.class); + + private final String tableId; + private final ByteString changeStreamNamePrefix; + + public MetadataTableDao(String tableId, ByteString changeStreamNamePrefix) { + this.tableId = tableId; + this.changeStreamNamePrefix = changeStreamNamePrefix; + } + + /** @return the prefix that is prepended to every row belonging to this beam job. */ + public ByteString getChangeStreamNamePrefix() { + return changeStreamNamePrefix; + } + + /** + * Return new partition row key prefix concatenated with change stream name. + * + * @return new partition row key prefix concatenated with change stream name. + */ + private ByteString getFullNewPartitionPrefix() { + return changeStreamNamePrefix.concat(NEW_PARTITION_PREFIX); + } + + /** + * Return stream partition row key prefix concatenated with change stream name. + * + * @return stream partition row key prefix concatenated with change stream name. + */ + private ByteString getFullStreamPartitionPrefix() { + return changeStreamNamePrefix.concat(STREAM_PARTITION_PREFIX); + } + + /** + * Return detect new partition row key concatenated with change stream name. + * + * @return detect new partition row key concatenated with change stream name. + */ + private ByteString getFullDetectNewPartition() { + return changeStreamNamePrefix.concat(DETECT_NEW_PARTITION_SUFFIX); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/package-info.java new file mode 100644 index 000000000000..bb130f56901a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dao/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Data access object for change stream for Google Cloud Bigtable. */ +@Experimental +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao; + +import org.apache.beam.sdk.annotations.Experimental; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java new file mode 100644 index 000000000000..6e8f21cd00b8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/DetectNewPartitionsDoFn.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; + +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.DetectNewPartitionsAction; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.GenerateInitialPartitionsAction; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.joda.time.Instant; + +// Allows for detectNewPartitionsAction setup +@SuppressWarnings("initialization.fields.uninitialized") +@UnboundedPerElement +public class DetectNewPartitionsDoFn extends DoFn { + private static final long serialVersionUID = 8052524268978107367L; + @Nullable private final com.google.cloud.Timestamp endTime; + + private final DaoFactory daoFactory; + private final ChangeStreamMetrics metrics; + private final ActionFactory actionFactory; + private DetectNewPartitionsAction detectNewPartitionsAction; + + public DetectNewPartitionsDoFn( + @Nullable com.google.cloud.Timestamp endTime, + ActionFactory actionFactory, + DaoFactory daoFactory, + ChangeStreamMetrics metrics) { + this.actionFactory = actionFactory; + this.daoFactory = daoFactory; + this.endTime = endTime; + this.metrics = metrics; + } + + @GetInitialWatermarkEstimatorState + public Instant getInitialWatermarkEstimatorState(@Element com.google.cloud.Timestamp startTime) { + return TimestampConverter.toInstant(startTime); + } + + @NewWatermarkEstimator + public ManualWatermarkEstimator newWatermarkEstimator( + @WatermarkEstimatorState Instant watermarkEstimatorState) { + return new Manual(watermarkEstimatorState); + } + + @GetInitialRestriction + public OffsetRange initialRestriction() { + return new OffsetRange(0, Long.MAX_VALUE); + } + + @NewTracker + public OffsetRangeTracker restrictionTracker(@Restriction OffsetRange restriction) { + return new OffsetRangeTracker(restriction); + } + + // We never want to scale based on this DoFn, so we return a constant backlog estimate of zero. + @GetSize + public double getSize() { + return 0d; + } + + @Setup + public void setup() throws IOException { + final MetadataTableDao metadataTableDao = daoFactory.getMetadataTableDao(); + final ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao(); + GenerateInitialPartitionsAction generateInitialPartitionsAction = + actionFactory.generateInitialPartitionsAction(metrics, changeStreamDao, endTime); + detectNewPartitionsAction = + actionFactory.detectNewPartitionsAction( + metrics, metadataTableDao, endTime, generateInitialPartitionsAction); + } + + @ProcessElement + public ProcessContinuation processElement( + @Element com.google.cloud.Timestamp startTime, + RestrictionTracker tracker, + OutputReceiver receiver, + ManualWatermarkEstimator watermarkEstimator, + BundleFinalizer bundleFinalizer) + throws Exception { + return detectNewPartitionsAction.run( + tracker, receiver, watermarkEstimator, bundleFinalizer, startTime); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java new file mode 100644 index 000000000000..c80fb4a4e10a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/InitializeDoFn.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A DoFn responsible to initialize the metadata table and prepare it for managing the state of the + * pipeline. + */ +@SuppressWarnings("UnusedVariable") +public class InitializeDoFn extends DoFn + implements Serializable { + private static final long serialVersionUID = 1868189906451252363L; + + private static final Logger LOG = LoggerFactory.getLogger(InitializeDoFn.class); + private final DaoFactory daoFactory; + private final String metadataTableAppProfileId; + private com.google.cloud.Timestamp startTime; + + public InitializeDoFn( + DaoFactory daoFactory, + String metadataTableAppProfileId, + com.google.cloud.Timestamp startTime) { + this.daoFactory = daoFactory; + this.metadataTableAppProfileId = metadataTableAppProfileId; + this.startTime = startTime; + } + + @ProcessElement + public void processElement(OutputReceiver receiver) + throws IOException { + LOG.info(daoFactory.getStreamTableDebugString()); + LOG.info(daoFactory.getMetadataTableDebugString()); + LOG.info("ChangeStreamName: " + daoFactory.getChangeStreamName()); + + receiver.output(startTime); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java new file mode 100644 index 000000000000..62ffd39cc1e8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; + +import com.google.protobuf.ByteString; +import java.io.IOException; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMetrics; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.ChangeStreamMutation; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.TimestampConverter; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ActionFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ChangeStreamAction; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.action.ReadChangeStreamPartitionAction; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.ChangeStreamDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.DaoFactory; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.dao.MetadataTableDao; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.ReadChangeStreamPartitionProgressTracker; +import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Allows for readChangeStreamPartitionAction setup +@SuppressWarnings({"initialization.fields.uninitialized", "UnusedVariable"}) +@UnboundedPerElement +public class ReadChangeStreamPartitionDoFn + extends DoFn> { + private static final long serialVersionUID = 4418739381635104479L; + + private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class); + + private final Duration heartbeatDurationSeconds; + private final DaoFactory daoFactory; + private final ChangeStreamMetrics metrics; + private final ActionFactory actionFactory; + + private ReadChangeStreamPartitionAction readChangeStreamPartitionAction; + + public ReadChangeStreamPartitionDoFn( + Duration heartbeatDurationSeconds, + DaoFactory daoFactory, + ActionFactory actionFactory, + ChangeStreamMetrics metrics) { + this.heartbeatDurationSeconds = heartbeatDurationSeconds; + this.daoFactory = daoFactory; + this.metrics = metrics; + this.actionFactory = actionFactory; + } + + @GetInitialWatermarkEstimatorState + public Instant getInitialWatermarkEstimatorState(@Element PartitionRecord partitionRecord) { + return TimestampConverter.toInstant(partitionRecord.getParentLowWatermark()); + } + + @NewWatermarkEstimator + public ManualWatermarkEstimator newWatermarkEstimator( + @WatermarkEstimatorState Instant watermarkEstimatorState) { + return new Manual(watermarkEstimatorState); + } + + @GetInitialRestriction + public StreamProgress initialRestriction() { + return new StreamProgress(); + } + + @NewTracker + public ReadChangeStreamPartitionProgressTracker restrictionTracker( + @Restriction StreamProgress restriction) { + return new ReadChangeStreamPartitionProgressTracker(restriction); + } + + @Setup + public void setup() throws IOException { + MetadataTableDao metadataTableDao = daoFactory.getMetadataTableDao(); + ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao(); + ChangeStreamAction changeStreamAction = actionFactory.changeStreamAction(this.metrics); + readChangeStreamPartitionAction = + actionFactory.readChangeStreamPartitionAction( + metadataTableDao, + changeStreamDao, + metrics, + changeStreamAction, + heartbeatDurationSeconds); + } + + @ProcessElement + public ProcessContinuation processElement( + @Element PartitionRecord partitionRecord, + RestrictionTracker tracker, + OutputReceiver> receiver, + ManualWatermarkEstimator watermarkEstimator) + throws InterruptedException, IOException { + return readChangeStreamPartitionAction.run( + partitionRecord, tracker, receiver, watermarkEstimator); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/package-info.java new file mode 100644 index 000000000000..273d5b8562c8 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/dofn/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** DoFn and SDF definitions to process Google Cloud Bigtable Change Streams. */ +@Experimental +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn; + +import org.apache.beam.sdk.annotations.Experimental; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java new file mode 100644 index 000000000000..d6388c1a56df --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/encoder/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Encoders for writing and reading from Metadata Table for Google Cloud Bigtable Change Streams. + */ +@Experimental +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.encoder; + +import org.apache.beam.sdk.annotations.Experimental; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java new file mode 100644 index 000000000000..7956a919bf22 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/PartitionRecord.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.model; + +import static org.apache.beam.sdk.io.gcp.bigtable.changestreams.ByteStringRangeHelper.formatByteStringRange; + +import com.google.cloud.Timestamp; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import java.io.Serializable; +import java.util.Objects; +import javax.annotation.Nullable; + +/** + * Output result of {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.DetectNewPartitionsDoFn} containing + * information required to stream a partition. + */ +public class PartitionRecord implements Serializable { + private static final long serialVersionUID = -7613861834142734474L; + + private ByteStringRange partition; + @Nullable private Timestamp startTime; + @Nullable private Timestamp endTime; + private String uuid; + private Timestamp parentLowWatermark; + + public PartitionRecord( + ByteStringRange partition, + Timestamp startTime, + String uuid, + Timestamp parentLowWatermark, + @Nullable Timestamp endTime) { + this.partition = partition; + this.startTime = startTime; + this.uuid = uuid; + this.parentLowWatermark = parentLowWatermark; + this.endTime = endTime; + } + + @Nullable + public Timestamp getStartTime() { + return startTime; + } + + public void setStartTime(@Nullable Timestamp startTime) { + this.startTime = startTime; + } + + @Nullable + public Timestamp getEndTime() { + return endTime; + } + + public void setEndTime(@Nullable Timestamp endTime) { + this.endTime = endTime; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + public Timestamp getParentLowWatermark() { + return parentLowWatermark; + } + + public void setParentLowWatermark(Timestamp parentLowWatermark) { + this.parentLowWatermark = parentLowWatermark; + } + + public ByteStringRange getPartition() { + return partition; + } + + public void setPartition(ByteStringRange partition) { + this.partition = partition; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PartitionRecord)) { + return false; + } + PartitionRecord that = (PartitionRecord) o; + return getPartition().equals(that.getPartition()) + && Objects.equals(getStartTime(), that.getStartTime()) + && Objects.equals(getEndTime(), that.getEndTime()) + && getUuid().equals(that.getUuid()) + && Objects.equals(getParentLowWatermark(), that.getParentLowWatermark()); + } + + @Override + public int hashCode() { + return Objects.hash( + getPartition(), getStartTime(), getEndTime(), getUuid(), getParentLowWatermark()); + } + + @Override + public String toString() { + return "PartitionRecord{" + + "partition=" + + formatByteStringRange(partition) + + ", startTime=" + + startTime + + ", endTime=" + + endTime + + ", uuid='" + + uuid + + '\'' + + ", parentLowWatermark=" + + parentLowWatermark + + '}'; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/package-info.java new file mode 100644 index 000000000000..a0a0b40b72b6 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/model/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** User models for the Google Cloud Bigtable change stream API. */ +@Experimental +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.model; + +import org.apache.beam.sdk.annotations.Experimental; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/package-info.java new file mode 100644 index 000000000000..afd05cc6c6f9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Change stream for Google Cloud Bigtable. */ +@Experimental +package org.apache.beam.sdk.io.gcp.bigtable.changestreams; + +import org.apache.beam.sdk.annotations.Experimental; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTracker.java new file mode 100644 index 000000000000..f6158898754a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/ReadChangeStreamPartitionProgressTracker.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction; + +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * RestrictionTracker used by {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.dofn.ReadChangeStreamPartitionDoFn} to keep + * track of the progress of the stream and to split the restriction for runner initiated + * checkpoints. + * + *

StreamProgress usually is a continuation token which represents a position in time of the + * stream of a specific partition. The token is used to resume streaming a partition. + * + *

On ChangeStreamMutation or Heartbeat response, the tracker will try to claim the continuation + * token from the response. The tracker stores that continuation token (wrapped in a StreamProgress) + * so that if the DoFn checkpoints or restarts, the token can be used to resume the stream. + * + *

The tracker will fail to claim a token if runner has initiated a checkpoint (by calling + * trySplit(0)). This signals to the DoFn to stop. + * + *

When runner initiates a checkpoint, the tracker returns null for the primary split and the + * residual split includes the entire token. The next time the DoFn try to claim a new + * StreamProgress, it will fail, and stop. The residual will be scheduled on a new DoFn to resume + * the work from the previous StreamProgress + */ +public class ReadChangeStreamPartitionProgressTracker + extends RestrictionTracker { + StreamProgress streamProgress; + boolean shouldStop = false; + + /** + * Constructs a restriction tracker with the streamProgress. + * + * @param streamProgress represents a position in time of the stream. + */ + public ReadChangeStreamPartitionProgressTracker(StreamProgress streamProgress) { + this.streamProgress = streamProgress; + } + + /** + * This restriction tracker is for unbounded streams. + * + * @return {@link + * org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.IsBounded.UNBOUNDED} + */ + @Override + public RestrictionTracker.IsBounded isBounded() { + return IsBounded.UNBOUNDED; + } + + /** + * This is to signal to the runner that this restriction has completed. Throw an exception if + * there is more work to be done, and it should not stop. A restriction tracker stops after a + * runner initiated checkpoint or the streamProgress contains a closeStream response and not a + * token. + * + * @throws java.lang.IllegalStateException when the restriction is not done and there is more work + * to be done. + */ + @Override + public void checkDone() throws java.lang.IllegalStateException { + boolean done = shouldStop; + Preconditions.checkState(done, "There's more work to be done"); + } + + /** + * Claims a new StreamProgress to be processed. StreamProgress can either be a ContinuationToken + * or a CloseStream. + * + *

The claim fails if the runner has previously initiated a checkpoint. The restriction tracker + * respects the runner initiated checkpoint and fails to claim this streamProgress. The new split + * will start from the previously successfully claimed streamProgress. + * + * @param streamProgress position in time of the stream that is being claimed. + * @return true if claim was successful, otherwise false. + */ + @Override + public boolean tryClaim(StreamProgress streamProgress) { + if (shouldStop) { + return false; + } + // We perform copy instead of assignment because we want to ensure all references of + // streamProgress gets updated. + this.streamProgress = streamProgress; + return true; + } + + /** + * Returns the streamProgress that was successfully claimed. + * + * @return the streamProgress that was successfully claimed. + */ + @Override + public StreamProgress currentRestriction() { + return streamProgress; + } + + /** + * Splits the work that's left. Since the position in the stream isn't a contiguous value, we + * cannot estimate how much work is left or breakdown the work into smaller chunks. Therefore, + * there's no way to split the work. To conform to the API, we return null for the primary split + * and then continue the work on the residual split. + * + *

Also note that, we only accept checkpoints (fractionOfRemainder = 0). Any other value, we + * reject (by returning null) the request to split since StreamProgress cannot be + * broken down into fractions. + * + * @param fractionOfRemainder the fraction of work remaining, where 0 is a request to checkpoint + * current work. + * @return split result when fractionOfRemainder = 0, otherwise null. + */ + @Override + public @Nullable SplitResult trySplit(double fractionOfRemainder) { + // When asked for fractionOfRemainder = 0, which means the runner is asking for a + // split/checkpoint. We can terminate the current worker and continue the rest of the work in a + // new worker. + if (fractionOfRemainder == 0) { + // shouldStop is only true if we have trySplit before. We don't want to trySplit again if we + // have already returned to the runner a split result. Future split should return null. To + // think of it another way, after trySplit has been called, the primary restriction tracker is + // null. Trying to split it is impossible, so we are returning null. + if (shouldStop) { + return null; + } + shouldStop = true; + return SplitResult.of(null, streamProgress); + } + return null; + } + + @Override + public String toString() { + return "CustomRestrictionTracker{" + + "streamProgress=" + + streamProgress + + ", shouldStop=" + + shouldStop + + '}'; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java new file mode 100644 index 000000000000..bcf032b0aa44 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/StreamProgress.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction; + +import java.io.Serializable; + +/** + * Position for {@link ReadChangeStreamPartitionProgressTracker}. This represents contains + * information that allows a stream, along with the {@link + * org.apache.beam.sdk.io.gcp.bigtable.changestreams.model.PartitionRecord} to resume from a + * checkpoint. + * + *

It should contain either a continuation token which represents a position in the stream, or it + * can contain a close stream message which represents an end to the stream and the DoFn needs to + * stop. + */ +public class StreamProgress implements Serializable { + private static final long serialVersionUID = -5384329262726188695L; + + public StreamProgress() {} +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/package-info.java new file mode 100644 index 000000000000..ccb9b6150db0 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/restriction/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Custom RestrictionTracker for Google Cloud Bigtable Change Streams. */ +@Experimental +package org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction; + +import org.apache.beam.sdk.annotations.Experimental; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/TimestampConverterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/TimestampConverterTest.java new file mode 100644 index 000000000000..70bb0c1e327b --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/TimestampConverterTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigtable.changestreams; + +import static org.junit.Assert.assertEquals; + +import org.joda.time.Instant; +import org.junit.Test; + +public class TimestampConverterTest { + + @Test + public void testCloudTimestampTotoInstant() { + int nanos = 123000000; // 123 ms + com.google.cloud.Timestamp timestamp = + com.google.cloud.Timestamp.ofTimeSecondsAndNanos(1000, nanos); + Instant instant = TimestampConverter.toInstant(timestamp); + assertEquals(1000123, instant.getMillis()); + } + + @Test + public void testProtoTimestampTotoInstant() { + int nanos = 123000000; // 123 ms + com.google.protobuf.Timestamp timestamp = + com.google.protobuf.Timestamp.newBuilder().setSeconds(1000).setNanos(nanos).build(); + Instant instant = TimestampConverter.toInstant(timestamp); + assertEquals(1000123, instant.getMillis()); + } +}