From 560e4917a4b231f22a6df07ac94b3dff1ad65cb9 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Fri, 21 Jul 2023 06:59:02 -0700 Subject: [PATCH] [prism] Regs for filter and stats transform tests. (#27586) Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- sdks/go/pkg/beam/transforms/filter/filter.go | 10 +- .../beam/transforms/filter/filter.shims.go | 201 ------------------ .../pkg/beam/transforms/filter/filter_test.go | 17 ++ .../pkg/beam/transforms/stats/count_test.go | 9 + sdks/go/pkg/beam/transforms/stats/max_test.go | 6 + .../go/pkg/beam/transforms/stats/quantiles.go | 8 +- .../beam/transforms/stats/quantiles_test.go | 51 ++--- 7 files changed, 58 insertions(+), 244 deletions(-) delete mode 100644 sdks/go/pkg/beam/transforms/filter/filter.shims.go diff --git a/sdks/go/pkg/beam/transforms/filter/filter.go b/sdks/go/pkg/beam/transforms/filter/filter.go index 913e7355c30d..997eec5eb4ef 100644 --- a/sdks/go/pkg/beam/transforms/filter/filter.go +++ b/sdks/go/pkg/beam/transforms/filter/filter.go @@ -21,11 +21,15 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" ) -//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/starcgen -//go:generate starcgen --package=filter --identifiers=filterFn,mapFn,mergeFn -//go:generate go fmt +func init() { + register.DoFn2x0[beam.T, func(beam.T)]((*filterFn)(nil)) + register.Function1x2(mapFn) + register.Function2x1(mergeFn) + register.Emitter1[beam.T]() +} var ( sig = funcx.MakePredicate(beam.TType) // T -> bool diff --git a/sdks/go/pkg/beam/transforms/filter/filter.shims.go b/sdks/go/pkg/beam/transforms/filter/filter.shims.go deleted file mode 100644 index b0d18233ab18..000000000000 --- a/sdks/go/pkg/beam/transforms/filter/filter.shims.go +++ /dev/null @@ -1,201 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Code generated by starcgen. DO NOT EDIT. -// File: filter.shims.go - -package filter - -import ( - "context" - "reflect" - - // Library imports - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" -) - -func init() { - runtime.RegisterFunction(mapFn) - runtime.RegisterFunction(mergeFn) - runtime.RegisterType(reflect.TypeOf((*filterFn)(nil)).Elem()) - schema.RegisterType(reflect.TypeOf((*filterFn)(nil)).Elem()) - reflectx.RegisterStructWrapper(reflect.TypeOf((*filterFn)(nil)).Elem(), wrapMakerFilterFn) - reflectx.RegisterFunc(reflect.TypeOf((*func(int, int) int)(nil)).Elem(), funcMakerIntIntГInt) - reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T, func(typex.T)))(nil)).Elem(), funcMakerTypex۰TEmitTypex۰TГ) - reflectx.RegisterFunc(reflect.TypeOf((*func(typex.T) (typex.T, int))(nil)).Elem(), funcMakerTypex۰TГTypex۰TInt) - reflectx.RegisterFunc(reflect.TypeOf((*func())(nil)).Elem(), funcMakerГ) - exec.RegisterEmitter(reflect.TypeOf((*func(typex.T))(nil)).Elem(), emitMakerTypex۰T) -} - -func wrapMakerFilterFn(fn any) map[string]reflectx.Func { - dfn := fn.(*filterFn) - return map[string]reflectx.Func{ - "ProcessElement": reflectx.MakeFunc(func(a0 typex.T, a1 func(typex.T)) { dfn.ProcessElement(a0, a1) }), - "Setup": reflectx.MakeFunc(func() { dfn.Setup() }), - } -} - -type callerIntIntГInt struct { - fn func(int, int) int -} - -func funcMakerIntIntГInt(fn any) reflectx.Func { - f := fn.(func(int, int) int) - return &callerIntIntГInt{fn: f} -} - -func (c *callerIntIntГInt) Name() string { - return reflectx.FunctionName(c.fn) -} - -func (c *callerIntIntГInt) Type() reflect.Type { - return reflect.TypeOf(c.fn) -} - -func (c *callerIntIntГInt) Call(args []any) []any { - out0 := c.fn(args[0].(int), args[1].(int)) - return []any{out0} -} - -func (c *callerIntIntГInt) Call2x1(arg0, arg1 any) any { - return c.fn(arg0.(int), arg1.(int)) -} - -type callerTypex۰TEmitTypex۰TГ struct { - fn func(typex.T, func(typex.T)) -} - -func funcMakerTypex۰TEmitTypex۰TГ(fn any) reflectx.Func { - f := fn.(func(typex.T, func(typex.T))) - return &callerTypex۰TEmitTypex۰TГ{fn: f} -} - -func (c *callerTypex۰TEmitTypex۰TГ) Name() string { - return reflectx.FunctionName(c.fn) -} - -func (c *callerTypex۰TEmitTypex۰TГ) Type() reflect.Type { - return reflect.TypeOf(c.fn) -} - -func (c *callerTypex۰TEmitTypex۰TГ) Call(args []any) []any { - c.fn(args[0].(typex.T), args[1].(func(typex.T))) - return []any{} -} - -func (c *callerTypex۰TEmitTypex۰TГ) Call2x0(arg0, arg1 any) { - c.fn(arg0.(typex.T), arg1.(func(typex.T))) -} - -type callerTypex۰TГTypex۰TInt struct { - fn func(typex.T) (typex.T, int) -} - -func funcMakerTypex۰TГTypex۰TInt(fn any) reflectx.Func { - f := fn.(func(typex.T) (typex.T, int)) - return &callerTypex۰TГTypex۰TInt{fn: f} -} - -func (c *callerTypex۰TГTypex۰TInt) Name() string { - return reflectx.FunctionName(c.fn) -} - -func (c *callerTypex۰TГTypex۰TInt) Type() reflect.Type { - return reflect.TypeOf(c.fn) -} - -func (c *callerTypex۰TГTypex۰TInt) Call(args []any) []any { - out0, out1 := c.fn(args[0].(typex.T)) - return []any{out0, out1} -} - -func (c *callerTypex۰TГTypex۰TInt) Call1x2(arg0 any) (any, any) { - return c.fn(arg0.(typex.T)) -} - -type callerГ struct { - fn func() -} - -func funcMakerГ(fn any) reflectx.Func { - f := fn.(func()) - return &callerГ{fn: f} -} - -func (c *callerГ) Name() string { - return reflectx.FunctionName(c.fn) -} - -func (c *callerГ) Type() reflect.Type { - return reflect.TypeOf(c.fn) -} - -func (c *callerГ) Call(args []any) []any { - c.fn() - return []any{} -} - -func (c *callerГ) Call0x0() { - c.fn() -} - -type emitNative struct { - n exec.ElementProcessor - fn any - est *sdf.WatermarkEstimator - - ctx context.Context - ws []typex.Window - et typex.EventTime - value exec.FullValue -} - -func (e *emitNative) Init(ctx context.Context, ws []typex.Window, et typex.EventTime) error { - e.ctx = ctx - e.ws = ws - e.et = et - return nil -} - -func (e *emitNative) Value() any { - return e.fn -} - -func (e *emitNative) AttachEstimator(est *sdf.WatermarkEstimator) { - e.est = est -} - -func emitMakerTypex۰T(n exec.ElementProcessor) exec.ReusableEmitter { - ret := &emitNative{n: n} - ret.fn = ret.invokeTypex۰T - return ret -} - -func (e *emitNative) invokeTypex۰T(val typex.T) { - e.value = exec.FullValue{Windows: e.ws, Timestamp: e.et, Elm: val} - if e.est != nil { - (*e.est).(sdf.TimestampObservingEstimator).ObserveTimestamp(e.et.ToTime()) - } - if err := e.n.ProcessElement(e.ctx, &e.value); err != nil { - panic(err) - } -} - -// DO NOT MODIFY: GENERATED CODE diff --git a/sdks/go/pkg/beam/transforms/filter/filter_test.go b/sdks/go/pkg/beam/transforms/filter/filter_test.go index 9cc5a526af9c..96b4cbe12d79 100644 --- a/sdks/go/pkg/beam/transforms/filter/filter_test.go +++ b/sdks/go/pkg/beam/transforms/filter/filter_test.go @@ -18,11 +18,28 @@ package filter_test import ( "testing" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter" ) +func TestMain(m *testing.M) { + ptest.Main(m) +} + +func init() { + register.Function1x1(alwaysTrue) + register.Function1x1(alwaysFalse) + register.Function1x1(isOne) + register.Function1x1(greaterThanOne) +} + +func alwaysTrue(a int) bool { return true } +func alwaysFalse(a int) bool { return false } +func isOne(a int) bool { return a == 1 } +func greaterThanOne(a int) bool { return a > 1 } + func TestInclude(t *testing.T) { tests := []struct { in []int diff --git a/sdks/go/pkg/beam/transforms/stats/count_test.go b/sdks/go/pkg/beam/transforms/stats/count_test.go index 23627a92f799..be6ce950e20a 100644 --- a/sdks/go/pkg/beam/transforms/stats/count_test.go +++ b/sdks/go/pkg/beam/transforms/stats/count_test.go @@ -20,10 +20,19 @@ import ( "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) +func TestMain(m *testing.M) { + ptest.Main(m) +} + +func init() { + register.Function2x1(kvToCount) +} + type count struct { Elm int Count int diff --git a/sdks/go/pkg/beam/transforms/stats/max_test.go b/sdks/go/pkg/beam/transforms/stats/max_test.go index af817527dc91..531792e70f58 100644 --- a/sdks/go/pkg/beam/transforms/stats/max_test.go +++ b/sdks/go/pkg/beam/transforms/stats/max_test.go @@ -19,10 +19,16 @@ import ( "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" ) +func init() { + register.Function2x1(kvToStudent) + register.Function1x2(studentToKV) +} + type student struct { Name string Grade float64 diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles.go b/sdks/go/pkg/beam/transforms/stats/quantiles.go index 79a66b58e1f0..6d2baa8b5e99 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles.go @@ -31,6 +31,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" ) func init() { @@ -44,6 +45,9 @@ func init() { beam.RegisterType(reflect.TypeOf((*shardElementsFn)(nil)).Elem()) beam.RegisterCoder(compactorsType, encodeCompactors, decodeCompactors) beam.RegisterCoder(weightedElementType, encodeWeightedElement, decodeWeightedElement) + + register.Function1x2(fixedKey) + register.Function2x1(makeWeightedElement) } // Opts contains settings used to configure how approximate quantiles are computed. @@ -663,12 +667,14 @@ func makeWeightedElement(weight int, element beam.T) weightedElement { return weightedElement{weight: weight, element: element} } +func fixedKey(e beam.T) (int, beam.T) { return 1, e } + // ApproximateQuantiles computes approximate quantiles for the input PCollection. // // The output PCollection contains a single element: a list of numQuantiles - 1 elements approximately splitting up the input collection into numQuantiles separate quantiles. // For example, if numQuantiles = 2, the returned list would contain a single element such that approximately half of the input would be less than that element and half would be greater. func ApproximateQuantiles(s beam.Scope, pc beam.PCollection, less any, opts Opts) beam.PCollection { - return ApproximateWeightedQuantiles(s, beam.ParDo(s, func(e beam.T) (int, beam.T) { return 1, e }, pc), less, opts) + return ApproximateWeightedQuantiles(s, beam.ParDo(s, fixedKey, pc), less, opts) } // reduce takes a PCollection and returns a PCollection<*compactors>. The output PCollection may have at most shardSizes[len(shardSizes) - 1] compactors. diff --git a/sdks/go/pkg/beam/transforms/stats/quantiles_test.go b/sdks/go/pkg/beam/transforms/stats/quantiles_test.go index c03620d0b9b7..1e389eed128b 100644 --- a/sdks/go/pkg/beam/transforms/stats/quantiles_test.go +++ b/sdks/go/pkg/beam/transforms/stats/quantiles_test.go @@ -16,46 +16,19 @@ package stats import ( - "reflect" "testing" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest" "github.com/google/go-cmp/cmp" ) func init() { - beam.RegisterFunction(weightedElementToKv) - - // In practice, this runs faster than plain reflection. - // TODO(https://github.com/apache/beam/issues/20271): Remove once collisions don't occur for starcgen over test code and an equivalent is generated for us. - reflectx.RegisterFunc(reflect.ValueOf(less).Type(), func(_ any) reflectx.Func { - return newIntLess() - }) -} - -type intLess struct { - name string - t reflect.Type -} - -func newIntLess() *intLess { - return &intLess{ - name: reflectx.FunctionName(reflect.ValueOf(less).Interface()), - t: reflect.ValueOf(less).Type(), - } -} - -func (i *intLess) Name() string { - return i.name -} -func (i *intLess) Type() reflect.Type { - return i.t -} -func (i *intLess) Call(args []any) []any { - return []any{args[0].(int) < args[1].(int)} + register.Function1x2(weightedElementToKv) + register.Function2x1(less) } func less(a, b int) bool { @@ -68,7 +41,7 @@ func TestLargeQuantiles(t *testing.T) { for i := 0; i < numElements; i++ { inputSlice = append(inputSlice, i) } - p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{[]int{10006, 19973}}) + p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{{10006, 19973}}) quantiles := ApproximateQuantiles(s, input, less, Opts{ K: 200, NumQuantiles: 3, @@ -85,7 +58,7 @@ func TestLargeQuantilesReversed(t *testing.T) { for i := numElements - 1; i >= 0; i-- { inputSlice = append(inputSlice, i) } - p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{[]int{9985, 19959}}) + p, s, input, expected := ptest.CreateList2(inputSlice, [][]int{{9985, 19959}}) quantiles := ApproximateQuantiles(s, input, less, Opts{ K: 200, NumQuantiles: 3, @@ -103,8 +76,8 @@ func TestBasicQuantiles(t *testing.T) { Expected [][]int }{ {[]int{}, [][]int{}}, - {[]int{1}, [][]int{[]int{1}}}, - {[]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, [][]int{[]int{6, 13}}}, + {[]int{1}, [][]int{{1}}}, + {[]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, [][]int{{6, 13}}}, } for _, test := range tests { @@ -180,7 +153,7 @@ func TestMerging(t *testing.T) { K: 3, NumberOfCompactions: 1, Compactors: []compactor{{ - sorted: [][]beam.T{[]beam.T{1}, []beam.T{2}, []beam.T{3}}, + sorted: [][]beam.T{{1}, {2}, {3}}, unsorted: []beam.T{6, 5, 4}, capacity: 4, }}, @@ -191,7 +164,7 @@ func TestMerging(t *testing.T) { NumberOfCompactions: 1, Compactors: []compactor{ { - sorted: [][]beam.T{[]beam.T{7}, []beam.T{8}, []beam.T{9}}, + sorted: [][]beam.T{{7}, {8}, {9}}, unsorted: []beam.T{12, 11, 10}, capacity: 4}, }, @@ -205,7 +178,7 @@ func TestMerging(t *testing.T) { Compactors: []compactor{ {capacity: 4}, { - sorted: [][]beam.T{[]beam.T{1, 3, 5, 7, 9, 11}}, + sorted: [][]beam.T{{1, 3, 5, 7, 9, 11}}, capacity: 4, }, }, @@ -222,12 +195,12 @@ func TestCompactorsEncoding(t *testing.T) { Compactors: []compactor{ { capacity: 4, - sorted: [][]beam.T{[]beam.T{1, 2}}, + sorted: [][]beam.T{{1, 2}}, unsorted: []beam.T{3, 4}, }, { capacity: 4, - sorted: [][]beam.T{[]beam.T{5, 6}}, + sorted: [][]beam.T{{5, 6}}, unsorted: []beam.T{7, 8}, }, },