From 09a518e2bc16b0e9658c2fe8ef4677c214689dab Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Fri, 10 Mar 2023 21:59:45 +0000 Subject: [PATCH 01/15] Add periodic.Sequence and periodic.Impulse transforms The new transforms extends support for the slowly updating side input pattern [1] as tracked in [2]. An attempt to mirror the logic of the Python implementation [3] has been made with minor idiomatic changes. Java [4][5] and Python [6] have influenced the documentation and naming. [1] https://beam.apache.org/documentation/patterns/side-inputs/ [2] https://github.com/apache/beam/issues/23106 [3] https://github.com/apache/beam/blob/v2.46.0/sdks/python/apache_beam/transforms/periodicsequence.py#L59 [4] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicSequence.html [5] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicImpulse.html [6] https://beam.apache.org/releases/pydoc/2.46.0/apache_beam.transforms.periodicsequence.html?highlight=periodicimpulse#module-apache_beam.transforms.periodicsequence --- CHANGES.md | 1 + .../slowly_updating_side_input.go | 131 +++++++++++ .../pkg/beam/transforms/periodic/periodic.go | 211 ++++++++++++++++++ 3 files changed, 343 insertions(+) create mode 100644 sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go create mode 100644 sdks/go/pkg/beam/transforms/periodic/periodic.go diff --git a/CHANGES.md b/CHANGES.md index 9437d489916f..56ceb8d19ad4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ ## New Features / Improvements * The Flink runner now supports Flink 1.16.x ([#25046](https://github.com/apache/beam/issues/25046)). +* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support for slowly updating side input patterns. ([#23106](https://github.com/apache/beam/issues/23106)) ## Breaking Changes diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go new file mode 100644 index 000000000000..1f4206d74e2c --- /dev/null +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -0,0 +1,131 @@ +package main + +import ( + "context" + "flag" + "strings" + "time" + + "cloud.google.com/go/pubsub" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx" +) + +func init() { + register.Function4x0(update) + register.Function4x0(process) + register.Emitter2[int, string]() + register.Iter1[string]() +} + +// update simulates an external call to get data for the side input. +func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) { + log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339)) + + // zero is the key used in beam.AddFixedKey which will be applied on the main input. + id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339) + + emit(id, externalData) +} + +// process simulates processing of main input. It reads side input by key +func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) { + log.Infof(ctx, "Processing (key:%d,value:%q)", k, v) + + iter := side(k) + + var externalData []string + var externalDatum string + for iter(&externalDatum) { + externalData = append(externalData, externalDatum) + } + + log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ",")) +} + +func fatalf(err error, format string, args ...interface{}) { + if err != nil { + log.Fatalf(context.TODO(), format, args...) + } +} + +func main() { + var inputTopic, periodicSequenceStart, periodicSequenceEnd string + var periodicSequenceInterval time.Duration + + now := time.Now() + + flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339), + "The time at which to start the periodic sequence.") + + flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339), + "The time at which to end the periodic sequence.") + + flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute, + "The interval between periodic sequence output.") + + flag.StringVar(&inputTopic, "input_topic", "input", + "The PubSub topic from which to read the main input data.") + + flag.Parse() + beam.Init() + ctx := context.Background() + p, s := beam.NewPipelineWithRoot() + + project := gcpopts.GetProject(ctx) + client, err := pubsub.NewClient(ctx, project) + fatalf(err, "Failed to create client: %v", err) + _, err = pubsubx.EnsureTopic(ctx, client, inputTopic) + fatalf(err, "Failed to ensure topic: %v", err) + + mainInput := beam.WindowInto( + s, + window.NewFixedWindows(periodicSequenceInterval), + beam.AddFixedKey( // simulate keyed data by adding a fixed key + s, + pubsubio.Read( + s, + project, + inputTopic, + nil, + ), + ), + beam.Trigger(trigger.Repeat(trigger.Always())), + beam.PanesDiscard(), + ) + + startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart) + endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd) + sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), + beam.ParDo( + s, + update, + periodic.Impulse( + s, + startTime, + endTime, + periodicSequenceInterval, + ), + ), + beam.Trigger(trigger.Repeat(trigger.Always())), + beam.PanesDiscard(), + ) + + beam.ParDo0(s, process, mainInput, + beam.SideInput{ + Input: sideInput, + }, + ) + + if _, err := beam.Run(context.Background(), "dataflow", p); err != nil { + log.Exitf(ctx, "Failed to run job: %v", err) + } +} diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go new file mode 100644 index 000000000000..15634c4e1905 --- /dev/null +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package periodic contains transformations for generating periodic sequences. +package periodic + +import ( + "context" + "fmt" + "math" + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" +) + +func init() { + register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition, + func(beam.EventTime, int64), + sdf.ProcessContinuation, error](&sequenceGenDoFn{}) + register.Emitter2[beam.EventTime, int64]() + beam.RegisterType(reflect.TypeOf(SequenceDefinition{})) +} + +// SequenceDefinition holds the configuration for generating a sequence of +// timestamped elements at an interval. +type SequenceDefinition struct { + Interval time.Duration + Start time.Time + End time.Time +} + +type sequenceGenDoFn struct { + now func() time.Time +} + +func (fn *sequenceGenDoFn) Setup() { + if fn.now == nil { + fn.now = time.Now + } +} + +func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction { + totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval)) + return offsetrange.Restriction{ + Start: int64(0), + End: int64(totalOutputs), + } +} + +func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction { + return []offsetrange.Restriction{rest} +} + +// TruncateRestriction immediately truncates the entire restrication. +func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction { + return offsetrange.Restriction{} +} + +func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator { + return &sdf.ManualWatermarkEstimator{} +} + +func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) { + currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start + currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex)) + currentTime := fn.now() + we.UpdateWatermark(currentOutputTimestamp) + for currentOutputTimestamp.Before(currentTime) { + if rt.TryClaim(currentOutputIndex) { + emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli()) + currentOutputIndex += 1 + currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex)) + currentTime = fn.now() + we.UpdateWatermark(currentOutputTimestamp) + } else if err := rt.GetError(); err != nil || rt.IsDone() { + // Stop processing on error or completion + return sdf.StopProcessing(), rt.GetError() + } else { + return sdf.ResumeProcessingIn(sd.Interval), nil + } + } + + return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil +} + +type impulseConfig struct { + ApplyWindow bool + + now func() time.Time +} + +type impulseOption func(*impulseConfig) error + +// ImpulseOption is a function that configures an [Impulse] transform. +type ImpulseOption = impulseOption + +// WithApplyWindow configures the [Impulse] transform to apply a fixed window +// transform to the output PCollection. +func WithApplyWindow() ImpulseOption { + return func(o *impulseConfig) error { + o.ApplyWindow = true + return nil + } +} + +func withNowFunc(now func() time.Time) ImpulseOption { + return func(o *impulseConfig) error { + o.now = now + return nil + } +} + +// Impulse is a PTransform which generates a sequence of timestamped +// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each +// element will be assigned to its own fixed window of interval size. +// +// The transform behaves the same as [Sequence] transform, but can be +// used as the first transform in a pipeline. +// +// The following applies to the arguments. +// - if interval <= 0, interval is set to [math.MaxInt64] +// - if start is a zero value [time.Time], start is set to the current time +// - if start is after end, start is set to end +// +// The PCollection generated by Impulse is unbounded and the output elements +// are the [time.UnixMilli] int64 values of the output timestamp. +func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts ...ImpulseOption) beam.PCollection { + if interval <= 0 { + interval = math.MaxInt64 + } + if start.IsZero() { + start = time.Now() + } + if start.After(end) { + start = end + } + + conf := impulseConfig{} + + for _, opt := range opts { + if err := opt(&conf); err != nil { + panic(fmt.Errorf("periodic.Impulse: invalid option: %v", err)) + } + } + + return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, conf, &sequenceGenDoFn{now: conf.now}) +} + +func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, conf impulseConfig, fn *sequenceGenDoFn) beam.PCollection { + sd := SequenceDefinition{Interval: interval, Start: start, End: end} + imp := beam.Create(s.Scope("ImpulseElement"), sd) + col := genSequence(s, imp, fn) + if conf.ApplyWindow { + return beam.WindowInto(s.Scope("ApplyWindowing"), + window.NewFixedWindows(interval), col) + } + return col +} + +// Sequence is a PTransform which generates a sequence of timestamped +// elements at fixed runtime intervals. +// +// The transform assigns each element a timestamp and will only output an +// element once the worker clock reach the output timestamp. Sequence is not +// able to guarantee that elements are output at the their exact timestamp, but +// it guarantees that elements will not be output prior to runtime timestamp. +// +// The transform will not output elements prior to the start time. +// +// Sequence receives [SequenceDefinition] elements and for each input element +// received, it will start generating output elements in the following pattern: +// +// - if element timestamp is less than current runtime then output element. +// - if element timestamp is greater than current runtime, wait until next +// element timestamp. +// +// The PCollection generated by Sequence is unbounded and the output elements +// are the [time.UnixMilli] int64 values of the output timestamp. +func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection { + return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{}) +} + +func genSequence(s beam.Scope, col beam.PCollection, fn *sequenceGenDoFn) beam.PCollection { + return beam.ParDo(s.Scope("GenSequence"), fn, col) +} From 3a147b841b6c6627b70f673f5065d56106fca432 Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Sat, 11 Mar 2023 11:58:21 +0000 Subject: [PATCH 02/15] Add licence to example --- .../slowly_updating_side_input.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go index 1f4206d74e2c..e32bc70c3c6b 100644 --- a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -1,3 +1,17 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package main import ( From 2a27b7a8e24eb54cf1fbce64befecfcd7c1bc15e Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 13:12:51 +0000 Subject: [PATCH 03/15] periodic: address feedback and add unit tests --- .../pkg/beam/transforms/periodic/periodic.go | 70 ++++--------------- .../beam/transforms/periodic/periodic_test.go | 42 +++++++++++ 2 files changed, 56 insertions(+), 56 deletions(-) create mode 100644 sdks/go/pkg/beam/transforms/periodic/periodic_test.go diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go index 15634c4e1905..6a8aa2a2f838 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -18,8 +18,6 @@ package periodic import ( "context" - "fmt" - "math" "reflect" "time" @@ -43,22 +41,17 @@ func init() { // timestamped elements at an interval. type SequenceDefinition struct { Interval time.Duration - Start time.Time - End time.Time + Start int64 + End int64 } -type sequenceGenDoFn struct { - now func() time.Time -} +type sequenceGenDoFn struct{} func (fn *sequenceGenDoFn) Setup() { - if fn.now == nil { - fn.now = time.Now - } } func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction { - totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval)) + totalOutputs := mtime.Time(sd.End).ToTime().Sub(mtime.Time(sd.Start).ToTime()) / sd.Interval return offsetrange.Restriction{ Start: int64(0), End: int64(totalOutputs), @@ -88,15 +81,15 @@ func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstima func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) { currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start - currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex)) - currentTime := fn.now() + currentOutputTimestamp := mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) + currentTime := time.Now() we.UpdateWatermark(currentOutputTimestamp) for currentOutputTimestamp.Before(currentTime) { if rt.TryClaim(currentOutputIndex) { emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli()) currentOutputIndex += 1 - currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex)) - currentTime = fn.now() + currentOutputTimestamp = mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) + currentTime = time.Now() we.UpdateWatermark(currentOutputTimestamp) } else if err := rt.GetError(); err != nil || rt.IsDone() { // Stop processing on error or completion @@ -109,35 +102,8 @@ func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWat return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil } -type impulseConfig struct { - ApplyWindow bool - - now func() time.Time -} - -type impulseOption func(*impulseConfig) error - -// ImpulseOption is a function that configures an [Impulse] transform. -type ImpulseOption = impulseOption - -// WithApplyWindow configures the [Impulse] transform to apply a fixed window -// transform to the output PCollection. -func WithApplyWindow() ImpulseOption { - return func(o *impulseConfig) error { - o.ApplyWindow = true - return nil - } -} - -func withNowFunc(now func() time.Time) ImpulseOption { - return func(o *impulseConfig) error { - o.now = now - return nil - } -} - // Impulse is a PTransform which generates a sequence of timestamped -// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each +// elements at fixed runtime intervals. If applyWindow is specified, each // element will be assigned to its own fixed window of interval size. // // The transform behaves the same as [Sequence] transform, but can be @@ -150,7 +116,7 @@ func withNowFunc(now func() time.Time) ImpulseOption { // // The PCollection generated by Impulse is unbounded and the output elements // are the [time.UnixMilli] int64 values of the output timestamp. -func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts ...ImpulseOption) beam.PCollection { +func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool) beam.PCollection { if interval <= 0 { interval = math.MaxInt64 } @@ -161,22 +127,14 @@ func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts .. start = end } - conf := impulseConfig{} - - for _, opt := range opts { - if err := opt(&conf); err != nil { - panic(fmt.Errorf("periodic.Impulse: invalid option: %v", err)) - } - } - - return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, conf, &sequenceGenDoFn{now: conf.now}) + return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, applyWindow, &sequenceGenDoFn{}) } -func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, conf impulseConfig, fn *sequenceGenDoFn) beam.PCollection { - sd := SequenceDefinition{Interval: interval, Start: start, End: end} +func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool, fn *sequenceGenDoFn) beam.PCollection { + sd := SequenceDefinition{Interval: interval, Start: start.UnixMilli(), End: end.UnixMilli()} imp := beam.Create(s.Scope("ImpulseElement"), sd) col := genSequence(s, imp, fn) - if conf.ApplyWindow { + if applyWindow { return beam.WindowInto(s.Scope("ApplyWindowing"), window.NewFixedWindows(interval), col) } diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic_test.go b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go new file mode 100644 index 000000000000..a14a1bc95a0f --- /dev/null +++ b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go @@ -0,0 +1,42 @@ +package periodic + +import ( + "context" + "testing" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" +) + +func TestSequence(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + sd := SequenceDefinition{ + Interval: time.Second, + Start: 0, + End: time.Minute.Milliseconds(), + } + in := beam.Create(s, sd) + out := Sequence(s, in) + passert.Count(s, out, "SecondsInMinute", 60) + beam.Init() + _, err := prism.Execute(context.Background(), p) + if err != nil { + t.Fatalf("Failed to execute job: %v", err) + } +} + +func TestImpulse(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + interval := time.Second + start := time.Unix(0, 0) + end := start.Add(time.Minute) + out := Impulse(s, start, end, interval, false) + passert.Count(s, out, "SecondsInMinute", 60) + beam.Init() + _, err := prism.Execute(context.Background(), p) + if err != nil { + t.Fatalf("Failed to execute job: %v", err) + } +} From db5ba795ba49a26a565813a89d5a06af7138da6b Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 13:21:04 +0000 Subject: [PATCH 04/15] periodic: emit bytes instead of int64 --- sdks/go/pkg/beam/transforms/periodic/periodic.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go index 6a8aa2a2f838..d634e721ef62 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -31,9 +31,9 @@ import ( func init() { register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition, - func(beam.EventTime, int64), + func(beam.EventTime, []byte), sdf.ProcessContinuation, error](&sequenceGenDoFn{}) - register.Emitter2[beam.EventTime, int64]() + register.Emitter2[beam.EventTime, []byte]() beam.RegisterType(reflect.TypeOf(SequenceDefinition{})) } @@ -79,14 +79,14 @@ func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstima return &sdf.ManualWatermarkEstimator{} } -func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) { +func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, []byte)) (sdf.ProcessContinuation, error) { currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start currentOutputTimestamp := mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) currentTime := time.Now() we.UpdateWatermark(currentOutputTimestamp) for currentOutputTimestamp.Before(currentTime) { if rt.TryClaim(currentOutputIndex) { - emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli()) + emit(mtime.FromTime(currentOutputTimestamp), []byte{}) currentOutputIndex += 1 currentOutputTimestamp = mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) currentTime = time.Now() @@ -114,8 +114,7 @@ func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWat // - if start is a zero value [time.Time], start is set to the current time // - if start is after end, start is set to end // -// The PCollection generated by Impulse is unbounded and the output elements -// are the [time.UnixMilli] int64 values of the output timestamp. +// The PCollection<[]byte> generated by Impulse is unbounded. func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool) beam.PCollection { if interval <= 0 { interval = math.MaxInt64 @@ -158,8 +157,7 @@ func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, appl // - if element timestamp is greater than current runtime, wait until next // element timestamp. // -// The PCollection generated by Sequence is unbounded and the output elements -// are the [time.UnixMilli] int64 values of the output timestamp. +// The PCollection<[]byte> generated by Sequence is unbounded. func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection { return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{}) } From 23ed0c11d449b1018a3e8bab454a5f2d5b8759e1 Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 14:09:32 +0000 Subject: [PATCH 05/15] periodic: adjust impulse argument validation --- sdks/go/pkg/beam/transforms/periodic/periodic.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go index d634e721ef62..57b91ee8c38f 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -110,21 +110,24 @@ func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWat // used as the first transform in a pipeline. // // The following applies to the arguments. -// - if interval <= 0, interval is set to [math.MaxInt64] // - if start is a zero value [time.Time], start is set to the current time // - if start is after end, start is set to end +// - start and end are normalized with [mtime.Normalize] +// - if interval <= 0 or interval > end.Sub(start), interval is set to end.Sub(start) // // The PCollection<[]byte> generated by Impulse is unbounded. func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool) beam.PCollection { - if interval <= 0 { - interval = math.MaxInt64 - } if start.IsZero() { start = time.Now() } if start.After(end) { start = end } + start = mtime.Normalize(mtime.FromTime(start)).ToTime() + end = mtime.Normalize(mtime.FromTime(end)).ToTime() + if interval <= 0 || interval > end.Sub(start) { + interval = end.Sub(start) + } return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, applyWindow, &sequenceGenDoFn{}) } From 6ef98ca7f629f33aa150e8775deef84840bd125c Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 15:11:17 +0000 Subject: [PATCH 06/15] examples/slowly_updating_side_input: fix periodic.Impulse call --- .../slowly_updating_side_input/slowly_updating_side_input.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go index e32bc70c3c6b..b645547ea570 100644 --- a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -127,6 +127,7 @@ func main() { startTime, endTime, periodicSequenceInterval, + false, ), ), beam.Trigger(trigger.Repeat(trigger.Always())), From e6cdac3338d43ed05ba130ead48be6d5ff593e32 Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 15:12:49 +0000 Subject: [PATCH 07/15] periodic: add licence to test file --- .../pkg/beam/transforms/periodic/periodic_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic_test.go b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go index a14a1bc95a0f..d977a5fc22cc 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic_test.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go @@ -1,3 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package periodic import ( From 5ef874be45834cc3b1bf9bdb5d54d098781ddc45 Mon Sep 17 00:00:00 2001 From: hnnsgstfssn Date: Mon, 20 Mar 2023 15:46:31 +0000 Subject: [PATCH 08/15] Apply suggestions from code review Co-authored-by: Robert Burke --- .../slowly_updating_side_input.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go index b645547ea570..eb3a9d0eeda7 100644 --- a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -12,6 +12,9 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +// slowly_updating_side_input is an example pipeline demonstrating the pattern described +// at https://beam.apache.org/documentation/patterns/side-inputs/. package main import ( @@ -42,7 +45,7 @@ func init() { // update simulates an external call to get data for the side input. func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) { - log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339)) + log.Infof(ctx, "Making external call %d at event time %s", i, t.ToTime().Format(time.RFC3339)) // zero is the key used in beam.AddFixedKey which will be applied on the main input. id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339) @@ -100,18 +103,12 @@ func main() { _, err = pubsubx.EnsureTopic(ctx, client, inputTopic) fatalf(err, "Failed to ensure topic: %v", err) + source := pubsubio.Read(s, project, inputTopic, nil) + keyedSource := beam.AddFixedKey(s, source) // simulate keyed data by adding a fixed key mainInput := beam.WindowInto( s, + keyedSource, window.NewFixedWindows(periodicSequenceInterval), - beam.AddFixedKey( // simulate keyed data by adding a fixed key - s, - pubsubio.Read( - s, - project, - inputTopic, - nil, - ), - ), beam.Trigger(trigger.Repeat(trigger.Always())), beam.PanesDiscard(), ) From 8bc367a5011b17ec10b61da9f2175aff9a141e16 Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 15:51:58 +0000 Subject: [PATCH 09/15] examples/slowly_updating_side_input: avoid nesting pipeline --- .../slowly_updating_side_input.go | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go index eb3a9d0eeda7..39f15b85977d 100644 --- a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -103,8 +103,8 @@ func main() { _, err = pubsubx.EnsureTopic(ctx, client, inputTopic) fatalf(err, "Failed to ensure topic: %v", err) - source := pubsubio.Read(s, project, inputTopic, nil) - keyedSource := beam.AddFixedKey(s, source) // simulate keyed data by adding a fixed key + source := pubsubio.Read(s, project, inputTopic, nil) + keyedSource := beam.AddFixedKey(s, source) // simulate keyed data by adding a fixed key mainInput := beam.WindowInto( s, keyedSource, @@ -115,18 +115,16 @@ func main() { startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart) endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd) + + // Generate an impulse every period. + periodicImp := periodic.Impulse(s, startTime, endTime, periodicSequenceInterval, false) + + // Use the impulse to trigger some other ordinary transform. + updatedImp := beam.ParDo(s, update, periodicImp) + + // Window for use as a side input, to allow the input to change with windows. sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), - beam.ParDo( - s, - update, - periodic.Impulse( - s, - startTime, - endTime, - periodicSequenceInterval, - false, - ), - ), + updatedImp, beam.Trigger(trigger.Repeat(trigger.Always())), beam.PanesDiscard(), ) From fbdf5a879351df54b55bc0847efa9d8e7ec50bc0 Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 16:01:17 +0000 Subject: [PATCH 10/15] examples/slowly_updating_side_input: fix WindowInto argument order --- .../slowly_updating_side_input/slowly_updating_side_input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go index 39f15b85977d..0cdfff25e075 100644 --- a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -107,8 +107,8 @@ func main() { keyedSource := beam.AddFixedKey(s, source) // simulate keyed data by adding a fixed key mainInput := beam.WindowInto( s, - keyedSource, window.NewFixedWindows(periodicSequenceInterval), + keyedSource, beam.Trigger(trigger.Repeat(trigger.Always())), beam.PanesDiscard(), ) From 3f24a07cb4f1d2448f00b4197740b6457a2a3ab6 Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 16:08:13 +0000 Subject: [PATCH 11/15] examples/slowly_updating_side_input: change impulse element type --- .../slowly_updating_side_input/slowly_updating_side_input.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go index 0cdfff25e075..e7852a05a443 100644 --- a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -44,8 +44,8 @@ func init() { } // update simulates an external call to get data for the side input. -func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) { - log.Infof(ctx, "Making external call %d at event time %s", i, t.ToTime().Format(time.RFC3339)) +func update(ctx context.Context, t beam.EventTime, _ []byte, emit func(int, string)) { + log.Infof(ctx, "Making external call at event time %s", t.ToTime().Format(time.RFC3339)) // zero is the key used in beam.AddFixedKey which will be applied on the main input. id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339) From 6af6dceb5d27763cdcaea0b8cd863e088ce79d8e Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 23:26:30 +0000 Subject: [PATCH 12/15] periodic: use testing.M to set the prism runner --- .../beam/transforms/periodic/periodic_test.go | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic_test.go b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go index d977a5fc22cc..7553e1f818af 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic_test.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go @@ -16,15 +16,20 @@ package periodic import ( - "context" + "os" "testing" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) +func TestMain(m *testing.M) { + os.Exit(ptest.MainRetWithDefault(m, "prism")) +} + func TestSequence(t *testing.T) { p, s := beam.NewPipelineWithRoot() sd := SequenceDefinition{ @@ -35,11 +40,7 @@ func TestSequence(t *testing.T) { in := beam.Create(s, sd) out := Sequence(s, in) passert.Count(s, out, "SecondsInMinute", 60) - beam.Init() - _, err := prism.Execute(context.Background(), p) - if err != nil { - t.Fatalf("Failed to execute job: %v", err) - } + ptest.RunAndValidate(t, p) } func TestImpulse(t *testing.T) { @@ -49,9 +50,5 @@ func TestImpulse(t *testing.T) { end := start.Add(time.Minute) out := Impulse(s, start, end, interval, false) passert.Count(s, out, "SecondsInMinute", 60) - beam.Init() - _, err := prism.Execute(context.Background(), p) - if err != nil { - t.Fatalf("Failed to execute job: %v", err) - } + ptest.RunAndValidate(t, p) } From de762ce4d74ed43ba0f905cfdb053704b6976ee9 Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 23:48:32 +0000 Subject: [PATCH 13/15] periodic: remove defunct Setup --- sdks/go/pkg/beam/transforms/periodic/periodic.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go index 57b91ee8c38f..4f0f6e802dad 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -47,9 +47,6 @@ type SequenceDefinition struct { type sequenceGenDoFn struct{} -func (fn *sequenceGenDoFn) Setup() { -} - func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction { totalOutputs := mtime.Time(sd.End).ToTime().Sub(mtime.Time(sd.Start).ToTime()) / sd.Interval return offsetrange.Restriction{ From e7167f60454c752301556c6241e7bbf948c3363b Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Mon, 20 Mar 2023 23:49:06 +0000 Subject: [PATCH 14/15] periodic: Sequence emits int64 index and Impulse emits []byte --- .../pkg/beam/transforms/periodic/periodic.go | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go index 4f0f6e802dad..3dbe6a640757 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -31,9 +31,11 @@ import ( func init() { register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition, - func(beam.EventTime, []byte), + func(beam.EventTime, int64), sdf.ProcessContinuation, error](&sequenceGenDoFn{}) - register.Emitter2[beam.EventTime, []byte]() + register.Emitter2[beam.EventTime, int64]() + register.Function2x0(sequenceToImpulse) + register.Emitter1[[]byte]() beam.RegisterType(reflect.TypeOf(SequenceDefinition{})) } @@ -76,14 +78,14 @@ func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstima return &sdf.ManualWatermarkEstimator{} } -func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, []byte)) (sdf.ProcessContinuation, error) { +func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) { currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start currentOutputTimestamp := mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) currentTime := time.Now() we.UpdateWatermark(currentOutputTimestamp) for currentOutputTimestamp.Before(currentTime) { if rt.TryClaim(currentOutputIndex) { - emit(mtime.FromTime(currentOutputTimestamp), []byte{}) + emit(mtime.FromTime(currentOutputTimestamp), currentOutputIndex) currentOutputIndex += 1 currentOutputTimestamp = mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) currentTime = time.Now() @@ -132,12 +134,17 @@ func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWi func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool, fn *sequenceGenDoFn) beam.PCollection { sd := SequenceDefinition{Interval: interval, Start: start.UnixMilli(), End: end.UnixMilli()} imp := beam.Create(s.Scope("ImpulseElement"), sd) - col := genSequence(s, imp, fn) + seq := genSequence(s, imp, fn) + imps := beam.ParDo(s, sequenceToImpulse, seq) if applyWindow { return beam.WindowInto(s.Scope("ApplyWindowing"), - window.NewFixedWindows(interval), col) + window.NewFixedWindows(interval), imps) } - return col + return imps +} + +func sequenceToImpulse(_ int64, emit func([]byte)) { + emit([]byte{}) } // Sequence is a PTransform which generates a sequence of timestamped @@ -157,7 +164,7 @@ func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, appl // - if element timestamp is greater than current runtime, wait until next // element timestamp. // -// The PCollection<[]byte> generated by Sequence is unbounded. +// The PCollection generated by Sequence is unbounded. func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection { return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{}) } From 1b05c690a21ac5ed8daf683dbce6a1075bb4f147 Mon Sep 17 00:00:00 2001 From: Hannes Gustafsson Date: Wed, 22 Mar 2023 01:19:35 +0000 Subject: [PATCH 15/15] periodic: document start and end and add constructor --- .../pkg/beam/transforms/periodic/periodic.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go index 3dbe6a640757..f8752dc2c93b 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -43,8 +43,22 @@ func init() { // timestamped elements at an interval. type SequenceDefinition struct { Interval time.Duration - Start int64 - End int64 + + // Start is the number of milliseconds since the Unix epoch. + Start int64 + + // End is the number of milliseconds since the Unix epoch. + End int64 +} + +// NewSequenceDefinition creates a new [SequenceDefinition] from a start and +// end [time.Time] along with its interval [time.Duration]. +func NewSequenceDefinition(start, end time.Time, interval time.Duration) SequenceDefinition { + return SequenceDefinition{ + Start: start.UnixMilli(), + End: end.UnixMilli(), + Interval: interval, + } } type sequenceGenDoFn struct{}