Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#23106] Add periodic.Sequence and periodic.Impulse transforms to Go SDK #25808

Merged
merged 16 commits into from
Mar 23, 2023

Conversation

hnnsgstfssn
Copy link
Contributor

@hnnsgstfssn hnnsgstfssn commented Mar 11, 2023

The new transforms extends support for the slowly updating side input pattern [1] as tracked in [2].

An attempt to mirror the logic of the Python implementation [3] has been made with minor idiomatic changes.

Java [4][5] and Python [6] have influenced the documentation and naming.

[1] https://beam.apache.org/documentation/patterns/side-inputs/
[2] #23106
[3] https://github.com/apache/beam/blob/v2.46.0/sdks/python/apache_beam/transforms/periodicsequence.py#L59
[4] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicSequence.html
[5] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicImpulse.html
[6] https://beam.apache.org/releases/pydoc/2.46.0/apache_beam.transforms.periodicsequence.html?highlight=periodicimpulse#module-apache_beam.transforms.periodicsequence

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

Copy link
Contributor Author

@hnnsgstfssn hnnsgstfssn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a first rough draft to use as a basis for initial discussion. I'm unsure how to properly test this without some guidance, and currently it does not drain properly.

}

type sequenceGenDoFn struct {
now func() time.Time
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While playing around locally I've added this way to mock the time.Now function. If it ends up being unused I'm happy to drop it if that makes more sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will work in the direct runner, but that's because the direct runner won't successfully run the example or anything local. It would be better to fold things into the sequence definition to enable appropriate testing behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I struggled with the direct runner, but have now added two working tests that uses the prism runner. I am struggling to understand how to fold this into the sequence definition as you suggest, to properly test it. Instead I went ahead and removed this entirely and the DoFn uses time.Now.

I am happy to leave it there for now. If you want to expand on how to fold it into the definition and test it I am happy to add this as well.

sdks/go/pkg/beam/transforms/periodic/periodic.go Outdated Show resolved Hide resolved
@hnnsgstfssn
Copy link
Contributor Author

The draining failure does not happen on latest stable Beam 2.45.0, but does happen on later versions. As of writing 2.46.0 and 2.47.0.dev had the example pipeline fail to drain.

@hnnsgstfssn
Copy link
Contributor Author

Note that the example was added mainly for illustration of the code used to manually test this PR on Dataflow. Let me know if it is useful to keep it, or if I should drop it from the PR.

@hnnsgstfssn
Copy link
Contributor Author

Considering it runs on latest stable as of writing, I'll mark this as ready for review.

I'm still expecting to potentially add tests, if there is a good way to test these streaming features, but it may be acceptable to have manually verified the runs example successfully on Dataflow.

@hnnsgstfssn hnnsgstfssn marked this pull request as ready for review March 11, 2023 14:21
@hnnsgstfssn
Copy link
Contributor Author

R: @lostluck

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@Abacn
Copy link
Contributor

Abacn commented Mar 11, 2023

Thanks for the contribution! This is a very useful transform. For the drain issue, Python and Java implementation also did not drain until recent fix (#23765 #25716). Basically the transform should (i) update watermark when advance and (ii) truncate immediately when truncate is called (happens in drain)

@codecov
Copy link

codecov bot commented Mar 11, 2023

Codecov Report

Merging #25808 (1b05c69) into master (b1ea4d3) will decrease coverage by 0.02%.
The diff coverage is 67.94%.

@@            Coverage Diff             @@
##           master   #25808      +/-   ##
==========================================
- Coverage   72.80%   72.79%   -0.02%     
==========================================
  Files         775      777       +2     
  Lines      102948   103041      +93     
==========================================
+ Hits        74956    75006      +50     
- Misses      26537    26574      +37     
- Partials     1455     1461       +6     
Flag Coverage Δ
go 53.95% <67.94%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/transforms/periodic/periodic.go 67.94% <67.94%> (ø)

... and 13 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@hnnsgstfssn
Copy link
Contributor Author

For the drain issue, Python and Java implementation also did not drain until recent fix (#23765 #25716). Basically the transform should (i) update watermark when advance and (ii) truncate immediately when truncate is called (happens in drain)

Ah! I didn't realize those fixes [1][2] for Python were recent- thanks for highlight it. I tried including both (i) and (ii) here i.e. implementing TruncateRestriction and CreateWatermarkEstimator and updating the watermark in ProcessElement, but might not be doing it correctly.

[1] 30b2617
[2] 7dba78d

@lostluck
Copy link
Contributor

Thanks for your patience! I'm still getting caught up from being on vacation. I'm intending to get this review done today or tomorrow.

@lostluck
Copy link
Contributor

This is failing our Staticheck action. Not hard to fix at least.

https://github.com/apache/beam/actions/runs/4392151772/jobs/7693061461?pr=25808

Error: transforms/periodic/periodic.go:61:18: calling math.Ceil on a converted integer is pointless (SA4015)
Error: transforms/periodic/periodic.go:132:6: func withNowFunc is unused (U1000)
Error: Process completed with exit code 1.

@lostluck lostluck changed the title Add periodic.Sequence and periodic.Impulse transforms [#23106] Add periodic.Sequence and periodic.Impulse transforms to Go SDK Mar 19, 2023
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! Thanks! I do have comments. Thanks for such a valuable first contribution to Beam! :)

I have a few commons on example formatting and noting that we should use the new Prism runner to be able to have unit test pipelines for periodic. I think I've covered your other questions too.

Comment on lines 121 to 134
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
beam.ParDo(
s,
update,
periodic.Impulse(
s,
startTime,
endTime,
periodicSequenceInterval,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nesting here, has the added deficit of hiding the core of the example.

Suggested change
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
beam.ParDo(
s,
update,
periodic.Impulse(
s,
startTime,
endTime,
periodicSequenceInterval,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)
// Generate an impulse every period.
periodicImp := periodic.Impulse(s, startTime, endTime, periodicSequenceInterval)
// Use the impulse to trigger some other ordinary transform.
updatedImp := beam.ParDo(s, update, periodicImp)
// Window for use as a side input, to allow the input to change with windows.
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
updatedImp,
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)

I'll note that the window for the side input is usually going to be larger than the window for the main processing. While this isn't wrong, the usualy goal around the pattern is a situation like allowing files that change hourly get read in once each hour, and have the more frequent data able to re-use the cached read in file. (Granted, this behavior isn't yet enabled by default in the Go SDK, but that's an aside).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of making the side input window larger. Do you think it is worth making that change, taking another configuration to specify the side input window size?

I'm also curious to know what you mean by

this behavior isn't yet enabled by default in the Go SDK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't able to apply this suggestion after having changed the periodic.Impulse signature, but have applied the same change in a separate commit. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WRT the cache:

Beam's execution abstraction, the FnAPI, has provisions for caching data cross bundle from the StateAPI on the SDK side, in order to avoid repeated deserialization, and additional round trips to the primary store of the runner to fetch state data. Side inputs also come across the StateAPI. In particular, very valuable for streaming jobs, as typically a single "SDK harness" is usually responsible for the same key all the time, so for tight windows it would look up the same data from the side inputs.


WRT not applying after the impulse change. Makes sense since an int64 was being received from the "update" function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't address the larger side input window. Since it's an example, we don't need to over configure things. I'd make the window 5 times larger to demonstrate the paradigm, and an explicit comment that's what the larger window is for.

sdks/go/pkg/beam/transforms/periodic/periodic.go Outdated Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to add a short unit test if using the prism runner directly.

While the runner isn't fully complete yet, it does run and execute ProcessContinuation transforms and watermarks!

It just doesn't do the splitting just yet, or actually "wait" for any process continuations at the moment. But when the "sequence" is done, it will terminate, so we can add a test with period of a second, a duration of a minute, and then count that we're getting 60 elements out of the transform. (Small risk of getting 59 instead, as a flake...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great! I couldn't get it working on the direct runner, but didn't try using the new prism explicitly. Have now added two tests TestImpulse and TestSequence and it looks like that's working. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The direct runner (and basically all non-portable runners), are not great, since they mis-align expectations for users when they move to executing on a "real" runner, like Flink or Dataflow. There's also no "spec" about what a Direct Runner should do to properly test things, so the Go Direct Runner is missing a number of features that the Java and Python ones use.

Essentially Prism is going to replace the Go Direct Runner as the default runner for the Go SDK at least, and hopefully make testing and local runs for all facets of Beam easier. The prism Readme has a bunch of the vision, and desired goals, but I gave a talk at last year's Beam Summit, about the motivations, especially in all the "hidden" bits of beam that users usually don't interact with, but are affected by.

https://www.youtube.com/watch?v=G4lbkvAG6xk

sdks/go/pkg/beam/transforms/periodic/periodic.go Outdated Show resolved Hide resolved
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great! I couldn't get it working on the direct runner, but didn't try using the new prism explicitly. Have now added two tests TestImpulse and TestSequence and it looks like that's working. Let me know what you think.

}

type sequenceGenDoFn struct {
now func() time.Time
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I struggled with the direct runner, but have now added two working tests that uses the prism runner. I am struggling to understand how to fold this into the sequence definition as you suggest, to properly test it. Instead I went ahead and removed this entirely and the DoFn uses time.Now.

I am happy to leave it there for now. If you want to expand on how to fold it into the definition and test it I am happy to add this as well.

sdks/go/pkg/beam/transforms/periodic/periodic.go Outdated Show resolved Hide resolved
Comment on lines 121 to 134
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
beam.ParDo(
s,
update,
periodic.Impulse(
s,
startTime,
endTime,
periodicSequenceInterval,
),
),
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought of making the side input window larger. Do you think it is worth making that change, taking another configuration to specify the side input window size?

I'm also curious to know what you mean by

this behavior isn't yet enabled by default in the Go SDK

sdks/go/pkg/beam/transforms/periodic/periodic.go Outdated Show resolved Hide resolved
sdks/go/pkg/beam/transforms/periodic/periodic.go Outdated Show resolved Hide resolved
@hnnsgstfssn
Copy link
Contributor Author

The example is still not able to successfully drain, failing with the following message. I'm unsure why, since as far as I can tell it is both (i) updating the watermark when advancing and (ii) truncating immediately.

"Error message from worker: generic::unknown: process bundle failed for instruction process_bundle-13-7 using plan drain-S02-11 : panic: runtime error: index out of range [2] with length 2
Full error:
while executing Process for Plan[drain-S02-11]:
2: DataSink[S[ptransform-9@localhost:12371]] Coder:W;coder-50<KV;coder-51<int[varintz;c2];coder-52,string;coder-53>>!IWC
3: PCollection[pcollection-32] Out:[2]
4: WindowInto[FIX[1m0s]]. Out:2
5: PCollection[pcollection-26] Out:[4]
6: ParDo[main.update] Out:[5] Sig: func(context.Context, mtime.Time, []uint8, func(int, string))
7: PCollection[pcollection-22] Out:[6]
8: SDF.ProcessSizedElementsAndRestrictions[periodic.sequenceGenDoFn] UID:8 Out:[7]
9: PCollection[pcollection-12-truncate-output] Out:[8]
10: SDF.TruncateSizedRestriction[periodic.sequenceGenDoFn] UID:10 Out:[9]
1: DataSource[S[ptransform-8@localhost:12371], 0] Out:10 Coder:W;coder-38<KV;coder-39<KV;coder-40<LP;coder-41<R[periodic.SequenceDefinition]>,KV;coder-42<offsetrange.Restriction[offsetrange.Restriction;c9];coder-43,bool;coder-44>>,double;coder-45>>!GWC 
	caused by:
panic: runtime error: index out of range [2] with length 2 goroutine 70 [running]:
runtime/debug.Stack()
	/usr/lib/go/src/runtime/debug/stack.go:24 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0xa5
panic({0xfb1ae0, 0xc000137290})
	/usr/lib/go/src/runtime/panic.go:884 +0x213
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*trInvoker).Invoke(0xc0001dbce0?, {0x11f39f0?, 0xc0000415c0?}, {0xf8c160?, 0xc000011e60?}, 0xc0001dbce0?)
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:320 +0x1b3
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*TruncateSizedRestriction).ProcessElement(0xc000041540, {0x11f39f0, 0xc0000415c0}, 0xc0001dba40, {0x0, 0x0, 0x0})
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/sdf.go:345 +0x118
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process(0xc000497040, {0x11f39f0, 0xc0000415c0})
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/datasource.go:189 +0x510
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute.func2({0x11f39f0?, 0xc0000415c0?})
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:131 +0x42
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x11f39f0?, 0xc0000415c0?}, 0x4172ed?)
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:62 +0x6c
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0004b39e0, {0x11f39f0, 0xc0000415c0}, {0xc0001370f8, 0x13}, {{0x11ebaa0?, 0xc000111f20?}, {0x1208f98?, 0xc00038c7e0?}})
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:130 +0x3da
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc000258000, {0x11f38d8, 0xc0004d93e0}, 0xc000338370)
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:407 +0xab7
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main.func4({0x11f38d8, 0xc0004d93e0}, 0xc000338370)
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:193 +0x19d
created by github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main
	/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:212 +0xfed
