-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#23106] Add periodic.Sequence and periodic.Impulse transforms to Go SDK #25808
Conversation
The new transforms extends support for the slowly updating side input pattern [1] as tracked in [2]. An attempt to mirror the logic of the Python implementation [3] has been made with minor idiomatic changes. Java [4][5] and Python [6] have influenced the documentation and naming. [1] https://beam.apache.org/documentation/patterns/side-inputs/ [2] apache#23106 [3] https://github.com/apache/beam/blob/v2.46.0/sdks/python/apache_beam/transforms/periodicsequence.py#L59 [4] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicSequence.html [5] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicImpulse.html [6] https://beam.apache.org/releases/pydoc/2.46.0/apache_beam.transforms.periodicsequence.html?highlight=periodicimpulse#module-apache_beam.transforms.periodicsequence
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a first rough draft to use as a basis for initial discussion. I'm unsure how to properly test this without some guidance, and currently it does not drain properly.
} | ||
|
||
type sequenceGenDoFn struct { | ||
now func() time.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While playing around locally I've added this way to mock the time.Now function. If it ends up being unused I'm happy to drop it if that makes more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will work in the direct runner, but that's because the direct runner won't successfully run the example or anything local. It would be better to fold things into the sequence definition to enable appropriate testing behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed I struggled with the direct runner, but have now added two working tests that uses the prism
runner. I am struggling to understand how to fold this into the sequence definition as you suggest, to properly test it. Instead I went ahead and removed this entirely and the DoFn uses time.Now
.
I am happy to leave it there for now. If you want to expand on how to fold it into the definition and test it I am happy to add this as well.
The draining failure does not happen on latest stable Beam 2.45.0, but does happen on later versions. As of writing 2.46.0 and 2.47.0.dev had the example pipeline fail to drain. |
Note that the example was added mainly for illustration of the code used to manually test this PR on Dataflow. Let me know if it is useful to keep it, or if I should drop it from the PR. |
Considering it runs on latest stable as of writing, I'll mark this as ready for review. I'm still expecting to potentially add tests, if there is a good way to test these streaming features, but it may be acceptable to have manually verified the runs example successfully on Dataflow. |
R: @lostluck |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
Thanks for the contribution! This is a very useful transform. For the drain issue, Python and Java implementation also did not drain until recent fix (#23765 #25716). Basically the transform should (i) update watermark when advance and (ii) truncate immediately when truncate is called (happens in drain) |
Codecov Report
@@ Coverage Diff @@
## master #25808 +/- ##
==========================================
- Coverage 72.80% 72.79% -0.02%
==========================================
Files 775 777 +2
Lines 102948 103041 +93
==========================================
+ Hits 74956 75006 +50
- Misses 26537 26574 +37
- Partials 1455 1461 +6
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 13 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
Ah! I didn't realize those fixes [1][2] for Python were recent- thanks for highlight it. I tried including both (i) and (ii) here i.e. implementing |
Thanks for your patience! I'm still getting caught up from being on vacation. I'm intending to get this review done today or tomorrow. |
This is failing our Staticheck action. Not hard to fix at least. https://github.com/apache/beam/actions/runs/4392151772/jobs/7693061461?pr=25808
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! Thanks! I do have comments. Thanks for such a valuable first contribution to Beam! :)
I have a few commons on example formatting and noting that we should use the new Prism runner to be able to have unit test pipelines for periodic. I think I've covered your other questions too.
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go
Outdated
Show resolved
Hide resolved
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go
Outdated
Show resolved
Hide resolved
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go
Show resolved
Hide resolved
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go
Outdated
Show resolved
Hide resolved
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), | ||
beam.ParDo( | ||
s, | ||
update, | ||
periodic.Impulse( | ||
s, | ||
startTime, | ||
endTime, | ||
periodicSequenceInterval, | ||
), | ||
), | ||
beam.Trigger(trigger.Repeat(trigger.Always())), | ||
beam.PanesDiscard(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nesting here, has the added deficit of hiding the core of the example.
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), | |
beam.ParDo( | |
s, | |
update, | |
periodic.Impulse( | |
s, | |
startTime, | |
endTime, | |
periodicSequenceInterval, | |
), | |
), | |
beam.Trigger(trigger.Repeat(trigger.Always())), | |
beam.PanesDiscard(), | |
) | |
// Generate an impulse every period. | |
periodicImp := periodic.Impulse(s, startTime, endTime, periodicSequenceInterval) | |
// Use the impulse to trigger some other ordinary transform. | |
updatedImp := beam.ParDo(s, update, periodicImp) | |
// Window for use as a side input, to allow the input to change with windows. | |
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), | |
updatedImp, | |
beam.Trigger(trigger.Repeat(trigger.Always())), | |
beam.PanesDiscard(), | |
) |
I'll note that the window for the side input is usually going to be larger than the window for the main processing. While this isn't wrong, the usualy goal around the pattern is a situation like allowing files that change hourly get read in once each hour, and have the more frequent data able to re-use the cached read in file. (Granted, this behavior isn't yet enabled by default in the Go SDK, but that's an aside).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of making the side input window larger. Do you think it is worth making that change, taking another configuration to specify the side input window size?
I'm also curious to know what you mean by
this behavior isn't yet enabled by default in the Go SDK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wasn't able to apply this suggestion after having changed the periodic.Impulse
signature, but have applied the same change in a separate commit. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WRT the cache:
Beam's execution abstraction, the FnAPI, has provisions for caching data cross bundle from the StateAPI on the SDK side, in order to avoid repeated deserialization, and additional round trips to the primary store of the runner to fetch state data. Side inputs also come across the StateAPI. In particular, very valuable for streaming jobs, as typically a single "SDK harness" is usually responsible for the same key all the time, so for tight windows it would look up the same data from the side inputs.
WRT not applying after the impulse change. Makes sense since an int64 was being received from the "update" function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't address the larger side input window. Since it's an example, we don't need to over configure things. I'd make the window 5 times larger to demonstrate the paradigm, and an explicit comment that's what the larger window is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to add a short unit test if using the prism
runner directly.
While the runner isn't fully complete yet, it does run and execute ProcessContinuation transforms and watermarks!
It just doesn't do the splitting just yet, or actually "wait" for any process continuations at the moment. But when the "sequence" is done, it will terminate, so we can add a test with period of a second, a duration of a minute, and then count that we're getting 60 elements out of the transform. (Small risk of getting 59 instead, as a flake...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great! I couldn't get it working on the direct runner, but didn't try using the new prism explicitly. Have now added two tests TestImpulse
and TestSequence
and it looks like that's working. Let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The direct runner (and basically all non-portable runners), are not great, since they mis-align expectations for users when they move to executing on a "real" runner, like Flink or Dataflow. There's also no "spec" about what a Direct Runner should do to properly test things, so the Go Direct Runner is missing a number of features that the Java and Python ones use.
Essentially Prism is going to replace the Go Direct Runner as the default runner for the Go SDK at least, and hopefully make testing and local runs for all facets of Beam easier. The prism Readme has a bunch of the vision, and desired goals, but I gave a talk at last year's Beam Summit, about the motivations, especially in all the "hidden" bits of beam that users usually don't interact with, but are affected by.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's great! I couldn't get it working on the direct runner, but didn't try using the new prism explicitly. Have now added two tests TestImpulse
and TestSequence
and it looks like that's working. Let me know what you think.
} | ||
|
||
type sequenceGenDoFn struct { | ||
now func() time.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed I struggled with the direct runner, but have now added two working tests that uses the prism
runner. I am struggling to understand how to fold this into the sequence definition as you suggest, to properly test it. Instead I went ahead and removed this entirely and the DoFn uses time.Now
.
I am happy to leave it there for now. If you want to expand on how to fold it into the definition and test it I am happy to add this as well.
sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go
Outdated
Show resolved
Hide resolved
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), | ||
beam.ParDo( | ||
s, | ||
update, | ||
periodic.Impulse( | ||
s, | ||
startTime, | ||
endTime, | ||
periodicSequenceInterval, | ||
), | ||
), | ||
beam.Trigger(trigger.Repeat(trigger.Always())), | ||
beam.PanesDiscard(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of making the side input window larger. Do you think it is worth making that change, taking another configuration to specify the side input window size?
I'm also curious to know what you mean by
this behavior isn't yet enabled by default in the Go SDK
Co-authored-by: Robert Burke <[email protected]>
The example is still not able to successfully drain, failing with the following message. I'm unsure why, since as far as I can tell it is both (i) updating the watermark when advancing and (ii) truncating immediately.
|
The drain panic is outside of your code. From here which means it's invoking a Drain for a KV for some reason... But sequenceGenDoFn isn't a KV, it does have restrictions though. OK the thing that's failing here is that a recent change to support context.Context as a parameter into all the SDF methods changed how this one executes. In particular, it moved from determining what to pass into the function based on its signature to based on what's input. But that's not how this Transform needs to work. Ooof. This is why I want prism to implement all of Beam... including Drains, so this stuff can be tested without involving Dataflow. The fix is to change the trInvoker method a little, since it's generally passing in the wrong thing anyway. https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L296 The problem we're running into is deciding whether the Main Input is a KV or not, so the output can be expanded out. This is something unique to the Go SDK since we didn't have generics to lean on in the original design to have a "real" KV. Basically, we are seeing a *FullValue{ Elm: Input, Elm2: Restriction}. So the right place to change is actually what's passed into the invoker in exec/sdf.go. There's already handling code there... Let me think about this for a bit, since I need to follow what's happening in the code... |
It's very pleasing that we can get coverage for this! https://app.codecov.io/gh/apache/beam/pull/25808 Can't help on the drain side yet, but we'll get there. |
OK, I think if this is patched in, it should resolve the drain issue. (Specifically the sdf.go file, the test doesn't need changing). If that works, please review/approve/lgtm that chagne and I'll merge it in, and we can unblock Periodic impulse! |
Other PR with fix is now merged in. Let me know when the test changes are made and I'll do one last pass. If it's good then, I'll merge this in! Thanks again for your contribution and patience. |
I believe that the last points raised should now be addressed. Please take one last pass when you have a chance, and a big thanks for your input and help on this- much appreciated :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one suggestion around documentation of SequenceDefinition and a helper constructor function for it and otherwise LGTM.
Thanks again!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much! This is very exciting.
The new transforms extends support for the slowly updating side input pattern [1] as tracked in [2].
An attempt to mirror the logic of the Python implementation [3] has been made with minor idiomatic changes.
Java [4][5] and Python [6] have influenced the documentation and naming.
[1] https://beam.apache.org/documentation/patterns/side-inputs/
[2] #23106
[3] https://github.com/apache/beam/blob/v2.46.0/sdks/python/apache_beam/transforms/periodicsequence.py#L59
[4] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicSequence.html
[5] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicImpulse.html
[6] https://beam.apache.org/releases/pydoc/2.46.0/apache_beam.transforms.periodicsequence.html?highlight=periodicimpulse#module-apache_beam.transforms.periodicsequence
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.