From d5aea2284ff6ad15b9843fd4a4d8e90a1d959c1a Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 10 Feb 2023 10:08:19 -0800 Subject: [PATCH 1/5] [prism] coders + initial test file. --- .../pkg/beam/runners/prism/internal/coders.go | 246 ++++++++++++++++++ .../runners/prism/internal/coders_test.go | 60 +++++ 2 files changed, 306 insertions(+) create mode 100644 sdks/go/pkg/beam/runners/prism/internal/coders.go create mode 100644 sdks/go/pkg/beam/runners/prism/internal/coders_test.go diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go new file mode 100644 index 000000000000..dd31ac916388 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "bytes" + "fmt" + "io" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" + "golang.org/x/exp/slog" + "google.golang.org/protobuf/encoding/prototext" +) + +// leafCoders lists coder urns the runner knows how to manipulate. +// In particular, ones that won't be a problem to parse, in general +// because they have a known total size. +var leafCoders = map[string]struct{}{ + urns.CoderBytes: {}, + urns.CoderStringUTF8: {}, + urns.CoderLengthPrefix: {}, + urns.CoderVarInt: {}, + urns.CoderDouble: {}, + urns.CoderBool: {}, + urns.CoderGlobalWindow: {}, + urns.CoderIntervalWindow: {}, +} + +func isLeafCoder(c *pipepb.Coder) bool { + _, ok := leafCoders[c.GetSpec().GetUrn()] + return ok +} + +func makeWindowedValueCoder(t *pipepb.PTransform, pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) string { + col := comps.GetPcollections()[pID] + cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders()) + wcID := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId() + + // The runner needs to be defensive, and tell the SDK to Length Prefix + // any coders that it doesn't understand. + // So here, we look at the coder and its components, and produce + // new coders that we know how to deal with. + + // Produce ID for the Windowed Value Coder + wvcID := "cwv_" + pID + wInC := &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderWindowedValue, + }, + ComponentCoderIds: []string{cID, wcID}, + } + // Populate the coders to send with the new windowed value coder. + coders[wvcID] = wInC + return wvcID +} + +// makeWindowCoders makes the coder pair but behavior is ultimately determined by the strategy's windowFn. +func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) { + var cwc *coder.WindowCoder + switch wc.GetSpec().GetUrn() { + case urns.CoderGlobalWindow: + cwc = coder.NewGlobalWindow() + case urns.CoderIntervalWindow: + cwc = coder.NewIntervalWindow() + default: + slog.Log(slog.LevelError, "makeWindowCoders: unknown urn", slog.String("urn", wc.GetSpec().GetUrn())) + panic(fmt.Sprintf("makeWindowCoders, unknown urn: %v", prototext.Format(wc))) + } + return exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc) +} + +// lpUnknownCoders takes a coder, and populates coders with any new coders +// coders that the runner needs to be safe, and speedy. +// It returns either the passed in coder id, or the new safe coder id. +func lpUnknownCoders(cID string, coders, base map[string]*pipepb.Coder) string { + // First check if we've already added the LP version of this coder to coders already. + lpcID := cID + "_lp" + // Check if we've done this one before. + if _, ok := coders[lpcID]; ok { + return lpcID + } + // All coders in the coders map have been processed. + if _, ok := coders[cID]; ok { + return cID + } + // Look up the cannon location. + c, ok := base[cID] + if !ok { + // We messed up somewhere. + panic(fmt.Sprint("unknown coder id:", cID)) + } + // Add the original coder to the coders map. + coders[cID] = c + // If we don't know this coder, and it has no sub components, + // we must LP it, and we return the LP'd version. + if len(c.GetComponentCoderIds()) == 0 && !isLeafCoder(c) { + lpc := &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderLengthPrefix, + }, + ComponentCoderIds: []string{cID}, + } + coders[lpcID] = lpc + return lpcID + } + var needNewComposite bool + var comps []string + for _, cc := range c.GetComponentCoderIds() { + rcc := lpUnknownCoders(cc, coders, base) + if cc != rcc { + needNewComposite = true + } + comps = append(comps, rcc) + } + if needNewComposite { + lpc := &pipepb.Coder{ + Spec: c.GetSpec(), + ComponentCoderIds: comps, + } + coders[lpcID] = lpc + return lpcID + } + return cID +} + +// reconcileCoders, has coders is primed with initial coders. +func reconcileCoders(coders, base map[string]*pipepb.Coder) { + for { + var comps []string + for _, c := range coders { + for _, ccid := range c.GetComponentCoderIds() { + if _, ok := coders[ccid]; !ok { + // We don't have the coder yet, so in we go. + comps = append(comps, ccid) + } + } + } + if len(comps) == 0 { + return + } + for _, ccid := range comps { + c, ok := base[ccid] + if !ok { + panic(fmt.Sprintf("unknown coder id during reconciliation: %v", ccid)) + } + coders[ccid] = c + } + } +} + +func kvcoder(comps *pipepb.Components, tid string) *pipepb.Coder { + t := comps.GetTransforms()[tid] + var inputPColID string + for _, pcolID := range t.GetInputs() { + inputPColID = pcolID + } + pcol := comps.GetPcollections()[inputPColID] + return comps.GetCoders()[pcol.GetCoderId()] +} + +// pullDecoder return a function that will extract the bytes +// for the associated coder. +func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) []byte { + urn := c.GetSpec().GetUrn() + switch urn { + // Anything length prefixed can be treated as opaque. + case urns.CoderBytes, urns.CoderStringUTF8, urns.CoderLengthPrefix: + return func(r io.Reader) []byte { + var buf bytes.Buffer + tr := io.TeeReader(r, &buf) + l, _ := coder.DecodeVarInt(tr) + ioutilx.ReadN(tr, int(l)) + return buf.Bytes() + } + case urns.CoderVarInt: + return func(r io.Reader) []byte { + var buf bytes.Buffer + tr := io.TeeReader(r, &buf) + coder.DecodeVarInt(tr) + return buf.Bytes() + } + case urns.CoderBool: + return func(r io.Reader) []byte { + if v, _ := coder.DecodeBool(r); v { + return []byte{1} + } + return []byte{0} + } + case urns.CoderDouble: + return func(r io.Reader) []byte { + var buf bytes.Buffer + tr := io.TeeReader(r, &buf) + coder.DecodeDouble(tr) + return buf.Bytes() + } + case urns.CoderIterable: + ccids := c.GetComponentCoderIds() + ed := pullDecoder(coders[ccids[0]], coders) + // TODO-rejigger all of these to avoid all the wasteful byte copies. + // The utility of the io interfaces strike again! + return func(r io.Reader) []byte { + var buf bytes.Buffer + tr := io.TeeReader(r, &buf) + l, _ := coder.DecodeInt32(tr) + for i := int32(0); i < l; i++ { + ed(tr) + } + return buf.Bytes() + } + + case urns.CoderKV: + ccids := c.GetComponentCoderIds() + kd := pullDecoder(coders[ccids[0]], coders) + vd := pullDecoder(coders[ccids[1]], coders) + // TODO-rejigger all of these to avoid all the wasteful byte copies. + // The utility of the io interfaces strike again! + return func(r io.Reader) []byte { + var buf bytes.Buffer + tr := io.TeeReader(r, &buf) + kd(tr) + vd(tr) + return buf.Bytes() + } + case urns.CoderRow: + panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", prototext.Format(c))) + default: + panic(fmt.Sprintf("unknown coder urn key: %v", urn)) + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go new file mode 100644 index 000000000000..f7e0ea005db0 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "testing" + + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" +) + +func Test_isLeafCoder(t *testing.T) { + tests := []struct { + urn string + isLeaf bool + }{ + {urns.CoderBytes, true}, + {urns.CoderStringUTF8, true}, + {urns.CoderLengthPrefix, true}, + {urns.CoderVarInt, true}, + {urns.CoderDouble, true}, + {urns.CoderBool, true}, + {urns.CoderGlobalWindow, true}, + {urns.CoderIntervalWindow, true}, + {urns.CoderIterable, false}, + {urns.CoderRow, false}, + {urns.CoderKV, false}, + } + for _, test := range tests { + undertest := &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: test.urn, + }, + } + if got, want := isLeafCoder(undertest), test.isLeaf; got != want { + t.Errorf("isLeafCoder(%v) = %v, want %v", test.urn, got, want) + } + } +} + +func Test_makeWindowedValueCoder(t *testing.T) { + +} + +func Test_pullDecoder(t *testing.T) { + +} From 478c2c4f21be9b1bd1f10ebf0418a07a32164cd9 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 14 Feb 2023 13:04:49 -0800 Subject: [PATCH 2/5] [prism] internal/coders.go --- .../pkg/beam/runners/prism/internal/coders.go | 45 ++-- .../runners/prism/internal/coders_test.go | 219 +++++++++++++++++- 2 files changed, 248 insertions(+), 16 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index dd31ac916388..40acda00e85d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -48,7 +48,11 @@ func isLeafCoder(c *pipepb.Coder) bool { return ok } -func makeWindowedValueCoder(t *pipepb.PTransform, pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) string { +// makeWindowedValueCoder gets the coder for the PCollection, renders it safe, and adds it to the coders map. +// +// PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner +// for crossing the FnAPI boundary at data sources and data sinks. +func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) string { col := comps.GetPcollections()[pID] cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders()) wcID := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId() @@ -89,41 +93,51 @@ func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) // lpUnknownCoders takes a coder, and populates coders with any new coders // coders that the runner needs to be safe, and speedy. // It returns either the passed in coder id, or the new safe coder id. -func lpUnknownCoders(cID string, coders, base map[string]*pipepb.Coder) string { +func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string { // First check if we've already added the LP version of this coder to coders already. lpcID := cID + "_lp" // Check if we've done this one before. - if _, ok := coders[lpcID]; ok { + if _, ok := bundle[lpcID]; ok { return lpcID } // All coders in the coders map have been processed. - if _, ok := coders[cID]; ok { + if _, ok := bundle[cID]; ok { return cID } - // Look up the cannon location. + // Look up the canonical location. c, ok := base[cID] if !ok { // We messed up somewhere. panic(fmt.Sprint("unknown coder id:", cID)) } // Add the original coder to the coders map. - coders[cID] = c + bundle[cID] = c // If we don't know this coder, and it has no sub components, // we must LP it, and we return the LP'd version. - if len(c.GetComponentCoderIds()) == 0 && !isLeafCoder(c) { + leaf := isLeafCoder(c) + if len(c.GetComponentCoderIds()) == 0 && !leaf { lpc := &pipepb.Coder{ Spec: &pipepb.FunctionSpec{ Urn: urns.CoderLengthPrefix, }, ComponentCoderIds: []string{cID}, } - coders[lpcID] = lpc + bundle[lpcID] = lpc return lpcID } + // We know we have a composite, so if we count this as a leaf, move everything to + // the coders map. + if leaf { + // Copy the components from the base. + for _, cc := range c.GetComponentCoderIds() { + bundle[cc] = base[cc] + } + return cID + } var needNewComposite bool var comps []string for _, cc := range c.GetComponentCoderIds() { - rcc := lpUnknownCoders(cc, coders, base) + rcc := lpUnknownCoders(cc, bundle, base) if cc != rcc { needNewComposite = true } @@ -134,19 +148,20 @@ func lpUnknownCoders(cID string, coders, base map[string]*pipepb.Coder) string { Spec: c.GetSpec(), ComponentCoderIds: comps, } - coders[lpcID] = lpc + bundle[lpcID] = lpc return lpcID } return cID } -// reconcileCoders, has coders is primed with initial coders. -func reconcileCoders(coders, base map[string]*pipepb.Coder) { +// reconcileCoders ensures coders is primed with initial coders from +// the base pipeline components. +func reconcileCoders(bundle, base map[string]*pipepb.Coder) { for { var comps []string - for _, c := range coders { + for _, c := range bundle { for _, ccid := range c.GetComponentCoderIds() { - if _, ok := coders[ccid]; !ok { + if _, ok := bundle[ccid]; !ok { // We don't have the coder yet, so in we go. comps = append(comps, ccid) } @@ -160,7 +175,7 @@ func reconcileCoders(coders, base map[string]*pipepb.Coder) { if !ok { panic(fmt.Sprintf("unknown coder id during reconciliation: %v", ccid)) } - coders[ccid] = c + bundle[ccid] = c } } } diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go index f7e0ea005db0..12785b2decf9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go @@ -16,10 +16,16 @@ package internal import ( + "bytes" "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/testing/protocmp" ) func Test_isLeafCoder(t *testing.T) { @@ -52,9 +58,220 @@ func Test_isLeafCoder(t *testing.T) { } func Test_makeWindowedValueCoder(t *testing.T) { + coders := map[string]*pipepb.Coder{} + gotID := makeWindowedValueCoder("testPID", &pipepb.Components{ + Pcollections: map[string]*pipepb.PCollection{ + "testPID": {CoderId: "testCoderID"}, + }, + Coders: map[string]*pipepb.Coder{ + "testCoderID": { + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBool, + }, + }, + }, + }, coders) + + if gotID == "" { + t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty", gotID) + } + got := coders[gotID] + if got == nil { + t.Errorf("makeWindowedValueCoder(...) = ID %v, had nil entry", gotID) + } + if got.GetSpec().GetUrn() != urns.CoderWindowedValue { + t.Errorf("makeWindowedValueCoder(...) = ID %v, had nil entry", gotID) + } +} + +func Test_makeWindowCoders(t *testing.T) { + tests := []struct { + urn string + window typex.Window + }{ + {urns.CoderGlobalWindow, window.GlobalWindow{}}, + {urns.CoderIntervalWindow, window.IntervalWindow{ + Start: mtime.MinTimestamp, + End: mtime.MaxTimestamp, + }}, + } + for _, test := range tests { + undertest := &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: test.urn, + }, + } + dec, enc := makeWindowCoders(undertest) + + // Validate we're getting a round trip coder. + var buf bytes.Buffer + if err := enc.EncodeSingle(test.window, &buf); err != nil { + t.Errorf("encoder[%v].EncodeSingle(%v) = %v, want nil", test.urn, test.window, err) + } + got, err := dec.DecodeSingle(&buf) + if err != nil { + t.Errorf("decoder[%v].DecodeSingle(%v) = %v, want nil", test.urn, test.window, err) + } + + if want := test.window; got != want { + t.Errorf("makeWindowCoders(%v) didn't round trip: got %v, want %v", test.urn, got, want) + } + } +} + +func Test_lpUnknownCoders(t *testing.T) { + tests := []struct { + name string + urn string + components []string + bundle, base map[string]*pipepb.Coder + want map[string]*pipepb.Coder + }{ + {"alreadyProcessed", + urns.CoderBool, nil, + map[string]*pipepb.Coder{ + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {"alreadyProcessedLP", + urns.CoderBool, nil, + map[string]*pipepb.Coder{ + "test_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}}, + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "test_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}}, + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {"noNeedForLP", + urns.CoderBool, nil, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {"needLP", + urns.CoderRow, nil, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "test_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}}, + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + }, + }, + {"needLP_recurse", + urns.CoderKV, []string{"k", "v"}, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + map[string]*pipepb.Coder{ + "test_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k_lp", "v"}}, + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"k"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {"alreadyLP", urns.CoderLengthPrefix, []string{"k"}, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + }, + map[string]*pipepb.Coder{ + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"k"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Add the initial coder to base. + test.base["test"] = &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{Urn: test.urn}, + ComponentCoderIds: test.components, + } + + lpUnknownCoders("test", test.bundle, test.base) + + if d := cmp.Diff(test.want, test.bundle, protocmp.Transform()); d != "" { + t.Fatalf("lpUnknownCoders(%v); (-want, +got):\n%v", test.urn, d) + } + }) + } } -func Test_pullDecoder(t *testing.T) { +func Test_reconcileCoders(t *testing.T) { + tests := []struct { + name string + bundle, base map[string]*pipepb.Coder + want map[string]*pipepb.Coder + }{ + {name: "noChanges", + bundle: map[string]*pipepb.Coder{ + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + base: map[string]*pipepb.Coder{ + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "b": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBytes}}, + "c": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderStringUTF8}}, + }, + want: map[string]*pipepb.Coder{ + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {name: "KV", + bundle: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + }, + base: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + want: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {name: "KV-nested", + bundle: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + }, + base: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"a", "b"}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBytes}}, + "b": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + "c": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderStringUTF8}}, + }, + want: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"a", "b"}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBytes}}, + "b": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reconcileCoders(test.bundle, test.base) + if d := cmp.Diff(test.want, test.bundle, protocmp.Transform()); d != "" { + t.Fatalf("reconcileCoders(...); (-want, +got):\n%v", d) + } + }) + } } From 1d6e787b449a67006600a487d632b68b188381b4 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 14 Feb 2023 14:28:15 -0800 Subject: [PATCH 3/5] [prism] finish coder coverage --- .../pkg/beam/runners/prism/internal/coders.go | 78 +++++--------- .../runners/prism/internal/coders_test.go | 100 ++++++++++++++++++ 2 files changed, 128 insertions(+), 50 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 40acda00e85d..63e5802b559a 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -180,78 +180,56 @@ func reconcileCoders(bundle, base map[string]*pipepb.Coder) { } } -func kvcoder(comps *pipepb.Components, tid string) *pipepb.Coder { - t := comps.GetTransforms()[tid] - var inputPColID string - for _, pcolID := range t.GetInputs() { - inputPColID = pcolID - } - pcol := comps.GetPcollections()[inputPColID] - return comps.GetCoders()[pcol.GetCoderId()] -} - // pullDecoder return a function that will extract the bytes // for the associated coder. func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) []byte { + dec := pullDecoderNoAlloc(c, coders) + return func(r io.Reader) []byte { + var buf bytes.Buffer + tr := io.TeeReader(r, &buf) + dec(tr) + return buf.Bytes() + } +} + +func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) { urn := c.GetSpec().GetUrn() switch urn { // Anything length prefixed can be treated as opaque. case urns.CoderBytes, urns.CoderStringUTF8, urns.CoderLengthPrefix: - return func(r io.Reader) []byte { - var buf bytes.Buffer - tr := io.TeeReader(r, &buf) - l, _ := coder.DecodeVarInt(tr) - ioutilx.ReadN(tr, int(l)) - return buf.Bytes() + return func(r io.Reader) { + l, _ := coder.DecodeVarInt(r) + ioutilx.ReadN(r, int(l)) } case urns.CoderVarInt: - return func(r io.Reader) []byte { - var buf bytes.Buffer - tr := io.TeeReader(r, &buf) - coder.DecodeVarInt(tr) - return buf.Bytes() + return func(r io.Reader) { + coder.DecodeVarInt(r) } case urns.CoderBool: - return func(r io.Reader) []byte { - if v, _ := coder.DecodeBool(r); v { - return []byte{1} - } - return []byte{0} + return func(r io.Reader) { + coder.DecodeBool(r) } case urns.CoderDouble: - return func(r io.Reader) []byte { - var buf bytes.Buffer - tr := io.TeeReader(r, &buf) - coder.DecodeDouble(tr) - return buf.Bytes() + return func(r io.Reader) { + coder.DecodeDouble(r) } case urns.CoderIterable: ccids := c.GetComponentCoderIds() - ed := pullDecoder(coders[ccids[0]], coders) - // TODO-rejigger all of these to avoid all the wasteful byte copies. - // The utility of the io interfaces strike again! - return func(r io.Reader) []byte { - var buf bytes.Buffer - tr := io.TeeReader(r, &buf) - l, _ := coder.DecodeInt32(tr) + ed := pullDecoderNoAlloc(coders[ccids[0]], coders) + return func(r io.Reader) { + l, _ := coder.DecodeInt32(r) for i := int32(0); i < l; i++ { - ed(tr) + ed(r) } - return buf.Bytes() } case urns.CoderKV: ccids := c.GetComponentCoderIds() - kd := pullDecoder(coders[ccids[0]], coders) - vd := pullDecoder(coders[ccids[1]], coders) - // TODO-rejigger all of these to avoid all the wasteful byte copies. - // The utility of the io interfaces strike again! - return func(r io.Reader) []byte { - var buf bytes.Buffer - tr := io.TeeReader(r, &buf) - kd(tr) - vd(tr) - return buf.Bytes() + kd := pullDecoderNoAlloc(coders[ccids[0]], coders) + vd := pullDecoderNoAlloc(coders[ccids[1]], coders) + return func(r io.Reader) { + kd(r) + vd(r) } case urns.CoderRow: panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", prototext.Format(c))) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go index 12785b2decf9..ad6e36496286 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go @@ -17,6 +17,8 @@ package internal import ( "bytes" + "encoding/binary" + "math" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" @@ -275,3 +277,101 @@ func Test_reconcileCoders(t *testing.T) { }) } } + +func Test_pullDecoder(t *testing.T) { + + doubleBytes := make([]byte, 8) + binary.BigEndian.PutUint64(doubleBytes, math.Float64bits(math.SqrtPi)) + + tests := []struct { + name string + coder *pipepb.Coder + coders map[string]*pipepb.Coder + input []byte + }{ + { + "bytes", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBytes, + }, + }, + map[string]*pipepb.Coder{}, + []byte{3, 1, 2, 3}, + }, { + "varint", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderVarInt, + }, + }, + map[string]*pipepb.Coder{}, + []byte{255, 3}, + }, { + "bool", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBool, + }, + }, + map[string]*pipepb.Coder{}, + []byte{1}, + }, { + "double", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderDouble, + }, + }, + map[string]*pipepb.Coder{}, + doubleBytes, + }, { + "iterable", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderIterable, + }, + ComponentCoderIds: []string{"elm"}, + }, + map[string]*pipepb.Coder{ + "elm": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderVarInt, + }, + }, + }, + []byte{4, 0, 1, 2, 3}, + }, { + "kv", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderKV, + }, + ComponentCoderIds: []string{"key", "value"}, + }, + map[string]*pipepb.Coder{ + "key": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderVarInt, + }, + }, + "value": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBool, + }, + }, + }, + []byte{3, 0}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dec := pullDecoder(test.coder, test.coders) + buf := bytes.NewBuffer(test.input) + got := dec(buf) + if !bytes.EqualFold(test.input, got) { + t.Fatalf("pullDecoder(%v)(...) = %v, want %v", test.coder, got, test.input) + } + }) + } +} From 949c535304dd3b620de60db156971e50bc8e61ce Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 14 Feb 2023 14:39:36 -0800 Subject: [PATCH 4/5] [prism] no-alloc doc --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 63e5802b559a..6bb3e284e6a9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -181,7 +181,8 @@ func reconcileCoders(bundle, base map[string]*pipepb.Coder) { } // pullDecoder return a function that will extract the bytes -// for the associated coder. +// for the associated coder. Uses a buffer and a TeeReader to extract the original +// bytes from when decoding elements. func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) []byte { dec := pullDecoderNoAlloc(c, coders) return func(r io.Reader) []byte { @@ -192,6 +193,9 @@ func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reade } } +// pullDecoderNoAlloc returns a function that decodes a single eleemnt of the given coder. +// Intended to only be used as an internal function for pullDecoder, which will use a io.TeeReader +// to extract the bytes. func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) { urn := c.GetSpec().GetUrn() switch urn { From b6d291f27ec98b42321c739f5681d9605aed0e4b Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 15 Feb 2023 13:16:21 -0800 Subject: [PATCH 5/5] review commit --- sdks/go/pkg/beam/runners/prism/internal/coders.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go index 6bb3e284e6a9..d88ed763f804 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/coders.go +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -154,7 +154,7 @@ func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string { return cID } -// reconcileCoders ensures coders is primed with initial coders from +// reconcileCoders ensures that the bundle coders are primed with initial coders from // the base pipeline components. func reconcileCoders(bundle, base map[string]*pipepb.Coder) { for {