Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28126] plumb coder errors with better context. #28164

Merged
merged 5 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ func isLeafCoder(c *pipepb.Coder) bool {
//
// PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner
// for crossing the FnAPI boundary at data sources and data sinks.
func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) string {
func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) (string, error) {
col := comps.GetPcollections()[pID]
cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
cID, err := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
if err != nil {
return "", fmt.Errorf("makeWindowedValueCoder: couldn't process coder for pcollection %q %v: %w", pID, prototext.Format(col), err)
}
wcID := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId()

// The runner needs to be defensive, and tell the SDK to Length Prefix
Expand All @@ -73,7 +76,7 @@ func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[str
}
// Populate the coders to send with the new windowed value coder.
coders[wvcID] = wInC
return wvcID
return wvcID, nil
}

// makeWindowCoders makes the coder pair but behavior is ultimately determined by the strategy's windowFn.
Expand All @@ -94,22 +97,22 @@ func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder)
// lpUnknownCoders takes a coder, and populates coders with any new coders
// coders that the runner needs to be safe, and speedy.
// It returns either the passed in coder id, or the new safe coder id.
func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string {
func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, error) {
// First check if we've already added the LP version of this coder to coders already.
lpcID := cID + "_lp"
// Check if we've done this one before.
if _, ok := bundle[lpcID]; ok {
return lpcID
return lpcID, nil
}
// All coders in the coders map have been processed.
if _, ok := bundle[cID]; ok {
return cID
return cID, nil
}
// Look up the canonical location.
c, ok := base[cID]
if !ok {
// We messed up somewhere.
panic(fmt.Sprint("unknown coder id:", cID))
return "", fmt.Errorf("lpUnknownCoders: coder %q not present in base map", cID)
}
// Add the original coder to the coders map.
bundle[cID] = c
Expand All @@ -124,7 +127,7 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string {
ComponentCoderIds: []string{cID},
}
bundle[lpcID] = lpc
return lpcID
return lpcID, nil
}
// We know we have a composite, so if we count this as a leaf, move everything to
// the coders map.
Expand All @@ -133,12 +136,15 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string {
for _, cc := range c.GetComponentCoderIds() {
bundle[cc] = base[cc]
}
return cID
return cID, nil
}
var needNewComposite bool
var comps []string
for _, cc := range c.GetComponentCoderIds() {
rcc := lpUnknownCoders(cc, bundle, base)
for i, cc := range c.GetComponentCoderIds() {
rcc, err := lpUnknownCoders(cc, bundle, base)
if err != nil {
return "", fmt.Errorf("lpUnknownCoders: couldn't handle component %d %q of %q %v:\n%w", i, cc, cID, prototext.Format(c), err)
}
if cc != rcc {
needNewComposite = true
}
Expand All @@ -150,9 +156,9 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string {
ComponentCoderIds: comps,
}
bundle[lpcID] = lpc
return lpcID
return lpcID, nil
}
return cID
return cID, nil
}

// reconcileCoders ensures that the bundle coders are primed with initial coders from
Expand Down
6 changes: 4 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/coders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Test_isLeafCoder(t *testing.T) {
func Test_makeWindowedValueCoder(t *testing.T) {
coders := map[string]*pipepb.Coder{}

gotID := makeWindowedValueCoder("testPID", &pipepb.Components{
gotID, err := makeWindowedValueCoder("testPID", &pipepb.Components{
Pcollections: map[string]*pipepb.PCollection{
"testPID": {CoderId: "testCoderID"},
},
Expand All @@ -74,7 +74,9 @@ func Test_makeWindowedValueCoder(t *testing.T) {
},
},
}, coders)

if err != nil {
t.Errorf("makeWindowedValueCoder(...) = error %v, want nil", err)
}
if gotID == "" {
t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty", gotID)
}
Expand Down
14 changes: 11 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,12 @@ func reElementResiduals(residuals [][]byte, inputInfo PColInfo, rb RunBundle) []
if err == io.EOF {
break
}
slog.Error("reElementResiduals: error decoding residual header", err, "bundle", rb)
panic("error decoding residual header")
slog.Error("reElementResiduals: error decoding residual header", "error", err, "bundle", rb)
panic("error decoding residual header:" + err.Error())
}
if len(ws) == 0 {
slog.Error("reElementResiduals: sdk provided a windowed value header 0 windows", "bundle", rb)
panic("error decoding residual header: sdk provided a windowed value header 0 windows")
}

for _, w := range ws {
Expand Down Expand Up @@ -332,9 +336,13 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
if err == io.EOF {
break
}
slog.Error("PersistBundle: error decoding watermarks", err, "bundle", rb, slog.String("output", output))
slog.Error("PersistBundle: error decoding watermarks", "error", err, "bundle", rb, slog.String("output", output))
panic("error decoding watermarks")
}
if len(ws) == 0 {
slog.Error("PersistBundle: sdk provided a windowed value header 0 windows", "bundle", rb)
panic("error decoding residual header: sdk provided a windowed value header 0 windows")
}
// TODO: Optimize unnecessary copies. This is doubleteeing.
elmBytes := info.EDec(tee)
for _, w := range ws {
Expand Down
10 changes: 8 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,19 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
}

func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder, comps *pipepb.Components) func(io.Reader) []byte {
cID := lpUnknownCoders(coldCId, coders, comps.GetCoders())
cID, err := lpUnknownCoders(coldCId, coders, comps.GetCoders())
if err != nil {
panic(err)
}
return pullDecoder(coders[cID], coders)
}

func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()]
wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
if err != nil {
panic(err)
}
return makeWindowCoders(coders[wcID])
}

Expand Down
15 changes: 12 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,18 @@ func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, com
coders := map[string]*pipepb.Coder{}

// TODO assert this is a KV. It's probably fine, but we should fail anyway.
wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
kcID := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders())
ecID := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders())
wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
if err != nil {
panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process window coder:\n%w", stageID, tid, prototext.Format(t), err))
}
kcID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders())
if err != nil {
panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process key coder:\n%w", stageID, tid, prototext.Format(t), err))
}
ecID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders())
if err != nil {
panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process value coder:\n%w", stageID, tid, prototext.Format(t), err))
}
reconcileCoders(coders, comps.GetCoders())

wc := coders[wcID]
Expand Down
19 changes: 15 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -290,9 +291,12 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
sink2Col := map[string]string{}
col2Coders := map[string]engine.PColInfo{}
for _, o := range stg.outputs {
wOutCid := makeWindowedValueCoder(o.global, comps, coders)
sinkID := o.transform + "_" + o.local
col := comps.GetPcollections()[o.global]
wOutCid, err := makeWindowedValueCoder(o.global, comps, coders)
if err != nil {
return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for output %+v, pcol %q %v:\n%w", stg.ID, o, o.global, prototext.Format(col), err)
}
sinkID := o.transform + "_" + o.local
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
sink2Col[sinkID] = o.global
Expand All @@ -311,7 +315,10 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
for _, si := range stg.sideInputs {
col := comps.GetPcollections()[si.global]
oCID := col.GetCoderId()
nCID := lpUnknownCoders(oCID, coders, comps.GetCoders())
nCID, err := lpUnknownCoders(oCID, coders, comps.GetCoders())
if err != nil {
return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for side input %+v, pcol %q %v:\n%w", stg.ID, si, si.global, prototext.Format(col), err)
}

sides = append(sides, si.global)
if oCID != nCID {
Expand Down Expand Up @@ -339,9 +346,13 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
// This id is directly used for the source, but this also copies
// coders used by side inputs to the coders map for the bundle, so
// needs to be run for every ID.
wInCid := makeWindowedValueCoder(stg.primaryInput, comps, coders)

col := comps.GetPcollections()[stg.primaryInput]
wInCid, err := makeWindowedValueCoder(stg.primaryInput, comps, coders)
if err != nil {
return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for primary input, pcol %q %v:\n%w", stg.ID, stg.primaryInput, prototext.Format(col), err)
}

ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
inputInfo := engine.PColInfo{
Expand Down