Skip to content

Commit

Permalink
periodic: Sequence emits int64 index and Impulse emits []byte
Browse files Browse the repository at this point in the history
  • Loading branch information
hnnsgstfssn committed Mar 20, 2023
1 parent de762ce commit e7167f6
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions sdks/go/pkg/beam/transforms/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (

func init() {
register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
func(beam.EventTime, []byte),
func(beam.EventTime, int64),
sdf.ProcessContinuation, error](&sequenceGenDoFn{})
register.Emitter2[beam.EventTime, []byte]()
register.Emitter2[beam.EventTime, int64]()
register.Function2x0(sequenceToImpulse)
register.Emitter1[[]byte]()
beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
}

Expand Down Expand Up @@ -76,14 +78,14 @@ func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstima
return &sdf.ManualWatermarkEstimator{}
}

func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, []byte)) (sdf.ProcessContinuation, error) {
func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
currentOutputTimestamp := mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime := time.Now()
we.UpdateWatermark(currentOutputTimestamp)
for currentOutputTimestamp.Before(currentTime) {
if rt.TryClaim(currentOutputIndex) {
emit(mtime.FromTime(currentOutputTimestamp), []byte{})
emit(mtime.FromTime(currentOutputTimestamp), currentOutputIndex)
currentOutputIndex += 1
currentOutputTimestamp = mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime = time.Now()
Expand Down Expand Up @@ -132,12 +134,17 @@ func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWi
func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool, fn *sequenceGenDoFn) beam.PCollection {
sd := SequenceDefinition{Interval: interval, Start: start.UnixMilli(), End: end.UnixMilli()}
imp := beam.Create(s.Scope("ImpulseElement"), sd)
col := genSequence(s, imp, fn)
seq := genSequence(s, imp, fn)
imps := beam.ParDo(s, sequenceToImpulse, seq)
if applyWindow {
return beam.WindowInto(s.Scope("ApplyWindowing"),
window.NewFixedWindows(interval), col)
window.NewFixedWindows(interval), imps)
}
return col
return imps
}

func sequenceToImpulse(_ int64, emit func([]byte)) {
emit([]byte{})
}

// Sequence is a PTransform which generates a sequence of timestamped
Expand All @@ -157,7 +164,7 @@ func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, appl
// - if element timestamp is greater than current runtime, wait until next
// element timestamp.
//
// The PCollection<[]byte> generated by Sequence is unbounded.
// The PCollection<int64> generated by Sequence is unbounded.
func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection {
return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{})
}
Expand Down

0 comments on commit e7167f6

Please sign in to comment.