diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 5e9ff9ab80c6..68400e83106f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -75,6 +75,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; @@ -600,6 +601,9 @@ public static Read read() { .setDynamicRead(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) .setConsumerPollingTimeout(2L) + .setRedistributed(false) + .setAllowDuplicates(false) + .setRedistributeNumKeys(0) .build(); } @@ -698,6 +702,15 @@ public abstract static class Read @Pure public abstract boolean isDynamicRead(); + @Pure + public abstract boolean isRedistributed(); + + @Pure + public abstract boolean isAllowDuplicates(); + + @Pure + public abstract int getRedistributeNumKeys(); + @Pure public abstract @Nullable Duration getWatchTopicPartitionDuration(); @@ -757,6 +770,12 @@ abstract Builder setConsumerFactoryFn( abstract Builder setWatchTopicPartitionDuration(Duration duration); + abstract Builder setRedistributed(boolean withRedistribute); + + abstract Builder setAllowDuplicates(boolean allowDuplicates); + + abstract Builder setRedistributeNumKeys(int redistributeNumKeys); + abstract Builder setTimestampPolicyFactory( TimestampPolicyFactory timestampPolicyFactory); @@ -852,6 +871,22 @@ static void setupExternalBuilder( } else { builder.setConsumerPollingTimeout(2L); } + + if (config.redistribute != null) { + builder.setRedistributed(config.redistribute); + if (config.redistributeNumKeys != null) { + builder.setRedistributeNumKeys((int) config.redistributeNumKeys); + } + if (config.allowDuplicates != null) { + builder.setAllowDuplicates(config.allowDuplicates); + } + + } else { + builder.setRedistributed(false); + builder.setRedistributeNumKeys(0); + builder.setAllowDuplicates(false); + } + System.out.println("xxx builder service" + builder.toString()); } private static Coder resolveCoder(Class> deserializer) { @@ -916,6 +951,9 @@ public static class Configuration { private Boolean commitOffsetInFinalize; private Long consumerPollingTimeout; private String timestampPolicy; + private Integer redistributeNumKeys; + private Boolean redistribute; + private Boolean allowDuplicates; public void setConsumerConfig(Map consumerConfig) { this.consumerConfig = consumerConfig; @@ -960,6 +998,18 @@ public void setTimestampPolicy(String timestampPolicy) { public void setConsumerPollingTimeout(Long consumerPollingTimeout) { this.consumerPollingTimeout = consumerPollingTimeout; } + + public void setRedistributeNumKeys(Integer redistributeNumKeys) { + this.redistributeNumKeys = redistributeNumKeys; + } + + public void setRedistribute(Boolean redistribute) { + this.redistribute = redistribute; + } + + public void setAllowDuplicates(Boolean allowDuplicates) { + this.allowDuplicates = allowDuplicates; + } } } @@ -1007,6 +1057,30 @@ public Read withTopicPartitions(List topicPartitions) { return toBuilder().setTopicPartitions(ImmutableList.copyOf(topicPartitions)).build(); } + /** + * Sets redistribute transform that hints to the runner to try to redistribute the work evenly. + */ + public Read withRedistribute() { + if (getRedistributeNumKeys() == 0 && isRedistributed()) { + LOG.warn("This will create a key per record, which is sub-optimal for most use cases."); + } + return toBuilder().setRedistributed(true).build(); + } + + public Read withAllowDuplicates(Boolean allowDuplicates) { + if (!isAllowDuplicates()) { + LOG.warn("Setting this value without setting withRedistribute() will have no effect."); + } + return toBuilder().setAllowDuplicates(allowDuplicates).build(); + } + + public Read withRedistributeNumKeys(int redistributeNumKeys) { + checkState( + isRedistributed(), + "withRedistributeNumKeys is ignored if withRedistribute() is not enabled on the transform."); + return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build(); + } + /** * Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions * from each of the matching topics are read. @@ -1618,6 +1692,25 @@ public PCollection> expand(PBegin input) { .withMaxNumRecords(kafkaRead.getMaxNumRecords()); } + if (kafkaRead.isRedistributed()) { + // fail here instead. + checkArgument( + kafkaRead.isCommitOffsetsInFinalizeEnabled(), + "commitOffsetsInFinalize() can't be enabled with isRedistributed"); + PCollection> output = input.getPipeline().apply(transform); + if (kafkaRead.getRedistributeNumKeys() == 0) { + return output.apply( + "Insert Redistribute", + Redistribute.>arbitrarily() + .withAllowDuplicates(kafkaRead.isAllowDuplicates())); + } else { + return output.apply( + "Insert Redistribute with Shards", + Redistribute.>arbitrarily() + .withAllowDuplicates(kafkaRead.isAllowDuplicates()) + .withNumBuckets((int) kafkaRead.getRedistributeNumKeys())); + } + } return input.getPipeline().apply(transform); } } @@ -1637,6 +1730,8 @@ public PCollection> expand(PBegin input) { .withKeyDeserializerProvider(kafkaRead.getKeyDeserializerProvider()) .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) .withManualWatermarkEstimator() + .withRedistribute() + .withAllowDuplicates() // must be set with withRedistribute option. .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) .withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout()); @@ -1650,6 +1745,15 @@ public PCollection> expand(PBegin input) { readTransform = readTransform.withBadRecordErrorHandler(kafkaRead.getBadRecordErrorHandler()); } + if (kafkaRead.isRedistributed()) { + readTransform = readTransform.withRedistribute(); + } + if (kafkaRead.isAllowDuplicates()) { + readTransform = readTransform.withAllowDuplicates(); + } + if (kafkaRead.getRedistributeNumKeys() > 0) { + readTransform = readTransform.withRedistributeNumKeys(kafkaRead.getRedistributeNumKeys()); + } PCollection output; if (kafkaRead.isDynamicRead()) { Set topics = new HashSet<>(); @@ -1679,6 +1783,22 @@ public PCollection> expand(PBegin input) { .apply(Impulse.create()) .apply(ParDo.of(new GenerateKafkaSourceDescriptor(kafkaRead))); } + if (kafkaRead.isRedistributed()) { + PCollection> pcol = + output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder)); + if (kafkaRead.getRedistributeNumKeys() == 0) { + return pcol.apply( + "Insert Redistribute", + Redistribute.>arbitrarily() + .withAllowDuplicates(kafkaRead.isAllowDuplicates())); + } else { + return pcol.apply( + "Insert Redistribute with Shards", + Redistribute.>arbitrarily() + .withAllowDuplicates(true) + .withNumBuckets((int) kafkaRead.getRedistributeNumKeys())); + } + } return output.apply(readTransform).setCoder(KafkaRecordCoder.of(keyCoder, valueCoder)); } } @@ -2070,6 +2190,15 @@ public abstract static class ReadSourceDescriptors @Pure abstract boolean isCommitOffsetEnabled(); + @Pure + abstract boolean isRedistribute(); + + @Pure + abstract boolean isAllowDuplicates(); + + @Pure + abstract int getRedistributeNumKeys(); + @Pure abstract @Nullable TimestampPolicyFactory getTimestampPolicyFactory(); @@ -2136,6 +2265,12 @@ abstract ReadSourceDescriptors.Builder setBadRecordErrorHandler( abstract ReadSourceDescriptors.Builder setBounded(boolean bounded); + abstract ReadSourceDescriptors.Builder setRedistribute(boolean withRedistribute); + + abstract ReadSourceDescriptors.Builder setAllowDuplicates(boolean allowDuplicates); + + abstract ReadSourceDescriptors.Builder setRedistributeNumKeys(int redistributeNumKeys); + abstract ReadSourceDescriptors build(); } @@ -2148,6 +2283,9 @@ public static ReadSourceDescriptors read() { .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) .setConsumerPollingTimeout(2L) + .setRedistribute(false) + .setAllowDuplicates(false) + .setRedistributeNumKeys(0) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2307,6 +2445,19 @@ public ReadSourceDescriptors withProcessingTime() { ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime()); } + /** Enable Redistribute. */ + public ReadSourceDescriptors withRedistribute() { + return toBuilder().setRedistribute(true).build(); + } + + public ReadSourceDescriptors withAllowDuplicates() { + return toBuilder().setAllowDuplicates(true).build(); + } + + public ReadSourceDescriptors withRedistributeNumKeys(int redistributeNumKeys) { + return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build(); + } + /** Use the creation time of {@link KafkaRecord} as the output timestamp. */ public ReadSourceDescriptors withCreateTime() { return withExtractOutputTimestampFn( @@ -2497,6 +2648,12 @@ public PCollection> expand(PCollection } } + if (isRedistribute()) { + if (getRedistributeNumKeys() == 0) { + LOG.warn("This will create a key per record, which is sub-optimal for most use cases."); + } + } + if (getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) { LOG.warn( "The bootstrapServers is not set. It must be populated through the KafkaSourceDescriptor during runtime otherwise the pipeline will fail."); @@ -2527,7 +2684,7 @@ public PCollection> expand(PCollection .getSchemaRegistry() .getSchemaCoder(KafkaSourceDescriptor.class), recordCoder)); - if (isCommitOffsetEnabled() && !configuredKafkaCommit()) { + if (isCommitOffsetEnabled() && !configuredKafkaCommit() && !isRedistribute()) { outputWithDescriptor = outputWithDescriptor .apply(Reshuffle.viaRandomKey()) @@ -2538,6 +2695,7 @@ public PCollection> expand(PCollection .getSchemaRegistry() .getSchemaCoder(KafkaSourceDescriptor.class), recordCoder)); + PCollection unused = outputWithDescriptor.apply(new KafkaCommitOffset(this)); unused.setCoder(VoidCoder.of()); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index ce0434ee88d1..457e0003705e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -119,6 +119,24 @@ Object getDefaultValue() { return Long.valueOf(2); } }, + REDISTRIBUTE_NUM_KEYS { + @Override + Object getDefaultValue() { + return Integer.valueOf(0); + } + }, + REDISTRIBUTED { + @Override + Object getDefaultValue() { + return false; + } + }, + ALLOW_DUPLICATES { + @Override + Object getDefaultValue() { + return false; + } + }, ; private final @NonNull ImmutableSet supportedImplementations; diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index 246fdd80d739..f021789a912c 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -108,7 +108,10 @@ public void testConstructKafkaRead() throws Exception { Field.of("start_read_time", FieldType.INT64), Field.of("commit_offset_in_finalize", FieldType.BOOLEAN), Field.of("timestamp_policy", FieldType.STRING), - Field.of("consumer_polling_timeout", FieldType.INT64))) + Field.of("consumer_polling_timeout", FieldType.INT64), + Field.of("redistribute_num_keys", FieldType.INT32), + Field.of("redistribute", FieldType.BOOLEAN), + Field.of("allow_duplicates", FieldType.BOOLEAN))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) @@ -117,6 +120,9 @@ public void testConstructKafkaRead() throws Exception { .withFieldValue("commit_offset_in_finalize", false) .withFieldValue("timestamp_policy", "ProcessingTime") .withFieldValue("consumer_polling_timeout", 5L) + .withFieldValue("redistribute_num_keys", 0) + .withFieldValue("redistribute", false) + .withFieldValue("allow_duplicates", false) .build()); RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); @@ -139,6 +145,7 @@ public void testConstructKafkaRead() throws Exception { expansionService.expand(request, observer); ExpansionApi.ExpansionResponse result = observer.result; RunnerApi.PTransform transform = result.getTransform(); + System.out.println("xxx : " + result.toString()); assertThat( transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*"))); @@ -237,7 +244,10 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception { Field.of("value_deserializer", FieldType.STRING), Field.of("start_read_time", FieldType.INT64), Field.of("commit_offset_in_finalize", FieldType.BOOLEAN), - Field.of("timestamp_policy", FieldType.STRING))) + Field.of("timestamp_policy", FieldType.STRING), + Field.of("redistribute_num_keys", FieldType.INT32), + Field.of("redistribute", FieldType.BOOLEAN), + Field.of("allow_duplicates", FieldType.BOOLEAN))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) @@ -245,6 +255,9 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception { .withFieldValue("start_read_time", startReadTime) .withFieldValue("commit_offset_in_finalize", false) .withFieldValue("timestamp_policy", "ProcessingTime") + .withFieldValue("redistribute_num_keys", 0) + .withFieldValue("redistribute", false) + .withFieldValue("allow_duplicates", false) .build()); RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java index 2d306b0d7798..ae939d66c210 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibilityTest.java @@ -103,7 +103,9 @@ public void testPrimitiveKafkaIOReadPropertiesDefaultValueExistence() { private void testReadTransformCreationWithImplementationBoundProperties( Function, KafkaIO.Read> kafkaReadDecorator) { - p.apply(kafkaReadDecorator.apply(mkKafkaReadTransform(1000, null, new ValueAsTimestampFn()))); + p.apply( + kafkaReadDecorator.apply( + mkKafkaReadTransform(1000, null, new ValueAsTimestampFn(), false, 0))); p.run(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 07e5b519c013..73aee5aeeef0 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -376,7 +376,7 @@ public Consumer apply(Map config) { static KafkaIO.Read mkKafkaReadTransform( int numElements, @Nullable SerializableFunction, Instant> timestampFn) { - return mkKafkaReadTransform(numElements, numElements, timestampFn); + return mkKafkaReadTransform(numElements, numElements, timestampFn, false, 0); } /** @@ -386,7 +386,9 @@ static KafkaIO.Read mkKafkaReadTransform( static KafkaIO.Read mkKafkaReadTransform( int numElements, @Nullable Integer maxNumRecords, - @Nullable SerializableFunction, Instant> timestampFn) { + @Nullable SerializableFunction, Instant> timestampFn, + @Nullable Boolean redistribute, + @Nullable Integer numKeys) { List topics = ImmutableList.of("topic_a", "topic_b"); @@ -404,10 +406,16 @@ static KafkaIO.Read mkKafkaReadTransform( } if (timestampFn != null) { - return reader.withTimestampFn(timestampFn); - } else { - return reader; + reader = reader.withTimestampFn(timestampFn); } + + if (redistribute) { + if (numKeys != null) { + reader = reader.withRedistribute().withRedistributeNumKeys(numKeys); + } + reader = reader.withRedistribute(); + } + return reader; } private static class AssertMultipleOf implements SerializableFunction, Void> { @@ -616,6 +624,42 @@ public void testRiskyConfigurationWarnsProperly() { p.run(); } + @Test + public void testCommitOffsetsInFinalizeAndRedistributeErrors() { + thrown.expect(Exception.class); + thrown.expectMessage("commitOffsetsInFinalize() can't be enabled with isRedistributed"); + + int numElements = 1000; + + PCollection input = + p.apply( + mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), true, 0) + .withConsumerConfigUpdates( + ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)) + .withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test + public void testNumKeysIgnoredWithRedistributeNotEnabled() { + int numElements = 1000; + + PCollection input = + p.apply( + mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0) + .withConsumerConfigUpdates( + ImmutableMap.of(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)) + .withoutMetadata()) + .apply(Values.create()); + + addCountingAsserts(input, numElements); + + p.run(); + } + @Test public void testUnreachableKafkaBrokers() { // Expect an exception when the Kafka brokers are not reachable on the workers. @@ -1905,7 +1949,7 @@ public void testUnboundedSourceStartReadTime() { PCollection input = p.apply( - mkKafkaReadTransform(numElements, maxNumRecords, new ValueAsTimestampFn()) + mkKafkaReadTransform(numElements, maxNumRecords, new ValueAsTimestampFn(), false, 0) .withStartReadTime(new Instant(startTime)) .withoutMetadata()) .apply(Values.create()); @@ -1929,7 +1973,7 @@ public void testUnboundedSourceStartReadTimeException() { int startTime = numElements / 20; p.apply( - mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn()) + mkKafkaReadTransform(numElements, numElements, new ValueAsTimestampFn(), false, 0) .withStartReadTime(new Instant(startTime)) .withoutMetadata()) .apply(Values.create()); diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 9848e429e215..db7b172170a1 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -98,6 +98,9 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addNullableLogicalTypeField("stop_read_time", new NanosInstant()) .addBooleanField("is_commit_offset_finalize_enabled") .addBooleanField("is_dynamic_read") + .addBooleanField("redistribute") + .addBooleanField("allows_duplicates") + .addNullableInt32Field("redistribute_num_keys") .addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration()) .addByteArrayField("timestamp_policy_factory") .addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES) @@ -215,6 +218,9 @@ public Row toConfigRow(Read transform) { + " is not supported yet."); } + fieldValues.put("redistribute", transform.isRedistributed()); + fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys()); + fieldValues.put("allows_duplicates", transform.isAllowDuplicates()); return Row.withSchema(schema).withFieldValues(fieldValues).build(); } @@ -325,6 +331,22 @@ public Row toConfigRow(Read transform) { if (maxNumRecords != null) { transform = transform.withMaxNumRecords(maxNumRecords); } + + Boolean isRedistributed = configRow.getBoolean("redistribute"); + if (isRedistributed != null && isRedistributed) { + transform = transform.withRedistribute(); + Integer redistributeNumKeys = + configRow.getValue("redistribute_num_keys") == null + ? Integer.valueOf(0) + : configRow.getInt32("redistribute_num_keys"); + if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) { + transform = transform.withRedistributeNumKeys(redistributeNumKeys); + } + Boolean allowDuplicates = configRow.getBoolean("allows_duplicates"); + if (allowDuplicates != null && allowDuplicates) { + transform = transform.withAllowDuplicates(allowDuplicates); + } + } Duration maxReadTime = configRow.getValue("max_read_time"); if (maxReadTime != null) { transform = diff --git a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java index f69b9c3649b4..095702a5c6ff 100644 --- a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java +++ b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java @@ -64,6 +64,7 @@ public class KafkaIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put("getMaxReadTime", "max_read_time"); READ_TRANSFORM_SCHEMA_MAPPING.put("getStartReadTime", "start_read_time"); READ_TRANSFORM_SCHEMA_MAPPING.put("getStopReadTime", "stop_read_time"); + READ_TRANSFORM_SCHEMA_MAPPING.put("getRedistributeNumKeys", "redistribute_num_keys"); READ_TRANSFORM_SCHEMA_MAPPING.put( "isCommitOffsetsInFinalizeEnabled", "is_commit_offset_finalize_enabled"); READ_TRANSFORM_SCHEMA_MAPPING.put("isDynamicRead", "is_dynamic_read"); diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index b4fd7d86e688..b19e9c22aa3c 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -82,19 +82,29 @@ import typing +import numpy as np + from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external import ExternalTransform from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder ReadFromKafkaSchema = typing.NamedTuple( 'ReadFromKafkaSchema', - [('consumer_config', typing.Mapping[str, str]), - ('topics', typing.List[str]), ('key_deserializer', str), - ('value_deserializer', str), ('start_read_time', typing.Optional[int]), - ('max_num_records', typing.Optional[int]), - ('max_read_time', typing.Optional[int]), - ('commit_offset_in_finalize', bool), ('timestamp_policy', str), - ('consumer_polling_timeout', typing.Optional[int])]) + [ + ('consumer_config', typing.Mapping[str, str]), + ('topics', typing.List[str]), + ('key_deserializer', str), + ('value_deserializer', str), + ('start_read_time', typing.Optional[int]), + ('max_num_records', typing.Optional[int]), + ('max_read_time', typing.Optional[int]), + ('commit_offset_in_finalize', bool), + ('timestamp_policy', str), + ('consumer_polling_timeout', typing.Optional[int]), + ('redistribute', typing.Optional[bool]), + ('redistribute_num_keys', typing.Optional[np.int32]), + ('allow_duplicates', typing.Optional[bool]), + ]) def default_io_expansion_service(append_args=None): @@ -138,6 +148,9 @@ def __init__( consumer_polling_timeout=2, with_metadata=False, expansion_service=None, + redistribute=False, + redistribute_num_keys=np.int32(0), + allow_duplicates=False, ): """ Initializes a read operation from Kafka. @@ -172,6 +185,12 @@ def __init__( this only works when using default key and value deserializers where Java Kafka Reader reads keys and values as 'byte[]'. :param expansion_service: The address (host:port) of the ExpansionService. + :param redistribute: whether a Redistribute transform should be applied + immediately after the read. + :param redistribute_num_keys: Configures how many keys the Redistribute + spreads the data across. + :param allow_duplicates: whether the Redistribute transform allows for + duplicates (this serves solely as a hint to the underlying runner). """ if timestamp_policy not in [ReadFromKafka.processing_time_policy, ReadFromKafka.create_time_policy, @@ -193,7 +212,10 @@ def __init__( start_read_time=start_read_time, commit_offset_in_finalize=commit_offset_in_finalize, timestamp_policy=timestamp_policy, - consumer_polling_timeout=consumer_polling_timeout)), + consumer_polling_timeout=consumer_polling_timeout, + redistribute=redistribute, + redistribute_num_keys=redistribute_num_keys, + allow_duplicates=allow_duplicates)), expansion_service or default_io_expansion_service()) diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index f69ee1c24c4e..4dc2446fdd9d 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -252,6 +252,9 @@ def test_expand_kafka_read(self): 'LongDeserializer', commit_offset_in_finalize=True, timestamp_policy=ReadFromKafka.create_time_policy, + redistribute=False, + redistribute_num_keys=0, + allow_duplicates=False, expansion_service=self.get_expansion_service())) self.assertTrue( 'No resolvable bootstrap urls given in bootstrap.servers' in str(