From 37955a956a3d8b4b3f721c797bc7971e002c1d2a Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 29 Dec 2022 13:21:51 -0800 Subject: [PATCH 1/3] [#24789] Spot fix fullvalue wrapping for SDF. --- sdks/go/pkg/beam/core/runtime/exec/sdf.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go index be4f491b04cf..3a1812f06027 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go @@ -190,7 +190,8 @@ func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, d func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { rest := elm.Elm2.(*FullValue).Elm ws := elm.Elm2.(*FullValue).Elm2 - mainElm := elm.Elm.(*FullValue) + + mainElm := convertIfNeeded(elm.Elm, &FullValue{}) splitRests := n.splitInv.Invoke(mainElm, rest) From 152368b8a535ac9ca45d70426d373a132ff28cbb Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 29 Dec 2022 13:28:08 -0800 Subject: [PATCH 2/3] Fix indenting on diagrams --- sdks/go/pkg/beam/core/runtime/exec/sdf.go | 38 +++++++++++------------ 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go index 3a1812f06027..62af4c2e1ab2 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go @@ -159,13 +159,13 @@ func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, d // Input Diagram: // // *FullValue { -// Elm: *FullValue (original input) -// Elm2: *FullValue { -// Elm: Restriction -// Elm2: Watermark estimator state -// } -// Windows -// Timestamps +// Elm: *FullValue (original input) +// Elm2: *FullValue { +// Elm: Restriction +// Elm2: Watermark estimator state +// } +// Windows +// Timestamps // } // // ProcessElement splits the given restriction into one or more restrictions and @@ -175,18 +175,18 @@ func (n *SplitAndSizeRestrictions) StartBundle(ctx context.Context, id string, d // // Output Diagram: // -// *FullValue { -// Elm: *FullValue { -// Elm: *FullValue (original input) -// Elm2: *FullValue { -// Elm: Restriction -// Elm2: Watermark estimator state -// } -// } -// Elm2: float64 (size) -// Windows -// Timestamps -// } +// *FullValue { +// Elm: *FullValue { +// Elm: *FullValue (original input) +// Elm2: *FullValue { +// Elm: Restriction +// Elm2: Watermark estimator state +// } +// } +// Elm2: float64 (size) +// Windows +// Timestamps +// } func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error { rest := elm.Elm2.(*FullValue).Elm ws := elm.Elm2.(*FullValue).Elm2 From 8408321e96f1c47f327fcfb7ee3cd8a7fc649098 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 29 Dec 2022 13:28:43 -0800 Subject: [PATCH 3/3] Reinstate comment --- sdks/go/pkg/beam/core/runtime/exec/sdf.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go index 62af4c2e1ab2..d4b1b32d257d 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go @@ -191,6 +191,8 @@ func (n *SplitAndSizeRestrictions) ProcessElement(ctx context.Context, elm *Full rest := elm.Elm2.(*FullValue).Elm ws := elm.Elm2.(*FullValue).Elm2 + // If receiving directly from a datasource, + // the element may not be wrapped in a *FullValue mainElm := convertIfNeeded(elm.Elm, &FullValue{}) splitRests := n.splitInv.Invoke(mainElm, rest)