Skip to content

Commit

Permalink
[#24789][prism] add preprocessor and test (#25520)
Browse files Browse the repository at this point in the history
* [prism] add preprocessor and test

* [prism] preparer comment

* [prism] move preparer

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Feb 20, 2023
1 parent ca1ec25 commit 6667eb4
Show file tree
Hide file tree
Showing 3 changed files with 351 additions and 0 deletions.
22 changes: 22 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

// stage represents a fused subgraph.
// temporary implementation to break up PRs.
type stage struct {
transforms []string
}
148 changes: 148 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"sort"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
)

// transformPreparer is an interface for handling different urns in the preprocessor
// largely for exchanging transforms for others, to be added to the complete set of
// components in the pipeline.
type transformPreparer interface {
// PrepareUrns returns the Beam URNs that this handler deals with for preprocessing.
PrepareUrns() []string
// PrepareTransform takes a PTransform proto and returns a set of new Components, and a list of
// transformIDs leaves to remove and ignore from graph processing.
PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string)
}

// preprocessor retains configuration for preprocessing the
// graph, such as special handling for lifted combiners or
// other configuration.
type preprocessor struct {
transformPreparers map[string]transformPreparer
}

func newPreprocessor(preps []transformPreparer) *preprocessor {
preparers := map[string]transformPreparer{}
for _, prep := range preps {
for _, urn := range prep.PrepareUrns() {
preparers[urn] = prep
}
}
return &preprocessor{
transformPreparers: preparers,
}
}

// preProcessGraph takes the graph and preprocesses for consumption in bundles.
// The output is the topological sort of the transform ids.
//
// These are how transforms are related in graph form, but not the specific bundles themselves, which will come later.
//
// Handles awareness of composite transforms and similar. Ultimately, after this point
// the graph stops being a hypergraph, with composite transforms being treated as
// "leaves" downstream as needed.
//
// This is where Combines become lifted (if it makes sense, or is configured), and similar behaviors.
func (p *preprocessor) preProcessGraph(comps *pipepb.Components) []*stage {
ts := comps.GetTransforms()

// TODO move this out of this part of the pre-processor?
leaves := map[string]struct{}{}
ignore := map[string]struct{}{}
for tid, t := range ts {
if _, ok := ignore[tid]; ok {
continue
}

spec := t.GetSpec()
if spec == nil {
// Most composites don't have specs.
slog.Debug("transform is missing a spec",
slog.Group("transform", slog.String("ID", tid), slog.String("name", t.GetUniqueName())))
continue
}

// Composite Transforms basically means needing to remove the "leaves" from the
// handling set, and producing the new sub component transforms. The top level
// composite should have enough information to produce the new sub transforms.
// In particular, the inputs and outputs need to all be connected and matched up
// so the topological sort still works out.
h := p.transformPreparers[spec.GetUrn()]
if h == nil {

// If there's an unknown urn, and it's not composite, simply add it to the leaves.
if len(t.GetSubtransforms()) == 0 {
leaves[tid] = struct{}{}
} else {
slog.Info("composite transform has unknown urn",
slog.Group("transform", slog.String("ID", tid),
slog.String("name", t.GetUniqueName()),
slog.String("urn", spec.GetUrn())))
}
continue
}

subs, toRemove := h.PrepareTransform(tid, t, comps)

// Clear out unnecessary leaves from this composite for topological sort handling.
for _, key := range toRemove {
ignore[key] = struct{}{}
delete(leaves, key)
}

// ts should be a clone, so we should be able to add new transforms into the map.
for tid, t := range subs.GetTransforms() {
leaves[tid] = struct{}{}
ts[tid] = t
}
for cid, c := range subs.GetCoders() {
comps.GetCoders()[cid] = c
}
for nid, n := range subs.GetPcollections() {
comps.GetPcollections()[nid] = n
}
// It's unlikely for these to change, but better to handle them now, to save a headache later.
for wid, w := range subs.GetWindowingStrategies() {
comps.GetWindowingStrategies()[wid] = w
}
for envid, env := range subs.GetEnvironments() {
comps.GetEnvironments()[envid] = env
}
}

// Extract URNs for the given transform.

keptLeaves := maps.Keys(leaves)
sort.Strings(keptLeaves)
topological := pipelinex.TopologicalSort(ts, keptLeaves)
slog.Debug("topological transform ordering", topological)

var stages []*stage
for _, tid := range topological {
stages = append(stages, &stage{
transforms: []string{tid},
})
}
return stages
}
181 changes: 181 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"testing"

pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
)

func Test_preprocessor_preProcessGraph(t *testing.T) {
tests := []struct {
name string
input *pipepb.Components

wantComponents *pipepb.Components
wantStages []*stage
}{
{
name: "noPreparer",
input: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"e1": {
UniqueName: "e1",
Spec: &pipepb.FunctionSpec{
Urn: "defaultUrn",
},
},
},
},

wantStages: []*stage{{transforms: []string{"e1"}}},
wantComponents: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"e1": {
UniqueName: "e1",
Spec: &pipepb.FunctionSpec{
Urn: "defaultUrn",
},
},
},
},
}, {
name: "preparer",
input: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"e1": {
UniqueName: "e1",
Spec: &pipepb.FunctionSpec{
Urn: "test_urn",
},
},
},
// Initialize maps because they always are by proto unmarshallers.
Pcollections: map[string]*pipepb.PCollection{},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{},
Coders: map[string]*pipepb.Coder{},
Environments: map[string]*pipepb.Environment{},
},

wantStages: []*stage{{transforms: []string{"e1_early"}}, {transforms: []string{"e1_late"}}},
wantComponents: &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
// Original is always kept
"e1": {
UniqueName: "e1",
Spec: &pipepb.FunctionSpec{
Urn: "test_urn",
},
},
"e1_early": {
UniqueName: "e1_early",
Spec: &pipepb.FunctionSpec{
Urn: "defaultUrn",
},
Outputs: map[string]string{"i0": "pcol1"},
EnvironmentId: "env1",
},
"e1_late": {
UniqueName: "e1_late",
Spec: &pipepb.FunctionSpec{
Urn: "defaultUrn",
},
Inputs: map[string]string{"i0": "pcol1"},
EnvironmentId: "env1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"pcol1": {
UniqueName: "pcol1",
CoderId: "coder1",
WindowingStrategyId: "ws1",
},
},
Coders: map[string]*pipepb.Coder{
"coder1": {Spec: &pipepb.FunctionSpec{Urn: "coder1"}},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"ws1": {WindowCoderId: "global"},
},
Environments: map[string]*pipepb.Environment{
"env1": {Urn: "env1"},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pre := newPreprocessor([]transformPreparer{&testPreparer{}})

gotStages := pre.preProcessGraph(test.input)
if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{})); diff != "" {
t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff)
}

if diff := cmp.Diff(test.input, test.wantComponents, protocmp.Transform()); diff != "" {
t.Errorf("preProcessGraph(%q) components diff (-want,+got)\n%v", test.name, diff)
}
})
}
}

type testPreparer struct{}

func (p *testPreparer) PrepareUrns() []string {
return []string{"test_urn"}
}

func (p *testPreparer) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
return &pipepb.Components{
Transforms: map[string]*pipepb.PTransform{
"e1_early": {
UniqueName: "e1_early",
Spec: &pipepb.FunctionSpec{
Urn: "defaultUrn",
},
Outputs: map[string]string{"i0": "pcol1"},
EnvironmentId: "env1",
},
"e1_late": {
UniqueName: "e1_late",
Spec: &pipepb.FunctionSpec{
Urn: "defaultUrn",
},
Inputs: map[string]string{"i0": "pcol1"},
EnvironmentId: "env1",
},
},
Pcollections: map[string]*pipepb.PCollection{
"pcol1": {
UniqueName: "pcol1",
CoderId: "coder1",
WindowingStrategyId: "ws1",
},
},
Coders: map[string]*pipepb.Coder{
"coder1": {Spec: &pipepb.FunctionSpec{Urn: "coder1"}},
},
WindowingStrategies: map[string]*pipepb.WindowingStrategy{
"ws1": {WindowCoderId: "global"},
},
Environments: map[string]*pipepb.Environment{
"env1": {Urn: "env1"},
},
}, []string{"e1"}
}

0 comments on commit 6667eb4

Please sign in to comment.