Skip to content

Commit

Permalink
[prism] Regs for filter and stats transform tests. (apache#27586)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
2 people authored and cushon committed May 24, 2024
1 parent 34ff8ee commit 560e491
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 244 deletions.
10 changes: 7 additions & 3 deletions sdks/go/pkg/beam/transforms/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

//go:generate go install github.com/apache/beam/sdks/v2/go/cmd/starcgen
//go:generate starcgen --package=filter --identifiers=filterFn,mapFn,mergeFn
//go:generate go fmt
func init() {
register.DoFn2x0[beam.T, func(beam.T)]((*filterFn)(nil))
register.Function1x2(mapFn)
register.Function2x1(mergeFn)
register.Emitter1[beam.T]()
}

var (
sig = funcx.MakePredicate(beam.TType) // T -> bool
Expand Down
201 changes: 0 additions & 201 deletions sdks/go/pkg/beam/transforms/filter/filter.shims.go

This file was deleted.

17 changes: 17 additions & 0 deletions sdks/go/pkg/beam/transforms/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,28 @@ package filter_test
import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/filter"
)

func TestMain(m *testing.M) {
ptest.Main(m)
}

func init() {
register.Function1x1(alwaysTrue)
register.Function1x1(alwaysFalse)
register.Function1x1(isOne)
register.Function1x1(greaterThanOne)
}

func alwaysTrue(a int) bool { return true }
func alwaysFalse(a int) bool { return false }
func isOne(a int) bool { return a == 1 }
func greaterThanOne(a int) bool { return a > 1 }

func TestInclude(t *testing.T) {
tests := []struct {
in []int
Expand Down
9 changes: 9 additions & 0 deletions sdks/go/pkg/beam/transforms/stats/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,19 @@ import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func TestMain(m *testing.M) {
ptest.Main(m)
}

func init() {
register.Function2x1(kvToCount)
}

type count struct {
Elm int
Count int
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/transforms/stats/max_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func init() {
register.Function2x1(kvToStudent)
register.Function1x2(studentToKV)
}

type student struct {
Name string
Grade float64
Expand Down
8 changes: 7 additions & 1 deletion sdks/go/pkg/beam/transforms/stats/quantiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
Expand All @@ -44,6 +45,9 @@ func init() {
beam.RegisterType(reflect.TypeOf((*shardElementsFn)(nil)).Elem())
beam.RegisterCoder(compactorsType, encodeCompactors, decodeCompactors)
beam.RegisterCoder(weightedElementType, encodeWeightedElement, decodeWeightedElement)

register.Function1x2(fixedKey)
register.Function2x1(makeWeightedElement)
}

// Opts contains settings used to configure how approximate quantiles are computed.
Expand Down Expand Up @@ -663,12 +667,14 @@ func makeWeightedElement(weight int, element beam.T) weightedElement {
return weightedElement{weight: weight, element: element}
}

func fixedKey(e beam.T) (int, beam.T) { return 1, e }

// ApproximateQuantiles computes approximate quantiles for the input PCollection<T>.
//
// The output PCollection contains a single element: a list of numQuantiles - 1 elements approximately splitting up the input collection into numQuantiles separate quantiles.
// For example, if numQuantiles = 2, the returned list would contain a single element such that approximately half of the input would be less than that element and half would be greater.
func ApproximateQuantiles(s beam.Scope, pc beam.PCollection, less any, opts Opts) beam.PCollection {
return ApproximateWeightedQuantiles(s, beam.ParDo(s, func(e beam.T) (int, beam.T) { return 1, e }, pc), less, opts)
return ApproximateWeightedQuantiles(s, beam.ParDo(s, fixedKey, pc), less, opts)
}

// reduce takes a PCollection<weightedElementWrapper> and returns a PCollection<*compactors>. The output PCollection may have at most shardSizes[len(shardSizes) - 1] compactors.
Expand Down
Loading

0 comments on commit 560e491

Please sign in to comment.