Skip to content

Commit

Permalink
Fix shouldUpdateTopicInKafkaWhen* test flakiness
Browse files Browse the repository at this point in the history
Signed-off-by: Federico Valeri <[email protected]>
  • Loading branch information
fvaleri committed Dec 7, 2024
1 parent c079854 commit 9d68abc
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verifyNoInteractions;

/**
* This test is not intended to provide lots of coverage of the {@link BatchingTopicController},
* rather it aims to cover some parts that a difficult to test via {@link TopicControllerIT}.
*/
@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
class BatchingTopicControllerIT implements TestSeparator {
private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(BatchingTopicControllerIT.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,20 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;

/**
* This integration test suite provides coverage of the {@link BatchingTopicController}.
* If you need to test individual units of code, use the the {@link BatchingTopicController}.
*/
@SuppressWarnings("checkstyle:ClassFanOutComplexity")
class TopicControllerIT implements TestSeparator {
private static final Logger LOGGER = LogManager.getLogger(TopicControllerIT.class);

private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(TopicControllerIT.class);
public static final Map<String, String> SELECTOR = Map.of("foo", "FOO", "bar", "BAR");
class TopicOperatorIT implements TestSeparator {
private static final Logger LOGGER = LogManager.getLogger(TopicOperatorIT.class);

private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(TopicOperatorIT.class);
private static final Map<String, String> SELECTOR = Map.of("foo", "FOO", "bar", "BAR");
private static final Map<String, Object> TEST_TOPIC_CONFIG = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"),
TopicConfig.COMPRESSION_TYPE_CONFIG, "producer",
TopicConfig.FLUSH_MS_CONFIG, 1234L,
TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234,
TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6,
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true
);

static KubernetesClient kubernetesClient;
TopicOperatorMain operator;
Expand Down Expand Up @@ -437,21 +441,13 @@ static List<KafkaTopic> managedKafkaTopics() {

static List<KafkaTopic> managedKafkaTopicsWithConfigs() {
var topicName = "topic" + System.nanoTime();
var configs = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"), // list typed
TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed
TopicConfig.FLUSH_MS_CONFIG, 1234L, // long typed
TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234, // int typed
TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6, // double typed
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true // boolean typed
);
return List.of(
kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, configs),
kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, configs)
kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, TEST_TOPIC_CONFIG),
kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG)
);
}

Expand Down Expand Up @@ -851,14 +847,10 @@ private void shouldUpdateTopicInKafkaWhenConfigChangedInKube(StrimziKafkaCluster
UnaryOperator<Map<String, String>> expectedChangedConfigs) throws ExecutionException, InterruptedException, TimeoutException {
// given
var expectedTopicName = TopicOperatorUtil.topicName(kt);
var expectedCreateConfigs = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, "compact", // list typed
TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed
TopicConfig.FLUSH_MS_CONFIG, "1234", // long typed
TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1234", // int typed
TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.6", // double typed
TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true" // boolean typed
);
var expectedCreateConfigs = TEST_TOPIC_CONFIG.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() instanceof List
? String.join(",", (List) e.getValue()) : String.valueOf(e.getValue())));

Map<String, String> expectedConfigs = expectedChangedConfigs.apply(expectedCreateConfigs);
assertNotEquals(expectedCreateConfigs, expectedConfigs);

Expand All @@ -872,17 +864,19 @@ private void shouldUpdateTopicInKafkaWhenConfigChangedInKube(StrimziKafkaCluster
assertEquals(expectedConfigs, topicConfigMap(expectedTopicName));
}

// TODO remove run CI 9

