Skip to content

Commit

Permalink
Introduce PeriodicImpulse.stopAfter()
Browse files Browse the repository at this point in the history
* Use it in streaming BigQueryIO integration test
  • Loading branch information
Abacn committed Sep 18, 2023
1 parent 011efac commit 9f0677d
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ void maybeRecordPCollectionWithAutoSharding(PCollection<?> pcol) {
options.isEnableStreamingEngine(),
"Runner determined sharding not available in Dataflow for GroupIntoBatches for"
+ " non-Streaming-Engine jobs. In order to use runner determined sharding, please use"
+ " --streaming --enable_streaming_engine");
+ " --streaming --experiments=enable_streaming_engine");
pCollectionsPreservedKeys.add(pcol);
pcollectionsRequiringAutoSharding.add(pcol);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
*/
package org.apache.beam.sdk.transforms;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand All @@ -34,28 +38,58 @@
*/
public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {

Instant startTimestamp = Instant.now();
Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
Duration fireInterval = Duration.standardMinutes(1);
Instant startTimestamp;
Instant stopTimestamp;
@Nullable Duration stopDuration;
Duration fireInterval;
boolean applyWindowing = false;
boolean catchUpToNow = true;

private PeriodicImpulse() {}
private PeriodicImpulse() {
this.startTimestamp = Instant.now();
this.stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
this.fireInterval = Duration.standardMinutes(1);
}

public static PeriodicImpulse create() {
return new PeriodicImpulse();
}

/**
* Assign a timestamp when the pipeliene starts to produce data.
*
* <p>Cannot be used along with {@link #stopAfter}.
*/
public PeriodicImpulse startAt(Instant startTime) {
checkArgument(stopDuration == null, "startAt and stopAfter cannot be set at the same time");
this.startTimestamp = startTime;
return this;
}

/**
* Assign a timestamp when the pipeliene stops producing data.
*
* <p>Cannot be used along with {@link #stopAfter}.
*/
public PeriodicImpulse stopAt(Instant stopTime) {
checkArgument(stopDuration == null, "stopAt and stopAfter cannot be set at the same time");
this.stopTimestamp = stopTime;
return this;
}

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Assign a time interval at which the pipeliene produces data. This is different from setting
* {@link #startAt} and {@link #stopAt}, as the first timestamp is determined at run time
* (pipeline starts processing).
*/
@Internal
public PeriodicImpulse stopAfter(Duration duration) {
this.stopDuration = duration;
return this;
}

public PeriodicImpulse withInterval(Duration interval) {
this.fireInterval = interval;
return this;
Expand All @@ -67,31 +101,65 @@ public PeriodicImpulse applyWindowing() {
}

/**
* The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then
* starts firing at the specified interval. If this is set to false, the PeriodicImpulse will
* perform the interval wait before firing each instant.
*/
@Internal
public PeriodicImpulse catchUpToNow(boolean catchUpToNow) {
this.catchUpToNow = catchUpToNow;
return this;
}

@Override
public PCollection<Instant> expand(PBegin input) {
PCollection<Instant> result =
input
.apply(
Create.<PeriodicSequence.SequenceDefinition>of(
new PeriodicSequence.SequenceDefinition(
startTimestamp, stopTimestamp, fireInterval, catchUpToNow)))
.apply(PeriodicSequence.create());
PCollection<PeriodicSequence.SequenceDefinition> seqDef;
if (stopDuration != null) {
// nonnull guaranteed
Duration d = stopDuration;
seqDef =
input
.apply(Impulse.create())
.apply(ParDo.of(new RuntimeSequenceFn(d, fireInterval, catchUpToNow)));
} else {
seqDef =
input.apply(
Create.of(
new PeriodicSequence.SequenceDefinition(
startTimestamp, stopTimestamp, fireInterval, catchUpToNow)));
}
PCollection<Instant> result = seqDef.apply(PeriodicSequence.create());

if (this.applyWindowing) {
result =
result.apply(
Window.<Instant>into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
result.apply(Window.into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
}

return result;
}

/**
* A DoFn generated a SequenceDefinition at run time. This enables set first element timestamp at
* pipeline start processing data.
*/
private static class RuntimeSequenceFn extends DoFn<byte[], PeriodicSequence.SequenceDefinition> {
Duration stopDuration;
Duration fireInterval;
boolean catchUpToNow;

RuntimeSequenceFn(Duration stopDuration, Duration fireInterval, boolean catchUpToNow) {
this.stopDuration = stopDuration;
this.fireInterval = fireInterval;
this.catchUpToNow = catchUpToNow;
}

@ProcessElement
public void process(ProcessContext c) {
Instant now = Instant.now();
c.output(
new PeriodicSequence.SequenceDefinition(
now, now.plus(stopDuration), fireInterval, catchUpToNow));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.util.Objects;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
Expand Down Expand Up @@ -67,6 +68,8 @@ public SequenceDefinition(Instant first, Instant last, Duration duration) {
this.catchUpToNow = true;
}

/** <b><i>catchUpToNow is experimental; no backwards-compatibility guarantees.</i></b> */
@Internal
public SequenceDefinition(
Instant first, Instant last, Duration duration, boolean catchUpToNow) {
this.first = first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -81,11 +88,32 @@ public void processElement(ProcessContext c) {
}
}

private GenerateSequence stream(int rowCount) {
int timestampIntervalInMilliseconds = 10;
return GenerateSequence.from(0)
.to(rowCount)
.withRate(1, Duration.millis(timestampIntervalInMilliseconds));
static class UnboundedStream extends PTransform<PBegin, PCollection<Long>> {

private final int rowCount;

public UnboundedStream(int rowCount) {
this.rowCount = rowCount;
}

@Override
public PCollection<Long> expand(PBegin input) {
int timestampIntervalInMillis = 10;
PeriodicImpulse impulse =
PeriodicImpulse.create()
.stopAfter(Duration.millis((long) timestampIntervalInMillis * rowCount - 1))
.withInterval(Duration.millis(timestampIntervalInMillis));
return input
.apply(impulse)
.apply(
MapElements.via(
new SimpleFunction<Instant, Long>() {
@Override
public Long apply(Instant input) {
return input.getMillis();
}
}));
}
}

private void runBigQueryIOStorageWritePipeline(
Expand All @@ -102,7 +130,9 @@ private void runBigQueryIOStorageWritePipeline(
new TableFieldSchema().setName("str").setType("STRING")));

Pipeline p = Pipeline.create(bqOptions);
p.apply("Input", isStreaming ? stream(rowCount) : GenerateSequence.from(0).to(rowCount))
p.apply(
"Input",
isStreaming ? new UnboundedStream(rowCount) : GenerateSequence.from(0).to(rowCount))
.apply("GenerateMessage", ParDo.of(new FillRowFn()))
.apply(
"WriteToBQ",
Expand Down

0 comments on commit 9f0677d

Please sign in to comment.