-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-10976] Bundle finalization: Harness and some exec changes #16980
[BEAM-10976] Bundle finalization: Harness and some exec changes #16980
Conversation
Codecov Report
@@ Coverage Diff @@
## master #16980 +/- ##
==========================================
+ Coverage 73.05% 73.74% +0.68%
==========================================
Files 660 667 +7
Lines 86785 87356 +571
==========================================
+ Hits 63402 64420 +1018
+ Misses 22384 21829 -555
- Partials 999 1107 +108
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
R: @lostluck |
@@ -220,6 +226,49 @@ func (n *ParDo) FinishBundle(_ context.Context) error { | |||
return nil | |||
} | |||
|
|||
func (n *ParDo) FinalizeBundle(ctx context.Context) error { | |||
failedIndices := []int{} | |||
for idx, bfc := range n.bf.callbacks { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason the callbacks have to live in each ParDo, rather than an independant structure that's referenced in the ParDos at Plan construction time, that can be swapped out/cleared when the plan is re-used/garbage collected?
That would avoid the plumbing creep where every implementation of Unit needs new methods just to get down to the ParDos that may have callbacks.
You'll note that in Go, that it's not a great idea to simply add new methods to an interface, as it will break everything that's trying to implement the interface.
I'll also point out that we have a pattern that does this, like how we collect metrics from PCollections, by pre-collecting all the PCollections in a plan rather than plumbing a new path all the way through: See
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/plan.go#L59
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L82
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely agree that this is a better approach and will update accordingly. With that said:
You'll note that in Go, that it's not a great idea to simply add new methods to an interface, as it will break everything that's trying to implement the interface.
Something I'm a little unclear on is what guarantees we actually make about compatibility outside of the beam
package. Are we basically saying that we can't change any of our internal interfaces/function definitions? As best I could tell, Unit
isn't something we actively expose to users through any normal path. I know we don't at this time have a true concept of internal for most of our stuff, but I'm curious how we make decisions on where its ok to make changes like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated! Please take another look when you have a chance. I did the same set of manual testing with these changes integrated into the other ones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still need to look at the new code but to answer your question:
WRT Compatibility, as a rule, we no longer make breaking changes to the main beam package.
Attached to this are things the main beam package make use of or that users use to build their own DoFns (eg the typex.Window interface, event time etc.)
We strongly avoid making changes to exported surfaces of our internal packages, but can if it's the best way to do things.
We have a statement about compatibility in the release announcement but this too is woefully non-specific. It would be best if we didn't force this on our users, but that's a larger set of refactorings.
In this specific case, extending the Unit interface would at worse also require changes to the direct runner implementations. Technically the direct runner is the only reason that interface is exposed at all (and frankly for most of the exec package anyway...) (This is another reason I'm not a fan of our direct runner).
As a rule for Go, adding methods to an interface is generally (but not always) the Wrong Move. It's inherently a breaking change. Interfaces are best when they're small. Especially if the cases are optional like here, it's often better to define an additional interface and type assert to check if it implements it. Now that alone wouldn't solve this plumbing necessary here.
In this case, it's a category error: One is treating the interface like a Java abstract base class, where we can have a default implementation, propagated to all subclasses automatically. However, outside of Type Embedding, there's no such feature in Go, and type embedding itself needs careful design to make use of it. In particular since it plays havok with trying to do things with reflection on the embedded type...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to take a second look at the actual finalization logic tomorrow, but overall this is looking good.
Don't forget to update the design doc with implementation details, for later reference.
We may want to actually cobble together a contribution guide for different kinds of features, so we don't keep running into the same implementation trickiness...
R: @youngoli - adding you in Rebo's absence. I think we were pretty close on this PR anyways, I've addressed all of rebo's initial comments |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a bunch of comments on this, but they're all small things. I think you and Robert already polished up the core architecture on this, that all looked good to me.
Thanks for the review @youngoli - several good catches in there. I think all your feedback should be addressed, please take another look when you have a chance! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All that's left are some nits. I'm being a bit pedantic here, but these small documentation nits tend to get ignored once they're merged in so I'd prefer to get them out of the way now.
Co-authored-by: Daniel Oliveira <[email protected]>
177691b
to
36e1450
Compare
…he#16980) * Bundle finalization harness side changes * Add testing * Iterate over pardos directly * Track bundlefinalizer in plan.go not pardo * Remove outdated test * Fix pointer issue * Update todos to reference jiras * Cleanup from feedback * Doc nit Co-authored-by: Daniel Oliveira <[email protected]> * GetExpirationTime comment Co-authored-by: github-actions <[email protected]> Co-authored-by: Daniel Oliveira <[email protected]>
…r transaction boundaries and transaction ID ordering. * Added integration test for transaction boundaries and transaction ID ordering. Made small fixes in ordered by key integration test. * [BEAM-9150] Fix beam_PostRelease_Python_Candidate (python RC validation scripts) (#16955) * Use default context output rather than outputWithTimestamp for ElasticsearchIO * Palo Alto case study - fix link * [BEAM-12777] Removed current docs version redirect * Merge pull request #16850: [BEAM-11205] Upgrade Libraries BOM dependencies to 24.3.0 * Update GCP Libraries BOM version to 24.3.0 * Update associated dependencies * Merge pull request #16484 from [BEAM-13633] [Playground] Implement method to get a default example for each SDKs * Implement method to get a default example for each SDKs * Add error handling * Added saving of precompiled objects catalog to cache at the server startup * Added caching of the catalog only in case of unspecified SDK * Update regarding comments * Update regarding comments * Simplified logging regarding comment * Get defaultExamplePath from the corresponding config * Refactoring code * Add the `link` field to response * Remove gjson; Resolve conflicts; * Refactoring code * Getting default precompiled object from cache * Refactoring code * Added saving of precompiled objects catalog to cache at the server startup * Added caching of the catalog only in case of unspecified SDK * Update regarding comments * Update regarding comments * Simplified logging regarding comment * Updates regarding comments * Update for environment_service_test.go * Get default example from catalog * GetCatalogFromCacheOrStorage method * Update licenses * Update licenses; Resolve conflicts; * [BEAM-13633][Playground] Change saving default precompiled objects to the cache * [BEAM-13633][Playground] Change logic of saving and receiving info about default precompiled objects * [BEAM-13633][Playground] Separate for each sdk * [BEAM-13633][Playground] regenerate proto files * Add code of the default example to response * Revert "Add code of the default example to response" This reverts commit da6baa0. * Refactoring code * Refactoring code; Add test; * Edit commentaries * Refactoring code * Add bucket name to methods Co-authored-by: Artur Khanin <[email protected]> Co-authored-by: AydarZaynutdinov <[email protected]> Co-authored-by: Pavel Avilov <pavel.avilov> * Add 2022 events blog post (#16975) * Clean up Go formatter suggestions (#16973) * [BEAM-14012] Add go fmt to Github Actions (#16978) * [BEAM-13911] Add basic tests to Go direct runner. (#16979) * [BEAM-13960] Add support for more types when converting from between row and proto (#16875) * Adding schema support. * Addressing feedback. * Bump org.mongodb:mongo-java-driver to 3.12.10 * [BEAM-13973] Link Dataproc Flink master URLs to the InteractiveRunner when FlinkRunner is used (#16904) * [BEAM-13925] Turn pr bot on for go prs (#16984) * [BEAM-13964] Bump kotlin to 1.6.x (#16882) * [BEAM-13964] Bump kotlin to 1.6.x * [BEAM-13964] Bump kotlin to 1.6.x * [BEAM-13964] fix warnings in Kotlin compilation * Skipping flaky sad-path tests for Spanner changestreams * Merge pull request #16906: [BEAM-13974] Handle idle Storage Api streams * Merge pull request #16562 from [BEAM-13051][D] Enable pylint warnings (no-name-in-module/no-value-for-parameter) * [BEAM-13051] Pylint no-name-in-module and no-value-for-parameter warnings enabled * [BEAM-13051] Fixed no-value-for-parameter warning for missing default values * [BEAM-13051] Fixed parameters warnings * [BEAM-13925] A couple small pr-bot bug fixes (#16996) * [BEAM-14029] Add getter, setter for target maven repo (#16995) * [BEAM-13903] Improve coverage of metricsx package (#16994) * [BEAM-13892] Improve coverage of avroio package (#16990) * [adhoc] Prepare aws2 ClientConfiguration for json serialization and cleanup AWS Module (#16894) * [adhoc] Prepare aws2 ClientConfiguration and related classes for json serialization and cleanup AWS Module * Merge pull request #16879 from [BEAM-12164] Add javadocs to SpannerConfig * Add tests and config for retry * lint * add tests * lint * Delete tests not passing * Rebase on apache beam master * review changes * review changes * add javadocs to SpannerConfig * revert * add full stops * [Cleanup] Update pre-v2 go package references (#17002) * [BEAM-13885] Add unit tests to window package (#16971) * Merge pull request #16891 from [BEAM-13872] [Playground] Increase test coverage for the code_processing package * Increase test coverage for the code_processing package * Refactoring code * Add test cases with mock cache * Add test for processCompileSuccess method * Update test names * Refactoring code * Merge pull request #16912 from [BEAM-13878] [Playground] Increase test coverage for the fs_tool package * Increase test coverage for the fs_tool package * Rename folder * Remove useless variable * Update test names * Merge pull request #16946 from [BEAM-13873] [Playground] Increase test coverage for the environment package * Increase test coverage for the environment package * Update test names * Refactoring code * Add bucket name to method * [BEAM-13999] playground - support vertical orientation for graph * [BEAM-13951] Update mass_comment.py list of Run commands (#16889) * BEAM-13951: Sort run command list * BEAM-13951: Update list * fixup! BEAM-13951: Update list * [BEAM-10652] Allow Clustering without Partition in BigQuery (#16578) * [BEAM-10652] removed check that blocked clustering without partitioning * [BEAM-10652] allow clustering without requiring partition * newline * added needed null * remove testClusteringThrowsWithoutPartitioning * update clustering * formatting * now compiles * passes spotless * update doc * focus on single test * spotless * run all ITs * spotless * testing with time partitioning * checking * set clustering independant of partitioning * remove timepart from it * spotless * removed test * added TODO * removed block of unneded code/comment * remove override to v3 coder * Spotless cleanup * re-add override to v3 coder * spotless * adding checksum ( wrong value ) * added needed query var * use tableName as var * DATASET NAME * project name in query * update query * change tests * remove unneeded imports * remove rest of forgotten * add rows * 16000 bytes * bigint * streaming test * spotless * methods * end stream * stream method and naming * nostream * streaming * streamingoptions * without streaming example * string column instead of date -- related to BEAM-13753 * mor strings * spotless * revert, only DEFAULT and FILE_LOADS * [BEAM-13857] Add K:V flags for expansion service jars and addresses to Go ITs. (#16908) Adds functionality for running jars to the Go integration test framework, and uses this functionality to implement handling of K:V flags for providing expansion service jars and addresses to the test framework. This means that tests can simply get the address of an expansion service with the appropriate label, and this feature will handle running a jar if necessary, or just using the passed in endpoint otherwise. * BEAM-14011 fix s3 filesystem multipart copy * Merge pull request #16842 from [BEAM-13932][Playground] Container's user privileges * [BEAM-13932][Playground] Change Dockerfiles * [BEAM-13932][Playground] Update proxy and permissions for the container's user * [BEAM-13932][Playground] Update permissions for the container's user for scio * Doc updates and blog post for 2.37.0 (#16887) * Doc updates and blog post for 2.37.0 * Add BEAM-13980 to known issues * Update dates * Drop known issue (fix cherrypicked) * Add license * Add missing # * Remove resolved issue in docs + update class path on sample (#17018) * [BEAM-14016] Fixed flaky postcommit test (#17009) Fixed SpannerWriteIntegrationTest.test_spanner_update by fixing the metric exporter usage in spannerio. * [BEAM-13925] months in date constructor are 0 indexed * [BEAM-13947] Add split() and rsplit(), non-deferred column operations on categorical columns (#16677) * Add split/rsplit; Need to refactor regex * Support Regex; Refactor tests * Remove debugger * fix grammar * Fix passing regex arg * Reorder imports * Address PR comments; Simplify kwargs * Simplify getting columns for split_cat * Update doctests to skip expand=True operations * Fix missing doctest * py: Import beam plugins before starting SdkHarness * BEAM-14026 - Fixes bug related to Unnesting nested rows in an array (#16988) * Suggested changes to handle nested row in an array * Beam-14026 Suggested changes to handle nested row in an array * Beam-14026 Enhanced by segregating the code from getBaseValues enhanced test case and example. * Beam-14026 The code is moved from Row to avoid impact to the public interface. The code is moved to BeamUnnestRel.java since its the caller class. The Example code was duplicate, hence dropped. build.gradle updated with the removal of example code. * Remove resolved issue in notebook * Bump numpy bound to include 1.22 and regenerate container deps. * [BEAM-13925] Add ability to get metrics on pr-bot performance (#16985) * Add script to get metrics on pr-bot performance * Respond to feedback * fix bad condition * [BEAM-11085] Test that windows are correctly observed in DoFns * Give pr bot write permissions on pr update * Adding a logical type for Schemas using proto serialization. (#16940) * BEAM-13765 missing PAssert methods (#16668) * [BEAM-13909] improve coverage of Provision package (#17014) * improve coverage of provision package * updated comments * [BEAM-14050] Update taxi.go example instructions * Merge pull request #17027: [BEAM-11205] Upgrade GCP Libraries BOM dependencies to 24.4.0 * [BEAM-13709] Inconsistent behavior when parsing boolean flags across different APIs in Python SDK (#16929) * Update dataflow API client. * Instructions for updating apitools generated files. * [BEAM-10976] Bundle finalization: Harness and some exec changes (#16980) * Bundle finalization harness side changes * Add testing * Iterate over pardos directly * Track bundlefinalizer in plan.go not pardo * Remove outdated test * Fix pointer issue * Update todos to reference jiras * Cleanup from feedback * Doc nit Co-authored-by: Daniel Oliveira <[email protected]> * GetExpirationTime comment Co-authored-by: github-actions <[email protected]> Co-authored-by: Daniel Oliveira <[email protected]> * Merge pull request #16976 from [BEAM-14010] [Website] Add Playground section to the Home page * [BEAM-14010] [Website] Add Playground section to the Home page * Update button to "Try Playground" Co-authored-by: Aydar Zainutdinov <[email protected]> * [BEAM-14010] [Website] change button name * [BEAM-14010] [Website] align header to center * [BEAM-14010] [Website] change link Co-authored-by: Alex Kosolapov <[email protected]> Co-authored-by: Aydar Zainutdinov <[email protected]> * [BEAM-12447] Upgrade cloud build client and add/cleanup options (#17032) * Merge pull request #17036 from [BEAM-12164] Convert all static instances to be transient in the connector in order to enable concurrent testing * Convert all static instances to be transient in the connector in order to enable concurrent testing * Initialized fields to null * nullness * Suppress uninitialized warnings * Remove resetting dao factory fields in SpannerChangeStreamErrorTest.java * Add validation package * fix variable reference (#16991) * Committed changes * Print more logging * More logging * Made pipelines streaming * Made small fixes * Small fixes * Ran spotless Apply Co-authored-by: emily <[email protected]> Co-authored-by: egalpin <[email protected]> Co-authored-by: Aydar Farrakhov <[email protected]> Co-authored-by: Miguel Hernandez <[email protected]> Co-authored-by: Benjamin Gonzalez <[email protected]> Co-authored-by: Pavel Avilov <[email protected]> Co-authored-by: Artur Khanin <[email protected]> Co-authored-by: AydarZaynutdinov <[email protected]> Co-authored-by: Ahmet Altay <[email protected]> Co-authored-by: Jack McCluskey <[email protected]> Co-authored-by: Robert Burke <[email protected]> Co-authored-by: laraschmidt <[email protected]> Co-authored-by: Alexey Romanenko <[email protected]> Co-authored-by: Victor <[email protected]> Co-authored-by: Danny McCormick <[email protected]> Co-authored-by: Masato Nakamura <[email protected]> Co-authored-by: Pablo Estrada <[email protected]> Co-authored-by: reuvenlax <[email protected]> Co-authored-by: Miguel Hernandez <[email protected]> Co-authored-by: Moritz Mack <[email protected]> Co-authored-by: Zoe <[email protected]> Co-authored-by: Brian Hulette <[email protected]> Co-authored-by: brucearctor <[email protected]> Co-authored-by: Daniel Oliveira <[email protected]> Co-authored-by: sp029619 <[email protected]> Co-authored-by: David Cavazos <[email protected]> Co-authored-by: Ning Kang <[email protected]> Co-authored-by: github-actions <[email protected]> Co-authored-by: Andy Ye <[email protected]> Co-authored-by: Rahul Iyer <[email protected]> Co-authored-by: abhijeet-lele <[email protected]> Co-authored-by: Valentyn Tymofieiev <[email protected]> Co-authored-by: Marcin Kuthan <[email protected]> Co-authored-by: Ritesh Ghorse <[email protected]> Co-authored-by: Jack McCluskey <[email protected]> Co-authored-by: ansh0l <[email protected]> Co-authored-by: Anand Inguva <[email protected]> Co-authored-by: Robert Bradshaw <[email protected]> Co-authored-by: Daniel Oliveira <[email protected]> Co-authored-by: bullet03 <[email protected]> Co-authored-by: Alex Kosolapov <[email protected]> Co-authored-by: Yichi Zhang <[email protected]> Co-authored-by: Nancy Xu <[email protected]>
This is part 1 of 2 to add bundle finalization support to the Go Sdk.
Summary of Overall Changes
Bundle finalization enables a DoFn to perform side effects after a runner has acknowledged that it has durably persisted the output. Right now, Java and Python support bundle finalization by allowing a user to register a callback function which is invoked when the runner acknowledges that it has persisted all output, but Go does not have any such support. This is part of a larger change to add support to the Go Sdk as outlined in this design doc.
I've completed all the changes (sans some better testing on the parts not in this PR), you can see the remaining files in the diff here - https://github.com/apache/beam/compare/master...damccorm:users/damccorm/bundle-finalization?expand=1
Summary of Changes in this PR
This PR adds most of the non user facing changes needed to enable this change. There are basically 2 major components:
Additional testing done
On top of the units added, using my full implementation (not just this partial one) I also was able to run an E2E example on Dataflow (FWIW, not all runners have finalization support but I found that Dataflow does). In that example, I hijacked the wordcount example and added a bundleFinalizer to write a file to persistent storage for each line that had at least 3 words (chosen pretty randomly to minimize the chances of collisions). I'll omit the whole sample since its long, but it produced a bunch of files like this:
This indeed ran after the other data was persisted
Next Steps
After this, I'll add the user facing functionality in a follow up pr, along with testing for that and 1+ integration test for the whole flow.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.