Skip to content

Commit

Permalink
Add support for WindowStrategy Pane and AllowedLatness features (#31806)
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas authored Jul 9, 2024
1 parent de4645d commit 516bbc7
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregat
outputTime = func(w typex.Window, et mtime.Time) mtime.Time {
return w.MaxTimestamp()
}
case pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE:
outputTime = func(w typex.Window, et mtime.Time) mtime.Time {
return et
}
default:
// TODO need to correct session logic if output time is different.
panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"sync/atomic"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
Expand Down Expand Up @@ -195,7 +196,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

// Inspect Windowing strategies for unsupported features.
for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0))
check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0), mtime.MaxTimestamp.Milliseconds())

// Both Closing behaviors are identical without additional trigger firings.
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS)
check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING)
Expand Down

0 comments on commit 516bbc7

Please sign in to comment.