Skip to content

Commit

Permalink
Merge branch 'strimzi:main' into mm2-bootstrapserver-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
varada-sunanda-ibm authored Oct 20, 2024
2 parents f4405c5 + 2149b43 commit 70aef0b
Show file tree
Hide file tree
Showing 120 changed files with 4,073 additions and 1,466 deletions.
2 changes: 1 addition & 1 deletion .azure/scripts/setup-helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -x

TEST_HELM3_VERSION=${TEST_HELM3_VERSION:-'v3.15.4'}
TEST_HELM_UNITTEST_VERSION=${TEST_HELM_UNITTEST_VERSION:-'v0.5.2'}
TEST_HELM_UNITTEST_VERSION=${TEST_HELM_UNITTEST_VERSION:-'v0.6.3'}

function install_helm3 {
export HELM_INSTALL_DIR=/usr/bin
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pre-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
- uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1
- uses: actions/setup-python@3542bca2639a428e1796aaa6a2ffef0c0f575566 # v3.1.4
- uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* after a scaling up, the operator triggers an auto-rebalancing for moving some of the existing partitions to the newly added brokers.
* before scaling down, and if the brokers to remove are hosting partitions, the operator triggers an auto-rebalancing to these partitions off the brokers to make them free to be removed.
* Strimzi Access Operator 0.1.0 added to the installation files and examples
* Allow rolling update for new cluster CA trust (during Cluster CA key replacement) to continue where it left off before interruption without rolling all pods again.

### Changes, deprecations and removals

Expand All @@ -22,6 +23,7 @@
This change matches user expectations.
* The External Configuration (`.spec.externalConfiguration`) in `KafkaConnect` and `KafkaMirrorMaker2` resources is deprecated and will be removed in the future.
Please use the environment variables, additional volumes and volume mounts in Pod and container templates instead.
* The Strimzi Canary installation files were removed based on the [_Strimzi proposal 086_](https://github.com/strimzi/proposals/blob/main/086-archive-canary.md) as the project was discontinued and archived.

## 0.43.0

Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/io/strimzi/api/ResourceAnnotations.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public class ResourceAnnotations {
*/
public static final String ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL = STRIMZI_DOMAIN + "rebalance-auto-approval";

/**
* Use this boolean annotation to set the KafkaRebalance as a template to be used for auto-rebalancing operations
*/
public static final String ANNO_STRIMZI_IO_REBALANCE_TEMPLATE = STRIMZI_DOMAIN + "rebalance-template";

/**
* Annotation which enabled the use of the connector operator
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public class KafkaAutoRebalanceConfiguration implements UnknownPropertyPreservin
private LocalObjectReference template;
private Map<String, Object> additionalProperties;

@Description("Mode for which running the auto-rebalancing on scaling, when brokers are added or removed. " +
"The supported modes are `add-brokers` and `remove-brokers`.\n")
@Description("Specifies the mode for automatically rebalancing when brokers are added or removed. " +
"Supported modes are `add-brokers` and `remove-brokers`. \n")
@JsonProperty(required = true)
public KafkaRebalanceMode getMode() {
return mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public class KafkaAutoRebalanceStatus implements UnknownPropertyPreserving {
private List<KafkaAutoRebalanceStatusBrokers> modes;
private Map<String, Object> additionalProperties;

@Description("The current state of the auto-rebalancing operation. Possible values are: \n\n" +
"* Idle as the initial state when an auto-rebalancing is requested or as final state when it completes or fails.\n" +
"* RebalanceOnScaleDown if a rebalancing related to a scale down operation is running.\n" +
"* RebalanceOnScaleUp if a rebalancing related to a scale up operation is running.\n")
@Description("The current state of an auto-rebalancing operation. Possible values are: \n\n" +
"* `Idle` as the initial state when an auto-rebalancing is requested or as final state when it completes or fails.\n" +
"* `RebalanceOnScaleDown` if an auto-rebalance related to a scale-down operation is running.\n" +
"* `RebalanceOnScaleUp` if an auto-rebalance related to a scale-up operation is running.")
public KafkaAutoRebalanceState getState() {
return state;
}
Expand All @@ -56,10 +56,10 @@ public void setLastTransitionTime(String lastTransitionTime) {
this.lastTransitionTime = lastTransitionTime;
}

@Description("List of the modes for which there is an auto-rebalancing operation already running or queued. " +
"For each mode entry, which could be for add-brokers or remove-brokers, it contains either:" +
"- the brokers' IDs relevant to the current ongoing auto-rebalance, or" +
"- the brokers' IDs relevant to a queued auto-rebalance (if a previous auto-rebalance is still in progress)")
@Description("List of modes where an auto-rebalancing operation is either running or queued. \n" +
"Each mode entry (`add-brokers` or `remove-brokers`) includes one of the following: \n\n" +
"* Broker IDs for a current auto-rebalance. \n" +
"* Broker IDs for a queued auto-rebalance (if a previous rebalance is still in progress).")
public List<KafkaAutoRebalanceStatusBrokers> getModes() {
return modes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ public void setMode(KafkaRebalanceMode mode) {
this.mode = mode;
}

@Description("List of brokers' IDs involved in an auto-rebalancing operation related to current mode. " +
"it contains either: " +
"- the brokers' IDs relevant to the current ongoing auto-rebalance, or " +
"- the brokers' IDs relevant to a queued auto-rebalance (if a previous auto-rebalance is still in progress)")
@Description("List of broker IDs involved in an auto-rebalancing operation related to the current mode. \n" +
"The list contains one of the following: \n\n" +
"* Broker IDs for a current auto-rebalance. \n" +
"* Broker IDs for a queued auto-rebalance (if a previous auto-rebalance is still in progress). \n")
public List<Integer> getBrokers() {
return brokers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,35 +51,15 @@
jsonPath = ".metadata.labels.strimzi\\.io/cluster",
type = "string"),
@Crd.Spec.AdditionalPrinterColumn(
name = "PendingProposal",
description = "A proposal has been requested from Cruise Control",
jsonPath = ".status.conditions[?(@.type==\"PendingProposal\")].status",
name = "Template",
description = "If this rebalance resource is a template",
jsonPath = ".metadata.annotations.strimzi\\.io/rebalance-template",
type = "string"),
@Crd.Spec.AdditionalPrinterColumn(
name = "ProposalReady",
description = "A proposal is ready and waiting for approval",
jsonPath = ".status.conditions[?(@.type==\"ProposalReady\")].status",
type = "string"),
@Crd.Spec.AdditionalPrinterColumn(
name = "Rebalancing",
description = "Cruise Control is doing the rebalance",
jsonPath = ".status.conditions[?(@.type==\"Rebalancing\")].status",
type = "string"),
@Crd.Spec.AdditionalPrinterColumn(
name = "Ready",
description = "The rebalance is complete",
jsonPath = ".status.conditions[?(@.type==\"Ready\")].status",
type = "string"),
@Crd.Spec.AdditionalPrinterColumn(
name = "NotReady",
description = "There is an error on the custom resource",
jsonPath = ".status.conditions[?(@.type==\"NotReady\")].status",
type = "string"),
@Crd.Spec.AdditionalPrinterColumn(
name = "Stopped",
description = "Processing the proposal or running rebalancing was stopped",
jsonPath = ".status.conditions[?(@.type==\"Stopped\")].status",
type = "string")
name = "Status",
description = "Status of the current rebalancing operation",
jsonPath = ".status.conditions[*].type",
type = "string")
}
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ public enum KafkaRebalanceAnnotation {
* This value should only be used when in the {@code ProposalReady} or {@code Stopped} states.
*/
refresh,
/**
* Used to represent a KafkaRebalance custom resource that represents a configuration template.
* This is used for auto-rebalancing on scaling to define the configuration template to be used for a specific
* rebalancing mode, as add-brokers or remove-brokers.
* When this annotation is applied to a KafkaRebalance custom resource, it doesn't trigger any actual auto-rebalance,
* instead the resource is just ignored.
*/
template,
/**
* Any other unsupported/unknown annotation value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,36 @@ public static StrimziPodSet createPodSet(
.build();
}

/**
* Patch a Strimzi PodSet to merge the provided annotations with the annotations on the Pod resources defined
* in the PodSet
*
* @param strimziPodSet Strimzi PodSet to patch
* @param annotationsToBeUpdated Annotations to merge with the existing annotations
*
* @return Patched PodSet
*/
public static StrimziPodSet patchAnnotations(StrimziPodSet strimziPodSet, Map<String, String> annotationsToBeUpdated) {
List<Map<String, Object>> newPods = PodSetUtils.podSetToPods(strimziPodSet)
.stream()
.map(pod -> {
Map<String, String> updatedAnnotations = pod.getMetadata().getAnnotations();
updatedAnnotations.putAll(annotationsToBeUpdated);
return pod.edit()
.editMetadata()
.withAnnotations(updatedAnnotations)
.endMetadata()
.build();
})
.map(PodSetUtils::podToMap)
.toList();
return new StrimziPodSetBuilder(strimziPodSet)
.editSpec()
.withPods(newPods)
.endSpec()
.build();
}

/**
* Creates a stateful Pod for use with StrimziPodSets. Stateful in this context means that it has a stable name and
* typically uses storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.RestartReason;
import io.strimzi.operator.cluster.model.RestartReasons;
import io.strimzi.operator.cluster.model.WorkloadUtils;
import io.strimzi.operator.cluster.operator.resource.KafkaAgentClientProvider;
import io.strimzi.operator.cluster.operator.resource.KafkaRoller;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -55,11 +56,11 @@

import java.time.Clock;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Class used for reconciliation of Cluster and Client CAs. This class contains both the steps of the CA reconciliation
Expand Down Expand Up @@ -395,8 +396,8 @@ Future<Void> rollingUpdateForNewCaKey() {
TlsPemIdentity coTlsPemIdentity = new TlsPemIdentity(new PemTrustSet(clusterCa.caCertSecret()), PemAuthIdentity.clusterOperator(coSecret));
return getZooKeeperReplicas()
.compose(replicas -> maybeRollZookeeper(replicas, podRollReasons, coTlsPemIdentity))
.compose(i -> getKafkaReplicas())
.compose(nodes -> rollKafkaBrokers(nodes, podRollReasons, coTlsPemIdentity))
.compose(i -> patchCaGenerationAndReturnNodes())
.compose(nodes -> rollKafka(nodes, podRollReasons, coTlsPemIdentity))
.compose(i -> maybeRollDeploymentIfExists(KafkaResources.entityOperatorDeploymentName(reconciliation.name()), podRollReasons))
.compose(i -> maybeRollDeploymentIfExists(KafkaExporterResources.componentName(reconciliation.name()), podRollReasons))
.compose(i -> maybeRollDeploymentIfExists(CruiseControlResources.componentName(reconciliation.name()), podRollReasons));
Expand Down Expand Up @@ -527,27 +528,40 @@ Future<Void> rollingUpdateForNewCaKey() {
}
}

/* test */ Future<Set<NodeRef>> getKafkaReplicas() {
/* test */ Future<Set<NodeRef>> patchCaGenerationAndReturnNodes() {
Labels selectorLabels = Labels.EMPTY
.withStrimziKind(reconciliation.kind())
.withStrimziCluster(reconciliation.name())
.withStrimziName(KafkaResources.kafkaComponentName(reconciliation.name()));

return strimziPodSetOperator.listAsync(reconciliation.namespace(), selectorLabels)
.compose(podSets -> {
Set<NodeRef> nodes = new LinkedHashSet<>();

if (podSets != null) {
for (StrimziPodSet podSet : podSets) {
nodes.addAll(ReconcilerUtils.nodesFromPodSet(podSet));
}
List<StrimziPodSet> updatedPodSets = podSets
.stream()
.map(podSet -> WorkloadUtils.patchAnnotations(
podSet,
Map.of(
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(clusterCa.caCertGeneration()),
Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(clusterCa.caKeyGeneration()),
Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, String.valueOf(clientsCa.caCertGeneration())
)))
.toList();
return strimziPodSetOperator.batchReconcile(
reconciliation,
reconciliation.namespace(),
updatedPodSets,
selectorLabels
).map(i -> updatedPodSets.stream()
.flatMap(podSet -> ReconcilerUtils.nodesFromPodSet(podSet).stream())
.collect(Collectors.toSet()));
} else {
return Future.succeededFuture(Set.of());
}

return Future.succeededFuture(nodes);
});

}

/* test */ Future<Void> rollKafkaBrokers(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
/* test */ Future<Void> rollKafka(Set<NodeRef> nodes, RestartReasons podRollReasons, TlsPemIdentity coTlsPemIdentity) {
return new KafkaRoller(
reconciliation,
vertx,
Expand Down
Loading

0 comments on commit 70aef0b

Please sign in to comment.