Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable KafkaIO SDF while it is tested and fixed #22261

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2bf47c9
21730 add warning message
johnjcasey Jul 13, 2022
4ae0939
add python warnings
johnjcasey Jul 13, 2022
4ef4c00
add python warnings
johnjcasey Jul 13, 2022
8b20e60
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 13, 2022
476cc54
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 14, 2022
aeb6b6e
update comments
johnjcasey Jul 14, 2022
ac6c369
fix indentation for pydoc
johnjcasey Jul 14, 2022
1f55b1a
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 14, 2022
5b0e77a
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 14, 2022
ba3f3b8
update comment
johnjcasey Jul 15, 2022
b408303
remove python comments
johnjcasey Jul 15, 2022
fa33138
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 15, 2022
0d01374
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 15, 2022
d3997c0
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 15, 2022
6daf3a0
revert Cham's changes
johnjcasey Jul 15, 2022
7670410
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 15, 2022
d6345ad
Disable kafka sdf by adding "use_unbounded_sdf_wrapper"
johnjcasey Jul 15, 2022
e44d2dd
add link to higher level issue
johnjcasey Jul 15, 2022
bf4e978
unrevert changes to external.py
johnjcasey Jul 15, 2022
7ddb931
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 15, 2022
beb3383
exclude SDF specific tests
johnjcasey Jul 15, 2022
6ec8808
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 15, 2022
21af25c
temporarily remove kafka specific tests
johnjcasey Jul 15, 2022
0f8a586
change workaround to just use the unbounded reader, instead of settin…
johnjcasey Jul 20, 2022
4d3cb12
remove method temporarily
johnjcasey Jul 20, 2022
7b1bfae
run spotless
johnjcasey Jul 20, 2022
0cb617e
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,11 @@
* For any significant significant updates to this I/O connector, please consider involving
* corresponding code reviewers mentioned <a
* href="https://github.com/apache/beam/blob/master/sdks/java/io/kafka/OWNERS">here</a>.
*
* <h1>Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the
* consumer to start from scratch. See <a
* href="https://github.com/apache/beam/issues/21730">this</a>. Current workaround is to use
* --experimental_option=use_unbounded_sdf_wrapper to use the Unbounded implementation</h1>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"For runners that require SDF, current workaround is to use ..."

Also, pls confirm that this works for Java pipelines that use Dataflow Runner v2.

*/
@Experimental(Kind.SOURCE_SINK)
@SuppressWarnings({
Expand Down Expand Up @@ -1357,6 +1362,10 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
&& runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
}
LOG.warn(
"Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the consumer to start from scratch."
+ " See https://github.com/apache/beam/issues/21730 . "
+ "Current workaround is to use --experimental_option=use_unbounded_sdf_wrapper to use the Unbounded implementation");
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
}

Expand Down
23 changes: 22 additions & 1 deletion sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,33 @@

For more information specific to Flink runner see:
- https://beam.apache.org/documentation/runners/flink/

Reading via Kafka SDF is currently broken, and will cause the pipeline
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove Python updates since we made UnboundedSource wrapped SDF Kafka the default for now: #22286

to re-read all data on Kafka whenever restarted.
See https://github.com/apache/beam/issues/21730.
Current workaround is:
Start an expansion service with experiment "use_unbounded_sdf_wrapper"
#pylint: disable=line-too-long
java -jar sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.41.0-SNAPSHOT.jar 12345 --experiments=use_unbounded_sdf_wrapper
#pylint: enable=line-too-long

Update transforms in kafka.py to use this expansion service.
https://github.com/apache/beam/blob/2c8e7eb7a39cbe3a1678a5c6b8b3f8700d4d8706/sdks/python/apache_beam/io/kafka.py#L189

Use instructions in Kafka example to run this from a Git clone.
https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi
"""

# pytype: skip-file

import logging
import typing

from apache_beam.transforms.external import BeamJarExpansionService
from apache_beam.transforms.external import ExternalTransform
from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder

_LOGGER = logging.getLogger(__name__)

ReadFromKafkaSchema = typing.NamedTuple(
'ReadFromKafkaSchema',
[('consumer_config', typing.Mapping[str, str]),
Expand Down Expand Up @@ -166,6 +183,10 @@ def __init__(
Java Kafka Reader reads keys and values as 'byte[]'.
:param expansion_service: The address (host:port) of the ExpansionService.
"""
_LOGGER.warning(
"Reading from Kafka via SDF is currently broken, and the pipeline will"
"re-read all data on the topic whenever the pipeline is restarted. "
"See Kafka.py for workaround instructions.")
if timestamp_policy not in [ReadFromKafka.processing_time_policy,
ReadFromKafka.create_time_policy,
ReadFromKafka.log_append_time]:
Expand Down