Skip to content

Commit

Permalink
KAFKA-15045: (KIP-924 pt. 25) Rename old internal StickyTaskAssignor …
Browse files Browse the repository at this point in the history
…to LegacyStickyTaskAssignor (apache#16322)

To avoid confusion in 3.8/until we fully remove all the old task assignors and internal config, we should rename the old internal assignor classes like the StickyTaskAssignor so that they won't be mixed up with the new version of the assignor (which is also named StickyTaskAssignor)

Reviewers: Bruno Cadonna <[email protected]>, Josep Prat <[email protected]>
  • Loading branch information
ableegoldman authored Jun 13, 2024
1 parent f380cd1 commit 4333af5
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackUtils;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
Expand Down Expand Up @@ -861,8 +861,8 @@ private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetada

private LegacyTaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
final LegacyTaskAssignor taskAssignor = legacyTaskAssignorSupplier.get();
if (taskAssignor instanceof StickyTaskAssignor) {
// special case: to preserve pre-existing behavior, we invoke the StickyTaskAssignor
if (taskAssignor instanceof LegacyStickyTaskAssignor) {
// special case: to preserve pre-existing behavior, we invoke the LegacyStickyTaskAssignor
// whether or not lag computation failed.
return taskAssignor;
} else if (lagComputationSuccessful) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
* 2. always return true, indicating that a follow-up rebalance is needed
*/
public class FallbackPriorTaskAssignor implements LegacyTaskAssignor {
private final StickyTaskAssignor delegate;
private final LegacyStickyTaskAssignor delegate;

public FallbackPriorTaskAssignor() {
delegate = new StickyTaskAssignor(true);
delegate = new LegacyStickyTaskAssignor(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@
import java.util.Objects;
import java.util.Set;

public class StickyTaskAssignor implements LegacyTaskAssignor {
// Note: as of 3.8, this class has been renamed from StickyTaskAssignor to LegacyStickyTaskAssignor,
// and a new StickyTaskAssignor implementation was added that implements the new TaskAssignor interface.
// If you were previously plugging in the old StickyTaskAssignor via the internal.task.assignor.class config,
// you should migrate to the new TaskAssignor interface by removing the internal config and instead
// passing in the new StickyTaskAssignor class to the new public task.assignor.class config
public class LegacyStickyTaskAssignor implements LegacyTaskAssignor {

private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class);
private static final Logger log = LoggerFactory.getLogger(LegacyStickyTaskAssignor.class);

// For stateful tasks, by default we want to maintain stickiness. So we have higher non_overlap_cost
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
Expand All @@ -59,11 +64,11 @@ public class StickyTaskAssignor implements LegacyTaskAssignor {

private final boolean mustPreserveActiveTaskAssignment;

public StickyTaskAssignor() {
public LegacyStickyTaskAssignor() {
this(false);
}

StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
LegacyStickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1460,8 +1460,8 @@ public void shouldReturnRackAwareAssignmentNonOverlapCost() {

@Test
public void shouldReturnTaskAssignorClass() {
props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, "StickyTaskAssignor");
assertEquals("StickyTaskAssignor", new StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, "LegacyStickyTaskAssignor");
assertEquals("LegacyStickyTaskAssignor", new StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
Expand Down Expand Up @@ -102,26 +102,26 @@ public void testHighAvailabilityTaskAssignorManyThreadsPerClient() {
completeLargeAssignment(1_000, 10, 1000, 1, HighAvailabilityTaskAssignor.class);
}

/* StickyTaskAssignor tests */
/* LegacyStickyTaskAssignor tests */

@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorLargePartitionCount() {
completeLargeAssignment(2_000, 2, 1, 1, StickyTaskAssignor.class);
completeLargeAssignment(2_000, 2, 1, 1, LegacyStickyTaskAssignor.class);
}

@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorLargeNumConsumers() {
completeLargeAssignment(1_000, 1_000, 1, 1, StickyTaskAssignor.class);
completeLargeAssignment(1_000, 1_000, 1, 1, LegacyStickyTaskAssignor.class);
}

@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorManyStandbys() {
completeLargeAssignment(1_000, 100, 1, 20, StickyTaskAssignor.class);
completeLargeAssignment(1_000, 100, 1, 20, LegacyStickyTaskAssignor.class);
}

@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorManyThreadsPerClient() {
completeLargeAssignment(1_000, 10, 1000, 1, StickyTaskAssignor.class);
completeLargeAssignment(1_000, 10, 1000, 1, LegacyStickyTaskAssignor.class);
}

/* FallbackPriorTaskAssignor tests */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
Expand Down Expand Up @@ -344,8 +344,8 @@ public static Collection<Object[]> parameters() {
return asList(
new Object[]{HighAvailabilityTaskAssignor.class, true, null},
new Object[]{HighAvailabilityTaskAssignor.class, false, null},
new Object[]{StickyTaskAssignor.class, true, null},
new Object[]{StickyTaskAssignor.class, false, null},
new Object[]{LegacyStickyTaskAssignor.class, true, null},
new Object[]{LegacyStickyTaskAssignor.class, false, null},
new Object[]{FallbackPriorTaskAssignor.class, true, null},
new Object[]{FallbackPriorTaskAssignor.class, false, null},
new Object[]{null, false, org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor.class},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignorTest;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetricsTest;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;

Expand All @@ -35,7 +35,7 @@
GlobalStateTaskTest.class,
TaskManagerTest.class,
TaskMetricsTest.class,
StickyTaskAssignorTest.class,
LegacyStickyTaskAssignorTest.class,
StreamsPartitionAssignorTest.class,
StandbyTaskCreationIntegrationTest.class,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
import static org.mockito.Mockito.spy;

@RunWith(Parameterized.class)
public class StickyTaskAssignorTest {
public class LegacyStickyTaskAssignorTest {

private final List<Integer> expectedTopicGroupIds = asList(1, 2);
private final Time time = new MockTime();
Expand Down Expand Up @@ -807,7 +807,7 @@ public void shouldViolateBalanceToPreserveActiveTaskStickiness() {
time
);

final boolean probingRebalanceNeeded = new StickyTaskAssignor(true).assign(
final boolean probingRebalanceNeeded = new LegacyStickyTaskAssignor(true).assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
Expand Down Expand Up @@ -857,7 +857,7 @@ public void shouldOptimizeStatefulAndStatelessTaskTraffic() {
time
);

final boolean probingRebalanceNeeded = new StickyTaskAssignor().assign(
final boolean probingRebalanceNeeded = new LegacyStickyTaskAssignor().assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(statefulTaskIds),
Expand Down Expand Up @@ -937,7 +937,7 @@ public void shouldAssignRandomInput() {
tpSize, partitionSize, maxCapacity, false, statefulTasks);


final boolean probing = new StickyTaskAssignor().assign(
final boolean probing = new LegacyStickyTaskAssignor().assign(
clientStateMap,
taskIds,
statefulTasks,
Expand Down Expand Up @@ -1005,7 +1005,7 @@ public void shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy()
final SortedMap<ProcessId, ClientState> clientStateMap = getRandomClientState(clientSize,
tpSize, partitionSize, maxCapacity, false, statefulTasks);

new StickyTaskAssignor().assign(
new LegacyStickyTaskAssignor().assign(
clientStateMap,
taskIds,
statefulTasks,
Expand Down Expand Up @@ -1047,7 +1047,7 @@ public void shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy()
time
));

new StickyTaskAssignor().assign(
new LegacyStickyTaskAssignor().assign(
clientStateMapCopy,
taskIds,
statefulTasks,
Expand Down Expand Up @@ -1085,7 +1085,7 @@ private boolean assign(final int numStandbys, final TaskId... tasks) {
private boolean assign(final AssignmentConfigs configs, final RackAwareTaskAssignor rackAwareTaskAssignor, final TaskId... tasks) {
final List<TaskId> taskIds = asList(tasks);
Collections.shuffle(taskIds);
return new StickyTaskAssignor().assign(
return new LegacyStickyTaskAssignor().assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def test_streams_should_scale_in_while_brokers_down(self, metadata_quorum, use_n
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs(
extra_configs=",application.id=shutdown_with_broker_down" +
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
)

processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
Expand Down Expand Up @@ -236,7 +236,7 @@ def test_streams_should_failover_while_brokers_down(self, metadata_quorum, use_n
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs(
extra_configs=",application.id=failover_with_broker_down" +
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
)

processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self, test_context):
def test_standby_tasks_rebalance(self, metadata_quorum, use_new_coordinator=False):
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs(
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor" % (
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % (
self.streams_source_topic,
self.streams_sink_topic_1,
self.streams_sink_topic_2
Expand Down

0 comments on commit 4333af5

Please sign in to comment.