Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable KafkaIO SDF while it is tested and fixed #22261

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2bf47c9
21730 add warning message
johnjcasey Jul 13, 2022
4ae0939
add python warnings
johnjcasey Jul 13, 2022
4ef4c00
add python warnings
johnjcasey Jul 13, 2022
8b20e60
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 13, 2022
476cc54
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 14, 2022
aeb6b6e
update comments
johnjcasey Jul 14, 2022
ac6c369
fix indentation for pydoc
johnjcasey Jul 14, 2022
1f55b1a
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 14, 2022
5b0e77a
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 14, 2022
ba3f3b8
update comment
johnjcasey Jul 15, 2022
b408303
remove python comments
johnjcasey Jul 15, 2022
fa33138
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 15, 2022
0d01374
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 15, 2022
d3997c0
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 15, 2022
6daf3a0
revert Cham's changes
johnjcasey Jul 15, 2022
7670410
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 15, 2022
d6345ad
Disable kafka sdf by adding "use_unbounded_sdf_wrapper"
johnjcasey Jul 15, 2022
e44d2dd
add link to higher level issue
johnjcasey Jul 15, 2022
bf4e978
unrevert changes to external.py
johnjcasey Jul 15, 2022
7ddb931
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 15, 2022
beb3383
exclude SDF specific tests
johnjcasey Jul 15, 2022
6ec8808
Merge remote-tracking branch 'origin/add-kafka-sdf-comment' into add-…
johnjcasey Jul 15, 2022
21af25c
temporarily remove kafka specific tests
johnjcasey Jul 15, 2022
0f8a586
change workaround to just use the unbounded reader, instead of settin…
johnjcasey Jul 20, 2022
4d3cb12
remove method temporarily
johnjcasey Jul 20, 2022
7b1bfae
run spotless
johnjcasey Jul 20, 2022
0cb617e
Merge remote-tracking branch 'origin/master' into add-kafka-sdf-comment
johnjcasey Jul 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdks/java/io/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ kafkaVersions.each {kv ->

filter {
excludeTestsMatching "*InStreaming"
if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1
// with SDF disabled, all dynamic partition tests will fail. See https://github.com/apache/beam/issues/22303
excludeTestsMatching "*DynamicPartitions"
// if (!(kv.key in sdfKafkaVersions)) excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementation;
import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityException;
import org.apache.beam.sdk.io.kafka.KafkaIOReadImplementationCompatibility.KafkaIOReadImplementationCompatibilityResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);

final KafkaIOReadImplementationCompatibilityResult compatibility =
KafkaIOReadImplementationCompatibility.getCompatibility(this);

// For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
// Kafka source, for example,
// * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
// legacy UnboundeSource being used.
// * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
// but will be wrapped by an SDF.
// * Some runners or selected features may not be compatible with SDF-based Kafka.
if (ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
|| ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "use_deprecated_read")
|| ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
|| compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
|| (compatibility.supports(KafkaIOReadImplementation.LEGACY)
&& runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
}
return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
// Reading from Kafka SDF is currently broken, as re-starting the pipeline will cause the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a valid reason to completely disable Kafka SDF. What if the consumer is totally fine with starting from scratch? I have business need that requires scanning time ranges - that is only being supported by SDF - without caring about any previous consumer offset. Disable it ONLY if group.id is provided.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically just add to the whole chunk of code you commented here a check like this:

          || ExperimentalOptions.hasExperiment(
              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
          || getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null
          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary. We primarily want to make sure that a new user won't run into this problem. We intend to fix this as rapidly as possible. If you have a less typical use case, that will still work on existing versions of Beam, while we try to get this fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this gets merged into master and gets released it's not temporary. Totally removing the SDF support is a breaking change. It should be as minimally breaking as possible. I have shown one precondition that indicates it works just fine even in this bugged state. There could be even more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have this use case, there are probably others. We won't disable this at all then

// consumer to start from scratch. See https://github.com/apache/beam/issues/21730.
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved
// https://github.com/apache/beam/issues/22303 is the task to try and fix Kafka SDF overall,
// as it appears to have other issues as well.
return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));

