Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate StrimziPodSets before rolling during CA renewal #10711

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* after a scaling up, the operator triggers an auto-rebalancing for moving some of the existing partitions to the newly added brokers.
* before scaling down, and if the brokers to remove are hosting partitions, the operator triggers an auto-rebalancing to these partitions off the brokers to make them free to be removed.
* Strimzi Access Operator 0.1.0 added to the installation files and examples
* Allow rolling update for new cluster CA trust (during Cluster CA key replacement) to continue where it left off before interruption without rolling all pods again.

### Changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,36 @@ public static StrimziPodSet createPodSet(
.build();
}

/**
* Patch a Strimzi PodSet to merge the provided annotations with the annotations on the Pod resources defined
* in the PodSet
*
* @param strimziPodSet Strimzi PodSet to patch
* @param annotationsToBeUpdated Annotations to merge with the existing annotations
*
* @return Patched PodSet
*/
public static StrimziPodSet patchAnnotations(StrimziPodSet strimziPodSet, Map<String, String> annotationsToBeUpdated) {
List<Map<String, Object>> newPods = PodSetUtils.podSetToPods(strimziPodSet)
.stream()
.map(pod -> {
Map<String, String> updatedAnnotations = pod.getMetadata().getAnnotations();
updatedAnnotations.putAll(annotationsToBeUpdated);
return pod.edit()
.editMetadata()
.withAnnotations(updatedAnnotations)
.endMetadata()
.build();
})
.map(PodSetUtils::podToMap)
.toList();
return new StrimziPodSetBuilder(strimziPodSet)
.editSpec()
.withPods(newPods)
.endSpec()
.build();
}

/**
* Creates a stateful Pod for use with StrimziPodSets. Stateful in this context means that it has a stable name and
* typically uses storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.model.WorkloadUtils;
import io.strimzi.operator.cluster.operator.resource.KafkaAgentClientProvider;
import io.strimzi.operator.cluster.operator.resource.KafkaRoller;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -55,11 +56,11 @@

import java.time.Clock;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Class used for reconciliation of Cluster and Client CAs. This class contains both the steps of the CA reconciliation
Expand Down Expand Up @@ -395,8 +396,8 @@ Future<Void> rollingUpdateForNewCaKey() {
TlsPemIdentity coTlsPemIdentity = new TlsPemIdentity(new PemTrustSet(clusterCa.caCertSecret()), PemAuthIdentity.clusterOperator(coSecret));
return getZooKeeperReplicas()
.compose(replicas -> maybeRollZookeeper(replicas, podRollReasons, coTlsPemIdentity))
.compose(i -> getKafkaReplicas())
.compose(nodes -> rollKafkaBrokers(nodes, podRollReasons, coTlsPemIdentity))
.compose(i -> patchCaGenerationAndReturnNodes())
.compose(nodes -> rollKafka(nodes, podRollReasons, coTlsPemIdentity))
.compose(i -> maybeRollDeploymentIfExists(KafkaResources.entityOperatorDeploymentName(reconciliation.name()), podRollReasons))
.compose(i -> maybeRollDeploymentIfExists(KafkaExporterResources.componentName(reconciliation.name()), podRollReasons))
.compose(i -> maybeRollDeploymentIfExists(CruiseControlResources.componentName(reconciliation.name()), podRollReasons));
Expand Down Expand Up @@ -527,27 +528,40 @@ Future<Void> rollingUpdateForNewCaKey() {
}
}

/* test */ Future<Set<NodeRef>> getKafkaReplicas() {
/* test */ Future<Set<NodeRef>> patchCaGenerationAndReturnNodes() {
Labels selectorLabels = Labels.EMPTY
.withStrimziKind(reconciliation.kind())
.withStrimziCluster(reconciliation.name())
.withStrimziName(KafkaResources.kafkaComponentName(reconciliation.name()));

return strimziPodSetOperator.listAsync(reconciliation.namespace(), selectorLabels)
.compose(podSets -> {
Set<NodeRef> nodes = new LinkedHashSet<>();

if (podSets != null) {
for (StrimziPodSet podSet : podSets) {
nodes.addAll(ReconcilerUtils.nodesFromPodSet(podSet));
}
List<StrimziPodSet> updatedPodSets = podSets
.stream()
.map(podSet -> WorkloadUtils.patchAnnotations(
podSet,
Map.of(
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(clusterCa.caCertGeneration()),
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(clusterCa.caKeyGeneration()),
Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, String.valueOf(clientsCa.caCertGeneration())
)))
.toList();
return strimziPodSetOperator.batchReconcile(
reconciliation,
reconciliation.namespace(),
updatedPodSets,
selectorLabels
).map(i -> updatedPodSets.stream()
.flatMap(podSet -> ReconcilerUtils.nodesFromPodSet(podSet).stream())
.collect(Collectors.toSet()));
} else {
return Future.succeededFuture(Set.of());
}

return Future.succeededFuture(nodes);
});

}

/* test */ Future<Void> rollKafkaBrokers(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
/* test */ Future<Void> rollKafka(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
return new KafkaRoller(
reconciliation,
vertx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.podset.StrimziPodSet;
import io.strimzi.api.kafka.model.podset.StrimziPodSetBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -363,6 +364,44 @@ public void testCreateStrimziPodSetFromNodeReferencesWithTemplate() {
assertThat(sps.getSpec().getPods().stream().map(pod -> PodSetUtils.mapToPod(pod).getMetadata().getName()).toList(), hasItems("my-cluster-nodes-10", "my-cluster-nodes-11", "my-cluster-nodes-12"));
}

@Test
public void testPatchPodAnnotations() {
Map<String, String> annotations = Map.of("anno-1", "value-1", "anno-2", "value-2", "anno-3", "value-3");
List<Pod> pods = new ArrayList<>();
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-0")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);
pods.add(new PodBuilder()
.withNewMetadata()
.withName("pod-1")
.withNamespace(NAMESPACE)
.withAnnotations(annotations)
.endMetadata()
.build()
);

StrimziPodSet sps = new StrimziPodSetBuilder()
.withNewMetadata()
.withName("my-sps")
.withNamespace(NAMESPACE)
.endMetadata()
.withNewSpec()
.withPods(PodSetUtils.podsToMaps(pods))
.endSpec()
.build();

List<Pod> resultPods = PodSetUtils.podSetToPods(WorkloadUtils.patchAnnotations(sps, Map.of("anno-2", "value-2a", "anno-4", "value-4")));
assertThat(resultPods.size(), is(2));
Map<String, String> expectedAnnotations = Map.of("anno-1", "value-1", "anno-2", "value-2a", "anno-3", "value-3", "anno-4", "value-4");
assertThat(resultPods.get(0).getMetadata().getAnnotations(), is(expectedAnnotations));
assertThat(resultPods.get(1).getMetadata().getAnnotations(), is(expectedAnnotations));
}

//////////////////////////////////////////////////
// Stateful Pod tests
//////////////////////////////////////////////////
Expand Down
Loading
Loading