From 6d580cb7a936811117a85da79d60b1ec09a1accc Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 1 Dec 2016 12:58:12 -0800 Subject: [PATCH 1/6] KafkaIO should return one split for each of partition. This is the actual unit of parallelism for Kafka topic. desiredNumSplits that Dataflow passes to a custom source is not very low when maxNumWorkers is set, it asks for just one split for each of the workers. This limits use of CPU cores on the workers essentially making autoscaling use more resources without improving peformance. This includes a hack to force single split in many unit tests since DirectPipelineRunner and InProcessPipelineRunner don't seem to read from more than one split. --- .../cloud/dataflow/contrib/kafka/KafkaIO.java | 52 +++++++++--------- .../dataflow/contrib/kafka/KafkaIOTest.java | 54 +++++++++++-------- 2 files changed, 59 insertions(+), 47 deletions(-) diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java index a18062da56..0d83ee45bb 100644 --- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java @@ -44,7 +44,6 @@ import com.google.cloud.dataflow.sdk.values.PInput; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; @@ -731,7 +730,6 @@ public List> generateInitialSplits( // (a) fetch partitions for each topic // (b) sort by - // (c) round-robin assign the partitions to splits if (partitions.isEmpty()) { try (Consumer consumer = consumerFactoryFn.apply(consumerConfig)) { @@ -754,43 +752,45 @@ public int compare(TopicPartition tp1, TopicPartition tp2) { } }); - checkArgument(desiredNumSplits > 0); checkState( partitions.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names"); - int numSplits = Math.min(desiredNumSplits, partitions.size()); - List> assignments = new ArrayList<>(numSplits); - - for (int i = 0; i < numSplits; i++) { - assignments.add(new ArrayList()); - } - for (int i = 0; i < partitions.size(); i++) { - assignments.get(i % numSplits).add(partitions.get(i)); - } - - List> result = new ArrayList<>(numSplits); - - for (int i = 0; i < numSplits; i++) { - List assignedToSplit = assignments.get(i); - - LOG.info( - "Partitions assigned to split {} (total {}): {}", - i, - assignedToSplit.size(), - Joiner.on(",").join(assignedToSplit)); + List> result = new ArrayList<>(partitions.size()); + if (Boolean.TRUE.equals(consumerConfig.get("force.single.split.for.dastaflow.tests"))) { + // Work around/hack for Direct/InProcess runner in Dataflow not supporting multiple splits. + LOG.info("Reading all the partitions in single split"); result.add( - new UnboundedKafkaSource( - i, + new UnboundedKafkaSource<>( + 0, this.topics, - assignedToSplit, + partitions, this.keyCoder, this.valueCoder, this.timestampFn, this.watermarkFn, this.consumerFactoryFn, this.consumerConfig)); + } else { + // one split for each partition. + for (int i = 0; i < partitions.size(); i++) { + TopicPartition partition = partitions.get(i); + + LOG.info("Partition assigned to split {} : {}", i, partition); + + result.add( + new UnboundedKafkaSource<>( + i, + this.topics, + ImmutableList.of(partition), + this.keyCoder, + this.valueCoder, + this.timestampFn, + this.watermarkFn, + this.consumerFactoryFn, + this.consumerConfig)); + } } return result; diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java index af8d674831..aeae6cdb4d 100644 --- a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java +++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java @@ -126,7 +126,7 @@ private static MockConsumer mkMockConsumer( tp.topic(), tp.partition(), offsets[pIdx]++, - ByteBuffer.wrap(new byte[8]).putInt(i).array(), + ByteBuffer.wrap(new byte[4]).putInt(i).array(), // key is 4 byte record id ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id } @@ -172,11 +172,12 @@ public Consumer apply(Map config) { } /** - * Creates a consumer with two topics, with 5 partitions each. numElements are (round-robin) - * assigned all the 10 partitions. + * Creates a consumer with two topics, with 10 partitions each. numElements are (round-robin) + * assigned all the 20 partitions. */ private static KafkaIO.TypedRead mkKafkaReadTransform( - int numElements, @Nullable SerializableFunction, Instant> timestampFn) { + int numElements, boolean forceSingleSplit, + @Nullable SerializableFunction, Instant> timestampFn) { List topics = ImmutableList.of("topic_a", "topic_b"); @@ -189,6 +190,11 @@ private static KafkaIO.TypedRead mkKafkaReadTransform( .withValueCoder(BigEndianLongCoder.of()) .withMaxNumRecords(numElements); + if (forceSingleSplit) { + reader = reader.updateConsumerProperties( + ImmutableMap.of("force.single.split.for.dataflow.tests", true)); + } + if (timestampFn != null) { return reader.withTimestampFn(timestampFn); } else { @@ -232,11 +238,13 @@ public static void addCountingAsserts(PCollection input, long numElements) @Test public void testUnboundedSource() { Pipeline p = TestPipeline.create(); + int numElements = 1000; PCollection input = p - .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, true, + new ValueAsTimestampFn()).withoutMetadata()) .apply(Values.create()); addCountingAsserts(input, numElements); @@ -282,7 +290,8 @@ public void testUnboundedSourceTimestamps() { PCollection input = p - .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, true, + new ValueAsTimestampFn()).withoutMetadata()) .apply(Values.create()); addCountingAsserts(input, numElements); @@ -309,16 +318,16 @@ public void processElement(ProcessContext ctx) throws Exception { public void testUnboundedSourceSplits() throws Exception { Pipeline p = TestPipeline.create(); int numElements = 1000; - int numSplits = 10; + int numSplits = 20; + // create source with 20 partitions. UnboundedSource, ?> initial = - mkKafkaReadTransform(numElements, null).makeSource(); - List< - ? extends - UnboundedSource< - com.google.cloud.dataflow.contrib.kafka.KafkaRecord, ?>> - splits = initial.generateInitialSplits(numSplits, p.getOptions()); - assertEquals("Expected exact splitting", numSplits, splits.size()); + mkKafkaReadTransform(numElements, false, null).makeSource(); + + List, ?>> splits = + initial.generateInitialSplits(1, p.getOptions()); + + assertEquals("KafkaIO should ignore desiredNumSplits", numSplits, splits.size()); long elementsPerSplit = numElements / numSplits; assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits); @@ -362,14 +371,14 @@ private static void advanceOnce(UnboundedReader reader) throws IOException { public void testUnboundedSourceCheckpointMark() throws Exception { int numElements = 85; // 85 to make sure some partitions have more records than other. - // create a single split: + // take the first split. UnboundedSource< com.google.cloud.dataflow.contrib.kafka.KafkaRecord, com.google.cloud.dataflow.contrib.kafka.KafkaCheckpointMark> source = - mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + mkKafkaReadTransform(numElements, true, new ValueAsTimestampFn()) .makeSource() - .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()) + .generateInitialSplits(5, PipelineOptionsFactory.fromArgs(new String[0]).create()) .get(0); UnboundedReader> reader = @@ -429,7 +438,8 @@ public void testSink() throws Exception { String topic = "test"; pipeline - .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, true, + new ValueAsTimestampFn()).withoutMetadata()) .apply( com.google.cloud.dataflow.contrib.kafka.KafkaIO.write() .withBootstrapServers("none") @@ -461,7 +471,8 @@ public void testValuesSink() throws Exception { String topic = "test"; pipeline - .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, true, + new ValueAsTimestampFn()).withoutMetadata()) .apply(Values.create()) // there are no keys .apply( com.google.cloud.dataflow.contrib.kafka.KafkaIO.write() @@ -487,7 +498,7 @@ public void testSinkWithSendErrors() throws Throwable { // TODO: Ideally we want the pipeline to run to completion by retrying bundles that fail. // We limit the number of errors injected to 10 below. This would reflect a real streaming - // pipeline. But I am sure how to achieve that. For now expect an exception: + // pipeline. But I am not sure how to achieve that. For now expect an exception: thrown.expect(InjectedErrorException.class); thrown.expectMessage("Injected Error #1"); @@ -504,7 +515,8 @@ public void testSinkWithSendErrors() throws Throwable { new ProducerSendCompletionThread(10, 100).start(); pipeline - .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, true, + new ValueAsTimestampFn()).withoutMetadata()) .apply( com.google.cloud.dataflow.contrib.kafka.KafkaIO.write() .withBootstrapServers("none") From cdd5f5fca9f08dbf211f0303d81510f473167401 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 1 Dec 2016 15:01:16 -0800 Subject: [PATCH 2/6] Based on Thomas comment. DirectPipelineRunner does not call getInitialSplits(). Rather than forcing single split through a special config, force it when it invoked from within KafkaIO itself. --- .../cloud/dataflow/contrib/kafka/KafkaIO.java | 51 ++++++++++--------- .../dataflow/contrib/kafka/KafkaIOTest.java | 39 +++++--------- 2 files changed, 38 insertions(+), 52 deletions(-) diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java index 0d83ee45bb..f5422d33d6 100644 --- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java @@ -756,41 +756,41 @@ public int compare(TopicPartition tp1, TopicPartition tp2) { partitions.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names"); + if (desiredNumSplits < 0) { + // special case for DirectPipelineRunner (used in tests) in Dataflow. Return single split. + return ImmutableList.of( + new UnboundedKafkaSource<>( + 0, + this.topics, + partitions, + this.keyCoder, + this.valueCoder, + this.timestampFn, + this.watermarkFn, + this.consumerFactoryFn, + this.consumerConfig) + ); + } + List> result = new ArrayList<>(partitions.size()); - if (Boolean.TRUE.equals(consumerConfig.get("force.single.split.for.dastaflow.tests"))) { - // Work around/hack for Direct/InProcess runner in Dataflow not supporting multiple splits. - LOG.info("Reading all the partitions in single split"); + // one split for each partition. + for (int i = 0; i < partitions.size(); i++) { + TopicPartition partition = partitions.get(i); + + LOG.info("Partition assigned to split {} : {}", i, partition); + result.add( new UnboundedKafkaSource<>( - 0, + i, this.topics, - partitions, + ImmutableList.of(partition), this.keyCoder, this.valueCoder, this.timestampFn, this.watermarkFn, this.consumerFactoryFn, this.consumerConfig)); - } else { - // one split for each partition. - for (int i = 0; i < partitions.size(); i++) { - TopicPartition partition = partitions.get(i); - - LOG.info("Partition assigned to split {} : {}", i, partition); - - result.add( - new UnboundedKafkaSource<>( - i, - this.topics, - ImmutableList.of(partition), - this.keyCoder, - this.valueCoder, - this.timestampFn, - this.watermarkFn, - this.consumerFactoryFn, - this.consumerConfig)); - } } return result; @@ -803,8 +803,9 @@ public UnboundedKafkaReader createReader( if (assignedPartitions.isEmpty()) { LOG.warn("Looks like generateSplits() is not called. Generate single split."); try { + return new UnboundedKafkaReader( - generateInitialSplits(1, options).get(0), checkpointMark); + generateInitialSplits(-1, options).get(0), checkpointMark); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java index aeae6cdb4d..7cfa8a0e94 100644 --- a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java +++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java @@ -126,7 +126,7 @@ private static MockConsumer mkMockConsumer( tp.topic(), tp.partition(), offsets[pIdx]++, - ByteBuffer.wrap(new byte[4]).putInt(i).array(), + ByteBuffer.wrap(new byte[8]).putInt(i).array(), // key is 4 byte record id ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id } @@ -176,8 +176,7 @@ public Consumer apply(Map config) { * assigned all the 20 partitions. */ private static KafkaIO.TypedRead mkKafkaReadTransform( - int numElements, boolean forceSingleSplit, - @Nullable SerializableFunction, Instant> timestampFn) { + int numElements, @Nullable SerializableFunction, Instant> timestampFn) { List topics = ImmutableList.of("topic_a", "topic_b"); @@ -190,11 +189,6 @@ private static KafkaIO.TypedRead mkKafkaReadTransform( .withValueCoder(BigEndianLongCoder.of()) .withMaxNumRecords(numElements); - if (forceSingleSplit) { - reader = reader.updateConsumerProperties( - ImmutableMap.of("force.single.split.for.dataflow.tests", true)); - } - if (timestampFn != null) { return reader.withTimestampFn(timestampFn); } else { @@ -238,13 +232,11 @@ public static void addCountingAsserts(PCollection input, long numElements) @Test public void testUnboundedSource() { Pipeline p = TestPipeline.create(); - int numElements = 1000; PCollection input = p - .apply(mkKafkaReadTransform(numElements, true, - new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) .apply(Values.create()); addCountingAsserts(input, numElements); @@ -290,8 +282,7 @@ public void testUnboundedSourceTimestamps() { PCollection input = p - .apply(mkKafkaReadTransform(numElements, true, - new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) .apply(Values.create()); addCountingAsserts(input, numElements); @@ -320,13 +311,10 @@ public void testUnboundedSourceSplits() throws Exception { int numElements = 1000; int numSplits = 20; - // create source with 20 partitions. UnboundedSource, ?> initial = - mkKafkaReadTransform(numElements, false, null).makeSource(); - + mkKafkaReadTransform(numElements, null).makeSource(); List, ?>> splits = initial.generateInitialSplits(1, p.getOptions()); - assertEquals("KafkaIO should ignore desiredNumSplits", numSplits, splits.size()); long elementsPerSplit = numElements / numSplits; @@ -371,14 +359,14 @@ private static void advanceOnce(UnboundedReader reader) throws IOException { public void testUnboundedSourceCheckpointMark() throws Exception { int numElements = 85; // 85 to make sure some partitions have more records than other. - // take the first split. + // create a single split: UnboundedSource< com.google.cloud.dataflow.contrib.kafka.KafkaRecord, com.google.cloud.dataflow.contrib.kafka.KafkaCheckpointMark> source = - mkKafkaReadTransform(numElements, true, new ValueAsTimestampFn()) + mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .makeSource() - .generateInitialSplits(5, PipelineOptionsFactory.fromArgs(new String[0]).create()) + .generateInitialSplits(-1, PipelineOptionsFactory.fromArgs(new String[0]).create()) .get(0); UnboundedReader> reader = @@ -438,8 +426,7 @@ public void testSink() throws Exception { String topic = "test"; pipeline - .apply(mkKafkaReadTransform(numElements, true, - new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) .apply( com.google.cloud.dataflow.contrib.kafka.KafkaIO.write() .withBootstrapServers("none") @@ -471,8 +458,7 @@ public void testValuesSink() throws Exception { String topic = "test"; pipeline - .apply(mkKafkaReadTransform(numElements, true, - new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) .apply(Values.create()) // there are no keys .apply( com.google.cloud.dataflow.contrib.kafka.KafkaIO.write() @@ -498,7 +484,7 @@ public void testSinkWithSendErrors() throws Throwable { // TODO: Ideally we want the pipeline to run to completion by retrying bundles that fail. // We limit the number of errors injected to 10 below. This would reflect a real streaming - // pipeline. But I am not sure how to achieve that. For now expect an exception: + // pipeline. But I am sure how to achieve that. For now expect an exception: thrown.expect(InjectedErrorException.class); thrown.expectMessage("Injected Error #1"); @@ -515,8 +501,7 @@ public void testSinkWithSendErrors() throws Throwable { new ProducerSendCompletionThread(10, 100).start(); pipeline - .apply(mkKafkaReadTransform(numElements, true, - new ValueAsTimestampFn()).withoutMetadata()) + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) .apply( com.google.cloud.dataflow.contrib.kafka.KafkaIO.write() .withBootstrapServers("none") From 1531683b4b33a8ca962ebc6dde4e3ce68fd6e0cc Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 8 Dec 2016 15:13:13 -0800 Subject: [PATCH 3/6] Simplify diff by handling single split & split per partition cases existing code. --- .../cloud/dataflow/contrib/kafka/KafkaIO.java | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java index f5422d33d6..d7e1c60feb 100644 --- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java @@ -44,6 +44,7 @@ import com.google.cloud.dataflow.sdk.values.PInput; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; @@ -730,6 +731,7 @@ public List> generateInitialSplits( // (a) fetch partitions for each topic // (b) sort by + // (c) round-robin assign the partitions to splits if (partitions.isEmpty()) { try (Consumer consumer = consumerFactoryFn.apply(consumerConfig)) { @@ -756,35 +758,33 @@ public int compare(TopicPartition tp1, TopicPartition tp2) { partitions.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names"); - if (desiredNumSplits < 0) { - // special case for DirectPipelineRunner (used in tests) in Dataflow. Return single split. - return ImmutableList.of( - new UnboundedKafkaSource<>( - 0, - this.topics, - partitions, - this.keyCoder, - this.valueCoder, - this.timestampFn, - this.watermarkFn, - this.consumerFactoryFn, - this.consumerConfig) - ); + // one split per partition, except when desiredNumSplits is -1, which is a special case + // for DirectDataflowRunner that requires single split. + int numSplits = desiredNumSplits == -1 ? 1 : partitions.size(); + List> assignments = new ArrayList<>(numSplits); + + for (int i = 0; i < numSplits; i++) { + assignments.add(new ArrayList()); + } + for (int i = 0; i < partitions.size(); i++) { + assignments.get(i % numSplits).add(partitions.get(i)); } - List> result = new ArrayList<>(partitions.size()); + List> result = new ArrayList<>(numSplits); - // one split for each partition. - for (int i = 0; i < partitions.size(); i++) { - TopicPartition partition = partitions.get(i); + for (int i = 0; i < numSplits; i++) { + List assignedToSplit = assignments.get(i); - LOG.info("Partition assigned to split {} : {}", i, partition); + LOG.info( + "Partitions assigned to split {} : {}", + i, + Joiner.on(",").join(assignedToSplit)); result.add( - new UnboundedKafkaSource<>( + new UnboundedKafkaSource( i, this.topics, - ImmutableList.of(partition), + assignedToSplit, this.keyCoder, this.valueCoder, this.timestampFn, @@ -803,7 +803,6 @@ public UnboundedKafkaReader createReader( if (assignedPartitions.isEmpty()) { LOG.warn("Looks like generateSplits() is not called. Generate single split."); try { - return new UnboundedKafkaReader( generateInitialSplits(-1, options).get(0), checkpointMark); } catch (Exception e) { From 10e9a08108763446728567ba4a37b6d6516b46bf Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 8 Dec 2016 16:00:34 -0800 Subject: [PATCH 4/6] review comments. Remove special handling of single partition in generateInitialSplits. --- .../cloud/dataflow/contrib/kafka/KafkaIO.java | 67 +++++++++---------- .../dataflow/contrib/kafka/KafkaIOTest.java | 4 +- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java index d7e1c60feb..5176d6e898 100644 --- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java @@ -44,7 +44,6 @@ import com.google.cloud.dataflow.sdk.values.PInput; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; @@ -714,24 +713,12 @@ public UnboundedKafkaSource( this.consumerConfig = consumerConfig; } - /** - * The partitions are evenly distributed among the splits. The number of splits returned is - * {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact - * count. - * - *

