Skip to content

Commit

Permalink
[Go SDK] Dataframe API wrapper (#23450)
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse authored Oct 18, 2022
1 parent 107a43d commit 78e1c0a
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Dataframe wrapper added in Go SDK via Cross-Language (Need to manually start python expansion service). (Go) ([#23384](https://github.com/apache/beam/issues/23384)).
* Name all Java threads to aid in debugging ([#23049](https://github.com/apache/beam/issues/23049)).

## Breaking Changes
Expand Down
17 changes: 17 additions & 0 deletions sdks/go/pkg/beam/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ func RegisterSchemaProvider(rt reflect.Type, provider interface{}) {
coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
}

// RegisterSchemaProviderWithURN is for internal use only. Users are recommended to use
// beam.RegisterSchemaProvider() instead.
// RegisterSchemaProviderWithURN registers a new schema provider for a new logical type defined
// in pkg/beam/model/pipeline_v1/schema.pb.go
//
// RegisterSchemaProviderWithURN must be called before beam.Init(), and conventionally
// is called in a package init() function.
func RegisterSchemaProviderWithURN(rt reflect.Type, provider interface{}, urn string) {
p := provider.(SchemaProvider)
st, err := p.FromLogicalType(rt)
if err != nil {
panic(fmt.Sprintf("beam.RegisterSchemaProvider: schema type provider for %v, doesn't support that type", rt))
}
schema.RegisterLogicalType(schema.ToLogicalType(urn, rt, st))
coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
}

// SchemaProvider specializes schema handling for complex types, including conversion to a
// valid schema base type,
//
Expand Down
87 changes: 87 additions & 0 deletions sdks/go/pkg/beam/transforms/xlang/dataframe/dataframe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package dataframe is a wrapper for DataframeTransform defined in Apache Beam Python SDK.
// An exapnsion service for python external transforms can be started by running
// $ python -m apache_beam.runners.portability.expansion_service_main -p $PORT_FOR_EXPANSION_SERVICE
package dataframe

import (
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/python"
)

func init() {
beam.RegisterType(reflect.TypeOf((*config)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*kwargs)(nil)).Elem())
beam.RegisterType(reflect.TypeOf((*argStruct)(nil)).Elem())
}

type kwargs struct {
Fn python.CallableSource `beam:"func"`
IncludeIndexes bool `beam:"include_indexes"`
}

type argStruct struct{}

type config struct {
dpl kwargs
expansionAddr string
}

type configOption func(*config)

// WithExpansionAddr sets an URL for a Python expansion service.
func WithExpansionAddr(expansionAddr string) configOption {
return func(c *config) {
c.expansionAddr = expansionAddr
}
}

// WithIndexes sets include_indexes option for DataframeTransform.
func WithIndexes() configOption {
return func(c *config) {
c.dpl.IncludeIndexes = true
}
}

// Transform is a multi-language wrapper for a Python DataframeTransform with a given lambda function.
// lambda function is a required parameter.
// Additional option for including indexes in dataframe can be provided by using
// dataframe.WithIndexes().
func Transform(s beam.Scope, fn string, col beam.PCollection, outT reflect.Type, opts ...configOption) beam.PCollection {
s.Scope("xlang.python.DataframeTransform")
cfg := config{
dpl: kwargs{Fn: python.CallableSource(fn)},
}
for _, opt := range opts {
opt(&cfg)
}

// TODO: load automatic expansion service here
if cfg.expansionAddr == "" {
panic("no expansion service address provided for xlang.DataframeTransform(), pass xlang.WithExpansionAddr(address) as a param.")
}

pet := python.NewExternalTransform[argStruct, kwargs]("apache_beam.dataframe.transforms.DataframeTransform")
pet.WithKwargs(cfg.dpl)
pl := beam.CrossLanguagePayload(pet)
result := beam.CrossLanguage(s, "beam:transforms:python:fully_qualified_named", pl, cfg.expansionAddr, beam.UnnamedInput(col), beam.UnnamedOutput(typex.New(outT)))
return result[beam.UnnamedOutputTag()]

}
129 changes: 129 additions & 0 deletions sdks/go/pkg/beam/transforms/xlang/python/external.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package python contains data structures required for python external transforms in a multilanguage pipeline.
package python

import (
"fmt"
"io"
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)

const (
pythonCallableUrn = "beam:logical_type:python_callable:v1"
)

var (
pcsType = reflect.TypeOf((*CallableSource)(nil)).Elem()
pcsStorageType = reflectx.String
)

func init() {
beam.RegisterType(pcsType)
beam.RegisterSchemaProviderWithURN(pcsType, &callableSourceProvider{}, pythonCallableUrn)
}

// CallableSource is a wrapper object storing a Python function definition
// that can be evaluated to Python callables in Python SDK.
//
// The snippet of Python code can be a valid Python expression such as
// lambda x: x * x
// str.upper
// a fully qualified name such as
// math.sin
// or a complete multi-line function or class definition such as
// def foo(x):
// ...
// class Foo:
// ...
//
// Any lines preceding the function definition are first evaluated to provide context in which to
// define the function which can be useful to declare imports or any other needed values, e.g.
// import math
//
// def helper(x):
// return x * x
//
// def func(y):
// return helper(y) + y
// in which case `func` would get applied to each element.
type CallableSource string

// callableSourceProvider implement the SchemaProvider interface for logical types
type callableSourceProvider struct{}

// FromLogicalType returns the goType of the logical type
func (p *callableSourceProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) {
if rt != pcsType {
return nil, fmt.Errorf("unable to provide schema.LogicalType for type %v, want %v", rt, pcsType)
}
return pcsStorageType, nil
}

// BuildEncoder encodes the PythonCallableSource logical type
func (p *callableSourceProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
if _, err := p.FromLogicalType(rt); err != nil {
return nil, err
}

return func(iface interface{}, w io.Writer) error {
v := iface.(CallableSource)
return coder.EncodeStringUTF8(string(v), w)
}, nil
}

// BuildDecoder decodes the PythonCallableSource logical type
func (p *callableSourceProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
if _, err := p.FromLogicalType(rt); err != nil {
return nil, err
}

return func(r io.Reader) (interface{}, error) {
s, err := coder.DecodeStringUTF8(r)
if err != nil {
return nil, err
}
return CallableSource(s), nil
}, nil
}

// NewExternalTransform creates a new instance for python external transform. It accepts two types:
// A: used for normal arguments
// K: used for keyword arguments
func NewExternalTransform[A, K any](constructor string) *pythonExternalTransform[A, K] {
return &pythonExternalTransform[A, K]{Constructor: constructor}
}

// PythonExternalTransform holds the details required for an External Python Transform.
type pythonExternalTransform[A, K any] struct {
Constructor string `beam:"constructor"`
Args A `beam:"args"`
Kwargs K `beam:"kwargs"`
}

// WithArgs adds arguments to the External Python Transform.
func (p *pythonExternalTransform[A, K]) WithArgs(args any) {
p.Args = args.(A)
}

// WithKwargs adds keyword arguments to the External Python Transform.
func (p *pythonExternalTransform[A, K]) WithKwargs(kwargs any) {
p.Kwargs = kwargs.(K)
}
1 change: 1 addition & 0 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var directFilters = []string{
"TestDebeziumIO_BasicRead",
"TestJDBCIO_BasicReadWrite",
"TestJDBCIO_PostgresReadWrite",
"TestDataframe",
// Triggers, Panes are not yet supported
"TestTrigger.*",
"TestPanes",
Expand Down
48 changes: 48 additions & 0 deletions sdks/go/test/integration/transforms/xlang/dataframe/dataframe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataframe

import (
"reflect"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang/dataframe"
)

func init() {
beam.RegisterType(reflect.TypeOf((*TestRow)(nil)).Elem())
}

type TestRow struct {
A int64 `beam:"a"`
B int32 `beam:"b"`
}

func DataframeTransform(expansionAddr string) *beam.Pipeline {
row0 := TestRow{A: int64(100), B: int32(1)}
row1 := TestRow{A: int64(100), B: int32(2)}
row2 := TestRow{A: int64(100), B: int32(3)}
row3 := TestRow{A: int64(200), B: int32(4)}

p, s := beam.NewPipelineWithRoot()

input := beam.Create(s, row0, row1, row3)
outCol := dataframe.Transform(s, "lambda df: df.groupby('a').sum()", input, reflect.TypeOf((*TestRow)(nil)).Elem(), dataframe.WithExpansionAddr(expansionAddr), dataframe.WithIndexes())

passert.Equals(s, outCol, row2, row3)
return p
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dataframe

import (
"flag"
"log"
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
)

var expansionAddr string // Populate with expansion address labelled "python_transform".

func checkFlags(t *testing.T) {
if expansionAddr == "" {
t.Skip("No python transform expansion address provided.")
}
}

func TestDataframe(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)
p := DataframeTransform(expansionAddr)
ptest.RunAndValidate(t, p)
}

func TestMain(m *testing.M) {
flag.Parse()
beam.Init()

services := integration.NewExpansionServices()
defer func() { services.Shutdown() }()
addr, err := services.GetAddr("python_transform")
if err != nil {
log.Printf("skipping missing expansion service: %v", err)
} else {
expansionAddr = addr
}

ptest.MainRet(m)
}

0 comments on commit 78e1c0a

Please sign in to comment.