diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java index fcd7ffe7f89..abc8b8ecb8e 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java @@ -116,8 +116,8 @@ public static KafkaMirrorMaker2Connectors fromCrd(Reconciliation reconciliation, errorMessages.add("Target cluster alias " + mirror.getTargetCluster() + " is used in a mirror definition, but cluster with this alias does not exist in cluster definitions"); } - if (!mirror.getTargetCluster().equals(connectCluster)) { - errorMessages.add("Connect cluster alias (currently set to " + connectCluster + ") has to be the same as the target cluster alias " + mirror.getTargetCluster()); + if (!mirror.getTargetCluster().equals(connectCluster) && !hasMatchingBootstrapServers(kafkaMirrorMaker2.getSpec().getClusters(), connectCluster, mirror.getTargetCluster())) { + errorMessages.add("Connect cluster alias (currently set to " + connectCluster + ") must match the target cluster alias " + mirror.getTargetCluster() + " or both clusters must have the same bootstrap servers."); } } @@ -326,4 +326,23 @@ private static String addTLSConfigToMirrorMaker2ConnectorConfig(Map clusterList, String connectClusterAlias, String targetClusterAlias) { + // Find the cluster for the connectClusterAlias + String connectClusterBootstrap = clusterList.stream() + .filter(cluster -> connectClusterAlias.equals(cluster.getAlias())) + .map(KafkaMirrorMaker2ClusterSpec::getBootstrapServers) + .findFirst() + .orElse(null); + + // Find the cluster for the targetClusterAlias + String targetClusterBootstrap = clusterList.stream() + .filter(cluster -> targetClusterAlias.equals(cluster.getAlias())) + .map(KafkaMirrorMaker2ClusterSpec::getBootstrapServers) + .findFirst() + .orElse(null); + + // Return true if both are found and have matching bootstrap servers + return connectClusterBootstrap != null && connectClusterBootstrap.equals(targetClusterBootstrap); + } } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java index efdcab5ca8e..a1c16fc811d 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java @@ -118,7 +118,7 @@ public void testFailingValidation() { assertThat(ex.getMessage(), is("KafkaMirrorMaker2 resource validation failed: " + "[Each MirrorMaker 2 mirror definition has to specify the source cluster alias, " + "Target cluster alias wrong-target is used in a mirror definition, but cluster with this alias does not exist in cluster definitions, " + - "Connect cluster alias (currently set to target) has to be the same as the target cluster alias wrong-target]")); + "Connect cluster alias (currently set to target) must match the target cluster alias wrong-target or both clusters must have the same bootstrap servers.]")); } @Test @@ -131,7 +131,7 @@ public void testMirrorTargetClusterNotSameAsConnectCluster() { .build(); InvalidResourceException ex = assertThrows(InvalidResourceException.class, () -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2)); assertThat(ex.getMessage(), is("KafkaMirrorMaker2 resource validation failed: " + - "[Connect cluster alias (currently set to source) has to be the same as the target cluster alias target]")); + "[Connect cluster alias (currently set to source) must match the target cluster alias target or both clusters must have the same bootstrap servers.]")); // A case where one mirror has the correct target cluster, but the other does not KafkaMirrorMaker2 kmm2CorrectAndIncorrectMirror = new KafkaMirrorMaker2Builder(KMM2) @@ -147,7 +147,62 @@ public void testMirrorTargetClusterNotSameAsConnectCluster() { .build(); ex = assertThrows(InvalidResourceException.class, () -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2CorrectAndIncorrectMirror)); assertThat(ex.getMessage(), is("KafkaMirrorMaker2 resource validation failed: " + - "[Connect cluster alias (currently set to target) has to be the same as the target cluster alias third]")); + "[Connect cluster alias (currently set to target) must match the target cluster alias third or both clusters must have the same bootstrap servers.]")); + } + + @Test + public void testClusterNotSameButBootstrapUrlSame() { + KafkaMirrorMaker2 kmm2 = new KafkaMirrorMaker2Builder(KMM2) + .editSpec() + .withConnectCluster("source") + .addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder() + .withAlias("third") + .withBootstrapServers("source:9092") + .build()) + .editMirror(0) + .withTargetCluster("third") + .endMirror() + .endSpec() + .build(); + + assertDoesNotThrow(() -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2)); + } + + @Test + public void testSourceClusterNotConnectCluster() { + KafkaMirrorMaker2 kmm2 = new KafkaMirrorMaker2Builder(KMM2) + .editSpec() + .withConnectCluster("target") + .addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder() + .withAlias("third") + .withBootstrapServers("source:9092") + .build()) + .editMirror(0) + .withTargetCluster("third") + .endMirror() + .endSpec() + .build(); + + InvalidResourceException ex = assertThrows(InvalidResourceException.class, () -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2)); + assertThat(ex.getMessage(), is("KafkaMirrorMaker2 resource validation failed: " + + "[Connect cluster alias (currently set to target) must match the target cluster alias third or both clusters must have the same bootstrap servers.]")); + } + + @Test + public void testMultipleMirrors() { + KafkaMirrorMaker2 kmm2CorrectAndIncorrectMirror = new KafkaMirrorMaker2Builder(KMM2) + .editSpec() + .addToClusters(new KafkaMirrorMaker2ClusterSpecBuilder() + .withAlias("fourth") + .withBootstrapServers("target:9092") + .build()) + .addToMirrors(new KafkaMirrorMaker2MirrorSpecBuilder() + .withSourceCluster("source") + .withTargetCluster("fourth").build()) + .endSpec() + .build(); + + assertDoesNotThrow(() -> KafkaMirrorMaker2Connectors.validateConnectors(kmm2CorrectAndIncorrectMirror)); } @Test