Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#23106] Add periodic.Sequence and periodic.Impulse transforms to Go SDK #25808

Merged
merged 16 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
## New Features / Improvements

* The Flink runner now supports Flink 1.16.x ([#25046](https://github.com/apache/beam/issues/25046)).
* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support for slowly updating side input patterns. ([#23106](https://github.com/apache/beam/issues/23106))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package main

import (
"context"
"flag"
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
)

func init() {
register.Function4x0(update)
register.Function4x0(process)
register.Emitter2[int, string]()
register.Iter1[string]()
}

// update simulates an external call to get data for the side input.
func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) {
log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339))
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

// zero is the key used in beam.AddFixedKey which will be applied on the main input.
id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339)

emit(id, externalData)
}

// process simulates processing of main input. It reads side input by key
func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) {
log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)

iter := side(k)

var externalData []string
var externalDatum string
for iter(&externalDatum) {
externalData = append(externalData, externalDatum)
}

log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ","))
}

func fatalf(err error, format string, args ...interface{}) {
if err != nil {
log.Fatalf(context.TODO(), format, args...)
}
}

func main() {
var inputTopic, periodicSequenceStart, periodicSequenceEnd string
var periodicSequenceInterval time.Duration

now := time.Now()

flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339),
"The time at which to start the periodic sequence.")

flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339),
"The time at which to end the periodic sequence.")

flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute,
"The interval between periodic sequence output.")

flag.StringVar(&inputTopic, "input_topic", "input",
"The PubSub topic from which to read the main input data.")
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

flag.Parse()
beam.Init()
ctx := context.Background()
p, s := beam.NewPipelineWithRoot()

project := gcpopts.GetProject(ctx)
client, err := pubsub.NewClient(ctx, project)
fatalf(err, "Failed to create client: %v", err)
_, err = pubsubx.EnsureTopic(ctx, client, inputTopic)
fatalf(err, "Failed to ensure topic: %v", err)

mainInput := beam.WindowInto(
s,
window.NewFixedWindows(periodicSequenceInterval),
beam.AddFixedKey( // simulate keyed data by adding a fixed key
s,
pubsubio.Read(
s,
project,
inputTopic,
nil,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart)
endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd)
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
beam.ParDo(
s,
update,
periodic.Impulse(
s,
startTime,
endTime,
periodicSequenceInterval,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nesting here, has the added deficit of hiding the core of the example.

Suggested change
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
beam.ParDo(
s,
update,
periodic.Impulse(
s,
startTime,
endTime,
periodicSequenceInterval,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)
// Generate an impulse every period.
periodicImp := periodic.Impulse(s, startTime, endTime, periodicSequenceInterval)
// Use the impulse to trigger some other ordinary transform.
updatedImp := beam.ParDo(s, update, periodicImp)
// Window for use as a side input, to allow the input to change with windows.
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
updatedImp,
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)

I'll note that the window for the side input is usually going to be larger than the window for the main processing. While this isn't wrong, the usualy goal around the pattern is a situation like allowing files that change hourly get read in once each hour, and have the more frequent data able to re-use the cached read in file. (Granted, this behavior isn't yet enabled by default in the Go SDK, but that's an aside).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of making the side input window larger. Do you think it is worth making that change, taking another configuration to specify the side input window size?

I'm also curious to know what you mean by

this behavior isn't yet enabled by default in the Go SDK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't able to apply this suggestion after having changed the periodic.Impulse signature, but have applied the same change in a separate commit. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WRT the cache:

Beam's execution abstraction, the FnAPI, has provisions for caching data cross bundle from the StateAPI on the SDK side, in order to avoid repeated deserialization, and additional round trips to the primary store of the runner to fetch state data. Side inputs also come across the StateAPI. In particular, very valuable for streaming jobs, as typically a single "SDK harness" is usually responsible for the same key all the time, so for tight windows it would look up the same data from the side inputs.


WRT not applying after the impulse change. Makes sense since an int64 was being received from the "update" function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't address the larger side input window. Since it's an example, we don't need to over configure things. I'd make the window 5 times larger to demonstrate the paradigm, and an explicit comment that's what the larger window is for.


beam.ParDo0(s, process, mainInput,
beam.SideInput{
Input: sideInput,
},
)

if _, err := beam.Run(context.Background(), "dataflow", p); err != nil {
log.Exitf(ctx, "Failed to run job: %v", err)
}
}
211 changes: 211 additions & 0 deletions sdks/go/pkg/beam/transforms/periodic/periodic.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to add a short unit test if using the prism runner directly.

While the runner isn't fully complete yet, it does run and execute ProcessContinuation transforms and watermarks!

It just doesn't do the splitting just yet, or actually "wait" for any process continuations at the moment. But when the "sequence" is done, it will terminate, so we can add a test with period of a second, a duration of a minute, and then count that we're getting 60 elements out of the transform. (Small risk of getting 59 instead, as a flake...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great! I couldn't get it working on the direct runner, but didn't try using the new prism explicitly. Have now added two tests TestImpulse and TestSequence and it looks like that's working. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The direct runner (and basically all non-portable runners), are not great, since they mis-align expectations for users when they move to executing on a "real" runner, like Flink or Dataflow. There's also no "spec" about what a Direct Runner should do to properly test things, so the Go Direct Runner is missing a number of features that the Java and Python ones use.

Essentially Prism is going to replace the Go Direct Runner as the default runner for the Go SDK at least, and hopefully make testing and local runs for all facets of Beam easier. The prism Readme has a bunch of the vision, and desired goals, but I gave a talk at last year's Beam Summit, about the motivations, especially in all the "hidden" bits of beam that users usually don't interact with, but are affected by.

https://www.youtube.com/watch?v=G4lbkvAG6xk

Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package periodic contains transformations for generating periodic sequences.
package periodic

import (
"context"
"fmt"
"math"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
func(beam.EventTime, int64),
sdf.ProcessContinuation, error](&sequenceGenDoFn{})
register.Emitter2[beam.EventTime, int64]()
beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
}

// SequenceDefinition holds the configuration for generating a sequence of
// timestamped elements at an interval.
type SequenceDefinition struct {
Interval time.Duration
Start time.Time
End time.Time
}

type sequenceGenDoFn struct {
now func() time.Time
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While playing around locally I've added this way to mock the time.Now function. If it ends up being unused I'm happy to drop it if that makes more sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work in the direct runner, but that's because the direct runner won't successfully run the example or anything local. It would be better to fold things into the sequence definition to enable appropriate testing behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I struggled with the direct runner, but have now added two working tests that uses the prism runner. I am struggling to understand how to fold this into the sequence definition as you suggest, to properly test it. Instead I went ahead and removed this entirely and the DoFn uses time.Now.

I am happy to leave it there for now. If you want to expand on how to fold it into the definition and test it I am happy to add this as well.

}

func (fn *sequenceGenDoFn) Setup() {
if fn.now == nil {
fn.now = time.Now
}
}

func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval))
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
return offsetrange.Restriction{
Start: int64(0),
End: int64(totalOutputs),
}
}

