From b6db721781d0b05a5e367aaf51df63cc738dc8e0 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 3 Mar 2023 19:04:17 -0500 Subject: [PATCH] Stop emitting upon truncate in Java PeriodicSequence (#25716) --- .../org/apache/beam/sdk/transforms/PeriodicSequence.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java index 6f0aeac859c1..491b2efa0787 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java @@ -194,6 +194,12 @@ public WatermarkEstimator newWatermarkEstimator( return new WatermarkEstimators.Manual(state); } + @TruncateRestriction + public RestrictionTracker.TruncateResult truncate() { + // stop emitting immediately upon drain + return null; + } + @ProcessElement public ProcessContinuation processElement( @Element SequenceDefinition srcElement, @@ -207,7 +213,7 @@ public ProcessContinuation processElement( boolean claimSuccess = true; - estimator.setWatermark(Instant.ofEpochMilli(restriction.getFrom())); + estimator.setWatermark(Instant.ofEpochMilli(nextOutput)); while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) { claimSuccess = restrictionTracker.tryClaim(nextOutput);