diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go index 4f0f6e802dad..3dbe6a640757 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -31,9 +31,11 @@ import ( func init() { register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition, - func(beam.EventTime, []byte), + func(beam.EventTime, int64), sdf.ProcessContinuation, error](&sequenceGenDoFn{}) - register.Emitter2[beam.EventTime, []byte]() + register.Emitter2[beam.EventTime, int64]() + register.Function2x0(sequenceToImpulse) + register.Emitter1[[]byte]() beam.RegisterType(reflect.TypeOf(SequenceDefinition{})) } @@ -76,14 +78,14 @@ func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstima return &sdf.ManualWatermarkEstimator{} } -func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, []byte)) (sdf.ProcessContinuation, error) { +func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) { currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start currentOutputTimestamp := mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) currentTime := time.Now() we.UpdateWatermark(currentOutputTimestamp) for currentOutputTimestamp.Before(currentTime) { if rt.TryClaim(currentOutputIndex) { - emit(mtime.FromTime(currentOutputTimestamp), []byte{}) + emit(mtime.FromTime(currentOutputTimestamp), currentOutputIndex) currentOutputIndex += 1 currentOutputTimestamp = mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) currentTime = time.Now() @@ -132,12 +134,17 @@ func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWi func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool, fn *sequenceGenDoFn) beam.PCollection { sd := SequenceDefinition{Interval: interval, Start: start.UnixMilli(), End: end.UnixMilli()} imp := beam.Create(s.Scope("ImpulseElement"), sd) - col := genSequence(s, imp, fn) + seq := genSequence(s, imp, fn) + imps := beam.ParDo(s, sequenceToImpulse, seq) if applyWindow { return beam.WindowInto(s.Scope("ApplyWindowing"), - window.NewFixedWindows(interval), col) + window.NewFixedWindows(interval), imps) } - return col + return imps +} + +func sequenceToImpulse(_ int64, emit func([]byte)) { + emit([]byte{}) } // Sequence is a PTransform which generates a sequence of timestamped @@ -157,7 +164,7 @@ func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, appl // - if element timestamp is greater than current runtime, wait until next // element timestamp. // -// The PCollection<[]byte> generated by Sequence is unbounded. +// The PCollection generated by Sequence is unbounded. func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection { return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{}) }