From 977e53104f493124101c8f06d63cca0ec0994990 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Fri, 17 Feb 2023 19:18:07 -0800 Subject: [PATCH] [prism] add windowing strategy (#25518) Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- .../runners/prism/internal/engine/strategy.go | 50 +++++++++++++++++++ .../prism/internal/engine/strategy_test.go | 45 +++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go create mode 100644 sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go new file mode 100644 index 000000000000..44e6064958c0 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package engine + +import ( + "fmt" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +type winStrat interface { + EarliestCompletion(typex.Window) mtime.Time +} + +type defaultStrat struct{} + +func (ws defaultStrat) EarliestCompletion(w typex.Window) mtime.Time { + return w.MaxTimestamp() +} + +func (defaultStrat) String() string { + return "default" +} + +type sessionStrat struct { + GapSize time.Duration +} + +func (ws sessionStrat) EarliestCompletion(w typex.Window) mtime.Time { + return w.MaxTimestamp().Add(ws.GapSize) +} + +func (ws sessionStrat) String() string { + return fmt.Sprintf("session[GapSize:%v]", ws.GapSize) +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go new file mode 100644 index 000000000000..9d558396f806 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package engine + +import ( + "testing" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" +) + +func TestEarliestCompletion(t *testing.T) { + tests := []struct { + strat winStrat + input typex.Window + want mtime.Time + }{ + {defaultStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime}, + {defaultStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3}, + {defaultStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1}, + {sessionStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3}, + {sessionStrat{GapSize: 3 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 6}, + } + + for _, test := range tests { + if got, want := test.strat.EarliestCompletion(test.input), test.want; got != want { + t.Errorf("%v.EarliestCompletion(%v)) = %v, want %v", test.strat, test.input, got, want) + } + } +}