diff --git a/CHANGES.md b/CHANGES.md index 049ef1a1e227..efce157e1cdf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,8 @@ * Schema'd PTransforms can now be directly applied to Beam dataframes just like PCollections. (Note that when doing multiple operations, it may be more efficient to explicitly chain the operations like `df | (Transform1 | Transform2 | ...)` to avoid excessive conversions.) +* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support + for slowly updating side input patterns. ([#23106](https://github.com/apache/beam/issues/23106)) ## Breaking Changes diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go new file mode 100644 index 000000000000..e7852a05a443 --- /dev/null +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// slowly_updating_side_input is an example pipeline demonstrating the pattern described +// at https://beam.apache.org/documentation/patterns/side-inputs/. +package main + +import ( + "context" + "flag" + "strings" + "time" + + "cloud.google.com/go/pubsub" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx" +) + +func init() { + register.Function4x0(update) + register.Function4x0(process) + register.Emitter2[int, string]() + register.Iter1[string]() +} + +// update simulates an external call to get data for the side input. +func update(ctx context.Context, t beam.EventTime, _ []byte, emit func(int, string)) { + log.Infof(ctx, "Making external call at event time %s", t.ToTime().Format(time.RFC3339)) + + // zero is the key used in beam.AddFixedKey which will be applied on the main input. + id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339) + + emit(id, externalData) +} + +// process simulates processing of main input. It reads side input by key +func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) { + log.Infof(ctx, "Processing (key:%d,value:%q)", k, v) + + iter := side(k) + + var externalData []string + var externalDatum string + for iter(&externalDatum) { + externalData = append(externalData, externalDatum) + } + + log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ",")) +} + +func fatalf(err error, format string, args ...interface{}) { + if err != nil { + log.Fatalf(context.TODO(), format, args...) + } +} + +func main() { + var inputTopic, periodicSequenceStart, periodicSequenceEnd string + var periodicSequenceInterval time.Duration + + now := time.Now() + + flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339), + "The time at which to start the periodic sequence.") + + flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339), + "The time at which to end the periodic sequence.") + + flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute, + "The interval between periodic sequence output.") + + flag.StringVar(&inputTopic, "input_topic", "input", + "The PubSub topic from which to read the main input data.") + + flag.Parse() + beam.Init() + ctx := context.Background() + p, s := beam.NewPipelineWithRoot() + + project := gcpopts.GetProject(ctx) + client, err := pubsub.NewClient(ctx, project) + fatalf(err, "Failed to create client: %v", err) + _, err = pubsubx.EnsureTopic(ctx, client, inputTopic) + fatalf(err, "Failed to ensure topic: %v", err) + + source := pubsubio.Read(s, project, inputTopic, nil) + keyedSource := beam.AddFixedKey(s, source) // simulate keyed data by adding a fixed key + mainInput := beam.WindowInto( + s, + window.NewFixedWindows(periodicSequenceInterval), + keyedSource, + beam.Trigger(trigger.Repeat(trigger.Always())), + beam.PanesDiscard(), + ) + + startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart) + endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd) + + // Generate an impulse every period. + periodicImp := periodic.Impulse(s, startTime, endTime, periodicSequenceInterval, false) + + // Use the impulse to trigger some other ordinary transform. + updatedImp := beam.ParDo(s, update, periodicImp) + + // Window for use as a side input, to allow the input to change with windows. + sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), + updatedImp, + beam.Trigger(trigger.Repeat(trigger.Always())), + beam.PanesDiscard(), + ) + + beam.ParDo0(s, process, mainInput, + beam.SideInput{ + Input: sideInput, + }, + ) + + if _, err := beam.Run(context.Background(), "dataflow", p); err != nil { + log.Exitf(ctx, "Failed to run job: %v", err) + } +} diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go new file mode 100644 index 000000000000..f8752dc2c93b --- /dev/null +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package periodic contains transformations for generating periodic sequences. +package periodic + +import ( + "context" + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" +) + +func init() { + register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition, + func(beam.EventTime, int64), + sdf.ProcessContinuation, error](&sequenceGenDoFn{}) + register.Emitter2[beam.EventTime, int64]() + register.Function2x0(sequenceToImpulse) + register.Emitter1[[]byte]() + beam.RegisterType(reflect.TypeOf(SequenceDefinition{})) +} + +// SequenceDefinition holds the configuration for generating a sequence of +// timestamped elements at an interval. +type SequenceDefinition struct { + Interval time.Duration + + // Start is the number of milliseconds since the Unix epoch. + Start int64 + + // End is the number of milliseconds since the Unix epoch. + End int64 +} + +// NewSequenceDefinition creates a new [SequenceDefinition] from a start and +// end [time.Time] along with its interval [time.Duration]. +func NewSequenceDefinition(start, end time.Time, interval time.Duration) SequenceDefinition { + return SequenceDefinition{ + Start: start.UnixMilli(), + End: end.UnixMilli(), + Interval: interval, + } +} + +type sequenceGenDoFn struct{} + +func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction { + totalOutputs := mtime.Time(sd.End).ToTime().Sub(mtime.Time(sd.Start).ToTime()) / sd.Interval + return offsetrange.Restriction{ + Start: int64(0), + End: int64(totalOutputs), + } +} + +func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction { + return []offsetrange.Restriction{rest} +} + +// TruncateRestriction immediately truncates the entire restrication. +func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction { + return offsetrange.Restriction{} +} + +func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator { + return &sdf.ManualWatermarkEstimator{} +} + +func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) { + currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start + currentOutputTimestamp := mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) + currentTime := time.Now() + we.UpdateWatermark(currentOutputTimestamp) + for currentOutputTimestamp.Before(currentTime) { + if rt.TryClaim(currentOutputIndex) { + emit(mtime.FromTime(currentOutputTimestamp), currentOutputIndex) + currentOutputIndex += 1 + currentOutputTimestamp = mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex)) + currentTime = time.Now() + we.UpdateWatermark(currentOutputTimestamp) + } else if err := rt.GetError(); err != nil || rt.IsDone() { + // Stop processing on error or completion + return sdf.StopProcessing(), rt.GetError() + } else { + return sdf.ResumeProcessingIn(sd.Interval), nil + } + } + + return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil +} + +// Impulse is a PTransform which generates a sequence of timestamped +// elements at fixed runtime intervals. If applyWindow is specified, each +// element will be assigned to its own fixed window of interval size. +// +// The transform behaves the same as [Sequence] transform, but can be +// used as the first transform in a pipeline. +// +// The following applies to the arguments. +// - if start is a zero value [time.Time], start is set to the current time +// - if start is after end, start is set to end +// - start and end are normalized with [mtime.Normalize] +// - if interval <= 0 or interval > end.Sub(start), interval is set to end.Sub(start) +// +// The PCollection<[]byte> generated by Impulse is unbounded. +func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool) beam.PCollection { + if start.IsZero() { + start = time.Now() + } + if start.After(end) { + start = end + } + start = mtime.Normalize(mtime.FromTime(start)).ToTime() + end = mtime.Normalize(mtime.FromTime(end)).ToTime() + if interval <= 0 || interval > end.Sub(start) { + interval = end.Sub(start) + } + + return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, applyWindow, &sequenceGenDoFn{}) +} + +func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool, fn *sequenceGenDoFn) beam.PCollection { + sd := SequenceDefinition{Interval: interval, Start: start.UnixMilli(), End: end.UnixMilli()} + imp := beam.Create(s.Scope("ImpulseElement"), sd) + seq := genSequence(s, imp, fn) + imps := beam.ParDo(s, sequenceToImpulse, seq) + if applyWindow { + return beam.WindowInto(s.Scope("ApplyWindowing"), + window.NewFixedWindows(interval), imps) + } + return imps +} + +func sequenceToImpulse(_ int64, emit func([]byte)) { + emit([]byte{}) +} + +// Sequence is a PTransform which generates a sequence of timestamped +// elements at fixed runtime intervals. +// +// The transform assigns each element a timestamp and will only output an +// element once the worker clock reach the output timestamp. Sequence is not +// able to guarantee that elements are output at the their exact timestamp, but +// it guarantees that elements will not be output prior to runtime timestamp. +// +// The transform will not output elements prior to the start time. +// +// Sequence receives [SequenceDefinition] elements and for each input element +// received, it will start generating output elements in the following pattern: +// +// - if element timestamp is less than current runtime then output element. +// - if element timestamp is greater than current runtime, wait until next +// element timestamp. +// +// The PCollection generated by Sequence is unbounded. +func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection { + return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{}) +} + +func genSequence(s beam.Scope, col beam.PCollection, fn *sequenceGenDoFn) beam.PCollection { + return beam.ParDo(s.Scope("GenSequence"), fn, col) +} diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic_test.go b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go new file mode 100644 index 000000000000..7553e1f818af --- /dev/null +++ b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package periodic + +import ( + "os" + "testing" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" + "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" +) + +func TestMain(m *testing.M) { + os.Exit(ptest.MainRetWithDefault(m, "prism")) +} + +func TestSequence(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + sd := SequenceDefinition{ + Interval: time.Second, + Start: 0, + End: time.Minute.Milliseconds(), + } + in := beam.Create(s, sd) + out := Sequence(s, in) + passert.Count(s, out, "SecondsInMinute", 60) + ptest.RunAndValidate(t, p) +} + +func TestImpulse(t *testing.T) { + p, s := beam.NewPipelineWithRoot() + interval := time.Second + start := time.Unix(0, 0) + end := start.Add(time.Minute) + out := Impulse(s, start, end, interval, false) + passert.Count(s, out, "SecondsInMinute", 60) + ptest.RunAndValidate(t, p) +}