-
Notifications
You must be signed in to change notification settings - Fork 322
KafkaIO should return one split for each of partition. #491
Conversation
This is the actual unit of parallelism for Kafka topic. desiredNumSplits that Dataflow passes to a custom source is not very low when maxNumWorkers is set, it asks for just one split for each of the workers. This limits use of CPU cores on the workers essentially making autoscaling use more resources without improving peformance. This includes a hack to force single split in many unit tests since DirectPipelineRunner and InProcessPipelineRunner don't seem to read from more than one split.
+R: @davorbonaci, @dhalperi, @tgroh |
DirectPipelineRunner does not call getInitialSplits(). Rather than forcing single split through a special config, force it when it invoked from within KafkaIO itself.
Update based on Thomas comment on chat. |
} | ||
for (int i = 0; i < partitions.size(); i++) { | ||
assignments.get(i % numSplits).add(partitions.get(i)); | ||
if (desiredNumSplits < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be inlined? More specifically, you could factor out the partitions
preprocessing, and then just call the constructor in generateInitialSplits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not sure exactly what you meant.. I minimized the diff by reusing old code that handles the generic case where number of partitions and splits might not match. PTAL.
I was not sure exactly what you meant.. I minimized the diff by reusing old code that handles the generic case where number of partitions and splits might not match. PTAL. |
generateInitialSplits.
Updated after a clarification from Thomas. It makes sense. There is no special case for single split in generateInitialSplits(). createReader() creates single reader if there aren't any partitions assigned (as happens with direct runner). Updated couple of javadoc comments as well. |
[ERROR] src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java:[29,8] (imports) UnusedImports: Unused import: com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory. Otherwise LGTM |
Thanks. Just pushed the fix for unused import. I will ping once travis-ci is happy. |
@tgroh all the checks passed. Thanks for the review. |
Recent PR GoogleCloudPlatform#491 changes how KafkaIO splits. This makes it incompatible with Dataflow update across these two versions.
KafkaIO should return one split for each of partition.
This is the actual unit of parallelism for Kafka topic. desiredNumSplits that Dataflow passes to a custom source is very low when maxNumWorkers is set. It asks for
just one split for each of the workers. This limits use of CPU cores on the workers essentially making autoscaling use more resources without improving performance.
This includes a hack to force single split in many unit tests since DirectPipelineRunner and InProcessPipelineRunner don't seem to read from more than one split.