From 9d68abc6d1633bbbb359a086bf7bbe6a21dc1dfc Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Fri, 6 Dec 2024 15:34:14 +0100 Subject: [PATCH] Fix shouldUpdateTopicInKafkaWhen* test flakiness Signed-off-by: Federico Valeri --- .../topic/BatchingTopicControllerIT.java | 4 - ...ControllerIT.java => TopicOperatorIT.java} | 112 +++++++++--------- 2 files changed, 53 insertions(+), 63 deletions(-) rename topic-operator/src/test/java/io/strimzi/operator/topic/{TopicControllerIT.java => TopicOperatorIT.java} (96%) diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java index c9e99106ae..fdbfdb950e 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/BatchingTopicControllerIT.java @@ -71,10 +71,6 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.verifyNoInteractions; -/** - * This test is not intended to provide lots of coverage of the {@link BatchingTopicController}, - * rather it aims to cover some parts that a difficult to test via {@link TopicControllerIT}. - */ @SuppressWarnings("checkstyle:ClassDataAbstractionCoupling") class BatchingTopicControllerIT implements TestSeparator { private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(BatchingTopicControllerIT.class); diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicOperatorIT.java similarity index 96% rename from topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java rename to topic-operator/src/test/java/io/strimzi/operator/topic/TopicOperatorIT.java index 30ce19eef3..ff315d3f15 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicOperatorIT.java @@ -97,16 +97,20 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -/** - * This integration test suite provides coverage of the {@link BatchingTopicController}. - * If you need to test individual units of code, use the the {@link BatchingTopicController}. - */ @SuppressWarnings("checkstyle:ClassFanOutComplexity") -class TopicControllerIT implements TestSeparator { - private static final Logger LOGGER = LogManager.getLogger(TopicControllerIT.class); - - private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(TopicControllerIT.class); - public static final Map SELECTOR = Map.of("foo", "FOO", "bar", "BAR"); +class TopicOperatorIT implements TestSeparator { + private static final Logger LOGGER = LogManager.getLogger(TopicOperatorIT.class); + + private static final String NAMESPACE = TopicOperatorTestUtil.namespaceName(TopicOperatorIT.class); + private static final Map SELECTOR = Map.of("foo", "FOO", "bar", "BAR"); + private static final Map TEST_TOPIC_CONFIG = Map.of( + TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"), + TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", + TopicConfig.FLUSH_MS_CONFIG, 1234L, + TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234, + TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6, + TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true + ); static KubernetesClient kubernetesClient; TopicOperatorMain operator; @@ -437,21 +441,13 @@ static List managedKafkaTopics() { static List managedKafkaTopicsWithConfigs() { var topicName = "topic" + System.nanoTime(); - var configs = Map.of( - TopicConfig.CLEANUP_POLICY_CONFIG, List.of("compact"), // list typed - TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed - TopicConfig.FLUSH_MS_CONFIG, 1234L, // long typed - TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, 1234, // int typed - TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.6, // double typed - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, true // boolean typed - ); return List.of( - kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, configs), - kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, configs) + kafkaTopic(NAMESPACE, topicName + "a", SELECTOR, null, true, topicName + "a", 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "b", SELECTOR, null, true, null, 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "c", SELECTOR, null, true, topicName + "c".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "d", SELECTOR, null, null, topicName + "d", 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "e", SELECTOR, null, null, null, 2, 1, TEST_TOPIC_CONFIG), + kafkaTopic(NAMESPACE, topicName + "f", SELECTOR, null, null, topicName + "f".toUpperCase(Locale.ROOT), 2, 1, TEST_TOPIC_CONFIG) ); } @@ -851,14 +847,10 @@ private void shouldUpdateTopicInKafkaWhenConfigChangedInKube(StrimziKafkaCluster UnaryOperator> expectedChangedConfigs) throws ExecutionException, InterruptedException, TimeoutException { // given var expectedTopicName = TopicOperatorUtil.topicName(kt); - var expectedCreateConfigs = Map.of( - TopicConfig.CLEANUP_POLICY_CONFIG, "compact", // list typed - TopicConfig.COMPRESSION_TYPE_CONFIG, "producer", // string typed - TopicConfig.FLUSH_MS_CONFIG, "1234", // long typed - TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1234", // int typed - TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.6", // double typed - TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true" // boolean typed - ); + var expectedCreateConfigs = TEST_TOPIC_CONFIG.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() instanceof List + ? String.join(",", (List) e.getValue()) : String.valueOf(e.getValue()))); + Map expectedConfigs = expectedChangedConfigs.apply(expectedCreateConfigs); assertNotEquals(expectedCreateConfigs, expectedConfigs); @@ -872,17 +864,19 @@ private void shouldUpdateTopicInKafkaWhenConfigChangedInKube(StrimziKafkaCluster assertEquals(expectedConfigs, topicConfigMap(expectedTopicName)); } + // TODO remove run CI 9 + @Test public void shouldUpdateTopicInKafkaWhenStringConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException { startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false")); - List topics = managedKafkaTopicsWithConfigs(); + var topics = managedKafkaTopicsWithConfigs(); for (KafkaTopic kt : topics) { shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, - TopicControllerIT::setSnappyCompression, + TopicOperatorIT::setSnappyCompression, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); - expectedUpdatedConfigs.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); + expectedUpdatedConfigs.replace(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"); return expectedUpdatedConfigs; }); } @@ -891,7 +885,7 @@ public void shouldUpdateTopicInKafkaWhenStringConfigChangedInKube() throws Execu @Test public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException { startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false")); - List topics = managedKafkaTopicsWithConfigs(); + var topics = managedKafkaTopicsWithConfigs(); for (KafkaTopic kt : topics) { shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, @@ -900,8 +894,8 @@ public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws Executio return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); - expectedUpdatedConfigs.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "5678"); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); + expectedUpdatedConfigs.replace(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "5678"); return expectedUpdatedConfigs; }); } @@ -910,7 +904,7 @@ public void shouldUpdateTopicInKafkaWhenIntConfigChangedInKube() throws Executio @Test public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException { startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false")); - List topics = managedKafkaTopicsWithConfigs(); + var topics = managedKafkaTopicsWithConfigs(); for (KafkaTopic kt : topics) { shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, @@ -919,8 +913,8 @@ public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws Executi return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); - expectedUpdatedConfigs.put(TopicConfig.FLUSH_MS_CONFIG, "9876"); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); + expectedUpdatedConfigs.replace(TopicConfig.FLUSH_MS_CONFIG, "9876"); return expectedUpdatedConfigs; }); } @@ -929,7 +923,7 @@ public void shouldUpdateTopicInKafkaWhenLongConfigChangedInKube() throws Executi @Test public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException { startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false")); - List topics = managedKafkaTopicsWithConfigs(); + var topics = managedKafkaTopicsWithConfigs(); for (KafkaTopic kt : topics) { shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, @@ -938,8 +932,8 @@ public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws Execu return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); - expectedUpdatedConfigs.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1"); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); + expectedUpdatedConfigs.replace(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.1"); return expectedUpdatedConfigs; }); } @@ -948,7 +942,7 @@ public void shouldUpdateTopicInKafkaWhenDoubleConfigChangedInKube() throws Execu @Test public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException { startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false")); - List topics = managedKafkaTopicsWithConfigs(); + var topics = managedKafkaTopicsWithConfigs(); for (KafkaTopic kt : topics) { shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, @@ -957,8 +951,8 @@ public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws Exec return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); - expectedUpdatedConfigs.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); + expectedUpdatedConfigs.replace(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false"); return expectedUpdatedConfigs; }); } @@ -967,7 +961,7 @@ public void shouldUpdateTopicInKafkaWhenBooleanConfigChangedInKube() throws Exec @Test public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws ExecutionException, InterruptedException, TimeoutException { startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false")); - List topics = managedKafkaTopicsWithConfigs(); + var topics = managedKafkaTopicsWithConfigs(); for (KafkaTopic kt : topics) { shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, @@ -976,8 +970,8 @@ public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws Executi return theKt; }, expectedCreateConfigs -> { - Map expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); - expectedUpdatedConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete"); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); + expectedUpdatedConfigs.replace(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete"); return expectedUpdatedConfigs; }); } @@ -986,7 +980,7 @@ public void shouldUpdateTopicInKafkaWhenListConfigChangedInKube() throws Executi @Test public void shouldUpdateTopicInKafkaWhenConfigRemovedInKube() throws ExecutionException, InterruptedException, TimeoutException { startKafkaCluster(1, 1, Map.of("auto.create.topics.enable", "false")); - List topics = managedKafkaTopicsWithConfigs(); + var topics = managedKafkaTopicsWithConfigs(); for (KafkaTopic kt : topics) { shouldUpdateTopicInKafkaWhenConfigChangedInKube(kafkaCluster, kt, @@ -995,7 +989,7 @@ public void shouldUpdateTopicInKafkaWhenConfigRemovedInKube() throws ExecutionEx return theKt; }, expectedCreateConfigs -> { - var expectedUpdatedConfigs = new HashMap<>(expectedCreateConfigs); + var expectedUpdatedConfigs = new LinkedHashMap<>(expectedCreateConfigs); expectedUpdatedConfigs.remove(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG); return expectedUpdatedConfigs; }); @@ -1064,7 +1058,7 @@ public void shouldUpdateTopicInKafkaWhenPartitionsIncreasedInKube() throws Execu // when: partitions is increased modifyTopicAndAwait(kt, - TopicControllerIT::incrementPartitions, + TopicOperatorIT::incrementPartitions, readyIsTrue()); // then @@ -1280,7 +1274,7 @@ private static TopicOperatorConfig topicOperatorConfig(String ns, StrimziKafkaCl return new TopicOperatorConfig(ns, Labels.fromMap(SELECTOR), kafkaCluster.getBootstrapServers(), - TopicControllerIT.class.getSimpleName(), + TopicOperatorIT.class.getSimpleName(), fullReconciliationIntervalMs, false, "", "", "", "", "", false, "", "", "", "", "", @@ -1702,7 +1696,7 @@ private void accountForReassigningPartitions( // then // trigger reconciliation by change a config var modified = modifyTopicAndAwait(created, - TopicControllerIT::setSnappyCompression, + TopicOperatorIT::setSnappyCompression, duringReassignmentPredicate); assertFalse(kafkaAdminClient.listPartitionReassignments(Set.of(tp)).reassignments().get().isEmpty(), @@ -1721,7 +1715,7 @@ private void accountForReassigningPartitions( // trigger reconciliation by changing a config again modifyTopicAndAwait(modified, - TopicControllerIT::setGzipCompression, + TopicOperatorIT::setGzipCompression, postReassignmentPredicate); } @@ -1798,7 +1792,7 @@ public void shouldFailAlterConfigIfNoTopicAuthz() throws ExecutionException, Int maybeStartOperator(config); createTopicAndAssertSuccess(kafkaCluster, kt); - var modified = modifyTopicAndAwait(kt, TopicControllerIT::setSnappyCompression, readyIsFalse()); + var modified = modifyTopicAndAwait(kt, TopicOperatorIT::setSnappyCompression, readyIsFalse()); var condition = assertExactlyOneCondition(modified); assertEquals("KafkaError", condition.getReason()); assertEquals("org.apache.kafka.common.errors.TopicAuthorizationException: not allowed", condition.getMessage()); @@ -1880,7 +1874,7 @@ public void shouldFailAddPartitionsIfNoTopicAuthz() throws ExecutionException, I createTopicAndAssertSuccess(kafkaCluster, kt); var modified = modifyTopicAndAwait(kt, - TopicControllerIT::incrementPartitions, + TopicOperatorIT::incrementPartitions, readyIsFalse()); var condition = assertExactlyOneCondition(modified); assertEquals("KafkaError", condition.getReason()); @@ -2043,7 +2037,7 @@ public void shouldTerminateIfQueueFull() throws ExecutionException, InterruptedE String ns = createNamespace(NAMESPACE); var config = new TopicOperatorConfig(ns, Labels.fromMap(SELECTOR), - kafkaCluster.getBootstrapServers(), TopicControllerIT.class.getSimpleName(), 10_000, + kafkaCluster.getBootstrapServers(), TopicOperatorIT.class.getSimpleName(), 10_000, false, "", "", "", "", "", false, "", "", "", "", "", true,