func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}

func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
return rest.Size()
}

func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
return []offsetrange.Restriction{rest}
}

// TruncateRestriction immediately truncates the entire restrication.
func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
return offsetrange.Restriction{}
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
}

func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
return &sdf.ManualWatermarkEstimator{}
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
}

func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime := fn.now()
we.UpdateWatermark(currentOutputTimestamp)
for currentOutputTimestamp.Before(currentTime) {
if rt.TryClaim(currentOutputIndex) {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli())
currentOutputIndex += 1
currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime = fn.now()
we.UpdateWatermark(currentOutputTimestamp)
} else if err := rt.GetError(); err != nil || rt.IsDone() {
// Stop processing on error or completion
return sdf.StopProcessing(), rt.GetError()
} else {
return sdf.ResumeProcessingIn(sd.Interval), nil
}
}

return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
}

type impulseConfig struct {
ApplyWindow bool

now func() time.Time
}

type impulseOption func(*impulseConfig) error

// ImpulseOption is a function that configures an [Impulse] transform.
type ImpulseOption = impulseOption
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

// WithApplyWindow configures the [Impulse] transform to apply a fixed window
// transform to the output PCollection.
func WithApplyWindow() ImpulseOption {
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
return func(o *impulseConfig) error {
o.ApplyWindow = true
return nil
}
}

func withNowFunc(now func() time.Time) ImpulseOption {
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
return func(o *impulseConfig) error {
o.now = now
return nil
}
}

// Impulse is a PTransform which generates a sequence of timestamped
// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each
// element will be assigned to its own fixed window of interval size.
//
// The transform behaves the same as [Sequence] transform, but can be
// used as the first transform in a pipeline.
//
// The following applies to the arguments.
// - if interval <= 0, interval is set to [math.MaxInt64]
// - if start is a zero value [time.Time], start is set to the current time
// - if start is after end, start is set to end
//
// The PCollection generated by Impulse is unbounded and the output elements
// are the [time.UnixMilli] int64 values of the output timestamp.
func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts ...ImpulseOption) beam.PCollection {
if interval <= 0 {
interval = math.MaxInt64
}
if start.IsZero() {
start = time.Now()
}
if start.After(end) {
start = end
}
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

conf := impulseConfig{}

for _, opt := range opts {
if err := opt(&conf); err != nil {
panic(fmt.Errorf("periodic.Impulse: invalid option: %v", err))
}
}

return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, conf, &sequenceGenDoFn{now: conf.now})
}

func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, conf impulseConfig, fn *sequenceGenDoFn) beam.PCollection {
sd := SequenceDefinition{Interval: interval, Start: start, End: end}
imp := beam.Create(s.Scope("ImpulseElement"), sd)
col := genSequence(s, imp, fn)
if conf.ApplyWindow {
return beam.WindowInto(s.Scope("ApplyWindowing"),
window.NewFixedWindows(interval), col)
}
return col
}

// Sequence is a PTransform which generates a sequence of timestamped
// elements at fixed runtime intervals.
//
// The transform assigns each element a timestamp and will only output an
// element once the worker clock reach the output timestamp. Sequence is not
// able to guarantee that elements are output at the their exact timestamp, but
// it guarantees that elements will not be output prior to runtime timestamp.
//
// The transform will not output elements prior to the start time.
//
// Sequence receives [SequenceDefinition] elements and for each input element
// received, it will start generating output elements in the following pattern:
//
// - if element timestamp is less than current runtime then output element.
// - if element timestamp is greater than current runtime, wait until next
// element timestamp.
//
// The PCollection generated by Sequence is unbounded and the output elements
// are the [time.UnixMilli] int64 values of the output timestamp.
func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection {
return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{})
}

func genSequence(s beam.Scope, col beam.PCollection, fn *sequenceGenDoFn) beam.PCollection {
return beam.ParDo(s.Scope("GenSequence"), fn, col)
}