"

@hnnsgstfssn hnnsgstfssn requested a review from lostluck March 20, 2023 17:12
@lostluck
Copy link
Contributor

The drain panic is outside of your code. From here which means it's invoking a Drain for a KV for some reason... But sequenceGenDoFn isn't a KV, it does have restrictions though.

OK the thing that's failing here is that a recent change to support context.Context as a parameter into all the SDF methods changed how this one executes.

In particular, it moved from determining what to pass into the function based on its signature to based on what's input. But that's not how this Transform needs to work.

"old code"
https://github.com/apache/beam/blame/9fb9d2cb415e903e4222021f8848bf4420157b2a/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go#L337

Ooof. This is why I want prism to implement all of Beam... including Drains, so this stuff can be tested without involving Dataflow.


The fix is to change the trInvoker method a little, since it's generally passing in the wrong thing anyway. https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L296

The problem we're running into is deciding whether the Main Input is a KV or not, so the output can be expanded out. This is something unique to the Go SDK since we didn't have generics to lean on in the original design to have a "real" KV.

Basically, we are seeing a *FullValue{ Elm: Input, Elm2: Restriction}.

So the right place to change is actually what's passed into the invoker in exec/sdf.go.

There's already handling code there... Let me think about this for a bit, since I need to follow what's happening in the code...

