From e0f463c69d01d68a0c0be3cbdda711d5de6a297c Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Mon, 9 Jan 2023 09:53:17 -0800 Subject: [PATCH] [#24931][Go SDK] Make element checkpoints independant (#24932) --- .../pkg/beam/core/runtime/exec/datasource.go | 49 ++++-- .../beam/core/runtime/exec/datasource_test.go | 155 +++++++++++++++++- sdks/go/pkg/beam/core/runtime/exec/plan.go | 25 +-- .../pkg/beam/core/runtime/exec/plan_test.go | 141 ++++++++++++++++ sdks/go/pkg/beam/core/runtime/exec/sdf.go | 24 +-- .../go/pkg/beam/core/runtime/exec/sdf_test.go | 30 ++++ sdks/go/pkg/beam/core/runtime/exec/unit.go | 2 +- .../pkg/beam/core/runtime/exec/unit_test.go | 14 +- .../pkg/beam/core/runtime/harness/harness.go | 32 ++-- sdks/go/pkg/beam/core/sdf/lock.go | 9 +- .../io/rtrackers/offsetrange/offsetrange.go | 4 + sdks/go/pkg/beam/runners/direct/impulse.go | 4 +- .../integration/primitives/checkpointing.go | 2 - 13 files changed, 419 insertions(+), 72 deletions(-) create mode 100644 sdks/go/pkg/beam/core/runtime/exec/plan_test.go diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 55b0b0a5cad0..9c4de0564c8e 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -127,10 +127,10 @@ func (r *byteCountReader) reset() int { } // Process opens the data source, reads and decodes data, kicking off element processing. -func (n *DataSource) Process(ctx context.Context) error { +func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) { r, err := n.source.OpenRead(ctx, n.SID) if err != nil { - return err + return nil, err } defer r.Close() n.PCol.resetSize() // initialize the size distribution for this bundle. @@ -154,23 +154,24 @@ func (n *DataSource) Process(ctx context.Context) error { cp = MakeElementDecoder(c) } + var checkpoints []*Checkpoint for { if n.incrementIndexAndCheckSplit() { - return nil + break } // TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes? ws, t, pn, err := DecodeWindowedValueHeader(wc, r) if err != nil { if err == io.EOF { - return nil + break } - return errors.Wrap(err, "source failed") + return nil, errors.Wrap(err, "source failed") } // Decode key or parallel element. pe, err := cp.Decode(&bcr) if err != nil { - return errors.Wrap(err, "source decode failed") + return nil, errors.Wrap(err, "source decode failed") } pe.Timestamp = t pe.Windows = ws @@ -180,18 +181,32 @@ func (n *DataSource) Process(ctx context.Context) error { for _, cv := range cvs { values, err := n.makeReStream(ctx, cv, &bcr, len(cvs) == 1 && n.singleIterate) if err != nil { - return err + return nil, err } valReStreams = append(valReStreams, values) } if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil { - return err + return nil, err } // Collect the actual size of the element, and reset the bytecounter reader. n.PCol.addSize(int64(bcr.reset())) bcr.reader = r + + // Check if there's a continuation and return residuals + // Needs to be done immeadiately after processing to not lose the element. + if c := n.getProcessContinuation(); c != nil { + cp, err := n.checkpointThis(c) + if err != nil { + // Errors during checkpointing should fail a bundle. + return nil, err + } + if cp != nil { + checkpoints = append(checkpoints, cp) + } + } } + return checkpoints, nil } func (n *DataSource) makeReStream(ctx context.Context, cv ElementDecoder, bcr *byteCountReader, onlyStream bool) (ReStream, error) { @@ -397,18 +412,22 @@ func (n *DataSource) makeEncodeElms() func([]*FullValue) ([][]byte, error) { return encodeElms } +type Checkpoint struct { + SR SplitResult + Reapply time.Duration +} + // Checkpoint attempts to split an SDF that has self-checkpointed (e.g. returned a // ProcessContinuation) and needs to be resumed later. If the underlying DoFn is not // splittable or has not returned a resuming continuation, the function returns an empty // SplitResult, a negative resumption time, and a false boolean to indicate that no split // occurred. -func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { +func (n *DataSource) checkpointThis(pc sdf.ProcessContinuation) (*Checkpoint, error) { n.mu.Lock() defer n.mu.Unlock() - pc := n.getProcessContinuation() if pc == nil || !pc.ShouldResume() { - return SplitResult{}, -1 * time.Minute, false, nil + return nil, nil } su := SplittableUnit(n.Out.(*ProcessSizedElementsAndRestrictions)) @@ -418,17 +437,17 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { // Checkpointing is functionally a split at fraction 0.0 rs, err := su.Checkpoint() if err != nil { - return SplitResult{}, -1 * time.Minute, false, err + return nil, err } if len(rs) == 0 { - return SplitResult{}, -1 * time.Minute, false, nil + return nil, nil } encodeElms := n.makeEncodeElms() rsEnc, err := encodeElms(rs) if err != nil { - return SplitResult{}, -1 * time.Minute, false, err + return nil, err } res := SplitResult{ @@ -437,7 +456,7 @@ func (n *DataSource) Checkpoint() (SplitResult, time.Duration, bool, error) { InId: su.GetInputId(), OW: ow, } - return res, pc.ResumeDelay(), true, nil + return &Checkpoint{SR: res, Reapply: pc.ResumeDelay()}, nil } // Split takes a sorted set of potential split indices and a fraction of the diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index 19f639b31d07..64a37739b24f 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -16,18 +16,25 @@ package exec import ( + "bytes" "context" "fmt" "io" "math" + "reflect" "testing" "time" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/coderx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -314,7 +321,10 @@ func TestDataSource_Split(t *testing.T) { t.Fatalf("error in Split: got primary index = %v, want %v ", got, want) } - runOnRoots(ctx, t, p, "Process", Root.Process) + runOnRoots(ctx, t, p, "Process", func(root Root, ctx context.Context) error { + _, err := root.Process(ctx) + return err + }) runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle) validateSource(t, out, source, makeValues(test.expected...)) @@ -449,7 +459,10 @@ func TestDataSource_Split(t *testing.T) { if got, want := splitRes.PI, test.splitIdx-1; got != want { t.Fatalf("error in Split: got primary index = %v, want %v ", got, want) } - runOnRoots(ctx, t, p, "Process", Root.Process) + runOnRoots(ctx, t, p, "Process", func(root Root, ctx context.Context) error { + _, err := root.Process(ctx) + return err + }) runOnRoots(ctx, t, p, "FinishBundle", Root.FinishBundle) validateSource(t, out, source, makeValues(test.expected...)) @@ -582,7 +595,10 @@ func TestDataSource_Split(t *testing.T) { if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful { t.Fatalf("p.Split(active) = %v,%v want unsuccessful split & nil err", sr, err) } - runOnRoots(ctx, t, p, "Process", Root.Process) + runOnRoots(ctx, t, p, "Process", func(root Root, ctx context.Context) error { + _, err := root.Process(ctx) + return err + }) if sr, err := p.Split(ctx, SplitPoints{Splits: []int64{0}, Frac: -1}); err != nil || !sr.Unsuccessful { t.Fatalf("p.Split(active, unable to get desired split) = %v,%v want unsuccessful split & nil err", sr, err) } @@ -858,6 +874,139 @@ func TestSplitHelper(t *testing.T) { }) } +func TestCheckpointing(t *testing.T) { + t.Run("nil", func(t *testing.T) { + cps, err := (&DataSource{}).checkpointThis(nil) + if err != nil { + t.Fatalf("checkpointThis() = %v, %v", cps, err) + } + }) + t.Run("Stop", func(t *testing.T) { + cps, err := (&DataSource{}).checkpointThis(sdf.StopProcessing()) + if err != nil { + t.Fatalf("checkpointThis() = %v, %v", cps, err) + } + }) + t.Run("Delay_no_residuals", func(t *testing.T) { + wesInv, _ := newWatermarkEstimatorStateInvoker(nil) + root := &DataSource{ + Out: &ProcessSizedElementsAndRestrictions{ + PDo: &ParDo{}, + wesInv: wesInv, + rt: offsetrange.NewTracker(offsetrange.Restriction{}), + elm: &FullValue{ + Windows: window.SingleGlobalWindow, + }, + }, + } + cp, err := root.checkpointThis(sdf.ResumeProcessingIn(time.Second * 13)) + if err != nil { + t.Fatalf("checkpointThis() = %v, %v, want nil", cp, err) + } + if cp != nil { + t.Fatalf("checkpointThis() = %v, want nil", cp) + } + }) + dfn, err := graph.NewDoFn(&CheckpointingSdf{delay: time.Minute}, graph.NumMainInputs(graph.MainSingle)) + if err != nil { + t.Fatalf("invalid function: %v", err) + } + + intCoder, _ := coderx.NewVarIntZ(reflectx.Int) + ERSCoder := coder.NewKV([]*coder.Coder{ + coder.NewKV([]*coder.Coder{ + coder.CoderFrom(intCoder), // Element + coder.NewKV([]*coder.Coder{ + coder.NewR(typex.New(reflect.TypeOf((*offsetrange.Restriction)(nil)).Elem())), // Restriction + coder.NewBool(), // Watermark State + }), + }), + coder.NewDouble(), // Size + }) + wvERSCoder := coder.NewW( + ERSCoder, + coder.NewGlobalWindow(), + ) + + rest := offsetrange.Restriction{Start: 1, End: 10} + value := &FullValue{ + Elm: &FullValue{ + Elm: 42, + Elm2: &FullValue{ + Elm: rest, // Restriction + Elm2: false, // Watermark State falsie + }, + }, + Elm2: rest.Size(), + Windows: window.SingleGlobalWindow, + Timestamp: mtime.MaxTimestamp, + Pane: typex.NoFiringPane(), + } + t.Run("Delay_residuals_Process", func(t *testing.T) { + ctx := context.Background() + wesInv, _ := newWatermarkEstimatorStateInvoker(nil) + rest := offsetrange.Restriction{Start: 1, End: 10} + root := &DataSource{ + Coder: wvERSCoder, + Out: &ProcessSizedElementsAndRestrictions{ + PDo: &ParDo{ + Fn: dfn, + Out: []Node{&Discard{}}, + }, + TfId: "testTransformID", + wesInv: wesInv, + rt: offsetrange.NewTracker(rest), + }, + } + if err := root.Up(ctx); err != nil { + t.Fatalf("invalid function: %v", err) + } + if err := root.Out.Up(ctx); err != nil { + t.Fatalf("invalid function: %v", err) + } + + enc := MakeElementEncoder(wvERSCoder) + var buf bytes.Buffer + + // We encode the element several times to ensure we don't + // drop any residuals, the root of issue #24931. + wantCount := 3 + for i := 0; i < wantCount; i++ { + if err := enc.Encode(value, &buf); err != nil { + t.Fatalf("couldn't encode value: %v", err) + } + } + + if err := root.StartBundle(ctx, "testBund", DataContext{ + Data: &TestDataManager{ + R: io.NopCloser(&buf), + }, + }, + ); err != nil { + t.Fatalf("invalid function: %v", err) + } + cps, err := root.Process(ctx) + if err != nil { + t.Fatalf("Process() = %v, %v, want nil", cps, err) + } + if got, want := len(cps), wantCount; got != want { + t.Fatalf("Process() = len %v checkpoints, want %v", got, want) + } + // Check each checkpoint has the expected values. + for _, cp := range cps { + if got, want := cp.Reapply, time.Minute; got != want { + t.Errorf("Process(delay(%v)) delay = %v, want %v", want, got, want) + } + if got, want := cp.SR.TId, root.Out.(*ProcessSizedElementsAndRestrictions).TfId; got != want { + t.Errorf("Process() transformID = %v, want %v", got, want) + } + if got, want := cp.SR.InId, "i0"; got != want { + t.Errorf("Process() transformID = %v, want %v", got, want) + } + } + }) +} + func runOnRoots(ctx context.Context, t *testing.T, p *Plan, name string, mthd func(Root, context.Context) error) { t.Helper() for i, root := range p.roots { diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go index 0189de51c7f8..7958cf382383 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/plan.go +++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go @@ -31,11 +31,12 @@ import ( // from a part of a pipeline. A plan can be used to process multiple bundles // serially. type Plan struct { - id string // id of the bundle descriptor for this plan - roots []Root - units []Unit - pcols []*PCollection - bf *bundleFinalizer + id string // id of the bundle descriptor for this plan + roots []Root + units []Unit + pcols []*PCollection + bf *bundleFinalizer + checkpoints []*Checkpoint status Status @@ -126,7 +127,11 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro } } for _, root := range p.roots { - if err := callNoPanic(ctx, root.Process); err != nil { + if err := callNoPanic(ctx, func(ctx context.Context) error { + cps, err := root.Process(ctx) + p.checkpoints = cps + return err + }); err != nil { p.status = Broken return errors.Wrapf(err, "while executing Process for %v", p) } @@ -281,9 +286,7 @@ func (p *Plan) Split(ctx context.Context, s SplitPoints) (SplitResult, error) { } // Checkpoint attempts to split an SDF if the DoFn self-checkpointed. -func (p *Plan) Checkpoint() (SplitResult, time.Duration, bool, error) { - if p.source != nil { - return p.source.Checkpoint() - } - return SplitResult{}, -1 * time.Minute, false, nil +func (p *Plan) Checkpoint() []*Checkpoint { + defer func() { p.checkpoints = nil }() + return p.checkpoints } diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan_test.go b/sdks/go/pkg/beam/core/runtime/exec/plan_test.go new file mode 100644 index 000000000000..348e0a44734e --- /dev/null +++ b/sdks/go/pkg/beam/core/runtime/exec/plan_test.go @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exec + +import ( + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" +) + +func TestPlan_Checkpoint(t *testing.T) { + var p Plan + want := []*Checkpoint{{Reapply: time.Hour}} + p.checkpoints = want + if got := p.Checkpoint(); !cmp.Equal(got, want) { + t.Errorf("p.Checkpoint() = %v, want %v", got, want) + } + if p.checkpoints != nil { + t.Errorf("p.Checkpoint() didn't nil checkpoints field") + } +} + +func TestPlan_BundleFinalizers(t *testing.T) { + newPlan := func() Plan { + var p Plan + p.status = Up + return p + } + t.Run("NoCallbacks", func(t *testing.T) { + p := newPlan() + p.bf = &bundleFinalizer{} + if err := p.Finalize(); err != nil { + t.Errorf("p.Finalize() = %v, want nil", err) + } + // Expiration time is no longer set to default + if got, want := p.GetExpirationTime(), (time.Time{}); want.Equal(got) { + t.Errorf("p.GetExpirationTime() = %v, want %v", got, want) + } + }) + + t.Run("Callbacks", func(t *testing.T) { + p := newPlan() + initialDeadline := time.Now().Add(time.Hour) + + var callCount int + inc := func() error { + callCount++ + return nil + } + p.bf = &bundleFinalizer{ + callbacks: []bundleFinalizationCallback{ + {callback: inc, validUntil: initialDeadline}, + {callback: inc, validUntil: initialDeadline}, + {callback: inc, validUntil: initialDeadline}, + }, + } + if err := p.Finalize(); err != nil { + t.Errorf("p.Finalize() = %v, want nil", err) + } + // Expiration time is no longer set to default + if got, want := p.GetExpirationTime(), (time.Time{}); want.Equal(got) { + t.Errorf("p.GetExpirationTime() = %v, want %v", got, want) + } + if got, want := callCount, 3; got != want { + t.Errorf("p.Finalize() didn't call all finalizers, got %v, want %v", got, want) + } + }) + t.Run("Callbacks_expired", func(t *testing.T) { + p := newPlan() + initialDeadline := time.Now().Add(-time.Hour) + + var callCount int + inc := func() error { + callCount++ + return nil + } + p.bf = &bundleFinalizer{ + callbacks: []bundleFinalizationCallback{ + {callback: inc, validUntil: initialDeadline}, + {callback: inc, validUntil: initialDeadline}, + {callback: inc, validUntil: initialDeadline}, + }, + } + if err := p.Finalize(); err != nil { + t.Errorf("p.Finalize() = %v, want nil", err) + } + // Expiration time is no longer set to default + if got, want := p.GetExpirationTime(), (time.Time{}); want.Equal(got) { + t.Errorf("p.GetExpirationTime() = %v, want %v", got, want) + } + if got, want := callCount, 0; got != want { + t.Errorf("p.Finalize() didn't call all finalizers, got %v, want %v", got, want) + } + }) + + t.Run("Callbacks_failures", func(t *testing.T) { + p := newPlan() + initialDeadline := time.Now().Add(time.Hour) + + var callCount int + inc := func() error { + callCount++ + if callCount == 1 { + return fmt.Errorf("unable to call") + } + return nil + } + p.bf = &bundleFinalizer{ + callbacks: []bundleFinalizationCallback{ + {callback: inc, validUntil: initialDeadline}, + {callback: inc, validUntil: initialDeadline}, + {callback: inc, validUntil: initialDeadline}, + }, + } + if err := p.Finalize(); err == nil { + t.Errorf("p.Finalize() = %v, want an error", err) + } + if got, want := callCount, 3; got != want { + t.Errorf("p.Finalize() didn't call all finalizers, got %v, want %v", got, want) + } + if len(p.bf.callbacks) != 1 { + t.Errorf("p.Finalize() didn't preserve failed callbacks") + } + }) + +} diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go index d4b1b32d257d..e22496eae6e5 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go @@ -459,18 +459,18 @@ func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, i // // Input Diagram: // -// *FullValue { -// Elm: *FullValue { -// Elm: *FullValue (KV input) or InputType (single-element input) -// Elm2: *FullValue { -// Elm: Restriction -// Elm2: Watermark estimator state -// } -// } -// Elm2: float64 (size) -// Windows -// Timestamps -// } +// *FullValue { +// Elm: *FullValue { +// Elm: *FullValue (KV input) or InputType (single-element input) +// Elm2: *FullValue { +// Elm: Restriction +// Elm2: Watermark estimator state +// } +// } +// Elm2: float64 (size) +// Windows +// Timestamps +// } // // ProcessElement then creates a restriction tracker from the stored restriction // and processes each element using the underlying ParDo and adding the diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf_test.go b/sdks/go/pkg/beam/core/runtime/exec/sdf_test.go index 6520b9dfe597..414f28553a81 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/sdf_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/sdf_test.go @@ -1524,6 +1524,36 @@ func (fn *WindowBlockingSdf) ProcessElement(w typex.Window, rt *sdf.LockRTracker emit(elm) } +// CheckpointingSdf is a very basic checkpointing DoFn that always +// returns a processing continuation. +type CheckpointingSdf struct { + delay time.Duration +} + +// CreateInitialRestriction creates a four-element offset range. +func (fn *CheckpointingSdf) CreateInitialRestriction(_ int) offsetrange.Restriction { + return offsetrange.Restriction{Start: 0, End: 4} +} + +// SplitRestriction is a no-op, and does not split. +func (fn *CheckpointingSdf) SplitRestriction(_ int, rest offsetrange.Restriction) []offsetrange.Restriction { + return []offsetrange.Restriction{rest} +} + +// RestrictionSize defers to the default offset range restriction size. +func (fn *CheckpointingSdf) RestrictionSize(_ int, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +// CreateTracker creates a LockRTracker wrapping an offset range RTracker. +func (fn *CheckpointingSdf) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +func (fn *CheckpointingSdf) ProcessElement(rt *sdf.LockRTracker, elm int, emit func(int)) sdf.ProcessContinuation { + return sdf.ResumeProcessingIn(fn.delay) +} + // SplittableUnitRTracker is a VetRTracker with some added behavior needed for // TestAsSplittableUnit. type SplittableUnitRTracker struct { diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit.go b/sdks/go/pkg/beam/core/runtime/exec/unit.go index 04635086ab12..ac907d98f75b 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit.go @@ -57,7 +57,7 @@ type Root interface { // Process processes the entire source, notably emitting elements to // downstream nodes. - Process(ctx context.Context) error + Process(ctx context.Context) ([]*Checkpoint, error) } // ElementProcessor presents a component that can process an element. diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go index 9d0137a0eddd..885294ed3970 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go @@ -125,6 +125,8 @@ type FixedRoot struct { Out Node } +var _ Root = (*FixedRoot)(nil) + func (n *FixedRoot) ID() UnitID { return n.UID } @@ -137,13 +139,13 @@ func (n *FixedRoot) StartBundle(ctx context.Context, id string, data DataContext return n.Out.StartBundle(ctx, id, data) } -func (n *FixedRoot) Process(ctx context.Context) error { +func (n *FixedRoot) Process(ctx context.Context) ([]*Checkpoint, error) { for _, elm := range n.Elements { if err := n.Out.ProcessElement(ctx, &elm.Key, elm.Values...); err != nil { - return err + return nil, err } } - return nil + return nil, nil } func (n *FixedRoot) FinishBundle(ctx context.Context) error { @@ -186,13 +188,13 @@ func (n *BenchRoot) StartBundle(ctx context.Context, id string, data DataContext return n.Out.StartBundle(ctx, id, data) } -func (n *BenchRoot) Process(ctx context.Context) error { +func (n *BenchRoot) Process(ctx context.Context) ([]*Checkpoint, error) { for elm := range n.Elements { if err := n.Out.ProcessElement(ctx, &elm.Key, elm.Values...); err != nil { - return err + return nil, err } } - return nil + return nil, nil } func (n *BenchRoot) FinishBundle(ctx context.Context) error { diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 6b2386cc2e38..c260a46c80ee 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -415,6 +415,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe mons, pylds := monitoring(plan, store, c.runnerCapabilities[URNMonitoringInfoShortID]) + checkpoints := plan.Checkpoint() requiresFinalization := false // Move the plan back to the candidate state c.mu.Lock() @@ -447,21 +448,19 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe } } - // Check if the underlying DoFn self-checkpointed. - sr, delay, checkpointed, checkErr := plan.Checkpoint() - var rRoots []*fnpb.DelayedBundleApplication - if checkpointed { - rRoots = make([]*fnpb.DelayedBundleApplication, len(sr.RS)) - for i, r := range sr.RS { - rRoots[i] = &fnpb.DelayedBundleApplication{ - Application: &fnpb.BundleApplication{ - TransformId: sr.TId, - InputId: sr.InId, - Element: r, - OutputWatermarks: sr.OW, - }, - RequestedTimeDelay: durationpb.New(delay), + if len(checkpoints) > 0 { + for _, cp := range checkpoints { + for _, r := range cp.SR.RS { + rRoots = append(rRoots, &fnpb.DelayedBundleApplication{ + Application: &fnpb.BundleApplication{ + TransformId: cp.SR.TId, + InputId: cp.SR.InId, + Element: r, + OutputWatermarks: cp.SR.OW, + }, + RequestedTimeDelay: durationpb.New(cp.Reapply), + }) } } } @@ -477,11 +476,6 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe if err != nil { return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err) } - - if checkErr != nil { - return fail(ctx, instID, "process bundle failed at checkpointing for instruction %v using plan %v : %v", instID, bdID, checkErr) - } - return &fnpb.InstructionResponse{ InstructionId: string(instID), Response: &fnpb.InstructionResponse_ProcessBundle{ diff --git a/sdks/go/pkg/beam/core/sdf/lock.go b/sdks/go/pkg/beam/core/sdf/lock.go index ea26129d34c0..eecae0ef4fd4 100644 --- a/sdks/go/pkg/beam/core/sdf/lock.go +++ b/sdks/go/pkg/beam/core/sdf/lock.go @@ -15,7 +15,10 @@ package sdf -import "sync" +import ( + "fmt" + "sync" +) // NewLockRTracker creates a LockRTracker initialized with the specified // restriction tracker as its underlying restriction tracker. @@ -92,3 +95,7 @@ func (rt *LockRTracker) IsBounded() bool { } return true } + +func (rt *LockRTracker) String() string { + return fmt.Sprintf("LockRTracker(%v)", rt.Rt) +} diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go index 3aaf48bc0af7..ad8da0fa90b3 100644 --- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go +++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go @@ -131,6 +131,10 @@ type Tracker struct { err error } +func (tracker *Tracker) String() string { + return fmt.Sprintf("[%v,%v) c: %v, a.: %v, stopped: %v, err: %v", tracker.rest.Start, tracker.rest.End, tracker.claimed, tracker.attempted, tracker.stopped, tracker.err) +} + // NewTracker is a constructor for an Tracker given a start and end range. func NewTracker(rest Restriction) *Tracker { return &Tracker{ diff --git a/sdks/go/pkg/beam/runners/direct/impulse.go b/sdks/go/pkg/beam/runners/direct/impulse.go index 1d6f78d06200..c2348474a0ba 100644 --- a/sdks/go/pkg/beam/runners/direct/impulse.go +++ b/sdks/go/pkg/beam/runners/direct/impulse.go @@ -43,13 +43,13 @@ func (n *Impulse) StartBundle(ctx context.Context, id string, data exec.DataCont return n.Out.StartBundle(ctx, id, data) } -func (n *Impulse) Process(ctx context.Context) error { +func (n *Impulse) Process(ctx context.Context) ([]*exec.Checkpoint, error) { value := &exec.FullValue{ Windows: window.SingleGlobalWindow, Timestamp: mtime.Now(), Elm: n.Value, } - return n.Out.ProcessElement(ctx, value) + return nil, n.Out.ProcessElement(ctx, value) } func (n *Impulse) FinishBundle(ctx context.Context) error { diff --git a/sdks/go/test/integration/primitives/checkpointing.go b/sdks/go/test/integration/primitives/checkpointing.go index 1a52f9d8aeb8..ae61f3186294 100644 --- a/sdks/go/test/integration/primitives/checkpointing.go +++ b/sdks/go/test/integration/primitives/checkpointing.go @@ -97,8 +97,6 @@ func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte, // Checkpoints is a small test pipeline to establish the correctness of the simple test case. func Checkpoints(s beam.Scope) { - beam.Init() - s.Scope("checkpoint") out := beam.ParDo(s, &selfCheckpointingDoFn{}, beam.Impulse(s)) passert.Count(s, out, "num ints", 10)