Skip to content

Commit

Permalink
Correctly chose earliest or latest in pane. (#31979)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Jul 29, 2024
1 parent 0a65b4b commit 9e431b4
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,20 +291,32 @@ func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingSt

// gbkBytes re-encodes gbk inputs in a gbk result.
func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder, watermark mtime.Time) []byte {
var outputTime func(typex.Window, mtime.Time) mtime.Time
// Pick how the timestamp of the aggregated output is computed.
var outputTime func(typex.Window, mtime.Time, mtime.Time) mtime.Time
switch ws.GetOutputTime() {
case pipepb.OutputTime_END_OF_WINDOW:
outputTime = func(w typex.Window, et mtime.Time) mtime.Time {
outputTime = func(w typex.Window, _, _ mtime.Time) mtime.Time {
return w.MaxTimestamp()
}
case pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE:
outputTime = func(w typex.Window, et mtime.Time) mtime.Time {
return et
case pipepb.OutputTime_EARLIEST_IN_PANE:
outputTime = func(_ typex.Window, cur, et mtime.Time) mtime.Time {
if et < cur {
return et
}
return cur
}
case pipepb.OutputTime_LATEST_IN_PANE:
outputTime = func(_ typex.Window, cur, et mtime.Time) mtime.Time {
if et > cur {
return et
}
return cur
}
default:
// TODO need to correct session logic if output time is different.
panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime()))
}

wDec, wEnc := makeWindowCoders(wc)

type keyTime struct {
Expand Down Expand Up @@ -340,14 +352,18 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat
key := string(keyByt)
value := vd(buf)
for _, w := range ws {
ft := outputTime(w, tm)
wk, ok := windows[w]
if !ok {
wk = make(map[string]keyTime)
windows[w] = wk
}
kt := wk[key]
kt.time = ft
kt, ok := wk[key]
if !ok {
// If the window+key map doesn't have a value, inititialize time with the element time.
// This allows earliest or latest to work properly in the outputTime function's first use.
kt.time = tm
}
kt.time = outputTime(w, kt.time, tm)
kt.key = keyByt
kt.w = w
kt.values = append(kt.values, value)
Expand Down

0 comments on commit 9e431b4

Please sign in to comment.