Skip to content

Commit

Permalink
[#29772][Go SDK] Add EventTime Timer tests. (#29829)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Dec 28, 2023
1 parent 8f58ad1 commit 783c72a
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 6 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
if err != nil {
return nil, err
}
u = &MapWindows{UID: b.idgen.New(), Fn: mapper, Out: out[0]}
u = &MapWindows{UID: b.idgen.New(), Fn: mapper, Out: out[0], FnUrn: fn.GetUrn()}

case graphx.URNFlatten:
u = &Flatten{UID: b.idgen.New(), N: len(transform.Inputs), Out: out[0]}
Expand Down
18 changes: 13 additions & 5 deletions sdks/go/pkg/beam/core/runtime/exec/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type MapWindows struct {
UID UnitID
Fn WindowMapper
Out Node

FnUrn string // Keep the urn for debugging purposes.
}

// ID returns the UnitID for this unit.
Expand All @@ -122,11 +124,17 @@ func (m *MapWindows) StartBundle(ctx context.Context, id string, data DataContex
}

func (m *MapWindows) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
w, ok := elm.Elm2.(window.IntervalWindow)
if !ok {
return errors.Errorf("not an IntervalWindow, got %T", elm.Elm2)
// MapWindows ends up with the wrappedDecode path, which can pass the value window through the
// Window field. Use that as the default for resilience to a change to match the coder correctly.
win := elm.Windows[0]
if elm.Elm2 != nil {
w, ok := elm.Elm2.(typex.Window)
if !ok {
return errors.Errorf("not a Window Value, got %T", elm.Elm2)
}
win = w
}
newW, err := m.Fn.MapWindow(w)
newW, err := m.Fn.MapWindow(win)
if err != nil {
return err
}
Expand All @@ -151,7 +159,7 @@ func (m *MapWindows) Down(_ context.Context) error {
}

func (m *MapWindows) String() string {
return fmt.Sprintf("MapWindows[%v]. Out:%v", m.Fn, m.Out.ID())
return fmt.Sprintf("MapWindows[%v]. Out:%v", m.FnUrn, m.Out.ID())
}

// WindowMapper defines an interface maps windows from a main input window space
Expand Down
15 changes: 15 additions & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ var portableFilters = []string{
"TestMapStateClear",
"TestSetState",
"TestSetStateClear",

// The portable runner does not appear to support timers. (extra elements)
"TestTimers.*",
}

var prismFilters = []string{
Expand All @@ -158,6 +161,9 @@ var prismFilters = []string{
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",

// The prism runner does not support timers https://github.com/apache/beam/issues/29772.
"TestTimers.*",
}

var flinkFilters = []string{
Expand All @@ -181,6 +187,9 @@ var flinkFilters = []string{
"TestMapStateClear",
"TestSetStateClear",
"TestSetState",

// Flink does not appear to support timers. (missing timer elements)
"TestTimers.*",
}

var samzaFilters = []string{
Expand Down Expand Up @@ -219,6 +228,9 @@ var samzaFilters = []string{
"TestSetStateClear",
// TODO(https://github.com/apache/beam/issues/26126): Java runner issue (AcitveBundle has no regsitered handler)
"TestDebeziumIO_BasicRead",

// Samza does not appear to support timers. (missing timer elements)
"TestTimers.*",
}

var sparkFilters = []string{
Expand Down Expand Up @@ -250,6 +262,9 @@ var sparkFilters = []string{
"TestMapStateClear",
"TestSetStateClear",
"TestSetState",

// Spark does not appear to support timers. (Missing all elements)
"TestTimers.*",
}

var dataflowFilters = []string{
Expand Down
152 changes: 152 additions & 0 deletions sdks/go/test/integration/primitives/timers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package primitives

import (
"context"
"strconv"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
)

// Based on https://github.com/apache/beam/blob/master/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java

func init() {
register.DoFn2x0[[]byte, func(string, int)](&inputFn[string, int]{})
register.DoFn6x0[beam.Window, state.Provider, timers.Provider, string, int, func(kv[string, int])](&eventTimeFn{})
register.Emitter2[string, int]()
register.Emitter1[kv[string, int]]()
}

type kv[K, V any] struct {
Key K
Value V
}

func kvfn[K, V any](k K, v V) kv[K, V] {
return kv[K, V]{k, v}
}

type inputFn[K, V any] struct {
Inputs []kv[K, V]
}

func (fn *inputFn[K, V]) ProcessElement(_ []byte, emit func(K, V)) {
for _, in := range fn.Inputs {
emit(in.Key, in.Value)
}
}

type eventTimeFn struct {
Callback timers.EventTime
MyKey state.Value[string]

Offset int
TimerOutput int
}

func (fn *eventTimeFn) ProcessElement(w beam.Window, sp state.Provider, tp timers.Provider, key string, value int, emit func(kv[string, int])) {
fn.Callback.Set(tp, w.MaxTimestamp().ToTime())
fn.MyKey.Write(sp, key)
emit(kvfn(key, value+fn.Offset))
}

func (fn *eventTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(kv[string, int])) {
switch timer.Family {
case fn.Callback.Family:
switch timer.Tag {
case "":
read, ok, err := fn.MyKey.Read(sp)
if err != nil {
panic(err)
}
if !ok {
panic("State must be set.")
}
emit(kvfn(read, fn.TimerOutput))
default:
panic("unexpected timer tag: " + timer.Family + " tag:" + timer.Tag + " want: \"\", for key:" + key)
}
default:
if fn.Callback.Family != timer.Family || timer.Tag != "" {
panic("unexpected timer family: " + timer.Family + " tag:" + timer.Tag + " want: " + fn.Callback.Family + ", for key:" + key)
}
}
}

// TimersEventTime takes in an impulse transform and produces a pipeline construction
// function that validates EventTime timers.
//
// The impulse is provided outside to swap between a bounded impulse, and
// an unbounded one, because the Go SDK uses that to determine if a pipeline
// is "streaming" or not. This matters at least for executions on Dataflow.
//
// The test produces some number of key value elements, with various event time offsets,
// expecting that a single timer event is what fires. The test validates that all the
// input elements, and the timer elements have been emitted.
func timersEventTimePipelineBuilder(makeImp func(s beam.Scope) beam.PCollection) func(s beam.Scope) {
return func(s beam.Scope) {
var inputs, wantOutputs []kv[string, int]

offset := 5000
timerOutput := 4093

numKeys := 50
numDuplicateTimers := 15

for key := 0; key < numKeys; key++ {
k := strconv.Itoa(key)
wantOutputs = append(wantOutputs, kvfn(k, timerOutput))

for i := 0; i < numDuplicateTimers; i++ {
inputs = append(inputs, kvfn(k, i))
wantOutputs = append(wantOutputs, kvfn(k, i+offset))
}
}

imp := makeImp(s)

keyed := beam.ParDo(s, &inputFn[string, int]{
Inputs: inputs,
}, imp)
times := beam.ParDo(s, &eventTimeFn{
Offset: offset,
TimerOutput: timerOutput,
Callback: timers.InEventTime("Callback"),
MyKey: state.MakeValueState[string]("MyKey"),
}, keyed)
passert.EqualsList(s, times, wantOutputs)
}
}

// TimersEventTime_Bounded validates event time timers in a bounded pipeline.
func TimersEventTime_Bounded(s beam.Scope) {
timersEventTimePipelineBuilder(beam.Impulse)(s)
}

// TimersEventTime_Bounded validates event time timers in an unbounded pipeline.
func TimersEventTime_Unbounded(s beam.Scope) {
timersEventTimePipelineBuilder(func(s beam.Scope) beam.PCollection {
now := time.Now()
return periodic.Impulse(s, now, now.Add(10*time.Second), 0, false)
})(s)
}
35 changes: 35 additions & 0 deletions sdks/go/test/integration/primitives/timers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package primitives

import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
)

func TestTimers_EventTime_Bounded(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TimersEventTime_Bounded)
}

func TestTimers_EventTime_Unbounded(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TimersEventTime_Unbounded)
}

// TODO(https://github.com/apache/beam/issues/29772): Add ProcessingTime Timer tests.

0 comments on commit 783c72a

Please sign in to comment.