From fafa3c76dc93f3258b2cea49dfd1dc7a724a213c Mon Sep 17 00:00:00 2001 From: Antoine Pourchet Date: Thu, 16 May 2024 12:37:59 -0600 Subject: [PATCH] KAFKA-15045: (KIP-924 pt. 4) Generify rack graph solving utilities (#15956) The graph solving utilities are currently hardcoded to work with ClientState, but don't actually depend on anything in those state classes. This change allows the MinTrafficGraphConstructor and BalanceSubtopologyGraphConstructor to be reused with KafkaStreamsStates instead. Reviewers: Anna Sophie Blee-Goldman , Almog Gavra --- .../BalanceSubtopologyGraphConstructor.java | 20 +++++++------- .../MinTrafficGraphConstructor.java | 16 ++++++------ .../assignment/RackAwareGraphConstructor.java | 26 +++++++++---------- .../RackAwareGraphConstructorFactory.java | 6 ++--- .../assignment/RackAwareTaskAssignor.java | 6 ++--- .../RackAwareGraphConstructorTest.java | 6 ++--- 6 files changed, 40 insertions(+), 40 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java index f1cd011ccff7e..6b9353200f25a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java @@ -35,7 +35,7 @@ import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.CostFunction; -public class BalanceSubtopologyGraphConstructor implements RackAwareGraphConstructor { +public class BalanceSubtopologyGraphConstructor implements RackAwareGraphConstructor { private final Map> tasksForTopicGroup; @@ -71,10 +71,10 @@ private static int getSecondStageClientNodeId(final List taskIdList, fin public Graph constructTaskGraph( final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map taskClientMap, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask, + final BiPredicate hasAssignedTask, final CostFunction costFunction, final int trafficCost, final int nonOverlapCost, @@ -86,7 +86,7 @@ public Graph constructTaskGraph( final Graph graph = new Graph<>(); for (final TaskId taskId : taskIdList) { - for (final Entry clientState : clientStates.entrySet()) { + for (final Entry clientState : clientStates.entrySet()) { if (hasAssignedTask.test(clientState.getValue(), taskId)) { originalAssignedTaskNumber.merge(clientState.getKey(), 1, Integer::sum); } @@ -122,12 +122,12 @@ public boolean assignTaskFromMinCostFlow( final Graph graph, final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map originalAssignedTaskNumber, final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unAssignTask, - final BiPredicate hasAssignedTask + final BiConsumer assignTask, + final BiConsumer unAssignTask, + final BiPredicate hasAssignedTask ) { final SortedMap> sortedTasksForTopicGroup = new TreeMap<>(tasksForTopicGroup); final Set taskIdSet = new HashSet<>(taskIdList); @@ -170,10 +170,10 @@ private void constructEdges( final Graph graph, final List taskIdList, final List clientList, - final Map clientStates, + final Map clientStates, final Map taskClientMap, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask, + final BiPredicate hasAssignedTask, final CostFunction costFunction, final int trafficCost, final int nonOverlapCost, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java index ec877c66a4217..82be22155d3c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.CostFunction; -public class MinTrafficGraphConstructor implements RackAwareGraphConstructor { +public class MinTrafficGraphConstructor implements RackAwareGraphConstructor { @Override public int getSinkNodeID( @@ -53,10 +53,10 @@ public int getClientIndex(final int clientNodeId, final List taskIdList, public Graph constructTaskGraph( final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map taskClientMap, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask, + final BiPredicate hasAssignedTask, final CostFunction costFunction, final int trafficCost, final int nonOverlapCost, @@ -66,7 +66,7 @@ public Graph constructTaskGraph( final Graph graph = new Graph<>(); for (final TaskId taskId : taskIdList) { - for (final Entry clientState : clientStates.entrySet()) { + for (final Entry clientState : clientStates.entrySet()) { if (hasAssignedTask.test(clientState.getValue(), taskId)) { originalAssignedTaskNumber.merge(clientState.getKey(), 1, Integer::sum); } @@ -122,12 +122,12 @@ public boolean assignTaskFromMinCostFlow( final Graph graph, final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map originalAssignedTaskNumber, final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unAssignTask, - final BiPredicate hasAssignedTask + final BiConsumer assignTask, + final BiConsumer unAssignTask, + final BiPredicate hasAssignedTask ) { int tasksAssigned = 0; boolean taskMoved = false; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java index 4b46c147e1756..8dbb7838182ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java @@ -33,7 +33,7 @@ /** * Construct graph for rack aware task assignor */ -public interface RackAwareGraphConstructor { +public interface RackAwareGraphConstructor { int SOURCE_ID = -1; int getSinkNodeID(final List taskIdList, final List clientList, final Map> tasksForTopicGroup); @@ -45,10 +45,10 @@ public interface RackAwareGraphConstructor { Graph constructTaskGraph( final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map taskClientMap, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask, + final BiPredicate hasAssignedTask, final CostFunction costFunction, final int trafficCost, final int nonOverlapCost, @@ -59,24 +59,24 @@ boolean assignTaskFromMinCostFlow( final Graph graph, final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map originalAssignedTaskNumber, final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unAssignTask, - final BiPredicate hasAssignedTask); + final BiConsumer assignTask, + final BiConsumer unAssignTask, + final BiPredicate hasAssignedTask); default KeyValue assignTaskToClient( final Graph graph, final TaskId taskId, final int taskNodeId, final int topicGroupIndex, - final Map clientStates, + final Map clientStates, final List clientList, final List taskIdList, final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unAssignTask + final BiConsumer assignTask, + final BiConsumer unAssignTask ) { int tasksAssigned = 0; boolean taskMoved = false; @@ -104,9 +104,9 @@ default KeyValue assignTaskToClient( default void validateAssignedTask( final List taskIdList, final int tasksAssigned, - final Map clientStates, + final Map clientStates, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask + final BiPredicate hasAssignedTask ) { // Validate task assigned if (tasksAssigned != taskIdList.size()) { @@ -117,7 +117,7 @@ default void validateAssignedTask( // Validate original assigned task number matches final Map assignedTaskNumber = new HashMap<>(); for (final TaskId taskId : taskIdList) { - for (final Entry clientState : clientStates.entrySet()) { + for (final Entry clientState : clientStates.entrySet()) { if (hasAssignedTask.test(clientState.getValue(), taskId)) { assignedTaskNumber.merge(clientState.getKey(), 1, Integer::sum); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java index c23dde358b33f..93afea586f03a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java @@ -25,12 +25,12 @@ public class RackAwareGraphConstructorFactory { - static RackAwareGraphConstructor create(final AssignmentConfigs assignmentConfigs, final Map> tasksForTopicGroup) { + static RackAwareGraphConstructor create(final AssignmentConfigs assignmentConfigs, final Map> tasksForTopicGroup) { switch (assignmentConfigs.rackAwareAssignmentStrategy) { case StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC: - return new MinTrafficGraphConstructor(); + return new MinTrafficGraphConstructor(); case StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY: - return new BalanceSubtopologyGraphConstructor(tasksForTopicGroup); + return new BalanceSubtopologyGraphConstructor(tasksForTopicGroup); default: throw new IllegalArgumentException("Rack aware assignment is disabled"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index b124d07424ae8..0ccc4bb6a9ded 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -325,7 +325,7 @@ private long tasksCost(final SortedSet tasks, } final List clientList = new ArrayList<>(clientStates.keySet()); final List taskIdList = new ArrayList<>(tasks); - final Graph graph = new MinTrafficGraphConstructor() + final Graph graph = new MinTrafficGraphConstructor() .constructTaskGraph( clientList, taskIdList, @@ -373,7 +373,7 @@ public long optimizeActiveTasks(final SortedSet activeTasks, final List taskIdList = new ArrayList<>(activeTasks); final Map taskClientMap = new HashMap<>(); final Map originalAssignedTaskNumber = new HashMap<>(); - final RackAwareGraphConstructor graphConstructor = RackAwareGraphConstructorFactory.create(assignmentConfigs, tasksForTopicGroup); + final RackAwareGraphConstructor graphConstructor = RackAwareGraphConstructorFactory.create(assignmentConfigs, tasksForTopicGroup); final Graph graph = graphConstructor.constructTaskGraph( clientList, taskIdList, @@ -419,7 +419,7 @@ public long optimizeStandbyTasks(final SortedMap clientStates boolean taskMoved = true; int round = 0; - final RackAwareGraphConstructor graphConstructor = new MinTrafficGraphConstructor(); + final RackAwareGraphConstructor graphConstructor = new MinTrafficGraphConstructor<>(); while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) { taskMoved = false; round++; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java index 607dff9382557..4f0f62cf69927 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java @@ -68,7 +68,7 @@ public class RackAwareGraphConstructorTest { private final Map originalAssignedTaskNumber = new HashMap<>(); private final Map> tasksForTopicGroup = getTasksForTopicGroup(TP_SIZE, PARTITION_SIZE); - private RackAwareGraphConstructor constructor; + private RackAwareGraphConstructor constructor; @Parameter public String constructorType; @@ -86,9 +86,9 @@ public void setUp() { randomAssignTasksToClient(taskIdList, clientStateMap); if (constructorType.equals(MIN_COST)) { - constructor = new MinTrafficGraphConstructor(); + constructor = new MinTrafficGraphConstructor<>(); } else if (constructorType.equals(BALANCE_SUBTOPOLOGY)) { - constructor = new BalanceSubtopologyGraphConstructor(tasksForTopicGroup); + constructor = new BalanceSubtopologyGraphConstructor<>(tasksForTopicGroup); } graph = constructor.constructTaskGraph( clientList, taskIdList, clientStateMap, taskClientMap, originalAssignedTaskNumber, ClientState::hasAssignedTask, this::getCost, 10, 1, false, false);