-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Disable KafkaIO SDF while it is tested and fixed #22261
Conversation
* <h1>Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the | ||
* consumer to start from scratch. See <a | ||
* href="https://github.com/apache/beam/issues/21730">this</a>. Current workaround is to use | ||
* --experimental_option=use_deprecated_read to use the Unbounded implementation</h1> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will work for Dataflow Runner v2 Java pipelines. Can you try ?
"use_unbounded_sdf_wrapper" should work but I only tried it for x-lang.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do once your change is merged
Run Spotless Precommit |
Run Java PreCommit |
Run PythonDocs PreCommit |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Codecov Report
@@ Coverage Diff @@
## master #22261 +/- ##
=======================================
Coverage 74.17% 74.17%
=======================================
Files 706 704 -2
Lines 93190 93159 -31
=======================================
- Hits 69124 69105 -19
+ Misses 22798 22787 -11
+ Partials 1268 1267 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. |
Assigning reviewers. If you would like to opt out of this review, comment R: @pabloem for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Run Portable_Python PreCommit |
Run Python PreCommit |
Run PythonDocs PreCommit |
pydoc precommit failure is due to
|
codecov is exclusively unrelated changes, so there is no need to interact with it |
R: @chamikaramj |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
sdks/python/apache_beam/io/kafka.py
Outdated
@@ -76,16 +76,33 @@ | |||
|
|||
For more information specific to Flink runner see: | |||
- https://beam.apache.org/documentation/runners/flink/ | |||
|
|||
Reading via Kafka SDF is currently broken, and will cause the pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove Python updates since we made UnboundedSource wrapped SDF Kafka the default for now: #22286
* <h1>Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the | ||
* consumer to start from scratch. See <a | ||
* href="https://github.com/apache/beam/issues/21730">this</a>. Current workaround is to use | ||
* --experimental_option=use_unbounded_sdf_wrapper to use the Unbounded implementation</h1> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"For runners that require SDF, current workaround is to use ..."
Also, pls confirm that this works for Java pipelines that use Dataflow Runner v2.
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. LGTM other than one comment.
@@ -706,27 +706,12 @@ class JavaJarExpansionService(object): | |||
This can be passed into an ExternalTransform as the expansion_service | |||
argument which will spawn a subprocess using this jar to expand the | |||
transform. | |||
|
|||
Args: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't revert changes to external.py since this feature and doc updates are useful in general.
…kafka-sdf-comment
…kafka-sdf-comment
…g an experimental option
Run Python_PVR_Flink PreCommit |
Run Java PreCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks.
Run Python_PVR_Flink PreCommit |
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder)); | ||
} | ||
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder)); | ||
// Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a valid reason to completely disable Kafka SDF. What if the consumer is totally fine with starting from scratch? I have business need that requires scanning time ranges - that is only being supported by SDF - without caring about any previous consumer offset. Disable it ONLY if group.id
is provided.
Lines 93 to 94 in 367173f
START_READ_TIME, | |
STOP_READ_TIME(SDF), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically just add to the whole chunk of code you commented here a check like this:
|| ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
|| getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null
|| compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is temporary. We primarily want to make sure that a new user won't run into this problem. We intend to fix this as rapidly as possible. If you have a less typical use case, that will still work on existing versions of Beam, while we try to get this fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this gets merged into master and gets released it's not temporary. Totally removing the SDF support is a breaking change. It should be as minimally breaking as possible. I have shown one precondition that indicates it works just fine even in this bugged state. There could be even more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have this use case, there are probably others. We won't disable this at all then
See #22303
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.