@Test
public void shouldUpdateTopicInKafkaWhenStringConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException {
startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false"));
List<KafkaTopic> topics = managedKafkaTopicsWithConfigs();
var topics = managedKafkaTopicsWithConfigs();

for (KafkaTopic kt : topics) {
shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt,
TopicControllerIT::setSnappyCompression,
TopicOperatorIT::setSnappyCompression,
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy");
var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.replace(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy");
return expectedUpdatedConfigs;
});
}
Expand All @@ -891,7 +885,7 @@ public void shouldUpdateTopicInKafkaWhenStringConfigChangedInKube() throws Execu
@Test
public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException {
startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false"));
List<KafkaTopic> topics = managedKafkaTopicsWithConfigs();
var topics = managedKafkaTopicsWithConfigs();

for (KafkaTopic kt : topics) {
shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt,
Expand All @@ -900,8 +894,8 @@ public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws Executio
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "5678");
var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.replace(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "5678");
return expectedUpdatedConfigs;
});
}
Expand All @@ -910,7 +904,7 @@ public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws Executio
@Test
public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException {
startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false"));
List<KafkaTopic> topics = managedKafkaTopicsWithConfigs();
var topics = managedKafkaTopicsWithConfigs();

for (KafkaTopic kt : topics) {
shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt,
Expand All @@ -919,8 +913,8 @@ public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws Executi
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.FLUSH_MS_CONFIG, "9876");
var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.replace(TopicConfig.FLUSH_MS_CONFIG, "9876");
return expectedUpdatedConfigs;
});
}
Expand All @@ -929,7 +923,7 @@ public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws Executi
@Test
public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException {
startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false"));
List<KafkaTopic> topics = managedKafkaTopicsWithConfigs();
var topics = managedKafkaTopicsWithConfigs();

for (KafkaTopic kt : topics) {
shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt,
Expand All @@ -938,8 +932,8 @@ public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws Execu
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1");
var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.replace(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1");
return expectedUpdatedConfigs;
});
}
Expand All @@ -948,7 +942,7 @@ public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws Execu
@Test
public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException {
startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false"));
List<KafkaTopic> topics = managedKafkaTopicsWithConfigs();
var topics = managedKafkaTopicsWithConfigs();

for (KafkaTopic kt : topics) {
shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt,
Expand All @@ -957,8 +951,8 @@ public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws Exec
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false");
var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.replace(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false");
return expectedUpdatedConfigs;
});
}
Expand All @@ -967,7 +961,7 @@ public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws Exec
@Test
public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException {
startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false"));
List<KafkaTopic> topics = managedKafkaTopicsWithConfigs();
var topics = managedKafkaTopicsWithConfigs();

for (KafkaTopic kt : topics) {
shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt,
Expand All @@ -976,8 +970,8 @@ public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws Executi
return theKt;
},
expectedCreateConfigs -> {
Map<String, String> expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.replace(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
return expectedUpdatedConfigs;
});
}
Expand All @@ -986,7 +980,7 @@ public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws Executi
@Test
public void shouldUpdateTopicInKafkaWhenConfigRemovedInKube() throws ExecutionException, InterruptedException, TimeoutException {
startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false"));
List<KafkaTopic> topics = managedKafkaTopicsWithConfigs();
var topics = managedKafkaTopicsWithConfigs();

for (KafkaTopic kt : topics) {
shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt,
Expand All @@ -995,7 +989,7 @@ public void shouldUpdateTopicInKafkaWhenConfigRemovedInKube() throws ExecutionEx
return theKt;
},
expectedCreateConfigs -> {
var expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs);
var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs);
expectedUpdatedConfigs.remove(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG);
return expectedUpdatedConfigs;
});
Expand Down Expand Up @@ -1064,7 +1058,7 @@ public void shouldUpdateTopicInKafkaWhenPartitionsIncreasedInKube() throws Execu

// when: partitions is increased
modifyTopicAndAwait(kt,
TopicControllerIT::incrementPartitions,
TopicOperatorIT::incrementPartitions,
readyIsTrue());

// then
Expand Down Expand Up @@ -1280,7 +1274,7 @@ private static TopicOperatorConfig topicOperatorConfig(String ns, StrimziKafkaCl
return new TopicOperatorConfig(ns,
Labels.fromMap(SELECTOR),
kafkaCluster.getBootstrapServers(),
TopicControllerIT.class.getSimpleName(),
TopicOperatorIT.class.getSimpleName(),
fullReconciliationIntervalMs,
false, "", "", "", "", "",
false, "", "", "", "", "",
Expand Down Expand Up @@ -1702,7 +1696,7 @@ private void accountForReassigningPartitions(
// then
// trigger reconciliation by change a config
var modified = modifyTopicAndAwait(created,
TopicControllerIT::setSnappyCompression,
TopicOperatorIT::setSnappyCompression,
duringReassignmentPredicate);

assertFalse(kafkaAdminClient.listPartitionReassignments(Set.of(tp)).reassignments().get().isEmpty(),
Expand All @@ -1721,7 +1715,7 @@ private void accountForReassigningPartitions(

// trigger reconciliation by changing a config again
modifyTopicAndAwait(modified,
TopicControllerIT::setGzipCompression,
TopicOperatorIT::setGzipCompression,
postReassignmentPredicate);
}

Expand Down Expand Up @@ -1798,7 +1792,7 @@ public void shouldFailAlterConfigIfNoTopicAuthz() throws ExecutionException, Int
maybeStartOperator(config);
createTopicAndAssertSuccess(kafkaCluster, kt);

var modified = modifyTopicAndAwait(kt, TopicControllerIT::setSnappyCompression, readyIsFalse());
var modified = modifyTopicAndAwait(kt, TopicOperatorIT::setSnappyCompression, readyIsFalse());
var condition = assertExactlyOneCondition(modified);
assertEquals("KafkaError", condition.getReason());
assertEquals("org.apache.kafka.common.errors.TopicAuthorizationException: not allowed", condition.getMessage());
Expand Down Expand Up @@ -1880,7 +1874,7 @@ public void shouldFailAddPartitionsIfNoTopicAuthz() throws ExecutionException, I
createTopicAndAssertSuccess(kafkaCluster, kt);

var modified = modifyTopicAndAwait(kt,
TopicControllerIT::incrementPartitions,
TopicOperatorIT::incrementPartitions,
readyIsFalse());
var condition = assertExactlyOneCondition(modified);
assertEquals("KafkaError", condition.getReason());
Expand Down Expand Up @@ -2043,7 +2037,7 @@ public void shouldTerminateIfQueueFull() throws ExecutionException, InterruptedE
String ns = createNamespace(NAMESPACE);

var config = new TopicOperatorConfig(ns, Labels.fromMap(SELECTOR),
kafkaCluster.getBootstrapServers(), TopicControllerIT.class.getSimpleName(), 10_000,
kafkaCluster.getBootstrapServers(), TopicOperatorIT.class.getSimpleName(), 10_000,
false, "", "", "", "", "",
false, "", "", "", "", "",
true,
Expand Down

0 comments on commit 9d68abc

Please sign in to comment.