diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 65280ef6b930..1e30d4258507 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -494,6 +494,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) { m.requirements[URNRequiresSplittableDoFn] = true } if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok { + payload.RequestsFinalization = true m.requirements[URNRequiresBundleFinalization] = true } if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); ok { diff --git a/sdks/go/pkg/beam/core/typex/special.go b/sdks/go/pkg/beam/core/typex/special.go index edc1249fe763..af36ba92d280 100644 --- a/sdks/go/pkg/beam/core/typex/special.go +++ b/sdks/go/pkg/beam/core/typex/special.go @@ -69,8 +69,18 @@ type Window interface { Equals(o Window) bool } -// BundleFinalization allows registering callbacks to be performed after the runner durably persists bundle results. +// BundleFinalization allows registering callbacks for the runner to invoke after the bundle completes and the runner +// commits the output. Parameter is accessible during DoFn StartBundle, ProcessElement, FinishBundle. +// However, if your DoFn implementation requires BundleFinalization in StartBundle or FinishBundle, it is needed in the +// ProcessElement signature, even if not invoked, +// Common use cases for BundleFinalization would be to perform work after elements in a bundle have been processed. +// See beam.ParDo for documentation on these DoFn lifecycle methods. type BundleFinalization interface { + + // RegisterCallback registers the runner to invoke func() after the runner persists the bundle of processed elements. + // The time.Duration configures the callback expiration, after which the runner will not invoke func(). + // Returning error communicates to the runner that bundle finalization failed and the runner may choose to attempt + // finalization again. RegisterCallback(time.Duration, func() error) } diff --git a/sdks/go/pkg/beam/forward.go b/sdks/go/pkg/beam/forward.go index 210c39ab4e49..b2f610b703e9 100644 --- a/sdks/go/pkg/beam/forward.go +++ b/sdks/go/pkg/beam/forward.go @@ -204,6 +204,7 @@ type Window = typex.Window // BundleFinalization represents the parameter used to register callbacks to // be run once the runner has durably persisted output for a bundle. +// See typex.BundleFinalization for more details. type BundleFinalization = typex.BundleFinalization // These are the reflect.Type instances of the universal types, which are used diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go index 2d3425af33c6..13e9b6f1b79d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go +++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go @@ -78,11 +78,7 @@ func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb } // Lets check for and remove anything that makes things less simple. - if pdo.OnWindowExpirationTimerFamilySpec == "" && - !pdo.RequestsFinalization && - !pdo.RequiresStableInput && - !pdo.RequiresTimeSortedInput && - pdo.RestrictionCoderId == "" { + if pdo.RestrictionCoderId == "" { // Which inputs are Side inputs don't change the graph further, // so they're not included here. Any nearly any ParDo can have them. diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index 6cde48ded9ac..1407feafe325 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -44,6 +44,7 @@ import ( var supportedRequirements = map[string]struct{}{ urns.RequirementSplittableDoFn: {}, urns.RequirementStatefulProcessing: {}, + urns.RequirementBundleFinalization: {}, } // TODO, move back to main package, and key off of executor handlers? diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index ed7f168e36ee..7de32f85b7ee 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -445,6 +445,7 @@ func finalizeStage(stg *stage, comps *pipepb.Components, pipelineFacts *fusionFa if err := (proto.UnmarshalOptions{}).Unmarshal(t.GetSpec().GetPayload(), pardo); err != nil { return fmt.Errorf("unable to decode ParDoPayload for %v", link.Transform) } + stg.finalize = pardo.RequestsFinalization if len(pardo.GetTimerFamilySpecs())+len(pardo.GetStateSpecs())+len(pardo.GetOnWindowExpirationTimerFamilySpec()) > 0 { stg.stateful = true } diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 24cfc750933d..f33754b2ca0a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "runtime/debug" "sync/atomic" "time" @@ -63,6 +64,7 @@ type stage struct { sideInputs []engine.LinkID // Non-parallel input PCollections and their consumers internalCols []string // PCollections that escape. Used for precise coder sending. envID string + finalize bool stateful bool // hasTimers indicates the transform+timerfamily pairs that need to be waited on for // the stage to be considered complete. @@ -106,7 +108,7 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c defer func() { // Convert execution panics to errors to fail the bundle. if e := recover(); e != nil { - err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v", e, s) + err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v,stackTrace:\n%s", e, s, debug.Stack()) } }() slog.Debug("Execute: starting bundle", "bundle", rb) @@ -322,6 +324,14 @@ progress: slog.Debug("returned empty residual application", "bundle", rb, slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput)) } em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, residuals) + if s.finalize { + _, err := b.Finalize(ctx, wk) + if err != nil { + slog.Error("SDK Error from bundle finalization", "bundle", rb, "error", err.Error()) + panic(err) + } + slog.Info("finalized bundle", "bundle", rb) + } b.OutputData = engine.TentativeData{} // Clear the data. return nil } diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index 6afb04521af0..f8917c72ccde 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -83,6 +83,7 @@ func TestImplemented(t *testing.T) { {pipeline: primitives.Checkpoints}, {pipeline: primitives.CoGBK}, {pipeline: primitives.ReshuffleKV}, + {pipeline: primitives.ParDoProcessElementBundleFinalizer}, // The following have been "allowed" to unblock further development // But it's not clear these tests truly validate the expected behavior diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 50e427ca36f5..3ccafdb81e9a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -206,6 +206,17 @@ func (b *B) Cleanup(wk *W) { wk.mu.Unlock() } +func (b *B) Finalize(ctx context.Context, wk *W) (*fnpb.FinalizeBundleResponse, error) { + resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{ + Request: &fnpb.InstructionRequest_FinalizeBundle{ + FinalizeBundle: &fnpb.FinalizeBundleRequest{ + InstructionId: b.InstID, + }, + }, + }) + return resp.GetFinalizeBundle(), nil +} + // Progress sends a progress request for the given bundle to the passed in worker, blocking on the response. func (b *B) Progress(ctx context.Context, wk *W) (*fnpb.ProcessBundleProgressResponse, error) { resp := wk.sendInstruction(ctx, &fnpb.InstructionRequest{ diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index aec69036eeb5..de782daa2d5d 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -104,6 +104,9 @@ var directFilters = []string{ "TestSetState", "TestSetStateClear", "TestTimers.*", // no timer support for the go direct runner. + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var portableFilters = []string{ @@ -134,6 +137,9 @@ var portableFilters = []string{ // The portable runner does not uniquify timers. (data elements re-fired) "TestTimers.*", + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var prismFilters = []string{ @@ -190,6 +196,9 @@ var flinkFilters = []string{ "TestTimers_EventTime_Unbounded", // (failure when comparing on side inputs (NPE on window lookup)) "TestTimers_ProcessingTime.*", // Flink doesn't support processing time timers. + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var samzaFilters = []string{ @@ -231,6 +240,9 @@ var samzaFilters = []string{ // Samza does not support state. "TestTimers.*", + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var sparkFilters = []string{ @@ -265,6 +277,9 @@ var sparkFilters = []string{ "TestTimers_EventTime_Unbounded", // Side inputs in executable stage not supported. "TestTimers_ProcessingTime_Infinity", // Spark doesn't support test stream. + + // no support for BundleFinalizer + "TestParDoBundleFinalizer.*", } var dataflowFilters = []string{ diff --git a/sdks/go/test/integration/primitives/pardo.go b/sdks/go/test/integration/primitives/pardo.go index 2c2383ea90ba..dc59d8f67b80 100644 --- a/sdks/go/test/integration/primitives/pardo.go +++ b/sdks/go/test/integration/primitives/pardo.go @@ -18,6 +18,8 @@ package primitives import ( "flag" "fmt" + "sync/atomic" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" @@ -32,6 +34,9 @@ func init() { register.Function3x2(asymJoinFn) register.Function5x0(splitByName) register.Function2x0(emitPipelineOptions) + register.DoFn2x0[beam.BundleFinalization, []byte]((*processElemBundleFinalizer)(nil)) + register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInFinishBundle)(nil)) + register.DoFn2x0[beam.BundleFinalization, []byte]((*finalizerInAll)(nil)) register.Iter1[int]() register.Iter2[int, int]() @@ -192,3 +197,78 @@ func emitPipelineOptions(_ []byte, emit func(string)) { emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B"))) emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C"))) } + +var CountInvokeBundleFinalizer atomic.Int32 + +const ( + BundleFinalizerStart = 1 + BundleFinalizerProcess = 2 + BundleFinalizerFinish = 4 +) + +// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn with a +// beam.BundleFinalization in its ProcessElement method. +func ParDoProcessElementBundleFinalizer(s beam.Scope) { + imp := beam.Impulse(s) + beam.ParDo0(s, &processElemBundleFinalizer{}, imp) +} + +type processElemBundleFinalizer struct { +} + +func (fn *processElemBundleFinalizer) ProcessElement(bf beam.BundleFinalization, _ []byte) { + bf.RegisterCallback(time.Second, func() error { + CountInvokeBundleFinalizer.Add(BundleFinalizerProcess) + return nil + }) +} + +// ParDoFinishBundleFinalizer creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a noop +// beam.BundleFinalization in its ProcessElement method and a beam.BundleFinalization in its FinishBundle method. +func ParDoFinishBundleFinalizer(s beam.Scope) { + imp := beam.Impulse(s) + beam.ParDo0(s, &finalizerInFinishBundle{}, imp) +} + +type finalizerInFinishBundle struct{} + +// ProcessElement requires beam.BundleFinalization in its method signature in order for FinishBundle's +// beam.BundleFinalization to be invoked. +func (fn *finalizerInFinishBundle) ProcessElement(_ beam.BundleFinalization, _ []byte) {} + +func (fn *finalizerInFinishBundle) FinishBundle(bf beam.BundleFinalization) { + bf.RegisterCallback(time.Second, func() error { + CountInvokeBundleFinalizer.Add(BundleFinalizerFinish) + return nil + }) +} + +// ParDoFinalizerInAll creates a beam.Pipeline with a beam.ParDo0 that processes a DoFn containing a beam.BundleFinalization +// in all three lifecycle methods StartBundle, ProcessElement, FinishBundle. +func ParDoFinalizerInAll(s beam.Scope) { + imp := beam.Impulse(s) + beam.ParDo0(s, &finalizerInAll{}, imp) +} + +type finalizerInAll struct{} + +func (fn *finalizerInAll) StartBundle(bf beam.BundleFinalization) { + bf.RegisterCallback(time.Second, func() error { + CountInvokeBundleFinalizer.Add(BundleFinalizerStart) + return nil + }) +} + +func (fn *finalizerInAll) ProcessElement(bf beam.BundleFinalization, _ []byte) { + bf.RegisterCallback(time.Second, func() error { + CountInvokeBundleFinalizer.Add(BundleFinalizerProcess) + return nil + }) +} + +func (fn *finalizerInAll) FinishBundle(bf beam.BundleFinalization) { + bf.RegisterCallback(time.Second, func() error { + CountInvokeBundleFinalizer.Add(BundleFinalizerFinish) + return nil + }) +} diff --git a/sdks/go/test/integration/primitives/pardo_test.go b/sdks/go/test/integration/primitives/pardo_test.go index d2ad57b350b3..aa6cb3de2008 100644 --- a/sdks/go/test/integration/primitives/pardo_test.go +++ b/sdks/go/test/integration/primitives/pardo_test.go @@ -18,6 +18,8 @@ package primitives import ( "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" "github.com/apache/beam/sdks/v2/go/test/integration" ) @@ -46,3 +48,44 @@ func TestParDoPipelineOptions(t *testing.T) { integration.CheckFilters(t) ptest.RunAndValidate(t, ParDoPipelineOptions()) } + +func TestParDoBundleFinalizer(t *testing.T) { + integration.CheckFilters(t) + if !jobopts.IsLoopback() { + t.Skip("Only Loopback mode is supported") + } + for _, tt := range []struct { + name string + pipelineFn func(s beam.Scope) + want int32 + }{ + { + name: "InProcessElement", + pipelineFn: ParDoProcessElementBundleFinalizer, + want: BundleFinalizerProcess, + }, + { + name: "InFinishBundle", + pipelineFn: ParDoFinishBundleFinalizer, + want: BundleFinalizerFinish, + }, + { + name: "InStartProcessFinishBundle", + pipelineFn: ParDoFinalizerInAll, + want: BundleFinalizerStart + BundleFinalizerProcess + BundleFinalizerFinish, + }, + } { + t.Run(tt.name, func(t *testing.T) { + CountInvokeBundleFinalizer.Store(0) + p, s := beam.NewPipelineWithRoot() + tt.pipelineFn(s) + _, err := ptest.RunWithMetrics(p) + if err != nil { + t.Fatalf("Failed to execute job: %v", err) + } + if got := CountInvokeBundleFinalizer.Load(); got != tt.want { + t.Errorf("BundleFinalization RegisterCallback not invoked as expected via proxy counts, got: %v, want: %v", got, tt.want) + } + }) + } +}