// final KafkaIOReadImplementationCompatibilityResult compatibility =
// KafkaIOReadImplementationCompatibility.getCompatibility(this);

// // For a number of cases, we prefer using the UnboundedSource Kafka over the new SDF-based
// // Kafka source, for example,
// // * Experiments 'beam_fn_api_use_deprecated_read' and use_deprecated_read will result in
// // legacy UnboundeSource being used.
// // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy UnboundeSource being used
// // but will be wrapped by an SDF.
// // * Some runners or selected features may not be compatible with SDF-based Kafka.
// if (ExperimentalOptions.hasExperiment(
// input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")
// || ExperimentalOptions.hasExperiment(
// input.getPipeline().getOptions(), "use_deprecated_read")
// || ExperimentalOptions.hasExperiment(
// input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
// || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
// || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
// && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
// return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, valueCoder));
// }
// return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, valueCoder));
}

private void warnAboutUnsafeConfigurations(PBegin input) {
Expand Down Expand Up @@ -1394,21 +1399,23 @@ public interface FakeFlinkPipelineOptions extends PipelineOptions {
void setCheckpointingInterval(Long interval);
}

private boolean runnerPrefersLegacyRead(PipelineOptions options) {
// Only Dataflow runner requires sdf read at this moment. For other non-portable runners, if
// it doesn't specify use_sdf_read, it will use legacy read regarding to performance concern.
// TODO(https://github.com/apache/beam/issues/20530): Remove this special check when we
// address performance issue.
if (ExperimentalOptions.hasExperiment(options, "use_sdf_read")) {
return false;
}
if (options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) {
return false;
} else if (ExperimentalOptions.hasExperiment(options, "beam_fn_api")) {
return false;
}
return true;
}
// private boolean runnerPrefersLegacyRead(PipelineOptions options) {
// // Only Dataflow runner requires sdf read at this moment. For other non-portable runners,
// if
// // it doesn't specify use_sdf_read, it will use legacy read regarding to performance
// concern.
// // TODO(https://github.com/apache/beam/issues/20530): Remove this special check when we
// // address performance issue.
// if (ExperimentalOptions.hasExperiment(options, "use_sdf_read")) {
// return false;
// }
// if (options.getRunner().getName().startsWith("org.apache.beam.runners.dataflow.")) {
// return false;
// } else if (ExperimentalOptions.hasExperiment(options, "beam_fn_api")) {
// return false;
// }
// return true;
// }

/**
* A {@link PTransformOverride} for runners to swap {@link ReadFromKafkaViaSDF} to legacy Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

import java.io.IOException;
Expand Down Expand Up @@ -66,24 +65,26 @@
})
public class KafkaIOExternalTest {

private void verifyKafkaReadComposite(
RunnerApi.PTransform kafkaSDFReadComposite, ExpansionApi.ExpansionResponse result)
throws Exception {
assertThat(
kafkaSDFReadComposite.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*Impulse.*")));
assertThat(
kafkaSDFReadComposite.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*GenerateKafkaSourceDescriptor.*")));
assertThat(
kafkaSDFReadComposite.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*ReadSourceDescriptors.*")));
RunnerApi.PTransform kafkaSdfParDo =
result.getComponents().getTransformsOrThrow(kafkaSDFReadComposite.getSubtransforms(2));
RunnerApi.ParDoPayload parDoPayload =
RunnerApi.ParDoPayload.parseFrom(kafkaSdfParDo.getSpec().getPayload());
assertNotNull(parDoPayload.getRestrictionCoderId());
}
// TODO: fix Kafka SDF https://github.com/apache/beam/issues/22303

// private void verifyKafkaReadComposite(
// RunnerApi.PTransform kafkaSDFReadComposite, ExpansionApi.ExpansionResponse result)
// throws Exception {
// assertThat(
// kafkaSDFReadComposite.getSubtransformsList(),
// Matchers.hasItem(MatchesPattern.matchesPattern(".*Impulse.*")));
// assertThat(
// kafkaSDFReadComposite.getSubtransformsList(),
// Matchers.hasItem(MatchesPattern.matchesPattern(".*GenerateKafkaSourceDescriptor.*")));
// assertThat(
// kafkaSDFReadComposite.getSubtransformsList(),
// Matchers.hasItem(MatchesPattern.matchesPattern(".*ReadSourceDescriptors.*")));
// RunnerApi.PTransform kafkaSdfParDo =
// result.getComponents().getTransformsOrThrow(kafkaSDFReadComposite.getSubtransforms(2));
// RunnerApi.ParDoPayload parDoPayload =
// RunnerApi.ParDoPayload.parseFrom(kafkaSdfParDo.getSpec().getPayload());
// assertNotNull(parDoPayload.getRestrictionCoderId());
// }

@Test
public void testConstructKafkaRead() throws Exception {
Expand Down Expand Up @@ -152,12 +153,14 @@ public void testConstructKafkaRead() throws Exception {
assertThat(transform.getInputsCount(), Matchers.is(0));
assertThat(transform.getOutputsCount(), Matchers.is(1));

RunnerApi.PTransform kafkaReadComposite =
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
// TODO: fix Kafka SDF https://github.com/apache/beam/issues/22303

verifyKafkaReadComposite(
result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0)),
result);
// RunnerApi.PTransform kafkaReadComposite =
// result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));

// verifyKafkaReadComposite(
// result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0)),
// result);
}

@Test
Expand Down Expand Up @@ -281,9 +284,11 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
result.getComponents().getTransformsOrThrow(transform.getSubtransforms(0));
result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0));

verifyKafkaReadComposite(
result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0)),
result);
// TODO: fix Kafka SDF https://github.com/apache/beam/issues/22303

// verifyKafkaReadComposite(
// result.getComponents().getTransformsOrThrow(kafkaReadComposite.getSubtransforms(0)),
// result);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ public void testReadTransformCreationWithLegacyImplementationBoundProperty() {
testReadTransformCreationWithImplementationBoundProperties(legacyDecoratorFunction());
}

@Test
public void testReadTransformCreationWithSdfImplementationBoundProperty() {
testReadTransformCreationWithImplementationBoundProperties(sdfDecoratorFunction());
}
// TODO: fix Kafka SDF https://github.com/apache/beam/issues/22303

// @Test
// public void testReadTransformCreationWithSdfImplementationBoundProperty() {
// testReadTransformCreationWithImplementationBoundProperties(sdfDecoratorFunction());
// }

@Test
public void testReadTransformCreationWithBothImplementationBoundProperties() {
Expand Down
12 changes: 3 additions & 9 deletions sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,8 @@
('commit_offset_in_finalize', bool), ('timestamp_policy', str)])


def default_io_expansion_service(append_args=None):
return BeamJarExpansionService(
'sdks:java:io:expansion-service:shadowJar', append_args=append_args)
def default_io_expansion_service():
return BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')


class ReadFromKafka(ExternalTransform):
Expand Down Expand Up @@ -187,12 +186,7 @@ def __init__(
start_read_time=start_read_time,
commit_offset_in_finalize=commit_offset_in_finalize,
timestamp_policy=timestamp_policy)),
expansion_service or default_io_expansion_service(
append_args=['--experiments=use_unbounded_sdf_wrapper']))
# TODO(https://github.com/apache/beam/issues/21730): remove
# 'use_unbounded_sdf_wrapper' which opts default expansion
# service into using SDF wrapped legacy Kafka source instead of pure SDF
# Kafka source.
expansion_service or default_io_expansion_service())


WriteToKafkaSchema = typing.NamedTuple(
Expand Down