diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders.go b/sdks/go/pkg/beam/runners/prism/internal/coders.go new file mode 100644 index 000000000000..d88ed763f804 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/coders.go @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "bytes" + "fmt" + "io" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" + "golang.org/x/exp/slog" + "google.golang.org/protobuf/encoding/prototext" +) + +// leafCoders lists coder urns the runner knows how to manipulate. +// In particular, ones that won't be a problem to parse, in general +// because they have a known total size. +var leafCoders = map[string]struct{}{ + urns.CoderBytes: {}, + urns.CoderStringUTF8: {}, + urns.CoderLengthPrefix: {}, + urns.CoderVarInt: {}, + urns.CoderDouble: {}, + urns.CoderBool: {}, + urns.CoderGlobalWindow: {}, + urns.CoderIntervalWindow: {}, +} + +func isLeafCoder(c *pipepb.Coder) bool { + _, ok := leafCoders[c.GetSpec().GetUrn()] + return ok +} + +// makeWindowedValueCoder gets the coder for the PCollection, renders it safe, and adds it to the coders map. +// +// PCollection coders are not inherently WindowValueCoder wrapped, and they are added by the runner +// for crossing the FnAPI boundary at data sources and data sinks. +func makeWindowedValueCoder(pID string, comps *pipepb.Components, coders map[string]*pipepb.Coder) string { + col := comps.GetPcollections()[pID] + cID := lpUnknownCoders(col.GetCoderId(), coders, comps.GetCoders()) + wcID := comps.GetWindowingStrategies()[col.GetWindowingStrategyId()].GetWindowCoderId() + + // The runner needs to be defensive, and tell the SDK to Length Prefix + // any coders that it doesn't understand. + // So here, we look at the coder and its components, and produce + // new coders that we know how to deal with. + + // Produce ID for the Windowed Value Coder + wvcID := "cwv_" + pID + wInC := &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderWindowedValue, + }, + ComponentCoderIds: []string{cID, wcID}, + } + // Populate the coders to send with the new windowed value coder. + coders[wvcID] = wInC + return wvcID +} + +// makeWindowCoders makes the coder pair but behavior is ultimately determined by the strategy's windowFn. +func makeWindowCoders(wc *pipepb.Coder) (exec.WindowDecoder, exec.WindowEncoder) { + var cwc *coder.WindowCoder + switch wc.GetSpec().GetUrn() { + case urns.CoderGlobalWindow: + cwc = coder.NewGlobalWindow() + case urns.CoderIntervalWindow: + cwc = coder.NewIntervalWindow() + default: + slog.Log(slog.LevelError, "makeWindowCoders: unknown urn", slog.String("urn", wc.GetSpec().GetUrn())) + panic(fmt.Sprintf("makeWindowCoders, unknown urn: %v", prototext.Format(wc))) + } + return exec.MakeWindowDecoder(cwc), exec.MakeWindowEncoder(cwc) +} + +// lpUnknownCoders takes a coder, and populates coders with any new coders +// coders that the runner needs to be safe, and speedy. +// It returns either the passed in coder id, or the new safe coder id. +func lpUnknownCoders(cID string, bundle, base map[string]*pipepb.Coder) string { + // First check if we've already added the LP version of this coder to coders already. + lpcID := cID + "_lp" + // Check if we've done this one before. + if _, ok := bundle[lpcID]; ok { + return lpcID + } + // All coders in the coders map have been processed. + if _, ok := bundle[cID]; ok { + return cID + } + // Look up the canonical location. + c, ok := base[cID] + if !ok { + // We messed up somewhere. + panic(fmt.Sprint("unknown coder id:", cID)) + } + // Add the original coder to the coders map. + bundle[cID] = c + // If we don't know this coder, and it has no sub components, + // we must LP it, and we return the LP'd version. + leaf := isLeafCoder(c) + if len(c.GetComponentCoderIds()) == 0 && !leaf { + lpc := &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderLengthPrefix, + }, + ComponentCoderIds: []string{cID}, + } + bundle[lpcID] = lpc + return lpcID + } + // We know we have a composite, so if we count this as a leaf, move everything to + // the coders map. + if leaf { + // Copy the components from the base. + for _, cc := range c.GetComponentCoderIds() { + bundle[cc] = base[cc] + } + return cID + } + var needNewComposite bool + var comps []string + for _, cc := range c.GetComponentCoderIds() { + rcc := lpUnknownCoders(cc, bundle, base) + if cc != rcc { + needNewComposite = true + } + comps = append(comps, rcc) + } + if needNewComposite { + lpc := &pipepb.Coder{ + Spec: c.GetSpec(), + ComponentCoderIds: comps, + } + bundle[lpcID] = lpc + return lpcID + } + return cID +} + +// reconcileCoders ensures that the bundle coders are primed with initial coders from +// the base pipeline components. +func reconcileCoders(bundle, base map[string]*pipepb.Coder) { + for { + var comps []string + for _, c := range bundle { + for _, ccid := range c.GetComponentCoderIds() { + if _, ok := bundle[ccid]; !ok { + // We don't have the coder yet, so in we go. + comps = append(comps, ccid) + } + } + } + if len(comps) == 0 { + return + } + for _, ccid := range comps { + c, ok := base[ccid] + if !ok { + panic(fmt.Sprintf("unknown coder id during reconciliation: %v", ccid)) + } + bundle[ccid] = c + } + } +} + +// pullDecoder return a function that will extract the bytes +// for the associated coder. Uses a buffer and a TeeReader to extract the original +// bytes from when decoding elements. +func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) []byte { + dec := pullDecoderNoAlloc(c, coders) + return func(r io.Reader) []byte { + var buf bytes.Buffer + tr := io.TeeReader(r, &buf) + dec(tr) + return buf.Bytes() + } +} + +// pullDecoderNoAlloc returns a function that decodes a single eleemnt of the given coder. +// Intended to only be used as an internal function for pullDecoder, which will use a io.TeeReader +// to extract the bytes. +func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) { + urn := c.GetSpec().GetUrn() + switch urn { + // Anything length prefixed can be treated as opaque. + case urns.CoderBytes, urns.CoderStringUTF8, urns.CoderLengthPrefix: + return func(r io.Reader) { + l, _ := coder.DecodeVarInt(r) + ioutilx.ReadN(r, int(l)) + } + case urns.CoderVarInt: + return func(r io.Reader) { + coder.DecodeVarInt(r) + } + case urns.CoderBool: + return func(r io.Reader) { + coder.DecodeBool(r) + } + case urns.CoderDouble: + return func(r io.Reader) { + coder.DecodeDouble(r) + } + case urns.CoderIterable: + ccids := c.GetComponentCoderIds() + ed := pullDecoderNoAlloc(coders[ccids[0]], coders) + return func(r io.Reader) { + l, _ := coder.DecodeInt32(r) + for i := int32(0); i < l; i++ { + ed(r) + } + } + + case urns.CoderKV: + ccids := c.GetComponentCoderIds() + kd := pullDecoderNoAlloc(coders[ccids[0]], coders) + vd := pullDecoderNoAlloc(coders[ccids[1]], coders) + return func(r io.Reader) { + kd(r) + vd(r) + } + case urns.CoderRow: + panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", prototext.Format(c))) + default: + panic(fmt.Sprintf("unknown coder urn key: %v", urn)) + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/coders_test.go b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go new file mode 100644 index 000000000000..ad6e36496286 --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/coders_test.go @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "bytes" + "encoding/binary" + "math" + "testing" + + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns" + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/testing/protocmp" +) + +func Test_isLeafCoder(t *testing.T) { + tests := []struct { + urn string + isLeaf bool + }{ + {urns.CoderBytes, true}, + {urns.CoderStringUTF8, true}, + {urns.CoderLengthPrefix, true}, + {urns.CoderVarInt, true}, + {urns.CoderDouble, true}, + {urns.CoderBool, true}, + {urns.CoderGlobalWindow, true}, + {urns.CoderIntervalWindow, true}, + {urns.CoderIterable, false}, + {urns.CoderRow, false}, + {urns.CoderKV, false}, + } + for _, test := range tests { + undertest := &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: test.urn, + }, + } + if got, want := isLeafCoder(undertest), test.isLeaf; got != want { + t.Errorf("isLeafCoder(%v) = %v, want %v", test.urn, got, want) + } + } +} + +func Test_makeWindowedValueCoder(t *testing.T) { + coders := map[string]*pipepb.Coder{} + + gotID := makeWindowedValueCoder("testPID", &pipepb.Components{ + Pcollections: map[string]*pipepb.PCollection{ + "testPID": {CoderId: "testCoderID"}, + }, + Coders: map[string]*pipepb.Coder{ + "testCoderID": { + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBool, + }, + }, + }, + }, coders) + + if gotID == "" { + t.Errorf("makeWindowedValueCoder(...) = %v, want non-empty", gotID) + } + got := coders[gotID] + if got == nil { + t.Errorf("makeWindowedValueCoder(...) = ID %v, had nil entry", gotID) + } + if got.GetSpec().GetUrn() != urns.CoderWindowedValue { + t.Errorf("makeWindowedValueCoder(...) = ID %v, had nil entry", gotID) + } +} + +func Test_makeWindowCoders(t *testing.T) { + tests := []struct { + urn string + window typex.Window + }{ + {urns.CoderGlobalWindow, window.GlobalWindow{}}, + {urns.CoderIntervalWindow, window.IntervalWindow{ + Start: mtime.MinTimestamp, + End: mtime.MaxTimestamp, + }}, + } + for _, test := range tests { + undertest := &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: test.urn, + }, + } + dec, enc := makeWindowCoders(undertest) + + // Validate we're getting a round trip coder. + var buf bytes.Buffer + if err := enc.EncodeSingle(test.window, &buf); err != nil { + t.Errorf("encoder[%v].EncodeSingle(%v) = %v, want nil", test.urn, test.window, err) + } + got, err := dec.DecodeSingle(&buf) + if err != nil { + t.Errorf("decoder[%v].DecodeSingle(%v) = %v, want nil", test.urn, test.window, err) + } + + if want := test.window; got != want { + t.Errorf("makeWindowCoders(%v) didn't round trip: got %v, want %v", test.urn, got, want) + } + } +} + +func Test_lpUnknownCoders(t *testing.T) { + tests := []struct { + name string + urn string + components []string + bundle, base map[string]*pipepb.Coder + want map[string]*pipepb.Coder + }{ + {"alreadyProcessed", + urns.CoderBool, nil, + map[string]*pipepb.Coder{ + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {"alreadyProcessedLP", + urns.CoderBool, nil, + map[string]*pipepb.Coder{ + "test_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}}, + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "test_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}}, + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {"noNeedForLP", + urns.CoderBool, nil, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {"needLP", + urns.CoderRow, nil, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "test_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"test"}}, + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + }, + }, + {"needLP_recurse", + urns.CoderKV, []string{"k", "v"}, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + map[string]*pipepb.Coder{ + "test_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k_lp", "v"}}, + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k_lp": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"k"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {"alreadyLP", urns.CoderLengthPrefix, []string{"k"}, + map[string]*pipepb.Coder{}, + map[string]*pipepb.Coder{ + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + }, + map[string]*pipepb.Coder{ + "test": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderLengthPrefix}, ComponentCoderIds: []string{"k"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Add the initial coder to base. + test.base["test"] = &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{Urn: test.urn}, + ComponentCoderIds: test.components, + } + + lpUnknownCoders("test", test.bundle, test.base) + + if d := cmp.Diff(test.want, test.bundle, protocmp.Transform()); d != "" { + t.Fatalf("lpUnknownCoders(%v); (-want, +got):\n%v", test.urn, d) + } + }) + } +} + +func Test_reconcileCoders(t *testing.T) { + tests := []struct { + name string + bundle, base map[string]*pipepb.Coder + want map[string]*pipepb.Coder + }{ + {name: "noChanges", + bundle: map[string]*pipepb.Coder{ + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + base: map[string]*pipepb.Coder{ + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "b": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBytes}}, + "c": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderStringUTF8}}, + }, + want: map[string]*pipepb.Coder{ + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {name: "KV", + bundle: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + }, + base: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + want: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + }, + }, + {name: "KV-nested", + bundle: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + }, + base: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"a", "b"}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBytes}}, + "b": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + "c": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderStringUTF8}}, + }, + want: map[string]*pipepb.Coder{ + "kv": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"k", "v"}}, + "k": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderKV}, ComponentCoderIds: []string{"a", "b"}}, + "v": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBool}}, + "a": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderBytes}}, + "b": {Spec: &pipepb.FunctionSpec{Urn: urns.CoderRow}}, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reconcileCoders(test.bundle, test.base) + + if d := cmp.Diff(test.want, test.bundle, protocmp.Transform()); d != "" { + t.Fatalf("reconcileCoders(...); (-want, +got):\n%v", d) + } + }) + } +} + +func Test_pullDecoder(t *testing.T) { + + doubleBytes := make([]byte, 8) + binary.BigEndian.PutUint64(doubleBytes, math.Float64bits(math.SqrtPi)) + + tests := []struct { + name string + coder *pipepb.Coder + coders map[string]*pipepb.Coder + input []byte + }{ + { + "bytes", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBytes, + }, + }, + map[string]*pipepb.Coder{}, + []byte{3, 1, 2, 3}, + }, { + "varint", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderVarInt, + }, + }, + map[string]*pipepb.Coder{}, + []byte{255, 3}, + }, { + "bool", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBool, + }, + }, + map[string]*pipepb.Coder{}, + []byte{1}, + }, { + "double", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderDouble, + }, + }, + map[string]*pipepb.Coder{}, + doubleBytes, + }, { + "iterable", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderIterable, + }, + ComponentCoderIds: []string{"elm"}, + }, + map[string]*pipepb.Coder{ + "elm": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderVarInt, + }, + }, + }, + []byte{4, 0, 1, 2, 3}, + }, { + "kv", + &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderKV, + }, + ComponentCoderIds: []string{"key", "value"}, + }, + map[string]*pipepb.Coder{ + "key": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderVarInt, + }, + }, + "value": &pipepb.Coder{ + Spec: &pipepb.FunctionSpec{ + Urn: urns.CoderBool, + }, + }, + }, + []byte{3, 0}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dec := pullDecoder(test.coder, test.coders) + buf := bytes.NewBuffer(test.input) + got := dec(buf) + if !bytes.EqualFold(test.input, got) { + t.Fatalf("pullDecoder(%v)(...) = %v, want %v", test.coder, got, test.input) + } + }) + } +}