diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java index 6f0aeac859c1..491b2efa0787 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java @@ -194,6 +194,12 @@ public WatermarkEstimator newWatermarkEstimator( return new WatermarkEstimators.Manual(state); } + @TruncateRestriction + public RestrictionTracker.TruncateResult truncate() { + // stop emitting immediately upon drain + return null; + } + @ProcessElement public ProcessContinuation processElement( @Element SequenceDefinition srcElement, @@ -207,7 +213,7 @@ public ProcessContinuation processElement( boolean claimSuccess = true; - estimator.setWatermark(Instant.ofEpochMilli(restriction.getFrom())); + estimator.setWatermark(Instant.ofEpochMilli(nextOutput)); while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) { claimSuccess = restrictionTracker.tryClaim(nextOutput);