diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java index f1cd011ccff7e..6b9353200f25a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/BalanceSubtopologyGraphConstructor.java @@ -35,7 +35,7 @@ import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.CostFunction; -public class BalanceSubtopologyGraphConstructor implements RackAwareGraphConstructor { +public class BalanceSubtopologyGraphConstructor implements RackAwareGraphConstructor { private final Map> tasksForTopicGroup; @@ -71,10 +71,10 @@ private static int getSecondStageClientNodeId(final List taskIdList, fin public Graph constructTaskGraph( final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map taskClientMap, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask, + final BiPredicate hasAssignedTask, final CostFunction costFunction, final int trafficCost, final int nonOverlapCost, @@ -86,7 +86,7 @@ public Graph constructTaskGraph( final Graph graph = new Graph<>(); for (final TaskId taskId : taskIdList) { - for (final Entry clientState : clientStates.entrySet()) { + for (final Entry clientState : clientStates.entrySet()) { if (hasAssignedTask.test(clientState.getValue(), taskId)) { originalAssignedTaskNumber.merge(clientState.getKey(), 1, Integer::sum); } @@ -122,12 +122,12 @@ public boolean assignTaskFromMinCostFlow( final Graph graph, final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map originalAssignedTaskNumber, final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unAssignTask, - final BiPredicate hasAssignedTask + final BiConsumer assignTask, + final BiConsumer unAssignTask, + final BiPredicate hasAssignedTask ) { final SortedMap> sortedTasksForTopicGroup = new TreeMap<>(tasksForTopicGroup); final Set taskIdSet = new HashSet<>(taskIdList); @@ -170,10 +170,10 @@ private void constructEdges( final Graph graph, final List taskIdList, final List clientList, - final Map clientStates, + final Map clientStates, final Map taskClientMap, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask, + final BiPredicate hasAssignedTask, final CostFunction costFunction, final int trafficCost, final int nonOverlapCost, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java index ec877c66a4217..82be22155d3c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/MinTrafficGraphConstructor.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.CostFunction; -public class MinTrafficGraphConstructor implements RackAwareGraphConstructor { +public class MinTrafficGraphConstructor implements RackAwareGraphConstructor { @Override public int getSinkNodeID( @@ -53,10 +53,10 @@ public int getClientIndex(final int clientNodeId, final List taskIdList, public Graph constructTaskGraph( final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map taskClientMap, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask, + final BiPredicate hasAssignedTask, final CostFunction costFunction, final int trafficCost, final int nonOverlapCost, @@ -66,7 +66,7 @@ public Graph constructTaskGraph( final Graph graph = new Graph<>(); for (final TaskId taskId : taskIdList) { - for (final Entry clientState : clientStates.entrySet()) { + for (final Entry clientState : clientStates.entrySet()) { if (hasAssignedTask.test(clientState.getValue(), taskId)) { originalAssignedTaskNumber.merge(clientState.getKey(), 1, Integer::sum); } @@ -122,12 +122,12 @@ public boolean assignTaskFromMinCostFlow( final Graph graph, final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map originalAssignedTaskNumber, final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unAssignTask, - final BiPredicate hasAssignedTask + final BiConsumer assignTask, + final BiConsumer unAssignTask, + final BiPredicate hasAssignedTask ) { int tasksAssigned = 0; boolean taskMoved = false; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java index 4b46c147e1756..8dbb7838182ea 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructor.java @@ -33,7 +33,7 @@ /** * Construct graph for rack aware task assignor */ -public interface RackAwareGraphConstructor { +public interface RackAwareGraphConstructor { int SOURCE_ID = -1; int getSinkNodeID(final List taskIdList, final List clientList, final Map> tasksForTopicGroup); @@ -45,10 +45,10 @@ public interface RackAwareGraphConstructor { Graph constructTaskGraph( final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map taskClientMap, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask, + final BiPredicate hasAssignedTask, final CostFunction costFunction, final int trafficCost, final int nonOverlapCost, @@ -59,24 +59,24 @@ boolean assignTaskFromMinCostFlow( final Graph graph, final List clientList, final List taskIdList, - final Map clientStates, + final Map clientStates, final Map originalAssignedTaskNumber, final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unAssignTask, - final BiPredicate hasAssignedTask); + final BiConsumer assignTask, + final BiConsumer unAssignTask, + final BiPredicate hasAssignedTask); default KeyValue assignTaskToClient( final Graph graph, final TaskId taskId, final int taskNodeId, final int topicGroupIndex, - final Map clientStates, + final Map clientStates, final List clientList, final List taskIdList, final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unAssignTask + final BiConsumer assignTask, + final BiConsumer unAssignTask ) { int tasksAssigned = 0; boolean taskMoved = false; @@ -104,9 +104,9 @@ default KeyValue assignTaskToClient( default void validateAssignedTask( final List taskIdList, final int tasksAssigned, - final Map clientStates, + final Map clientStates, final Map originalAssignedTaskNumber, - final BiPredicate hasAssignedTask + final BiPredicate hasAssignedTask ) { // Validate task assigned if (tasksAssigned != taskIdList.size()) { @@ -117,7 +117,7 @@ default void validateAssignedTask( // Validate original assigned task number matches final Map assignedTaskNumber = new HashMap<>(); for (final TaskId taskId : taskIdList) { - for (final Entry clientState : clientStates.entrySet()) { + for (final Entry clientState : clientStates.entrySet()) { if (hasAssignedTask.test(clientState.getValue(), taskId)) { assignedTaskNumber.merge(clientState.getKey(), 1, Integer::sum); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java index c23dde358b33f..93afea586f03a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java @@ -25,12 +25,12 @@ public class RackAwareGraphConstructorFactory { - static RackAwareGraphConstructor create(final AssignmentConfigs assignmentConfigs, final Map> tasksForTopicGroup) { + static RackAwareGraphConstructor create(final AssignmentConfigs assignmentConfigs, final Map> tasksForTopicGroup) { switch (assignmentConfigs.rackAwareAssignmentStrategy) { case StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC: - return new MinTrafficGraphConstructor(); + return new MinTrafficGraphConstructor(); case StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY: - return new BalanceSubtopologyGraphConstructor(tasksForTopicGroup); + return new BalanceSubtopologyGraphConstructor(tasksForTopicGroup); default: throw new IllegalArgumentException("Rack aware assignment is disabled"); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index b124d07424ae8..0ccc4bb6a9ded 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -325,7 +325,7 @@ private long tasksCost(final SortedSet tasks, } final List clientList = new ArrayList<>(clientStates.keySet()); final List taskIdList = new ArrayList<>(tasks); - final Graph graph = new MinTrafficGraphConstructor() + final Graph graph = new MinTrafficGraphConstructor() .constructTaskGraph( clientList, taskIdList, @@ -373,7 +373,7 @@ public long optimizeActiveTasks(final SortedSet activeTasks, final List taskIdList = new ArrayList<>(activeTasks); final Map taskClientMap = new HashMap<>(); final Map originalAssignedTaskNumber = new HashMap<>(); - final RackAwareGraphConstructor graphConstructor = RackAwareGraphConstructorFactory.create(assignmentConfigs, tasksForTopicGroup); + final RackAwareGraphConstructor graphConstructor = RackAwareGraphConstructorFactory.create(assignmentConfigs, tasksForTopicGroup); final Graph graph = graphConstructor.constructTaskGraph( clientList, taskIdList, @@ -419,7 +419,7 @@ public long optimizeStandbyTasks(final SortedMap clientStates boolean taskMoved = true; int round = 0; - final RackAwareGraphConstructor graphConstructor = new MinTrafficGraphConstructor(); + final RackAwareGraphConstructor graphConstructor = new MinTrafficGraphConstructor<>(); while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) { taskMoved = false; round++; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java index 607dff9382557..4f0f62cf69927 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorTest.java @@ -68,7 +68,7 @@ public class RackAwareGraphConstructorTest { private final Map originalAssignedTaskNumber = new HashMap<>(); private final Map> tasksForTopicGroup = getTasksForTopicGroup(TP_SIZE, PARTITION_SIZE); - private RackAwareGraphConstructor constructor; + private RackAwareGraphConstructor constructor; @Parameter public String constructorType; @@ -86,9 +86,9 @@ public void setUp() { randomAssignTasksToClient(taskIdList, clientStateMap); if (constructorType.equals(MIN_COST)) { - constructor = new MinTrafficGraphConstructor(); + constructor = new MinTrafficGraphConstructor<>(); } else if (constructorType.equals(BALANCE_SUBTOPOLOGY)) { - constructor = new BalanceSubtopologyGraphConstructor(tasksForTopicGroup); + constructor = new BalanceSubtopologyGraphConstructor<>(tasksForTopicGroup); } graph = constructor.constructTaskGraph( clientList, taskIdList, clientStateMap, taskClientMap, originalAssignedTaskNumber, ClientState::hasAssignedTask, this::getCost, 10, 1, false, false);