Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#23106] Add periodic.Sequence and periodic.Impulse transforms to Go SDK #25808

Merged
merged 16 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
## New Features / Improvements

* The Flink runner now supports Flink 1.16.x ([#25046](https://github.com/apache/beam/issues/25046)).
* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support for slowly updating side input patterns. ([#23106](https://github.com/apache/beam/issues/23106))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"flag"
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
)

func init() {
register.Function4x0(update)
register.Function4x0(process)
register.Emitter2[int, string]()
register.Iter1[string]()
}

// update simulates an external call to get data for the side input.
func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) {
log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339))
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

// zero is the key used in beam.AddFixedKey which will be applied on the main input.
id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339)

emit(id, externalData)
}

// process simulates processing of main input. It reads side input by key
func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) {
log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)

iter := side(k)

var externalData []string
var externalDatum string
for iter(&externalDatum) {
externalData = append(externalData, externalDatum)
}

log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ","))
}

func fatalf(err error, format string, args ...interface{}) {
if err != nil {
log.Fatalf(context.TODO(), format, args...)
}
}

func main() {
var inputTopic, periodicSequenceStart, periodicSequenceEnd string
var periodicSequenceInterval time.Duration

now := time.Now()

flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339),
"The time at which to start the periodic sequence.")

flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339),
"The time at which to end the periodic sequence.")

flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute,
"The interval between periodic sequence output.")

flag.StringVar(&inputTopic, "input_topic", "input",
"The PubSub topic from which to read the main input data.")
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

flag.Parse()
beam.Init()
ctx := context.Background()
p, s := beam.NewPipelineWithRoot()

project := gcpopts.GetProject(ctx)
client, err := pubsub.NewClient(ctx, project)
fatalf(err, "Failed to create client: %v", err)
_, err = pubsubx.EnsureTopic(ctx, client, inputTopic)
fatalf(err, "Failed to ensure topic: %v", err)

mainInput := beam.WindowInto(
s,
window.NewFixedWindows(periodicSequenceInterval),
beam.AddFixedKey( // simulate keyed data by adding a fixed key
s,
pubsubio.Read(
s,
project,
inputTopic,
nil,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved

startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart)
endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd)
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
beam.ParDo(
s,
update,
periodic.Impulse(
s,
startTime,
endTime,
periodicSequenceInterval,
false,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)

beam.ParDo0(s, process, mainInput,
beam.SideInput{
Input: sideInput,
},
)

if _, err := beam.Run(context.Background(), "dataflow", p); err != nil {
log.Exitf(ctx, "Failed to run job: %v", err)
}
}
170 changes: 170 additions & 0 deletions sdks/go/pkg/beam/transforms/periodic/periodic.go
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to add a short unit test if using the prism runner directly.

While the runner isn't fully complete yet, it does run and execute ProcessContinuation transforms and watermarks!

It just doesn't do the splitting just yet, or actually "wait" for any process continuations at the moment. But when the "sequence" is done, it will terminate, so we can add a test with period of a second, a duration of a minute, and then count that we're getting 60 elements out of the transform. (Small risk of getting 59 instead, as a flake...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great! I couldn't get it working on the direct runner, but didn't try using the new prism explicitly. Have now added two tests TestImpulse and TestSequence and it looks like that's working. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The direct runner (and basically all non-portable runners), are not great, since they mis-align expectations for users when they move to executing on a "real" runner, like Flink or Dataflow. There's also no "spec" about what a Direct Runner should do to properly test things, so the Go Direct Runner is missing a number of features that the Java and Python ones use.

Essentially Prism is going to replace the Go Direct Runner as the default runner for the Go SDK at least, and hopefully make testing and local runs for all facets of Beam easier. The prism Readme has a bunch of the vision, and desired goals, but I gave a talk at last year's Beam Summit, about the motivations, especially in all the "hidden" bits of beam that users usually don't interact with, but are affected by.

https://www.youtube.com/watch?v=G4lbkvAG6xk

Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package periodic contains transformations for generating periodic sequences.
package periodic

import (
"context"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
func(beam.EventTime, []byte),
sdf.ProcessContinuation, error](&sequenceGenDoFn{})
register.Emitter2[beam.EventTime, []byte]()
beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
}

// SequenceDefinition holds the configuration for generating a sequence of
// timestamped elements at an interval.
type SequenceDefinition struct {
Interval time.Duration
Start int64
End int64
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
}

type sequenceGenDoFn struct{}

func (fn *sequenceGenDoFn) Setup() {
}

func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
totalOutputs := mtime.Time(sd.End).ToTime().Sub(mtime.Time(sd.Start).ToTime()) / sd.Interval
return offsetrange.Restriction{
Start: int64(0),
End: int64(totalOutputs),
}
}