@lostluck
Copy link
Contributor

It's very pleasing that we can get coverage for this! https://app.codecov.io/gh/apache/beam/pull/25808

Can't help on the drain side yet, but we'll get there.

@lostluck
Copy link
Contributor

OK, I think if this is patched in, it should resolve the drain issue. (Specifically the sdf.go file, the test doesn't need changing).

#25908

If that works, please review/approve/lgtm that chagne and I'll merge it in, and we can unblock Periodic impulse!

@lostluck
Copy link
Contributor

lostluck commented Mar 20, 2023

Other PR with fix is now merged in. Let me know when the test changes are made and I'll do one last pass. If it's good then, I'll merge this in!

Thanks again for your contribution and patience.

@hnnsgstfssn hnnsgstfssn requested a review from lostluck March 21, 2023 00:04
@hnnsgstfssn
Copy link
Contributor Author

I believe that the last points raised should now be addressed. Please take one last pass when you have a chance, and a big thanks for your input and help on this- much appreciated :)

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one suggestion around documentation of SequenceDefinition and a helper constructor function for it and otherwise LGTM.

Thanks again!

sdks/go/pkg/beam/transforms/periodic/periodic.go Outdated Show resolved Hide resolved
Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much! This is very exciting.

@lostluck lostluck merged commit 16cd2a9 into apache:master Mar 23, 2023
@Abacn Abacn mentioned this pull request Mar 24, 2023
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants