Skip to content

Commit

Permalink
[apache#28126] plumb coder errors with better context.
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Aug 25, 2023
1 parent b2d1c60 commit 147434c
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 24 deletions.
32 changes: 19 additions & 13 deletions sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,12 @@ func isLeafCoder(c *pipepb.Coder) bool {
//
// PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner
// for crossing the FnAPI boundary at data sources and data sinks.
func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) string {
func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) (string, error) {
col := comps.GetPcollections()[pID]
cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
cID, err := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders())
if err != nil {
return "", fmt.Errorf("makeWindowedValueCoder: couldn't process coder for pcollection %q %v: %w", pID, prototext.Format(col), err)
}
wcID := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId()

// The runner needs to be defensive, and tell the SDK to Length Prefix
Expand All @@ -73,7 +76,7 @@ func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[str
}
// Populate the coders to send with the new windowed value coder.
coders[wvcID] = wInC
return wvcID
return wvcID, nil
}

// makeWindowCoders makes the coder pair but behavior is ultimately determined by the strategy's windowFn.
Expand All @@ -94,22 +97,22 @@ func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder)
// lpUnknownCoders takes a coder, and populates coders with any new coders
// coders that the runner needs to be safe, and speedy.
// It returns either the passed in coder id, or the new safe coder id.
func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string {
func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) (string, error) {
// First check if we've already added the LP version of this coder to coders already.
lpcID := cID + "_lp"
// Check if we've done this one before.
if _, ok := bundle[lpcID]; ok {
return lpcID
return lpcID, nil
}
// All coders in the coders map have been processed.
if _, ok := bundle[cID]; ok {
return cID
return cID, nil
}
// Look up the canonical location.
c, ok := base[cID]
if !ok {
// We messed up somewhere.
panic(fmt.Sprint("unknown coder id:", cID))
return "", fmt.Errorf("lpUnknownCoders: coder %q not present in base map", cID)
}
// Add the original coder to the coders map.
bundle[cID] = c
Expand All @@ -124,7 +127,7 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string {
ComponentCoderIds: []string{cID},
}
bundle[lpcID] = lpc
return lpcID
return lpcID, nil
}
// We know we have a composite, so if we count this as a leaf, move everything to
// the coders map.
Expand All @@ -133,12 +136,15 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string {
for _, cc := range c.GetComponentCoderIds() {
bundle[cc] = base[cc]
}
return cID
return cID, nil
}
var needNewComposite bool
var comps []string
for _, cc := range c.GetComponentCoderIds() {
rcc := lpUnknownCoders(cc, bundle, base)
for i, cc := range c.GetComponentCoderIds() {
rcc, err := lpUnknownCoders(cc, bundle, base)
if err != nil {
return "", fmt.Errorf("lpUnknownCoders: couldn't handle component %d %q of %q %v:\n%w", i, cc, cID, prototext.Format(c), err)
}
if cc != rcc {
needNewComposite = true
}
Expand All @@ -150,9 +156,9 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string {
ComponentCoderIds: comps,
}
bundle[lpcID] = lpc
return lpcID
return lpcID, nil
}
return cID
return cID, nil
}

// reconcileCoders ensures that the bundle coders are primed with initial coders from
Expand Down
6 changes: 4 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/coders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Test_isLeafCoder(t *testing.T) {
func Test_makeWindowedValueCoder(t *testing.T) {
coders := map[string]*pipepb.Coder{}

gotID := makeWindowedValueCoder("testPID", &pipepb.Components{
gotID, err := makeWindowedValueCoder("testPID", &pipepb.Components{
Pcollections: map[string]*pipepb.PCollection{
"testPID": {CoderId: "testCoderID"},
},
Expand All @@ -74,7 +74,9 @@ func Test_makeWindowedValueCoder(t *testing.T) {
},
},
}, coders)

if err != nil {
t.Errorf("makeWindowedValueCoder(...) = error %v, want nil", err)
}
if gotID == "" {
t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty", gotID)
}
Expand Down
10 changes: 8 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,19 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
}

func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder, comps *pipepb.Components) func(io.Reader) []byte {
cID := lpUnknownCoders(coldCId, coders, comps.GetCoders())
cID, err := lpUnknownCoders(coldCId, coders, comps.GetCoders())
if err != nil {
panic(err)
}
return pullDecoder(coders[cID], coders)
}

func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, coders map[string]*pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) {
ws := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()]
wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
if err != nil {
panic(err)
}
return makeWindowCoders(coders[wcID])
}

Expand Down
15 changes: 12 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,18 @@ func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, com
coders := map[string]*pipepb.Coder{}

// TODO assert this is a KV. It's probably fine, but we should fail anyway.
wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
kcID := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders())
ecID := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders())
wcID, err := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
if err != nil {
panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process window coder:\n%w", stageID, tid, prototext.Format(t), err))
}
kcID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders())
if err != nil {
panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process key coder:\n%w", stageID, tid, prototext.Format(t), err))
}
ecID, err := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders())
if err != nil {
panic(fmt.Errorf("ExecuteTransform[GBK] stage %v, transform %q %v: couldn't process value coder:\n%w", stageID, tid, prototext.Format(t), err))
}
reconcileCoders(coders, comps.GetCoders())

wc := coders[wcID]
Expand Down
19 changes: 15 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -290,9 +291,12 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
sink2Col := map[string]string{}
col2Coders := map[string]engine.PColInfo{}
for _, o := range stg.outputs {
wOutCid := makeWindowedValueCoder(o.global, comps, coders)
sinkID := o.transform + "_" + o.local
col := comps.GetPcollections()[o.global]
wOutCid, err := makeWindowedValueCoder(o.global, comps, coders)
if err != nil {
return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for output %+v, pcol %q %v:\n%w", stg.ID, o, o.global, prototext.Format(col), err)
}
sinkID := o.transform + "_" + o.local
ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
sink2Col[sinkID] = o.global
Expand All @@ -311,7 +315,10 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
for _, si := range stg.sideInputs {
col := comps.GetPcollections()[si.global]
oCID := col.GetCoderId()
nCID := lpUnknownCoders(oCID, coders, comps.GetCoders())
nCID, err := lpUnknownCoders(oCID, coders, comps.GetCoders())
if err != nil {
return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for side input %+v, pcol %q %v:\n%w", stg.ID, si, si.global, prototext.Format(col), err)
}

sides = append(sides, si.global)
if oCID != nCID {
Expand Down Expand Up @@ -339,9 +346,13 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
// This id is directly used for the source, but this also copies
// coders used by side inputs to the coders map for the bundle, so
// needs to be run for every ID.
wInCid := makeWindowedValueCoder(stg.primaryInput, comps, coders)

col := comps.GetPcollections()[stg.primaryInput]
wInCid, err := makeWindowedValueCoder(stg.primaryInput, comps, coders)
if err != nil {
return fmt.Errorf("buildDescriptor: failed to handle coder on stage %v for primary input, pcol %q %v:\n%w", stg.ID, stg.primaryInput, prototext.Format(col), err)
}

ed := collectionPullDecoder(col.GetCoderId(), coders, comps)
wDec, wEnc := getWindowValueCoders(comps, col, coders)
inputInfo := engine.PColInfo{
Expand Down

0 comments on commit 147434c

Please sign in to comment.