It is important to assign the partitions deterministically so that we can support - * resuming a split from last checkpoint. The Kafka partitions are sorted by {@code } and then assigned to splits in round-robin order. - */ - @Override - public List> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { + private List fetchKafkaPartitions() { List partitions = new ArrayList<>(assignedPartitions); // (a) fetch partitions for each topic // (b) sort by - // (c) round-robin assign the partitions to splits if (partitions.isEmpty()) { try (Consumer consumer = consumerFactoryFn.apply(consumerConfig)) { @@ -758,33 +745,35 @@ public int compare(TopicPartition tp1, TopicPartition tp2) { partitions.size() > 0, "Could not find any partitions. Please check Kafka configuration and topic names"); - // one split per partition, except when desiredNumSplits is -1, which is a special case - // for DirectDataflowRunner that requires single split. - int numSplits = desiredNumSplits == -1 ? 1 : partitions.size(); - List> assignments = new ArrayList<>(numSplits); + return partitions; + } - for (int i = 0; i < numSplits; i++) { - assignments.add(new ArrayList()); - } - for (int i = 0; i < partitions.size(); i++) { - assignments.get(i % numSplits).add(partitions.get(i)); - } + /** + * Returns one split for each of the Kafka partitions. + * + *

It is important to sort the partitions deterministically so that we can support + * resuming a split from last checkpoint. The Kafka partitions are sorted by {@code }. + */ + @Override + public List> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + + List partitions = fetchKafkaPartitions(); - List> result = new ArrayList<>(numSplits); + List> result = new ArrayList<>(partitions.size()); - for (int i = 0; i < numSplits; i++) { - List assignedToSplit = assignments.get(i); + // one split for each partition. + for (int i = 0; i < partitions.size(); i++) { + TopicPartition partition = partitions.get(i); - LOG.info( - "Partitions assigned to split {} : {}", - i, - Joiner.on(",").join(assignedToSplit)); + LOG.info("Partition assigned to split {} : {}", i, partition); result.add( - new UnboundedKafkaSource( + new UnboundedKafkaSource<>( i, this.topics, - assignedToSplit, + ImmutableList.of(partition), this.keyCoder, this.valueCoder, this.timestampFn, @@ -804,7 +793,17 @@ public UnboundedKafkaReader createReader( LOG.warn("Looks like generateSplits() is not called. Generate single split."); try { return new UnboundedKafkaReader( - generateInitialSplits(-1, options).get(0), checkpointMark); + new UnboundedKafkaSource<>( + 0, + this.topics, + fetchKafkaPartitions(), + this.keyCoder, + this.valueCoder, + this.timestampFn, + this.watermarkFn, + this.consumerFactoryFn, + this.consumerConfig), + checkpointMark); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java index 7cfa8a0e94..710f74ed92 100644 --- a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java +++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java @@ -365,9 +365,7 @@ public void testUnboundedSourceCheckpointMark() throws Exception { com.google.cloud.dataflow.contrib.kafka.KafkaCheckpointMark> source = mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) - .makeSource() - .generateInitialSplits(-1, PipelineOptionsFactory.fromArgs(new String[0]).create()) - .get(0); + .makeSource(); UnboundedReader> reader = source.createReader(null, null); From 1485c4d47cad125f7c18c9ca05a888d1f8211f3c Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 8 Dec 2016 16:07:31 -0800 Subject: [PATCH 5/6] fix indentation --- .../cloud/dataflow/contrib/kafka/KafkaIO.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java index 5176d6e898..47335bdbc9 100644 --- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java +++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java @@ -794,15 +794,15 @@ public UnboundedKafkaReader createReader( try { return new UnboundedKafkaReader( new UnboundedKafkaSource<>( - 0, - this.topics, - fetchKafkaPartitions(), - this.keyCoder, - this.valueCoder, - this.timestampFn, - this.watermarkFn, - this.consumerFactoryFn, - this.consumerConfig), + 0, + this.topics, + fetchKafkaPartitions(), + this.keyCoder, + this.valueCoder, + this.timestampFn, + this.watermarkFn, + this.consumerFactoryFn, + this.consumerConfig), checkpointMark); } catch (Exception e) { throw new RuntimeException(e); From 668a3e2f36f6722c792bc3606a27d6d23ca751c0 Mon Sep 17 00:00:00 2001 From: Raghu Angadi Date: Thu, 8 Dec 2016 17:16:30 -0800 Subject: [PATCH 6/6] fix unused import --- .../com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java index 710f74ed92..19d4336a5b 100644 --- a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java +++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java @@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.io.Read; import com.google.cloud.dataflow.sdk.io.UnboundedSource; import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Count;