Skip to content

Commit

Permalink
Stop emitting upon truncate in Java PeriodicSequence (apache#25716)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored and ruslan-ikhsan committed Mar 10, 2023
1 parent cfd91ea commit b6db721
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ public WatermarkEstimator<Instant> newWatermarkEstimator(
return new WatermarkEstimators.Manual(state);
}

@TruncateRestriction
public RestrictionTracker.TruncateResult<OffsetRange> truncate() {
// stop emitting immediately upon drain
return null;
}

@ProcessElement
public ProcessContinuation processElement(
@Element SequenceDefinition srcElement,
Expand All @@ -207,7 +213,7 @@ public ProcessContinuation processElement(

boolean claimSuccess = true;

estimator.setWatermark(Instant.ofEpochMilli(restriction.getFrom()));
estimator.setWatermark(Instant.ofEpochMilli(nextOutput));

while (claimSuccess && Instant.ofEpochMilli(nextOutput).isBeforeNow()) {
claimSuccess = restrictionTracker.tryClaim(nextOutput);
Expand Down

0 comments on commit b6db721

Please sign in to comment.