diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go index 25b846370fb5..54cc02e07b3d 100644 --- a/sdks/go/pkg/beam/core/graph/fn.go +++ b/sdks/go/pkg/beam/core/graph/fn.go @@ -1099,13 +1099,13 @@ func validateSdfElementT(fn *Fn, name string, method *funcx.Fn, num int, startIn for i := 0; i < num; i++ { idx := i + startIndex - if method.Param[idx].T != processFn.Param[pos+i].T { + if got, want := method.Param[i+startIndex].T, processFn.Param[pos+i].T; got != want { err := errors.Errorf("mismatched element type in method %v, param %v. got: %v, want: %v", - name, idx, method.Param[idx].T, processFn.Param[pos+i].T) + name, idx, got, want) return errors.SetTopLevelMsgf(err, "Mismatched element type in method %v, "+ "parameter at index %v. Got: %v, Want: %v (from method %v). "+ "Ensure that element parameters in SDF methods have consistent types with element parameters in %v.", - name, idx, method.Param[idx].T, processFn.Param[pos+i].T, processElementName, processElementName) + name, idx, got, want, processElementName, processElementName) } } return nil @@ -1250,7 +1250,7 @@ func validateStatefulWatermarkSig(fn *Fn, numMainIn int) error { "Ensure that all restrictions in an SDF are the same type.", initialWatermarkEstimatorStateName, 1, method.Param[1].T, restT, createTrackerName) } - if err := validateSdfElementT(fn, restrictionSizeName, method, numMainIn, 2); err != nil { + if err := validateSdfElementT(fn, initialWatermarkEstimatorStateName, method, numMainIn, 2); err != nil { return err } diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go b/sdks/go/pkg/beam/core/runtime/exec/datasink.go index 36f2a5195ca2..6b39a2bb44f1 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go @@ -77,7 +77,7 @@ func (n *DataSink) ProcessElement(ctx context.Context, value *FullValue, values return err } if err := n.enc.Encode(value, &b); err != nil { - return errors.WithContextf(err, "encoding element %v with coder %v", value, n.enc) + return errors.WithContextf(err, "encoding element %v with coder %v", value, n.Coder) } byteCount, err := n.w.Write(b.Bytes()) if err != nil {