Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

KafkaIO should return one split for each of partition. #491

Merged
merged 6 commits into from
Dec 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -714,24 +713,12 @@ public UnboundedKafkaSource(
this.consumerConfig = consumerConfig;
}

/**
* The partitions are evenly distributed among the splits. The number of splits returned is
* {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact
* count.
*
* <p> It is important to assign the partitions deterministically so that we can support
* resuming a split from last checkpoint. The Kafka partitions are sorted by {@code <topic,
* partition>} and then assigned to splits in round-robin order.
*/
@Override
public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
int desiredNumSplits, PipelineOptions options) throws Exception {
private List<TopicPartition> fetchKafkaPartitions() {

List<TopicPartition> partitions = new ArrayList<>(assignedPartitions);

// (a) fetch partitions for each topic
// (b) sort by <topic, partition>
// (c) round-robin assign the partitions to splits

if (partitions.isEmpty()) {
try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
Expand All @@ -754,37 +741,39 @@ public int compare(TopicPartition tp1, TopicPartition tp2) {
}
});

checkArgument(desiredNumSplits > 0);
checkState(
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");

int numSplits = Math.min(desiredNumSplits, partitions.size());
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
return partitions;
}

for (int i = 0; i < numSplits; i++) {
assignments.add(new ArrayList<TopicPartition>());
}
for (int i = 0; i < partitions.size(); i++) {
assignments.get(i % numSplits).add(partitions.get(i));
}
/**
* Returns one split for each of the Kafka partitions.
*
* <p> It is important to sort the partitions deterministically so that we can support
* resuming a split from last checkpoint. The Kafka partitions are sorted by {@code <topic,
* partition>}.
*/
@Override
public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
int desiredNumSplits, PipelineOptions options) throws Exception {

List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(numSplits);
List<TopicPartition> partitions = fetchKafkaPartitions();

for (int i = 0; i < numSplits; i++) {
List<TopicPartition> assignedToSplit = assignments.get(i);
List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(partitions.size());

// one split for each partition.
for (int i = 0; i < partitions.size(); i++) {
TopicPartition partition = partitions.get(i);

LOG.info(
"Partitions assigned to split {} (total {}): {}",
i,
assignedToSplit.size(),
Joiner.on(",").join(assignedToSplit));
LOG.info("Partition assigned to split {} : {}", i, partition);

result.add(
new UnboundedKafkaSource<K, V>(
new UnboundedKafkaSource<>(
i,
this.topics,
assignedToSplit,
ImmutableList.of(partition),
this.keyCoder,
this.valueCoder,
this.timestampFn,
Expand All @@ -804,7 +793,17 @@ public UnboundedKafkaReader<K, V> createReader(
LOG.warn("Looks like generateSplits() is not called. Generate single split.");
try {
return new UnboundedKafkaReader<K, V>(
generateInitialSplits(1, options).get(0), checkpointMark);
new UnboundedKafkaSource<>(
0,
this.topics,
fetchKafkaPartitions(),
this.keyCoder,
this.valueCoder,
this.timestampFn,
this.watermarkFn,
this.consumerFactoryFn,
this.consumerConfig),
checkpointMark);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Count;
Expand Down Expand Up @@ -172,8 +171,8 @@ public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
}

/**
* Creates a consumer with two topics, with 5 partitions each. numElements are (round-robin)
* assigned all the 10 partitions.
* Creates a consumer with two topics, with 10 partitions each. numElements are (round-robin)
* assigned all the 20 partitions.
*/
private static KafkaIO.TypedRead<Integer, Long> mkKafkaReadTransform(
int numElements, @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
Expand Down Expand Up @@ -309,16 +308,13 @@ public void processElement(ProcessContext ctx) throws Exception {
public void testUnboundedSourceSplits() throws Exception {
Pipeline p = TestPipeline.create();
int numElements = 1000;
int numSplits = 10;
int numSplits = 20;

UnboundedSource<KafkaRecord<Integer, Long>, ?> initial =
mkKafkaReadTransform(numElements, null).makeSource();
List<
? extends
UnboundedSource<
com.google.cloud.dataflow.contrib.kafka.KafkaRecord<Integer, Long>, ?>>
splits = initial.generateInitialSplits(numSplits, p.getOptions());
assertEquals("Expected exact splitting", numSplits, splits.size());
List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits =
initial.generateInitialSplits(1, p.getOptions());
assertEquals("KafkaIO should ignore desiredNumSplits", numSplits, splits.size());

long elementsPerSplit = numElements / numSplits;
assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
Expand Down Expand Up @@ -368,9 +364,7 @@ public void testUnboundedSourceCheckpointMark() throws Exception {
com.google.cloud.dataflow.contrib.kafka.KafkaCheckpointMark>
source =
mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
.makeSource()
.generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
.get(0);
.makeSource();

UnboundedReader<com.google.cloud.dataflow.contrib.kafka.KafkaRecord<Integer, Long>> reader =
source.createReader(null, null);
Expand Down