From 2c59a748b91a0988e0b8e332f18bd9db072c3e95 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 20 Jul 2023 11:33:17 -0700 Subject: [PATCH] [prism] side input coders & progress hang (#27572) Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- .../prism/internal/engine/elementmanager.go | 11 ++- .../pkg/beam/runners/prism/internal/stage.go | 78 ++++++++++++++----- .../runners/prism/internal/worker/bundle.go | 21 +++-- .../runners/prism/internal/worker/worker.go | 22 +++--- 4 files changed, 94 insertions(+), 38 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 89fececea108..95ad2e562d4c 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -706,8 +706,15 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) { } ready := true for _, side := range ss.sides { - pID := em.pcolParents[side] - parent := em.stages[pID] + pID, ok := em.pcolParents[side] + // These panics indicate pre-process/stage construction problems. + if !ok { + panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side)) + } + parent, ok := em.stages[pID] + if !ok { + panic(fmt.Sprintf("stage[%v] no parent for side input %v, with parent ID %v", ss.ID, side, pID)) + } ow := parent.OutputWatermark() if upstreamW > ow { ready = false diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 7dbf8cf87e77..44f9c1e9d281 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -116,7 +116,11 @@ progress: progTick.Stop() break progress // exit progress loop on close. case <-progTick.C: - resp := b.Progress(wk) + resp, err := b.Progress(wk) + if err != nil { + slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error()) + break progress + } index, unknownIDs := j.ContributeTentativeMetrics(resp) if len(unknownIDs) > 0 { md := wk.MonitoringMetadata(unknownIDs) @@ -125,7 +129,11 @@ progress: slog.Debug("progress report", "bundle", rb, "index", index) // Progress for the bundle hasn't advanced. Try splitting. if previousIndex == index && !splitsDone { - sr := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) + sr, err := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) + if err != nil { + slog.Debug("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) + break progress + } if sr.GetChannelSplits() == nil { slog.Warn("split failed", "bundle", rb) splitsDone = true @@ -164,8 +172,8 @@ progress: // Bundle has failed, fail the job. // TODO add retries & clean up this logic. Channels are closed by the "runner" transforms. if !ok && b.Error != "" { - slog.Error("job failed", "error", b.Error, "bundle", rb, "job", j) - j.Failed(fmt.Errorf("bundle failed: %v", b.Error)) + slog.Error("job failed", "bundle", rb, "job", j) + j.Failed(fmt.Errorf("%v", b.Error)) return } @@ -245,31 +253,54 @@ func buildStage(s *stage, tid string, t *pipepb.PTransform, comps *pipepb.Compon } var inputInfo engine.PColInfo var sides []string + localIdReplacements := map[string]string{} + globalIDReplacements := map[string]string{} for local, global := range t.GetInputs() { + if _, ok := sis[local]; ok { + col := comps.GetPcollections()[global] + oCID := col.GetCoderId() + nCID := lpUnknownCoders(oCID, coders, comps.GetCoders()) + + sides = append(sides, global) + if oCID != nCID { + // Add a synthetic PCollection set with the new coder. + newGlobal := global + "_prismside" + comps.GetPcollections()[newGlobal] = &pipepb.PCollection{ + DisplayData: col.GetDisplayData(), + UniqueName: col.GetUniqueName(), + CoderId: nCID, + IsBounded: col.GetIsBounded(), + WindowingStrategyId: col.WindowingStrategyId, + } + localIdReplacements[local] = newGlobal + globalIDReplacements[newGlobal] = global + } + continue + } // This id is directly used for the source, but this also copies // coders used by side inputs to the coders map for the bundle, so // needs to be run for every ID. wInCid := makeWindowedValueCoder(global, comps, coders) - _, ok := sis[local] - if ok { - sides = append(sides, global) - } else { - // this is the main input - transforms[s.inputTransformID] = sourceTransform(s.inputTransformID, portFor(wInCid, wk), global) - col := comps.GetPcollections()[global] - ed := collectionPullDecoder(col.GetCoderId(), coders, comps) - wDec, wEnc := getWindowValueCoders(comps, col, coders) - inputInfo = engine.PColInfo{ - GlobalID: global, - WDec: wDec, - WEnc: wEnc, - EDec: ed, - } + + // this is the main input + transforms[s.inputTransformID] = sourceTransform(s.inputTransformID, portFor(wInCid, wk), global) + col := comps.GetPcollections()[global] + ed := collectionPullDecoder(col.GetCoderId(), coders, comps) + wDec, wEnc := getWindowValueCoders(comps, col, coders) + inputInfo = engine.PColInfo{ + GlobalID: global, + WDec: wDec, + WEnc: wEnc, + EDec: ed, } // We need to process all inputs to ensure we have all input coders, so we must continue. } + // Update side inputs to point to new PCollection with any replaced coders. + for l, g := range localIdReplacements { + t.GetInputs()[l] = g + } - prepareSides, err := handleSideInputs(t, comps, coders, wk) + prepareSides, err := handleSideInputs(t, comps, coders, wk, globalIDReplacements) if err != nil { slog.Error("buildStage: handleSideInputs", err, slog.String("transformID", tid)) panic(err) @@ -322,7 +353,7 @@ func buildStage(s *stage, tid string, t *pipepb.PTransform, comps *pipepb.Compon } // handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data. -func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, tid string, watermark mtime.Time), error) { +func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) { sis, err := getSideInputs(t) if err != nil { return nil, err @@ -335,6 +366,11 @@ func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map if !ok { continue // This is the main input. } + // Use the old global ID as the identifier for the data storage + // This matches what we do in the rest of the stage layer. + if oldGlobal, ok := replacements[global]; ok { + global = oldGlobal + } // this is a side input switch si.GetAccessPattern().GetUrn() { diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 58cc813d7108..30515fa6f6e8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -16,6 +16,7 @@ package worker import ( + "fmt" "sync/atomic" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" @@ -144,18 +145,22 @@ func (b *B) Cleanup(wk *W) { wk.mu.Unlock() } -func (b *B) Progress(wk *W) *fnpb.ProcessBundleProgressResponse { - return wk.sendInstruction(&fnpb.InstructionRequest{ +func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) { + resp := wk.sendInstruction(&fnpb.InstructionRequest{ Request: &fnpb.InstructionRequest_ProcessBundleProgress{ ProcessBundleProgress: &fnpb.ProcessBundleProgressRequest{ InstructionId: b.InstID, }, }, - }).GetProcessBundleProgress() + }) + if resp.GetError() != "" { + return nil, fmt.Errorf("progress[%v] error from SDK: %v", b.InstID, resp.GetError()) + } + return resp.GetProcessBundleProgress(), nil } -func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) *fnpb.ProcessBundleSplitResponse { - return wk.sendInstruction(&fnpb.InstructionRequest{ +func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error) { + resp := wk.sendInstruction(&fnpb.InstructionRequest{ Request: &fnpb.InstructionRequest_ProcessBundleSplit{ ProcessBundleSplit: &fnpb.ProcessBundleSplitRequest{ InstructionId: b.InstID, @@ -168,5 +173,9 @@ func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) *fnpb.ProcessB }, }, }, - }).GetProcessBundleSplit() + }) + if resp.GetError() != "" { + return nil, fmt.Errorf("split[%v] error from SDK: %v", b.InstID, resp.GetError()) + } + return resp.GetProcessBundleSplit(), nil } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 9767dec068fe..80bdadc51626 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -256,11 +256,7 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { // TODO: Do more than assume these are ProcessBundleResponses. wk.mu.Lock() if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok { - // TODO. Better pipeline error handling. - if resp.Error != "" { - slog.LogAttrs(ctrl.Context(), slog.LevelError, "ctrl.Recv pipeline error", - slog.String("error", resp.GetError())) - } + // Error is handled in the resonse handler. b.Respond(resp) } else { slog.Debug("ctrl.Recv: %v", resp) @@ -327,12 +323,20 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error { } }() - for req := range wk.DataReqs { - if err := data.Send(req); err != nil { - slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err)) + for { + select { + case req, ok := <-wk.DataReqs: + if !ok { + return nil + } + if err := data.Send(req); err != nil { + slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err)) + } + case <-data.Context().Done(): + slog.Debug("Data context canceled") + return data.Context().Err() } } - return nil } // State relays elements and timer bytes to SDKs and back again, coordinated via