Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prism] Fusion base, reshuffle, cogbk. #27737

Merged
merged 6 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func newBuilder(desc *fnpb.ProcessBundleDescriptor) (*builder, error) {

input := unmarshalKeyedValues(transform.GetInputs())
for i, from := range input {
succ[from] = append(succ[from], linkID{id, i})
// We don't need to multiplex successors for pardo side inputs.
// so we only do so for SDK side Flattens.
if i == 0 || transform.GetSpec().GetUrn() == graphx.URNFlatten {
succ[from] = append(succ[from], linkID{id, i})
}
}
output := unmarshalKeyedValues(transform.GetOutputs())
for _, to := range output {
Expand Down Expand Up @@ -731,7 +735,10 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
}
// Strip PCollections from Expand nodes, as CoGBK metrics are handled by
// the DataSource that preceeds them.
trueOut := out[0].(*PCollection).Out
trueOut := out[0]
if pcol, ok := trueOut.(*PCollection); ok {
trueOut = pcol.Out
}
b.units = b.units[:len(b.units)-1]
u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: trueOut}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func (ss *stageState) startBundle(watermark mtime.Time, genBundID func() string)

var toProcess, notYet []element
for _, e := range ss.pending {
if !ss.aggregate || ss.aggregate && ss.strat.EarliestCompletion(e.window) <= watermark {
if !ss.aggregate || ss.aggregate && ss.strat.EarliestCompletion(e.window) < watermark {
toProcess = append(toProcess, e)
} else {
notYet = append(notYet, e)
Expand Down
27 changes: 15 additions & 12 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ func RunPipeline(j *jobservices.Job) {
j.SendMsg("running " + j.String())
j.Running()

executePipeline(j.RootCtx, wk, j)
err := executePipeline(j.RootCtx, wk, j)
if err != nil {
j.Failed(err)
return
}
j.SendMsg("pipeline completed " + j.String())

// Stop the worker.
Expand Down Expand Up @@ -126,14 +130,14 @@ func externalEnvironment(ctx context.Context, ep *pipepb.ExternalPayload, wk *wo
type transformExecuter interface {
ExecuteUrns() []string
ExecuteWith(t *pipepb.PTransform) string
ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, data [][]byte) *worker.B
ExecuteTransform(stageID, tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, data [][]byte) *worker.B
}

type processor struct {
transformExecuters map[string]transformExecuter
}

func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) error {
pipeline := j.Pipeline
comps := proto.Clone(pipeline.GetComponents()).(*pipepb.Components)

Expand All @@ -145,7 +149,8 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
Combine(CombineCharacteristic{EnableLifting: true}),
ParDo(ParDoCharacteristic{DisableSDF: true}),
Runner(RunnerCharacteristic{
SDKFlatten: false,
SDKFlatten: false,
SDKReshuffle: false,
}),
}

Expand Down Expand Up @@ -175,10 +180,7 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
// TODO move this loop and code into the preprocessor instead.
stages := map[string]*stage{}
var impulses []string
for i, stage := range topo {
if len(stage.transforms) != 1 {
panic(fmt.Sprintf("unsupported stage[%d]: contains multiple transforms: %v; TODO: implement fusion", i, stage.transforms))
}
for _, stage := range topo {
tid := stage.transforms[0]
t := ts[tid]
urn := t.GetSpec().GetUrn()
Expand Down Expand Up @@ -255,16 +257,16 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
wk.Descriptors[stage.ID] = stage.desc
case wk.ID:
// Great! this is for this environment. // Broken abstraction.
buildStage(stage, tid, t, comps, wk)
buildDescriptor(stage, comps, wk)
stages[stage.ID] = stage
slog.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
outputs := maps.Keys(stage.OutputsToCoders)
sort.Strings(outputs)
em.AddStage(stage.ID, []string{stage.mainInputPCol}, stage.sides, outputs)
em.AddStage(stage.ID, []string{stage.primaryInput}, stage.sides, outputs)
default:
err := fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId())
slog.Error("Execute", err)
panic(err)
return err
}
}

Expand All @@ -285,6 +287,7 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) {
}(rb)
}
slog.Info("pipeline done!", slog.String("job", j.String()))
return nil
}

