diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index d045ced58f2d..e6fe28714b7f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -374,36 +374,6 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error { return nil } -// handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data. -func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) { - sis, err := getSideInputs(t) - if err != nil { - return nil, err - } - var prepSides []func(b *worker.B, watermark mtime.Time) - - // Get WindowedValue Coders for the transform's input and output PCollections. - for local, global := range t.GetInputs() { - _, ok := sis[local] - if !ok { - continue // This is the main input. - } - if oldGlobal, ok := replacements[global]; ok { - global = oldGlobal - } - prepSide, err := handleSideInput(tid, local, global, comps, coders, wk) - if err != nil { - return nil, err - } - prepSides = append(prepSides, prepSide) - } - return func(b *worker.B, tid string, watermark mtime.Time) { - for _, prep := range prepSides { - prep(b, watermark) - } - }, nil -} - // handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark. func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) { t := comps.GetTransforms()[tid]