Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prism] Align side input coders & fix progress + split hang #27572

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,15 @@ func (ss *stageState) bundleReady(em *ElementManager) (mtime.Time, bool) {
}
ready := true
for _, side := range ss.sides {
pID := em.pcolParents[side]
parent := em.stages[pID]
pID, ok := em.pcolParents[side]
// These panics indicate pre-process/stage construction problems.
if !ok {
panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side))
}
parent, ok := em.stages[pID]
if !ok {
panic(fmt.Sprintf("stage[%v] no parent for side input %v, with parent ID %v", ss.ID, side, pID))
}
ow := parent.OutputWatermark()
if upstreamW > ow {
ready = false
Expand Down
78 changes: 57 additions & 21 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ progress:
progTick.Stop()
break progress // exit progress loop on close.
case <-progTick.C:
resp := b.Progress(wk)
resp, err := b.Progress(wk)
if err != nil {
slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error())
break progress
}
index, unknownIDs := j.ContributeTentativeMetrics(resp)
if len(unknownIDs) > 0 {
md := wk.MonitoringMetadata(unknownIDs)
Expand All @@ -125,7 +129,11 @@ progress:
slog.Debug("progress report", "bundle", rb, "index", index)
// Progress for the bundle hasn't advanced. Try splitting.
if previousIndex == index && !splitsDone {
sr := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
sr, err := b.Split(wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
if err != nil {
slog.Debug("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error())
break progress
}
if sr.GetChannelSplits() == nil {
slog.Warn("split failed", "bundle", rb)
splitsDone = true
Expand Down Expand Up @@ -164,8 +172,8 @@ progress:
// Bundle has failed, fail the job.
// TODO add retries & clean up this logic. Channels are closed by the "runner" transforms.
if !ok && b.Error != "" {
slog.Error("job failed", "error", b.Error, "bundle", rb, "job", j)
j.Failed(fmt.Errorf("bundle failed: %v", b.Error))
slog.Error("job failed", "bundle", rb, "job", j)
j.Failed(fmt.Errorf("%v", b.Error))
return
}

Expand Down Expand Up @@ -245,31 +253,54 @@ func buildStage(s *stage, tid string, t *pipepb.PTransform, comps *pipepb.Compon
}
var inputInfo engine.PColInfo
var sides []string
localIdReplacements := map[string]string{}
globalIDReplacements := map[string]string{}
for local, global := range t.GetInputs() {
if _, ok := sis[local]; ok {
col := comps.GetPcollections()[global]
oCID := col.GetCoderId()
nCID := lpUnknownCoders(oCID, coders, comps.GetCoders())

sides = append(sides, global)
if oCID != nCID {
// Add a synthetic PCollection set with the new coder.
newGlobal := global + "_prismside"
comps.GetPcollections()[newGlobal] = &pipepb.PCollection{
DisplayData: col.GetDisplayData(),
UniqueName: col.GetUniqueName(),
CoderId: nCID,
IsBounded: col.GetIsBounded(),
WindowingStrategyId: col.WindowingStrategyId,
}
localIdReplacements[local] = newGlobal
globalIDReplacements[newGlobal] = global
}
continue
}
// This id is directly used for the source, but this also copies
// coders used by side inputs to the coders map for the bundle, so
// needs to be run for every ID.
wInCid := makeWindowedValueCoder(global, comps, coders)
_, ok := sis[local]
if ok {
sides = append(sides, global)
} else {
// this is the main input
transforms[s.inputTransformID] = sourceTransform(s.inputTransformID, portFor(wInCid, wk), global)
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
inputInfo = engine.PColInfo{
GlobalID: global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
}

// this is the main input
transforms[s.inputTransformID] = sourceTransform(s.inputTransformID, portFor(wInCid, wk), global)
col := comps.GetPcollections()[global]
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
inputInfo = engine.PColInfo{
GlobalID: global,
WDec: wDec,
WEnc: wEnc,
EDec: ed,
}
// We need to process all inputs to ensure we have all input coders, so we must continue.
}
// Update side inputs to point to new PCollection with any replaced coders.
for l, g := range localIdReplacements {
t.GetInputs()[l] = g
}

prepareSides, err := handleSideInputs(t, comps, coders, wk)
prepareSides, err := handleSideInputs(t, comps, coders, wk, globalIDReplacements)
if err != nil {
slog.Error("buildStage: handleSideInputs", err, slog.String("transformID", tid))
panic(err)
Expand Down Expand Up @@ -322,7 +353,7 @@ func buildStage(s *stage, tid string, t *pipepb.PTransform, comps *pipepb.Compon
}

// handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data.
func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, tid string, watermark mtime.Time), error) {
func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) {
sis, err := getSideInputs(t)
if err != nil {
return nil, err
Expand All @@ -335,6 +366,11 @@ func handleSideInputs(t *pipepb.PTransform, comps *pipepb.Components, coders map
if !ok {
continue // This is the main input.
}
// Use the old global ID as the identifier for the data storage
// This matches what we do in the rest of the stage layer.
if oldGlobal, ok := replacements[global]; ok {
global = oldGlobal
}

// this is a side input
switch si.GetAccessPattern().GetUrn() {
Expand Down
21 changes: 15 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package worker

import (
"fmt"
"sync/atomic"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
Expand Down Expand Up @@ -144,18 +145,22 @@ func (b *B) Cleanup(wk *W) {
wk.mu.Unlock()
}

func (b *B) Progress(wk *W) *fnpb.ProcessBundleProgressResponse {
return wk.sendInstruction(&fnpb.InstructionRequest{
func (b *B) Progress(wk *W) (*fnpb.ProcessBundleProgressResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleProgress{
ProcessBundleProgress: &fnpb.ProcessBundleProgressRequest{
InstructionId: b.InstID,
},
},
}).GetProcessBundleProgress()
})
if resp.GetError() != "" {
return nil, fmt.Errorf("progress[%v] error from SDK: %v", b.InstID, resp.GetError())
}
return resp.GetProcessBundleProgress(), nil
}

func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) *fnpb.ProcessBundleSplitResponse {
return wk.sendInstruction(&fnpb.InstructionRequest{
func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) (*fnpb.ProcessBundleSplitResponse, error) {
resp := wk.sendInstruction(&fnpb.InstructionRequest{
Request: &fnpb.InstructionRequest_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitRequest{
InstructionId: b.InstID,
Expand All @@ -168,5 +173,9 @@ func (b *B) Split(wk *W, fraction float64, allowedSplits []int64) *fnpb.ProcessB
},
},
},
}).GetProcessBundleSplit()
})
if resp.GetError() != "" {
return nil, fmt.Errorf("split[%v] error from SDK: %v", b.InstID, resp.GetError())
}
return resp.GetProcessBundleSplit(), nil
}
22 changes: 13 additions & 9 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,7 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
// TODO: Do more than assume these are ProcessBundleResponses.
wk.mu.Lock()
if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok {
// TODO. Better pipeline error handling.
if resp.Error != "" {
slog.LogAttrs(ctrl.Context(), slog.LevelError, "ctrl.Recv pipeline error",
slog.String("error", resp.GetError()))
}
// Error is handled in the resonse handler.
b.Respond(resp)
} else {
slog.Debug("ctrl.Recv: %v", resp)
Expand Down Expand Up @@ -327,12 +323,20 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
}
}()

for req := range wk.DataReqs {
if err := data.Send(req); err != nil {
slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err))
for {
select {
case req, ok := <-wk.DataReqs:
if !ok {
return nil
}
if err := data.Send(req); err != nil {
slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err))
}
case <-data.Context().Done():
slog.Debug("Data context canceled")
return data.Context().Err()
}
}
return nil
}

// State relays elements and timer bytes to SDKs and back again, coordinated via
Expand Down