func collectionPullDecoder(coldCId string, coders map[string]*pipepb.Coder, comps *pipepb.Components) func(io.Reader) []byte {
Expand All @@ -300,7 +303,7 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod

func getOnlyValue[K comparable, V any](in map[K]V) V {
if len(in) != 1 {
panic(fmt.Sprintf("expected single value map, had %v", len(in)))
panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
}
for _, v := range in {
return v
Expand Down
151 changes: 149 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
Expand Down Expand Up @@ -319,6 +320,61 @@ func TestRunner_Pipelines(t *testing.T) {
Want: []int{16, 17, 18},
}, sum)
},
}, {
name: "sideinput_sameAsMainInput",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col0})
beam.ParDo(s, &int64Check{
Name: "sum sideinput check",
Want: []int{13, 14, 15},
}, sum)
},
}, {
name: "sideinput_sameAsMainInput+Derived",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
col1 := beam.ParDo(s, dofn2, col0)
// Doesn't matter which of col0 or col1 is used.
sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col0}, beam.SideInput{Input: col1})
beam.ParDo(s, &int64Check{
Name: "sum sideinput check",
Want: []int{16, 17, 18},
}, sum)
},
}, {
// Main input is getting duplicated data, since it's being executed twice...
// But that doesn't make any sense
name: "sideinput_2iterable1Data2",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
col1 := beam.ParDo(s, dofn2, col0)
col2 := beam.ParDo(s, dofn2, col0)
// Doesn't matter which of col1 or col2 is used.
sum := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: col2}, beam.SideInput{Input: col1})
beam.ParDo(s, &int64Check{
Name: "iter sideinput check",
Want: []int{19, 20, 21},
}, sum)
},
}, {
// Re-use the same side inputs sequentially (the two consumers should be in the same stage.)
name: "sideinput_two_2iterable1Data",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col0 := beam.ParDo(s, dofn1, imp)
sideIn1 := beam.ParDo(s, dofn1, imp)
sideIn2 := beam.ParDo(s, dofn1, imp)
col1 := beam.ParDo(s, dofn3x1, col0, beam.SideInput{Input: sideIn1}, beam.SideInput{Input: sideIn2})
sum := beam.ParDo(s, dofn3x1, col1, beam.SideInput{Input: sideIn1}, beam.SideInput{Input: sideIn2})
beam.ParDo(s, &int64Check{
Name: "check_sideinput_re-use",
Want: []int{25, 26, 27},
}, sum)
},
}, {
name: "combine_perkey",
pipeline: func(s beam.Scope) {
Expand Down Expand Up @@ -380,6 +436,30 @@ func TestRunner_Pipelines(t *testing.T) {
}, flat)
passert.NonEmpty(s, flat)
},
}, {
name: "gbk_into_gbk",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofnKV, imp)
gbk1 := beam.GroupByKey(s, col1)
col2 := beam.ParDo(s, dofnGBKKV, gbk1)
gbk2 := beam.GroupByKey(s, col2)
out := beam.ParDo(s, dofnGBK, gbk2)
passert.Equals(s, out, int64(9), int64(12))
},
}, {
name: "lperror_gbk_into_cogbk_shared_input",
pipeline: func(s beam.Scope) {
want := beam.CreateList(s, []int{0})
fruits := beam.CreateList(s, []int64{42, 42, 42})
fruitsKV := beam.AddFixedKey(s, fruits)

fruitsGBK := beam.GroupByKey(s, fruitsKV)
fooKV := beam.ParDo(s, toFoo, fruitsGBK)
fruitsFooCoGBK := beam.CoGroupByKey(s, fruitsKV, fooKV)
got := beam.ParDo(s, toID, fruitsFooCoGBK)
passert.Equals(s, got, want)
},
},
}
// TODO: Explicit DoFn Failure case.
Expand Down Expand Up @@ -429,8 +509,75 @@ func TestFailure(t *testing.T) {
if err == nil {
t.Fatalf("expected pipeline failure, but got a success")
}
// Job failure state reason isn't communicated with the state change over the API
// so we can't check for a reason here.
if want := "doFnFail: failing as intended"; !strings.Contains(err.Error(), want) {
t.Fatalf("expected pipeline failure with %q, but was %v", want, err)
}
}