func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}

func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
return rest.Size()
}

func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
return []offsetrange.Restriction{rest}
}

// TruncateRestriction immediately truncates the entire restrication.
func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
return offsetrange.Restriction{}
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
}

func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
return &sdf.ManualWatermarkEstimator{}
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
}

func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, []byte)) (sdf.ProcessContinuation, error) {
currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
currentOutputTimestamp := mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime := time.Now()
we.UpdateWatermark(currentOutputTimestamp)
for currentOutputTimestamp.Before(currentTime) {
if rt.TryClaim(currentOutputIndex) {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
emit(mtime.FromTime(currentOutputTimestamp), []byte{})
currentOutputIndex += 1
currentOutputTimestamp = mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime = time.Now()
we.UpdateWatermark(currentOutputTimestamp)
} else if err := rt.GetError(); err != nil || rt.IsDone() {
// Stop processing on error or completion
return sdf.StopProcessing(), rt.GetError()
} else {
return sdf.ResumeProcessingIn(sd.Interval), nil
}
}

return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
}

// Impulse is a PTransform which generates a sequence of timestamped
// elements at fixed runtime intervals. If applyWindow is specified, each
// element will be assigned to its own fixed window of interval size.
//
// The transform behaves the same as [Sequence] transform, but can be
// used as the first transform in a pipeline.
//
// The following applies to the arguments.
// - if start is a zero value [time.Time], start is set to the current time
// - if start is after end, start is set to end
// - start and end are normalized with [mtime.Normalize]
// - if interval <= 0 or interval > end.Sub(start), interval is set to end.Sub(start)
//
// The PCollection<[]byte> generated by Impulse is unbounded.
func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool) beam.PCollection {
if start.IsZero() {
start = time.Now()
}
if start.After(end) {
start = end
}
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
start = mtime.Normalize(mtime.FromTime(start)).ToTime()
end = mtime.Normalize(mtime.FromTime(end)).ToTime()
if interval <= 0 || interval > end.Sub(start) {
interval = end.Sub(start)
}

return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, applyWindow, &sequenceGenDoFn{})
}

func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool, fn *sequenceGenDoFn) beam.PCollection {
sd := SequenceDefinition{Interval: interval, Start: start.UnixMilli(), End: end.UnixMilli()}
imp := beam.Create(s.Scope("ImpulseElement"), sd)
col := genSequence(s, imp, fn)
if applyWindow {
return beam.WindowInto(s.Scope("ApplyWindowing"),
window.NewFixedWindows(interval), col)
}
return col
}

// Sequence is a PTransform which generates a sequence of timestamped
// elements at fixed runtime intervals.
//
// The transform assigns each element a timestamp and will only output an
// element once the worker clock reach the output timestamp. Sequence is not
// able to guarantee that elements are output at the their exact timestamp, but
// it guarantees that elements will not be output prior to runtime timestamp.
//
// The transform will not output elements prior to the start time.
//
// Sequence receives [SequenceDefinition] elements and for each input element
// received, it will start generating output elements in the following pattern:
//
// - if element timestamp is less than current runtime then output element.
// - if element timestamp is greater than current runtime, wait until next
// element timestamp.
//
// The PCollection<[]byte> generated by Sequence is unbounded.
func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection {
return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{})
}

func genSequence(s beam.Scope, col beam.PCollection, fn *sequenceGenDoFn) beam.PCollection {
return beam.ParDo(s.Scope("GenSequence"), fn, col)
}
57 changes: 57 additions & 0 deletions sdks/go/pkg/beam/transforms/periodic/periodic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package periodic

import (
"context"
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
)

func TestSequence(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
sd := SequenceDefinition{
Interval: time.Second,
Start: 0,
End: time.Minute.Milliseconds(),
}
in := beam.Create(s, sd)
out := Sequence(s, in)
passert.Count(s, out, "SecondsInMinute", 60)
beam.Init()
hnnsgstfssn marked this conversation as resolved.
Show resolved Hide resolved
_, err := prism.Execute(context.Background(), p)
if err != nil {
t.Fatalf("Failed to execute job: %v", err)
}
}

func TestImpulse(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
interval := time.Second
start := time.Unix(0, 0)
end := start.Add(time.Minute)
out := Impulse(s, start, end, interval, false)
passert.Count(s, out, "SecondsInMinute", 60)
beam.Init()
_, err := prism.Execute(context.Background(), p)
if err != nil {
t.Fatalf("Failed to execute job: %v", err)
}
}