func TestRunner_Passert(t *testing.T) {
initRunner(t)
tests := []struct {
name string
pipeline func(s beam.Scope)
metrics func(t *testing.T, pr beam.PipelineResult)
}{
{
name: "Empty",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofnEmpty, imp)
passert.Empty(s, col1)
},
}, {
name: "Equals-TwoEmpty",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofnEmpty, imp)
col2 := beam.ParDo(s, dofnEmpty, imp)
passert.Equals(s, col1, col2)
},
}, {
name: "Equals",
pipeline: func(s beam.Scope) {
imp := beam.Impulse(s)
col1 := beam.ParDo(s, dofn1, imp)
col2 := beam.ParDo(s, dofn1, imp)
passert.Equals(s, col1, col2)
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
test.pipeline(s)
pr, err := executeWithT(context.Background(), t, p)
if err != nil {
t.Fatal(err)
}
if test.metrics != nil {
test.metrics(t, pr)
}
})
}
}

func toFoo(et beam.EventTime, id int, _ func(*int64) bool) (int, string) {
return id, "ooo"
}

func toID(et beam.EventTime, id int, fruitIter func(*int64) bool, fooIter func(*string) bool) int {
var fruit int64
for fruitIter(&fruit) {
}
var foo string
for fooIter(&foo) {
}
return id
}

func init() {
register.Function3x2(toFoo)
register.Function4x1(toID)
}

// TODO: PCollection metrics tests, in particular for element counts, in multi transform pipelines
Expand Down
70 changes: 65 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ import (
// RunnerCharacteristic holds the configuration for Runner based transforms,
// such as GBKs, Flattens.
type RunnerCharacteristic struct {
SDKFlatten bool // Sets whether we should force an SDK side flatten.
SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
SDKFlatten bool // Sets whether we should force an SDK side flatten.
SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
SDKReshuffle bool
}

func Runner(config any) *runner {
Expand All @@ -63,13 +64,72 @@ func (*runner) ConfigCharacteristic() reflect.Type {
return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem()
}

var _ transformPreparer = (*runner)(nil)

func (*runner) PrepareUrns() []string {
return []string{urns.TransformReshuffle}
}

// PrepareTransform handles special processing with respect runner transforms, like reshuffle.
func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
// TODO: Implement the windowing strategy the "backup" transforms used for Reshuffle.
// TODO: Implement a fusion break for reshuffles.

if h.config.SDKReshuffle {
panic("SDK side reshuffle not yet supported")
}

// A Reshuffle, in principle, is a no-op on the pipeline structure, WRT correctness.
// It could however affect performance, so it exists to tell the runner that this
// point in the pipeline needs a fusion break, to enable the pipeline to change it's
// degree of parallelism.
//
// The change of parallelism goes both ways. It could allow for larger batch sizes
// enable smaller batch sizes downstream if it is infact paralleizable.
//
// But for a single transform node per stage runner, we can elide it entirely,
// since the input collection and output collection types match.

// Get the input and output PCollections, there should only be 1 each.
if len(t.GetOutputs()) != 1 {
panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t))
}
if len(t.GetOutputs()) != 1 {
panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you missed the input check here and didn't update the panic string for each

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol. putput. Good catch!


inColID := getOnlyValue(t.GetInputs())
outColID := getOnlyValue(t.GetOutputs())

// We need to find all Transforms that consume the output collection and
// replace them so they consume the input PCollection directly.

// We need to remove the consumers of the output PCollection.
toRemove := []string{}

for _, t := range comps.GetTransforms() {
for li, gi := range t.GetInputs() {
if gi == outColID {
// The whole s
t.GetInputs()[li] = inColID
}
}
}

// And all the sub transforms.
toRemove = append(toRemove, t.GetSubtransforms()...)

// Return the new components which is the transforms consumer
return nil, toRemove
}

var _ transformExecuter = (*runner)(nil)

func (*runner) ExecuteUrns() []string {
return []string{urns.TransformFlatten, urns.TransformGBK}
return []string{urns.TransformFlatten, urns.TransformGBK, urns.TransformReshuffle}
}

// ExecuteWith returns what environment the
// ExecuteWith returns what environment the transform should execute in.
func (h *runner) ExecuteWith(t *pipepb.PTransform) string {
urn := t.GetSpec().GetUrn()
if urn == urns.TransformFlatten && !h.config.SDKFlatten {
Expand All @@ -82,7 +142,7 @@ func (h *runner) ExecuteWith(t *pipepb.PTransform) string {
}

// ExecuteTransform handles special processing with respect to runner specific transforms
func (h *runner) ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, inputData [][]byte) *worker.B {
func (h *runner) ExecuteTransform(stageID, tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, inputData [][]byte) *worker.B {
urn := t.GetSpec().GetUrn()
var data [][]byte
var onlyOut string
Expand Down
Loading