From b6251b464a287c518fa5d151bc714d6744741fbc Mon Sep 17 00:00:00 2001 From: Pedro Guimaraes Date: Tue, 24 Oct 2023 01:18:35 +0200 Subject: [PATCH 01/15] Add first draft of standby_cluster support --- actions.yaml | 6 + metadata.yaml | 5 + src/charm.py | 2 + src/patroni.py | 15 +- src/relations/async_replication.py | 219 +++++++++++++++++++++++++++++ templates/patroni.yml.j2 | 11 ++ 6 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 src/relations/async_replication.py diff --git a/actions.yaml b/actions.yaml index 03a90556cc..1932b38646 100644 --- a/actions.yaml +++ b/actions.yaml @@ -43,3 +43,9 @@ set-tls-private-key: private-key: type: string description: The content of private key for communications with clients. Content will be auto-generated if this option is not specified. + +promote-standby-cluster: + description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster. + +demote-primary-cluster: + description: Demotes the primary cluster of choice to a standby. Must be ran against the charm unit leader of the standby cluster. diff --git a/metadata.yaml b/metadata.yaml index 74485cbe02..e1cfa75799 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -39,6 +39,8 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication database: interface: postgresql_client db: @@ -51,6 +53,9 @@ provides: interface: grafana_dashboard requires: + async-replica: + interface: async_replication + limit: 1 certificates: interface: tls-certificates limit: 1 diff --git a/src/charm.py b/src/charm.py index 6797c86c17..f4b3ecd80b 100755 --- a/src/charm.py +++ b/src/charm.py @@ -76,6 +76,7 @@ WORKLOAD_OS_USER, ) from patroni import NotReadyError, Patroni +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model @@ -152,6 +153,7 @@ def __init__(self, *args): postgresql_db_port = ServicePort(5432, name="database") patroni_api_port = ServicePort(8008, name="api") self.service_patcher = KubernetesServicePatch(self, [postgresql_db_port, patroni_api_port]) + self.async_manager = PostgreSQLAsyncReplication(self) def _generate_metrics_jobs(self, enable_tls: bool) -> Dict: """Generate spec for Prometheus scraping.""" diff --git a/src/patroni.py b/src/patroni.py index 0187633b4f..f15d2ebfaf 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -333,6 +333,9 @@ def render_patroni_yml_file( # Open the template patroni.yml file. with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) + + primary = self._charm.async_manager.get_primary_data() + # Render the template file with the correct values. rendered = template.render( connectivity=connectivity, @@ -343,8 +346,12 @@ def render_patroni_yml_file( is_no_sync_member=is_no_sync_member, namespace=self._namespace, storage_path=self._storage_path, - superuser_password=self._superuser_password, - replication_password=self._replication_password, + superuser_password=primary["superuser-password"] + if primary + else self._superuser_password, + replication_password=primary["replication-password"] + if primary + else self._replication_password, rewind_user=REWIND_USER, rewind_password=self._rewind_password, enable_pgbackrest=stanza is not None, @@ -355,6 +362,10 @@ def render_patroni_yml_file( minority_count=self._members_count // 2, version=self.rock_postgresql_version.split(".")[0], pg_parameters=parameters, + standby_cluster_endpoint=primary["endpoint"] if primary else None, + extra_replication_endpoints={"{}/32".format(primary["endpoint"])} + if primary + else self._charm.async_manager.standby_endpoints(), ) self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..68e1ae4d45 --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,219 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Implements the state-machine. + +1) First async replication relation is made: both units get blocked waiting for a leader +2) User runs the promote action against one of the clusters +3) The cluster moves leader and sets the async-replication data, marking itself as leader +4) The other units receive that new information and update themselves to become standby-leaders. +""" + +import json +import logging +from typing import Dict, Set + +from lightkube import Client +from lightkube.resources.core_v1 import Endpoints +from ops.charm import ( + ActionEvent, + CharmBase, +) +from ops.framework import Object +from ops.model import ( + Unit, +) + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" + + +class MoreThanOnePrimarySelectedError(Exception): + """Represents more than one primary has been selected.""" + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION) -> None: + super().__init__(charm, relation_name) + self.relation_name = relation_name + self.charm = charm + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_primary_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_primary_changed + ) + self.framework.observe( + self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster + ) + self.framework.observe( + self.charm.on.demote_primary_cluster_action, self._on_demote_primary_cluster + ) + + # We treat both relations above as actually the same. + # The big difference appears only at promote/demote actions + self.relation_set = { + *set(self.charm.model.relations[ASYNC_PRIMARY_RELATION]), + *set(self.charm.model.relations[ASYNC_REPLICA_RELATION]), + } + self.container = self.charm.unit.get_container("postgresql") + + @property + def endpoint(self) -> str: + """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + for rel in self.relation_set: + return str(self.charm.model.get_binding(rel).network.ingress_address) + return None + + def standby_endpoints(self) -> Set[str]: + """Returns the set of IPs used by each standby unit with a /32 mask.""" + standby_endpoints = set() + for rel in self.relation_set: + for unit in rel.units: + if not rel.data[unit].get("elected", None): + standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"]))) + return standby_endpoints + + def get_primary_data(self) -> Dict[str, str]: + """Returns the primary info, if available.""" + for rel in self.relation_set: + for unit in rel.units: + if unit.name == self.charm.unit.name: + # If this unit is the leader, then return None + return None + if rel.data[unit].get("elected", None): + elected_data = json.loads(rel.data[unit]["elected"]) + return { + "endpoint": str(elected_data["endpoint"]), + "replication-password": elected_data["replication-password"], + "superuser-password": elected_data["superuser-password"], + } + return None + + def _on_primary_changed(self, _): + """Triggers a configuration change.""" + primary = self._check_if_primary_already_selected() + if not primary: + return + + if primary.name == self.charm.unit.name: + # This unit is the leader, generate a new configuration and leave. + # There is nothing to do for the leader. + self.charm.update_config() + self.container.start(self.charm._postgresql_service) + return + + self.container.stop(self.charm._postgresql_service) + + # Standby units must delete their data folder + # Delete the K8S endpoints that tracks the cluster information, including its id. + # This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't + # work after the database service is stopped on Pebble. + try: + client = Client() + client.delete( + Endpoints, + name=f"patroni-{self.charm._name}", + namespace=self.charm._namespace, + ) + client.delete( + Endpoints, + name=f"patroni-{self.charm._name}-config", + namespace=self.charm._namespace, + ) + + self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() + self.charm._create_pgdata(self.container) + + self.charm.update_config() + except Exception: + pass + self.container.start(self.charm._postgresql_service) + + def _get_primary_candidates(self): + rel = self.model.get_relation(ASYNC_PRIMARY_RELATION) + return rel.units if rel else [] + + def _check_if_primary_already_selected(self) -> Unit: + """Returns the unit if a primary is present.""" + result = None + if not self.relation_set: + return None + for rel in self.relation_set: + for unit in rel.units: + if "elected" in rel.data[unit] and not result: + result = unit + elif result: + raise MoreThanOnePrimarySelectedError + return result + + def _on_promote_standby_cluster(self, event: ActionEvent) -> None: + """Moves a standby cluster to a primary, if none is present.""" + if ( + "cluster_initialised" not in self.charm._peers.data[self.charm.app] + or not self.charm._patroni.member_started + ): + event.fail("Cluster not initialized yet.") + return + + if not self.charm.unit.is_leader(): + event.fail("Not the charm leader unit.") + return + + # Let the exception error the unit + unit = self._check_if_primary_already_selected() + if unit: + event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") + return + + # If this is a standby-leader, then execute switchover logic + # TODO + + # Now, publish that this unit is the leader + if not self.endpoint: + event.fail("No relation found.") + return + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not primary_relation: + event.fail("No primary relation") + return + + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "replication-password": self.charm._patroni._replication_password, + "superuser-password": self.charm._patroni._superuser_password, + } + ) + # event.set_result() + + def _on_demote_primary_cluster(self, event: ActionEvent) -> None: + """Moves a primary cluster to standby.""" + if ( + "cluster_initialised" not in self.charm._peers.data[self.charm.app] + or not self.charm._patroni.member_started + ): + event.fail("Cluster not initialized yet.") + return + + if not self.charm.unit.is_leader(): + event.fail("Not the charm leader unit.") + return + + # Let the exception error the unit + unit = self._check_if_primary_already_selected() + if not unit or unit.name != self.charm.unit.name: + event.fail(f"Cannot promote - {unit.name} is primary") + return + + # If this is a standby-leader, then execute switchover logic + # TODO + + # Now, publish that this unit is the leader + del self._get_primary_candidates()[self.charm.unit].data["elected"] + # event.set_result() diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index bdf0515b76..9236e64d2c 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -48,6 +48,11 @@ bootstrap: command: pgbackrest --stanza={{ restore_stanza }} --pg1-path={{ storage_path }}/pgdata --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif standby_cluster_endpoint %} + standby_cluster: + host: {{ standby_cluster_endpoint }} + port: 5432 + create_replica_methods: ["backup_restore", "basebackup"] {% else %} initdb: - auth-host: md5 @@ -58,6 +63,9 @@ bootstrap: pg_hba: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }} md5 + {%- endfor %} bypass_api_service: true log: dir: /var/log/postgresql @@ -113,6 +121,9 @@ postgresql: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + {%- for e in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ e }} md5 + {%- endfor -%} {%- for endpoint in endpoints %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}.{{ namespace }}.svc.cluster.local md5 {%- endfor %} From c4c0adb34ac06f06bade5fe37fced5bebb015f2d Mon Sep 17 00:00:00 2001 From: Pedro Guimaraes Date: Tue, 24 Oct 2023 12:16:06 +0200 Subject: [PATCH 02/15] First support finished: Deploy two models, each with 1x postgresql Then, configure async replication as follows: $ juju switch psql-1 $ juju offer postgresql-k8s:async-primary async-primary # async-primary is the relation provided by the leader $ juju switch psql-2 $ juju consume admin/psql-1.async-primary # consume the primary relation $ juju relate postgresql-k8s:async-replica async-primary # Both units are now related, where postgresql-k8s in model psql-2 is the standby-leader Now, run the action: $ juju run -m psql-1 postgresql-k8s/0 promote-standby-cluster # move postgresql-k8s in model psql-1 to be the leader cluster Run the following command to check status: $ PATRONI_KUBERNETES_LABELS='{application: patroni, cluster-name: patroni-postgresql-k8s}' \ PATRONI_KUBERNETES_NAMESPACE=psql-2 \ # update to model number PATRONI_KUBERNETES_USE_ENDPOINTS=true \ PATRONI_NAME=postgresql-k8s-0 \ PATRONI_REPLICATION_USERNAME=replication \ PATRONI_SCOPE=patroni-postgresql-k8s \ PATRONI_SUPERUSER_USERNAME=operator \ patronictl -c /var/lib/postgresql/data/patroni.yml list Role should be "Standby leader" and State should be "Running". --- src/relations/async_replication.py | 98 ++++++++++++++++++++++++++---- templates/patroni.yml.j2 | 2 +- 2 files changed, 86 insertions(+), 14 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 68e1ae4d45..5c57b2967d 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -35,6 +35,19 @@ class MoreThanOnePrimarySelectedError(Exception): """Represents more than one primary has been selected.""" +def _get_pod_ip(): + """Reads some files to quickly figure out its own pod IP. + + It should work for any Ubuntu-based image + """ + with open("/etc/hosts") as f: + hosts = f.read() + with open("/etc/hostname") as f: + hostname = f.read().replace("\n", "") + line = [ln for ln in hosts.split("\n") if ln.find(hostname) >= 0][0] + return line.split("\t")[0] + + class PostgreSQLAsyncReplication(Object): """Defines the async-replication management logic.""" @@ -46,7 +59,7 @@ def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_primary_changed ) self.framework.observe( - self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_primary_changed + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_standby_changed ) self.framework.observe( self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster @@ -74,16 +87,18 @@ def standby_endpoints(self) -> Set[str]: """Returns the set of IPs used by each standby unit with a /32 mask.""" standby_endpoints = set() for rel in self.relation_set: - for unit in rel.units: + for unit in self._all_units(rel): if not rel.data[unit].get("elected", None): standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"]))) + if "pod-address" in rel.data[unit]: + standby_endpoints.add("{}/32".format(str(rel.data[unit]["pod-address"]))) return standby_endpoints def get_primary_data(self) -> Dict[str, str]: """Returns the primary info, if available.""" for rel in self.relation_set: - for unit in rel.units: - if unit.name == self.charm.unit.name: + for unit in self._all_units(rel): + if "elected" in rel.data[unit] and unit.name == self.charm.unit.name: # If this unit is the leader, then return None return None if rel.data[unit].get("elected", None): @@ -95,21 +110,65 @@ def get_primary_data(self) -> Dict[str, str]: } return None + def _all_units(self, relation): + return {*relation.units, self.charm.unit} + + def _all_replica_published_pod_ips(self) -> bool: + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit]: + # This is the leader unit, it will not publish its own pod address + continue + if "pod-address" not in rel.data[unit]: + return False + return True + def _on_primary_changed(self, _): - """Triggers a configuration change.""" + """Triggers a configuration change in the primary units.""" + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not primary_relation: + return + primary = self._check_if_primary_already_selected() if not primary: return - if primary.name == self.charm.unit.name: - # This unit is the leader, generate a new configuration and leave. - # There is nothing to do for the leader. - self.charm.update_config() - self.container.start(self.charm._postgresql_service) + if primary.name != self.charm.unit.name: + # no primary available, once it has been configured, it will trigger + # a new event changed return + if not self._all_replica_published_pod_ips(): + # We will have more events happening, no need for retrigger + return + + # This unit is the leader, generate a new configuration and leave. + # There is nothing to do for the leader. self.container.stop(self.charm._postgresql_service) + self.charm.update_config() + self.container.start(self.charm._postgresql_service) + + # Retrigger the other units' async-replica-changed + primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "true" + def _on_standby_changed(self, _): + """Triggers a configuration change.""" + primary = self._check_if_primary_already_selected() + if not primary: + return + + replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) + if not replica_relation: + return + + # Check if we have already published pod-address. If not, then we are waiting + # for the leader to catch all the pod ips and restart itself + if "pod-address" not in replica_relation.data[self.charm.unit]: + replica_relation.data[self.charm.unit]["pod-address"] = _get_pod_ip() + # Finish here and wait for the retrigger from the primary cluster + return + + self.container.stop(self.charm._postgresql_service) # Standby units must delete their data folder # Delete the K8S endpoints that tracks the cluster information, including its id. # This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't @@ -145,10 +204,10 @@ def _check_if_primary_already_selected(self) -> Unit: if not self.relation_set: return None for rel in self.relation_set: - for unit in rel.units: + for unit in self._all_units(rel): if "elected" in rel.data[unit] and not result: result = unit - elif result: + elif "elected" in rel.data[unit] and result: raise MoreThanOnePrimarySelectedError return result @@ -190,6 +249,13 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: "superuser-password": self.charm._patroni._superuser_password, } ) + + # Now, check if postgresql it had originally published its pod IP in the + # replica relation databag. Delete it, if yes. + replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: + return + del replica_relation.data[self.charm.unit]["pod-address"] # event.set_result() def _on_demote_primary_cluster(self, event: ActionEvent) -> None: @@ -213,7 +279,13 @@ def _on_demote_primary_cluster(self, event: ActionEvent) -> None: # If this is a standby-leader, then execute switchover logic # TODO + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not primary_relation or "elected" not in primary_relation.data[self.charm.unit]: + event.fail("No primary relation") + return # Now, publish that this unit is the leader - del self._get_primary_candidates()[self.charm.unit].data["elected"] + del primary_relation.data[self.charm.unit].data["elected"] + if "primary-cluster-ready" in primary_relation.data[self.charm.unit]: + del primary_relation.data[self.charm.unit]["primary-cluster-ready"] # event.set_result() diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 9236e64d2c..c43e12fa3d 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -52,7 +52,7 @@ bootstrap: standby_cluster: host: {{ standby_cluster_endpoint }} port: 5432 - create_replica_methods: ["backup_restore", "basebackup"] + create_replica_methods: ["basebackup"] {% else %} initdb: - auth-host: md5 From 0a47ced6ea1214b7ebc0a507380943c8eb1b4a1a Mon Sep 17 00:00:00 2001 From: Pedro Guimaraes Date: Thu, 26 Oct 2023 22:17:11 +0200 Subject: [PATCH 03/15] Add support for clusters with >1 untis --- src/relations/async_replication.py | 122 ++++++++++++++++++++++++++--- 1 file changed, 112 insertions(+), 10 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 5c57b2967d..199bfc0561 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -14,7 +14,7 @@ from typing import Dict, Set from lightkube import Client -from lightkube.resources.core_v1 import Endpoints +from lightkube.resources.core_v1 import Service from ops.charm import ( ActionEvent, CharmBase, @@ -24,6 +24,10 @@ Unit, ) +from constants import ( + PEER, +) + logger = logging.getLogger(__name__) @@ -61,6 +65,20 @@ def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION self.framework.observe( self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_standby_changed ) + + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_departure + ) + self.framework.observe( self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster ) @@ -95,13 +113,15 @@ def standby_endpoints(self) -> Set[str]: return standby_endpoints def get_primary_data(self) -> Dict[str, str]: - """Returns the primary info, if available.""" + """Returns the primary info, if available and if the primary cluster is ready.""" for rel in self.relation_set: for unit in self._all_units(rel): if "elected" in rel.data[unit] and unit.name == self.charm.unit.name: # If this unit is the leader, then return None return None - if rel.data[unit].get("elected", None): + if rel.data[unit].get("elected", None) and rel.data[unit].get( + "primary-cluster-ready", None + ): elected_data = json.loads(rel.data[unit]["elected"]) return { "endpoint": str(elected_data["endpoint"]), @@ -111,7 +131,9 @@ def get_primary_data(self) -> Dict[str, str]: return None def _all_units(self, relation): - return {*relation.units, self.charm.unit} + foundUnits = {*relation.units, self.charm.unit} + logger.info(f"Units found: {foundUnits}") + return foundUnits def _all_replica_published_pod_ips(self) -> bool: for rel in self.relation_set: @@ -123,24 +145,51 @@ def _all_replica_published_pod_ips(self) -> bool: return False return True - def _on_primary_changed(self, _): + def _on_departure(self, _): + for rel in [ + self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(PEER), # for the "replica_ready" + ]: + if not rel: + continue + if "pod-address" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["pod-address"] + if "elected" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["elected"] + if "primary-cluster-ready" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["primary-cluster-ready"] + if "replica_ready" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["replica_ready"] + + self.container.stop(self.charm._postgresql_service) + self.charm.update_config() + self.container.start(self.charm._postgresql_service) + + def _on_primary_changed(self, event): """Triggers a configuration change in the primary units.""" primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) if not primary_relation: return + logger.info("_on_primary_changed: primary_relation exists") primary = self._check_if_primary_already_selected() if not primary: return + logger.info("_on_primary_changed: primary cluster exists") if primary.name != self.charm.unit.name: # no primary available, once it has been configured, it will trigger # a new event changed + event.defer() return + logger.info("_on_primary_changed: unit is the primary's leader") if not self._all_replica_published_pod_ips(): # We will have more events happening, no need for retrigger + event.defer() return + logger.info("_on_primary_changed: all replicas published pod details") # This unit is the leader, generate a new configuration and leave. # There is nothing to do for the leader. @@ -149,26 +198,44 @@ def _on_primary_changed(self, _): self.container.start(self.charm._postgresql_service) # Retrigger the other units' async-replica-changed - primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "true" + primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "True" - def _on_standby_changed(self, _): + def _on_standby_changed(self, event): # noqa C901 """Triggers a configuration change.""" primary = self._check_if_primary_already_selected() if not primary: return + logger.info("_on_standby_changed: primary is present") replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) if not replica_relation: return + logger.info("_on_standby_changed: replica relation available") # Check if we have already published pod-address. If not, then we are waiting # for the leader to catch all the pod ips and restart itself if "pod-address" not in replica_relation.data[self.charm.unit]: replica_relation.data[self.charm.unit]["pod-address"] = _get_pod_ip() + + # Stop the container. + # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s + # service from Kubernetes and not getting it recreated! + # We will restart the it once the cluster is ready. + self.container.stop(self.charm._postgresql_service) + # Finish here and wait for the retrigger from the primary cluster + event.defer() return + logger.info("_on_standby_changed: pod-address published in own replica databag") + + if not self.get_primary_data(): + # We've made thus far. + # However, the get_primary_data will return != None ONLY if the primary cluster + # is ready and configured. Until then, we wait. + event.defer() + return + logger.info("_on_standby_changed: primary cluster is ready") - self.container.stop(self.charm._postgresql_service) # Standby units must delete their data folder # Delete the K8S endpoints that tracks the cluster information, including its id. # This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't @@ -176,22 +243,57 @@ def _on_standby_changed(self, _): try: client = Client() client.delete( - Endpoints, + Service, name=f"patroni-{self.charm._name}", namespace=self.charm._namespace, ) + except Exception: + logger.info("_on_standby_changed: k8s error while trying to delete patroni service") + pass + try: client.delete( - Endpoints, + Service, name=f"patroni-{self.charm._name}-config", namespace=self.charm._namespace, ) + except Exception: + logger.info( + "_on_standby_changed: k8s error while trying to delete patroni-config service" + ) + pass + # Clean folder and generate configuration. + try: self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() self.charm._create_pgdata(self.container) self.charm.update_config() except Exception: + logger.info("_on_standby_changed: error while deleting pgdata or reconfiguring") pass + + logger.info("_on_standby_changed: configuration done, waiting for restart of the service") + + #### RESTARTING LOGIC #### + # We can only restart the database IF all the replicas already configured themselves. + # So, check if peers have published "replica_ready=True" in their databag. + peers_relation = self.model.get_relation(PEER) + if peers_relation.data[self.charm.unit].get("replica_ready", "False") != "True": + peers_relation.data[self.charm.unit]["replica_ready"] = "True" + event.defer() + return + logger.info("_on_standby_changed: this unit has replica_ready=True") + + # Now, check the peers until we have all of them ready: + for p in peers_relation.units: + if peers_relation.data[p].get("replica_ready", "False") != "True": + # Found a peer not ready yet + logger.info(f"_on_standby_changed: replica {p.name} is not ready yet") + event.defer() + return + logger.info("_on_standby_changed: all peers have replica_ready=True") + + # We are ready to restart the service now: all peers have configured themselves. self.container.start(self.charm._postgresql_service) def _get_primary_candidates(self): From 29184896f571bd66f77748c4a7143ef284c5d88a Mon Sep 17 00:00:00 2001 From: Pedro Guimaraes Date: Thu, 26 Oct 2023 22:22:37 +0200 Subject: [PATCH 04/15] Add lint fix --- src/relations/async_replication.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 199bfc0561..cc548c9690 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -131,9 +131,9 @@ def get_primary_data(self) -> Dict[str, str]: return None def _all_units(self, relation): - foundUnits = {*relation.units, self.charm.unit} - logger.info(f"Units found: {foundUnits}") - return foundUnits + found_units = {*relation.units, self.charm.unit} + logger.debug(f"Units found: {found_units}") + return found_units def _all_replica_published_pod_ips(self) -> bool: for rel in self.relation_set: From 73af8350d9341364c40628f4fde1b072fe4c72e5 Mon Sep 17 00:00:00 2001 From: Pedro Guimaraes Date: Fri, 27 Oct 2023 20:07:56 +0200 Subject: [PATCH 05/15] Add a coordinator logic to make sure all units of the standby cluster will stop their services before moving on and reconfiguring --- actions.yaml | 18 ++- src/coordinator_ops.py | 217 +++++++++++++++++++++++++++++ src/relations/async_replication.py | 184 ++++++++++-------------- 3 files changed, 303 insertions(+), 116 deletions(-) create mode 100644 src/coordinator_ops.py diff --git a/actions.yaml b/actions.yaml index 1932b38646..1edc4666dc 100644 --- a/actions.yaml +++ b/actions.yaml @@ -46,6 +46,18 @@ set-tls-private-key: promote-standby-cluster: description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster. - -demote-primary-cluster: - description: Demotes the primary cluster of choice to a standby. Must be ran against the charm unit leader of the standby cluster. + params: + force: + type: boolean + default: False + description: | + WARNING: this option set to True WILL WIPE OUT your current primary cluster! + If this option and "force-really-really-mean-it" are set both to true, then this unit will take over the primary role. + It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. + force-really-really-mean-it: + type: boolean + default: False + description: | + WARNING: this option set to True WILL WIPE OUT your current primary cluster! + If this option and "force" are set both to true, then this unit will take over the primary role. + It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. diff --git a/src/coordinator_ops.py b/src/coordinator_ops.py new file mode 100644 index 0000000000..08384a2fa9 --- /dev/null +++ b/src/coordinator_ops.py @@ -0,0 +1,217 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""The coordinated ops is a class that ensures a certain activity is ran together. + +The concept is similar to the "cohort" in snaps, where all units wait until they can +proceed to execute a certain activity, for example, restarting your service. + +The process starts with the leader issuing a new coordination request. Effectively, +that is implemented as the __coord_counter is increased +1 in the app level. +__coord_approved is set to "False". + +Each unit receives a relation-changed, which is then re-issued as a _coordinator_requested +event. Once the unit done its task, it should ack the request. +Each unit should ack the request by equaling its own __coord_counter +to the app's value. + +Once all units ack'ed the __coord_counter, then the leader switches the +__coord_approved to "True". All units then will process that new change as a +"coordinator-approved" event and execute the activity they have been waiting. + +If there is a need to coordinate several activities in sequence, e.g. coordinated stop and then +coordinated start, it is recommended that the leader unit publishes twice a _requested, as follows: + + + class MyCharm: + + def __init__(self, *args): + self.stop_coordinator = CoordinatedOpsManager(relation, tag="_stop_my_charm") + self.start_coordinator = CoordinatedOpsManager(relation, tag="_start_my_charm") + + self.framework.observe( + self.stop_coordinator.on.coordinator_requested, + self._on_coordinator_requested + ) + self.framework.observe( + self.stop_coordinator.on.coordinator_approved, + self._on_coordinator_approved + ) + self.framework.observe( + self.start_coordinator.on.coordinator_requested, + self._on_coordinator_requested + ) + self.framework.observe( + self.start_coordinator.on.coordinator_approved, + self._on_coordinator_approved + ) + + def _a_method(): + # A method that kick starts the restarting coordination + ...... + if self.charm.unit.is_leader(): + self.stop_coordinator.coordinate() + + def _on_coordinator_requested(self, event): + if self.service_is_running and event.tag == "_stop_my_charm": + # We are in the stop-phase + self.service.stop() + self.stop_coordinator.acknowledge(event) + elif event.tag == "_start_my_charm": + # we are in the starting-phase + self.service.start() + self.start_coordinator.acknowledge(event) + + def _on_coordinator_approved(self, event): + # All units have ack'ed the activity, which means we have stopped. + if self.charm.unit.is_leader() and event.tag == "_stop_my_charm": + # Now kickstart the restarting process + self.start_coordinator.coordinate() +""" + + +import logging +from typing import AnyStr + +from ops.charm import ( + CharmBase, + CharmEvents, + EventSource, + RelationChangedEvent, +) +from ops.framework import EventBase, Object + +logger = logging.getLogger(__name__) + + +class CoordinatorEventBase(EventBase): + """Base event for the coordination activities.""" + + def __init__(self, handle: 'Handle', tag: str): + super().__init__(handle) + self._tag = tag + + @property + def tag(self): + """Returns the tag representing this coordinator's controllers.""" + return self._tag + + +class CoordinatorRequestedEvent(CoordinatorEventBase): + """Event to signal that the leader requested the units to coordinate a new activity.""" + def __init__(self, handle: 'Handle', tag: str): + super().__init__(handle, tag) + + +class CoordinatorApprovedEvent(CoordinatorEventBase): + """Event to signal that all units ack'ed the coordination request and can proceed.""" + def __init__(self, handle: 'Handle', tag: str): + super().__init__(handle, tag) + + +class CoordinatorCharmEvents(CharmEvents): + """List of events that the TLS Certificates requirer charm can leverage.""" + + coordinator_approved = EventSource(CoordinatorApprovedEvent) + coordinator_requested = EventSource(CoordinatorRequestedEvent) + + +class CoordinatedOpsManager(Object): + """Coordinates activities that demand the entire peer group to act at once.""" + + on = CoordinatorCharmEvents() + + def __init__(self, charm: CharmBase, relation: AnyStr, tag: AnyStr = ""): + super().__init__(charm, relation) + self.tag = tag + self.relation = relation + self.app = charm.app + self.name = relation + tag # use the tag to separate multiple coordinator objects + # in the same charm class. + self.charm = charm # Maintain a reference to charm, so we can emit events. + self.framework.observe(charm.on[self.relation].relation_changed, self._on_relation_changed) + + @property + def under_coordination(self): + """Returns True if the _coord_approved == False.""" + return ( + self.model.get_relation(self.relation) + .data[self.app] + .get(f"_{self.name}_coord_approved", "True") + == "False" + ) + + def coordinate(self): + """Process a request to ask a new coordination activity. + + If we are the leader, fire off a coordinator requested event in the self.name. + """ + logger.info("coordinate: starting") + if self.charm.unit.is_leader(): + counter = ( + self.model.get_relation(self.relation) + .data[self.app] + .get(f"_{self.name}_coord_counter", 0) + ) + self.model.get_relation(self.relation).data[self.app][ + f"_{self.name}_coord_counter" + ] = str(counter + 1 if counter < 10000000 else 0) + self.model.get_relation(self.relation).data[self.app][ + f"_{self.name}_coord_approved" + ] = "False" + logger.info("coordinate: tasks executed") + + def acknowledge(self, event): + """Runs the ack of the latest requested coordination. + + Each unit will set their own _counter to the same value as app's. + """ + coord_counter = f"_{self.name}_coord_counter" + self.model.get_relation(self.relation).data[self.charm.unit][coord_counter] = str( + self.model.get_relation(self.relation).data[self.app].get(coord_counter, 0) + ) + logger.info("acknowledge: updated internal counter") + + if not self.charm.unit.is_leader(): + # Nothing to do anymore. + logger.info("acknowledge: this unit is not a leader") + return + + relation = self.model.get_relation(self.relation) + # Now, the leader must check if everyone has ack'ed + for unit in relation.units: + if relation.data[unit].get(coord_counter, "0") != relation.data[self.app].get( + coord_counter, "0" + ): + logger.info(f"acknowledge: {unit.name} still has a different coord_counter") + # We defer the event until _coord_approved == True. + # If we have _coord_counter differing, then we are not yet there. + event.defer() + return + logger.info("acknowledge: all units are set, set coord_approved == True") + # Just confirmed we have all units ack'ed. Now, set the approval. + relation.data[self.app][f"_{self.name}_coord_approved"] = "True" + + def _on_relation_changed(self: CharmBase, _: RelationChangedEvent): + """Process relation changed. + + First, determine whether this unit has received a new request for coordination. + + Then, if we are the leader, fire off a coordinator requested event. + """ + logger.info("coordinator: starting _on_relation_changed") + relation_data = self.model.get_relation(self.relation).data[self.app] + unit_data = self.model.get_relation(self.relation).data[self.charm.unit] + + if relation_data.get(f"_{self.name}_coord_approved", "False") == "True": + logger.info("coordinator: _on_relation_changed -- coordinator approved") + # We are approved to move on, issue the coordinator_approved event. + self.on.coordinator_approved.emit(self.tag) + return + coord_counter = f"_{self.name}_coord_counter" + if coord_counter in relation_data and relation_data.get( + coord_counter, "0" + ) != unit_data.get(coord_counter, "0"): + logger.info("coordinator: _on_relation_changed -- coordinator requested") + self.on.coordinator_requested.emit(self.tag) + return diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index cc548c9690..85ec2abece 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -1,4 +1,4 @@ -# Copyright 2022 Canonical Ltd. +# Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. """Implements the state-machine. @@ -24,9 +24,7 @@ Unit, ) -from constants import ( - PEER, -) +from coordinator_ops import CoordinatedOpsManager logger = logging.getLogger(__name__) @@ -59,13 +57,21 @@ def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION super().__init__(charm, relation_name) self.relation_name = relation_name self.charm = charm + self.restart_coordinator = CoordinatedOpsManager(charm, "restart", tag="_asyncreplica") self.framework.observe( self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_primary_changed ) self.framework.observe( self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_standby_changed ) + self.framework.observe( + self.restart_coordinator.on.coordinator_requested, self._on_coordination_request + ) + self.framework.observe( + self.restart_coordinator.on.coordinator_approved, self._on_coordination_approval + ) + # Departure events self.framework.observe( self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, self._on_departure ) @@ -79,12 +85,10 @@ def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_departure ) + # Actions self.framework.observe( self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster ) - self.framework.observe( - self.charm.on.demote_primary_cluster_action, self._on_demote_primary_cluster - ) # We treat both relations above as actually the same. # The big difference appears only at promote/demote actions @@ -149,9 +153,8 @@ def _on_departure(self, _): for rel in [ self.model.get_relation(ASYNC_REPLICA_RELATION), self.model.get_relation(ASYNC_PRIMARY_RELATION), - self.model.get_relation(PEER), # for the "replica_ready" ]: - if not rel: + if not rel: # if no relation exits, then it rel == None continue if "pod-address" in rel.data[self.charm.unit]: del rel.data[self.charm.unit]["pod-address"] @@ -159,8 +162,6 @@ def _on_departure(self, _): del rel.data[self.charm.unit]["elected"] if "primary-cluster-ready" in rel.data[self.charm.unit]: del rel.data[self.charm.unit]["primary-cluster-ready"] - if "replica_ready" in rel.data[self.charm.unit]: - del rel.data[self.charm.unit]["replica_ready"] self.container.stop(self.charm._postgresql_service) self.charm.update_config() @@ -175,13 +176,15 @@ def _on_primary_changed(self, event): primary = self._check_if_primary_already_selected() if not primary: + # primary may not be available because the action of promoting a cluster was + # executed way after the relation changes. + # Defer it until + event.defer() return logger.info("_on_primary_changed: primary cluster exists") if primary.name != self.charm.unit.name: - # no primary available, once it has been configured, it will trigger - # a new event changed - event.defer() + # this unit is not the system leader return logger.info("_on_primary_changed: unit is the primary's leader") @@ -202,27 +205,20 @@ def _on_primary_changed(self, event): def _on_standby_changed(self, event): # noqa C901 """Triggers a configuration change.""" - primary = self._check_if_primary_already_selected() - if not primary: - return - logger.info("_on_standby_changed: primary is present") - replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) if not replica_relation: return logger.info("_on_standby_changed: replica relation available") + primary = self._check_if_primary_already_selected() + if not primary: + return + logger.info("_on_standby_changed: primary is present") + # Check if we have already published pod-address. If not, then we are waiting # for the leader to catch all the pod ips and restart itself if "pod-address" not in replica_relation.data[self.charm.unit]: replica_relation.data[self.charm.unit]["pod-address"] = _get_pod_ip() - - # Stop the container. - # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s - # service from Kubernetes and not getting it recreated! - # We will restart the it once the cluster is ready. - self.container.stop(self.charm._postgresql_service) - # Finish here and wait for the retrigger from the primary cluster event.defer() return @@ -236,63 +232,52 @@ def _on_standby_changed(self, event): # noqa C901 return logger.info("_on_standby_changed: primary cluster is ready") - # Standby units must delete their data folder - # Delete the K8S endpoints that tracks the cluster information, including its id. - # This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't - # work after the database service is stopped on Pebble. - try: + ################ + # Initiate restart logic + ################ + + # We need to: + # 1) Stop all standby units + # 2) Delete the k8s service + # 3) Remove the pgdata folder + # 4) Start all standby units + # For that, the peer leader must first stop its own service and then, issue a + # coordination request to all units. All units ack that request once they all have + # their service stopped. + # Then, we get an approved coordination from the leader, which triggers the + # steps 2-4. + if self.charm.unit.is_leader() and not self.restart_coordinator.under_coordination: + # The leader now requests a ack from each unit that they have stopped. + self.restart_coordinator.coordinate() + + def _on_coordination_request(self, event): + # Stop the container. + # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s + # service from Kubernetes and not getting it recreated! + # We will restart the it once the cluster is ready. + self.container.stop(self.charm._postgresql_service) + self.restart_coordinator.acknowledge(event) + + def _on_coordination_approval(self, event): + """Runs when the coordinator guaranteed all units have stopped.""" + if self.charm.unit.is_leader(): + # Delete the K8S endpoints that tracks the cluster information, including its id. + # This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't + # work after the database service is stopped on Pebble. client = Client() - client.delete( - Service, - name=f"patroni-{self.charm._name}", - namespace=self.charm._namespace, - ) - except Exception: - logger.info("_on_standby_changed: k8s error while trying to delete patroni service") - pass - try: client.delete( Service, name=f"patroni-{self.charm._name}-config", namespace=self.charm._namespace, ) - except Exception: - logger.info( - "_on_standby_changed: k8s error while trying to delete patroni-config service" - ) - pass # Clean folder and generate configuration. - try: - self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() - self.charm._create_pgdata(self.container) - - self.charm.update_config() - except Exception: - logger.info("_on_standby_changed: error while deleting pgdata or reconfiguring") - pass + self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() + self.charm._create_pgdata(self.container) + self.charm.update_config() logger.info("_on_standby_changed: configuration done, waiting for restart of the service") - #### RESTARTING LOGIC #### - # We can only restart the database IF all the replicas already configured themselves. - # So, check if peers have published "replica_ready=True" in their databag. - peers_relation = self.model.get_relation(PEER) - if peers_relation.data[self.charm.unit].get("replica_ready", "False") != "True": - peers_relation.data[self.charm.unit]["replica_ready"] = "True" - event.defer() - return - logger.info("_on_standby_changed: this unit has replica_ready=True") - - # Now, check the peers until we have all of them ready: - for p in peers_relation.units: - if peers_relation.data[p].get("replica_ready", "False") != "True": - # Found a peer not ready yet - logger.info(f"_on_standby_changed: replica {p.name} is not ready yet") - event.defer() - return - logger.info("_on_standby_changed: all peers have replica_ready=True") - # We are ready to restart the service now: all peers have configured themselves. self.container.start(self.charm._postgresql_service) @@ -326,15 +311,6 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail("Not the charm leader unit.") return - # Let the exception error the unit - unit = self._check_if_primary_already_selected() - if unit: - event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") - return - - # If this is a standby-leader, then execute switchover logic - # TODO - # Now, publish that this unit is the leader if not self.endpoint: event.fail("No relation found.") @@ -344,6 +320,20 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail("No primary relation") return + # Check if this is a take over from a standby cluster + if event.params.get("force", False) and event.params.get( + "force-really-really-mean-it", False + ): + pass + + # Let the exception error the unit + unit = self._check_if_primary_already_selected() + if unit: + event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") + return + + # If this is a standby-leader, then execute switchover logic + # TODO primary_relation.data[self.charm.unit]["elected"] = json.dumps( { "endpoint": self.endpoint, @@ -359,35 +349,3 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: return del replica_relation.data[self.charm.unit]["pod-address"] # event.set_result() - - def _on_demote_primary_cluster(self, event: ActionEvent) -> None: - """Moves a primary cluster to standby.""" - if ( - "cluster_initialised" not in self.charm._peers.data[self.charm.app] - or not self.charm._patroni.member_started - ): - event.fail("Cluster not initialized yet.") - return - - if not self.charm.unit.is_leader(): - event.fail("Not the charm leader unit.") - return - - # Let the exception error the unit - unit = self._check_if_primary_already_selected() - if not unit or unit.name != self.charm.unit.name: - event.fail(f"Cannot promote - {unit.name} is primary") - return - - # If this is a standby-leader, then execute switchover logic - # TODO - primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) - if not primary_relation or "elected" not in primary_relation.data[self.charm.unit]: - event.fail("No primary relation") - return - - # Now, publish that this unit is the leader - del primary_relation.data[self.charm.unit].data["elected"] - if "primary-cluster-ready" in primary_relation.data[self.charm.unit]: - del primary_relation.data[self.charm.unit]["primary-cluster-ready"] - # event.set_result() From f3cfd31c5df99e5e7b794719fa56ac4422851dc3 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Jan 2024 09:48:58 -0300 Subject: [PATCH 06/15] Add extra needed details Signed-off-by: Marcelo Henrique Neppel --- .../data_platform_libs/v0/data_secrets.py | 2 +- poetry.lock | 1 - src/backups.py | 13 ++ src/charm.py | 32 ++++- src/coordinator_ops.py | 14 +- src/patroni.py | 24 ++++ src/relations/async_replication.py | 122 +++++++++++++++--- templates/patroni.yml.j2 | 1 + 8 files changed, 178 insertions(+), 31 deletions(-) diff --git a/lib/charms/data_platform_libs/v0/data_secrets.py b/lib/charms/data_platform_libs/v0/data_secrets.py index 254b9af3df..7adc915498 100644 --- a/lib/charms/data_platform_libs/v0/data_secrets.py +++ b/lib/charms/data_platform_libs/v0/data_secrets.py @@ -97,7 +97,7 @@ def get_content(self) -> Dict[str, str]: """Getting cached secret content.""" if not self._secret_content: if self.meta: - self._secret_content = self.meta.get_content() + self._secret_content = self.meta.get_content(refresh=True) return self._secret_content def set_content(self, content: Dict[str, str]) -> None: diff --git a/poetry.lock b/poetry.lock index d5e621c63c..3ca45b3fd3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1688,7 +1688,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, diff --git a/src/backups.py b/src/backups.py index 60053db2b4..1d0f58bd50 100644 --- a/src/backups.py +++ b/src/backups.py @@ -25,6 +25,7 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from constants import BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER +from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION logger = logging.getLogger(__name__) @@ -668,6 +669,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False + logger.info("Checking that the cluster is not replicating data to a standby cluster") + for rel in [ + self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(ASYNC_PRIMARY_RELATION), + ]: + if not rel: # if no relation exits, then it rel == None + continue + error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster" + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + return False + logger.info("Checking that this unit was already elected the leader unit") if not self.charm.unit.is_leader(): error_message = "Unit cannot restore backup as it was not elected the leader unit yet" diff --git a/src/charm.py b/src/charm.py index 037c86bd94..33a51cb6f2 100755 --- a/src/charm.py +++ b/src/charm.py @@ -360,6 +360,18 @@ def _get_endpoints_to_remove(self) -> List[str]: endpoints_to_remove = list(set(old) - set(current)) return endpoints_to_remove + def get_unit_ip(self, unit: Unit) -> Optional[str]: + """Get the IP address of a specific unit.""" + # Check if host is current host. + if unit == self.unit: + return str(self.model.get_binding(PEER).network.bind_address) + # Check if host is a peer. + elif unit in self._peers.data: + return str(self._peers.data[unit].get("private-address")) + # Return None if the unit is not a peer neither the current unit. + else: + return None + def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: """The leader removes the departing units from the list of cluster members.""" # Allow leader to update endpoints if it isn't leaving. @@ -416,9 +428,13 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None: # Restart the workload if it's stuck on the starting state after a timeline divergence # due to a backup that was restored. - if not self.is_primary and ( - self._patroni.member_replication_lag == "unknown" - or int(self._patroni.member_replication_lag) > 1000 + if ( + not self.is_primary + and not self.is_standby_leader + and ( + self._patroni.member_replication_lag == "unknown" + or int(self._patroni.member_replication_lag) > 1000 + ) ): self._patroni.reinitialize_postgresql() logger.debug("Deferring on_peer_relation_changed: reinitialising replica") @@ -1118,6 +1134,11 @@ def is_primary(self) -> bool: """Return whether this unit is the primary instance.""" return self._unit == self._patroni.get_primary(unit_name_pattern=True) + @property + def is_standby_leader(self) -> bool: + """Return whether this unit is the standby leader instance.""" + return self._unit == self._patroni.get_standby_leader(unit_name_pattern=True) + @property def is_tls_enabled(self) -> bool: """Return whether TLS is enabled.""" @@ -1524,7 +1545,10 @@ def get_node_cpu_cores(self) -> int: """Return the number of CPU cores for the current K8S node.""" client = Client() node = client.get(Node, name=self._get_node_name_for_pod(), namespace=self._namespace) - return int(node.status.allocatable["cpu"]) + cpu = node.status.allocatable["cpu"] + if cpu.endswith("m"): + return int(cpu[:-1]) // 1000 + return int(cpu) def get_available_resources(self) -> Tuple[int, int]: """Get available CPU cores and memory (in bytes) for the container.""" diff --git a/src/coordinator_ops.py b/src/coordinator_ops.py index 08384a2fa9..3b4304539a 100644 --- a/src/coordinator_ops.py +++ b/src/coordinator_ops.py @@ -79,7 +79,7 @@ def _on_coordinator_approved(self, event): EventSource, RelationChangedEvent, ) -from ops.framework import EventBase, Object +from ops.framework import EventBase, Handle, Object logger = logging.getLogger(__name__) @@ -87,7 +87,7 @@ def _on_coordinator_approved(self, event): class CoordinatorEventBase(EventBase): """Base event for the coordination activities.""" - def __init__(self, handle: 'Handle', tag: str): + def __init__(self, handle: "Handle", tag: str): super().__init__(handle) self._tag = tag @@ -99,13 +99,15 @@ def tag(self): class CoordinatorRequestedEvent(CoordinatorEventBase): """Event to signal that the leader requested the units to coordinate a new activity.""" - def __init__(self, handle: 'Handle', tag: str): + + def __init__(self, handle: "Handle", tag: str): super().__init__(handle, tag) class CoordinatorApprovedEvent(CoordinatorEventBase): """Event to signal that all units ack'ed the coordination request and can proceed.""" - def __init__(self, handle: 'Handle', tag: str): + + def __init__(self, handle: "Handle", tag: str): super().__init__(handle, tag) @@ -148,10 +150,10 @@ def coordinate(self): """ logger.info("coordinate: starting") if self.charm.unit.is_leader(): - counter = ( + counter = int( self.model.get_relation(self.relation) .data[self.app] - .get(f"_{self.name}_coord_counter", 0) + .get(f"_{self.name}_coord_counter", "0") ) self.model.get_relation(self.relation).data[self.app][ f"_{self.name}_coord_counter" diff --git a/src/patroni.py b/src/patroni.py index 3053728a1a..2f70f204ef 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -127,6 +127,30 @@ def get_primary(self, unit_name_pattern=False) -> str: break return primary + def get_standby_leader(self, unit_name_pattern=False) -> str: + """Get standby leader instance. + + Args: + unit_name_pattern: whether to convert pod name to unit name + + Returns: + standby leader pod or unit name. + """ + primary = None + # Request info from cluster endpoint (which returns all members of the cluster). + for attempt in Retrying(stop=stop_after_attempt(len(self._endpoints) + 1)): + with attempt: + url = self._get_alternative_patroni_url(attempt) + r = requests.get(f"{url}/cluster", verify=self._verify) + for member in r.json()["members"]: + if member["role"] == "standby_leader": + primary = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + primary = "/".join(primary.rsplit("-", 1)) + break + return primary + def get_sync_standby_names(self) -> List[str]: """Get the list of sync standby unit names.""" sync_standbys = [] diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 85ec2abece..8f9f23f5b8 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -11,7 +11,8 @@ import json import logging -from typing import Dict, Set +from datetime import datetime +from typing import Dict, Optional, Set, Tuple from lightkube import Client from lightkube.resources.core_v1 import Service @@ -22,8 +23,18 @@ from ops.framework import Object from ops.model import ( Unit, + WaitingStatus, +) +from ops.pebble import ChangeError +from tenacity import Retrying, stop_after_attempt, wait_fixed + +from constants import ( + APP_SCOPE, + REPLICATION_PASSWORD_KEY, + USER_PASSWORD_KEY, + WORKLOAD_OS_GROUP, + WORKLOAD_OS_USER, ) - from coordinator_ops import CoordinatedOpsManager logger = logging.getLogger(__name__) @@ -101,9 +112,12 @@ def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION @property def endpoint(self) -> str: """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" - for rel in self.relation_set: - return str(self.charm.model.get_binding(rel).network.ingress_address) - return None + sync_standby_names = self.charm._patroni.get_sync_standby_names() + if len(sync_standby_names) > 0: + unit = self.model.get_unit(sync_standby_names[0]) + return self.charm.get_unit_ip(unit) + else: + return self.charm.get_unit_ip(self.charm.unit) def standby_endpoints(self) -> Set[str]: """Returns the set of IPs used by each standby unit with a /32 mask.""" @@ -116,13 +130,14 @@ def standby_endpoints(self) -> Set[str]: standby_endpoints.add("{}/32".format(str(rel.data[unit]["pod-address"]))) return standby_endpoints - def get_primary_data(self) -> Dict[str, str]: + def get_primary_data(self) -> Optional[Dict[str, str]]: """Returns the primary info, if available and if the primary cluster is ready.""" for rel in self.relation_set: for unit in self._all_units(rel): if "elected" in rel.data[unit] and unit.name == self.charm.unit.name: # If this unit is the leader, then return None return None + if rel.data[unit].get("elected", None) and rel.data[unit].get( "primary-cluster-ready", None ): @@ -162,8 +177,12 @@ def _on_departure(self, _): del rel.data[self.charm.unit]["elected"] if "primary-cluster-ready" in rel.data[self.charm.unit]: del rel.data[self.charm.unit]["primary-cluster-ready"] + if self.charm.unit.is_leader() and "promoted" in self.charm.app_peer_data: + del self.charm.app_peer_data["promoted"] - self.container.stop(self.charm._postgresql_service) + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + self.container.stop(self.charm._postgresql_service) self.charm.update_config() self.container.start(self.charm._postgresql_service) @@ -196,7 +215,9 @@ def _on_primary_changed(self, event): # This unit is the leader, generate a new configuration and leave. # There is nothing to do for the leader. - self.container.stop(self.charm._postgresql_service) + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + self.container.stop(self.charm._postgresql_service) self.charm.update_config() self.container.start(self.charm._postgresql_service) @@ -224,7 +245,8 @@ def _on_standby_changed(self, event): # noqa C901 return logger.info("_on_standby_changed: pod-address published in own replica databag") - if not self.get_primary_data(): + primary_data = self.get_primary_data() + if not primary_data: # We've made thus far. # However, the get_primary_data will return != None ONLY if the primary cluster # is ready and configured. Until then, we wait. @@ -232,6 +254,21 @@ def _on_standby_changed(self, event): # noqa C901 return logger.info("_on_standby_changed: primary cluster is ready") + if "system-id" not in replica_relation.data[self.charm.unit]: + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(f"Failed to get system identifier: {error}") + replica_relation.data[self.charm.unit]["system-id"] = system_identifier + + if self.charm.unit.is_leader(): + self.charm.set_secret( + APP_SCOPE, USER_PASSWORD_KEY, primary_data["superuser-password"] + ) + self.charm.set_secret( + APP_SCOPE, REPLICATION_PASSWORD_KEY, primary_data["replication-password"] + ) + del self.charm._peers.data[self.charm.app]["cluster_initialised"] + ################ # Initiate restart logic ################ @@ -255,7 +292,28 @@ def _on_coordination_request(self, event): # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s # service from Kubernetes and not getting it recreated! # We will restart the it once the cluster is ready. - self.container.stop(self.charm._postgresql_service) + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + self.container.stop(self.charm._postgresql_service) + + replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) + for unit in replica_relation.units: + if "elected" not in replica_relation.data[unit]: + continue + elected_data = json.loads(replica_relation.data[unit]["elected"]) + if "system-id" not in elected_data: + continue + if replica_relation.data[self.charm.unit]["system-id"] != elected_data["system-id"]: + if self.charm.unit.is_leader(): + # Store current data in a ZIP file, clean folder and generate configuration. + logger.info("Creating backup of pgdata folder") + self.container.exec( + f"tar -zcf /var/lib/postgresql/data/pgdata-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.zip /var/lib/postgresql/data/pgdata".split() + ).wait_output() + logger.info("Removing and recreating pgdata folder") + self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() + self.charm._create_pgdata(self.container) + break self.restart_coordinator.acknowledge(event) def _on_coordination_approval(self, event): @@ -270,22 +328,20 @@ def _on_coordination_approval(self, event): name=f"patroni-{self.charm._name}-config", namespace=self.charm._namespace, ) - - # Clean folder and generate configuration. - self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() - self.charm._create_pgdata(self.container) + elif not self.charm._patroni.primary_endpoint_ready: + self.charm.unit.status = WaitingStatus("waiting for primary to be ready") + event.defer() + return self.charm.update_config() logger.info("_on_standby_changed: configuration done, waiting for restart of the service") # We are ready to restart the service now: all peers have configured themselves. self.container.start(self.charm._postgresql_service) + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app]["cluster_initialised"] = "True" - def _get_primary_candidates(self): - rel = self.model.get_relation(ASYNC_PRIMARY_RELATION) - return rel.units if rel else [] - - def _check_if_primary_already_selected(self) -> Unit: + def _check_if_primary_already_selected(self) -> Optional[Unit]: """Returns the unit if a primary is present.""" result = None if not self.relation_set: @@ -332,15 +388,23 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") return + system_identifier, error = self.get_system_identifier() + if error is not None: + event.fail(f"Failed to get system identifier: {error}") + return + # If this is a standby-leader, then execute switchover logic # TODO primary_relation.data[self.charm.unit]["elected"] = json.dumps( { + # "endpoint": self.charm.async_replication_endpoint, "endpoint": self.endpoint, "replication-password": self.charm._patroni._replication_password, "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, } ) + self.charm.app_peer_data["promoted"] = "True" # Now, check if postgresql it had originally published its pod IP in the # replica relation databag. Delete it, if yes. @@ -349,3 +413,23 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: return del replica_relation.data[self.charm.unit]["pod-address"] # event.set_result() + + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the PostgreSQL system identifier from this instance.""" + try: + system_identifier, error = self.container.exec( + [ + f'/usr/lib/postgresql/{self.charm._patroni.rock_postgresql_version.split(".")[0]}/bin/pg_controldata', + "/var/lib/postgresql/data/pgdata", + ], + user=WORKLOAD_OS_USER, + group=WORKLOAD_OS_GROUP, + ).wait_output() + except ChangeError as e: + return None, str(e) + if error != "": + return None, error + system_identifier = [ + line for line in system_identifier.splitlines() if "Database system identifier" in line + ][0].split(" ")[-1] + return system_identifier, None diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 57ee2f5a65..eb12e3cf82 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -124,6 +124,7 @@ postgresql: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.6/32 md5 {%- for e in extra_replication_endpoints %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ e }} md5 {%- endfor -%} From 789217ca227e4d7c4e0c3d51007f43eb1d8a65b1 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 23 Jan 2024 09:35:27 -0300 Subject: [PATCH 07/15] Add all secrets and add minor fixes to code Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 15 ------- src/charm.py | 12 ++++++ src/coordinator_ops.py | 2 +- src/relations/async_replication.py | 65 +++++++++++++++++++++--------- 4 files changed, 58 insertions(+), 36 deletions(-) diff --git a/actions.yaml b/actions.yaml index 1edc4666dc..37346ce8a8 100644 --- a/actions.yaml +++ b/actions.yaml @@ -46,18 +46,3 @@ set-tls-private-key: promote-standby-cluster: description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster. - params: - force: - type: boolean - default: False - description: | - WARNING: this option set to True WILL WIPE OUT your current primary cluster! - If this option and "force-really-really-mean-it" are set both to true, then this unit will take over the primary role. - It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. - force-really-really-mean-it: - type: boolean - default: False - description: | - WARNING: this option set to True WILL WIPE OUT your current primary cluster! - If this option and "force" are set both to true, then this unit will take over the primary role. - It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. diff --git a/src/charm.py b/src/charm.py index 19f23bb14f..0aa761b8ee 100755 --- a/src/charm.py +++ b/src/charm.py @@ -364,6 +364,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self.postgresql_client_relation.update_read_only_endpoint() self._remove_from_endpoints(endpoints_to_remove) + # Update the endpoint in the async replication data. + self.async_manager.update_async_replication_data() + def _on_peer_relation_changed(self, event: HookEvent) -> None: """Reconfigure cluster members.""" # The cluster must be initialized first in the leader unit @@ -449,6 +452,9 @@ def _on_config_changed(self, _) -> None: # update config on every run self.update_config() + # Update the endpoint in the async replication data. + self.async_manager.update_async_replication_data() + if not self.unit.is_leader(): return @@ -933,6 +939,9 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() + # Update the password in the async replication data. + self.async_manager.update_async_replication_data() + event.set_results({"password": password}) def _on_get_primary(self, event: ActionEvent) -> None: @@ -1054,6 +1063,9 @@ def _on_update_status(self, _) -> None: if self._handle_processes_failures(): return + # Update the endpoint in the async replication data. + self.async_manager.update_async_replication_data() + self._set_primary_status_message() def _handle_processes_failures(self) -> bool: diff --git a/src/coordinator_ops.py b/src/coordinator_ops.py index 3b4304539a..2bd1c2becc 100644 --- a/src/coordinator_ops.py +++ b/src/coordinator_ops.py @@ -1,4 +1,4 @@ -# Copyright 2023 Canonical Ltd. +# Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. """The coordinated ops is a class that ensures a certain activity is ran together. diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 8f9f23f5b8..7de7d88cbb 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -1,13 +1,7 @@ -# Copyright 2023 Canonical Ltd. +# Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Implements the state-machine. - -1) First async replication relation is made: both units get blocked waiting for a leader -2) User runs the promote action against one of the clusters -3) The cluster moves leader and sets the async-replication data, marking itself as leader -4) The other units receive that new information and update themselves to become standby-leaders. -""" +"""Implements the state-machine.""" import json import logging @@ -30,7 +24,9 @@ from constants import ( APP_SCOPE, + MONITORING_PASSWORD_KEY, REPLICATION_PASSWORD_KEY, + REWIND_PASSWORD_KEY, USER_PASSWORD_KEY, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER, @@ -144,7 +140,9 @@ def get_primary_data(self) -> Optional[Dict[str, str]]: elected_data = json.loads(rel.data[unit]["elected"]) return { "endpoint": str(elected_data["endpoint"]), + "monitoring-password": elected_data["monitoring-password"], "replication-password": elected_data["replication-password"], + "rewind-password": elected_data["rewind-password"], "superuser-password": elected_data["superuser-password"], } return None @@ -261,12 +259,18 @@ def _on_standby_changed(self, event): # noqa C901 replica_relation.data[self.charm.unit]["system-id"] = system_identifier if self.charm.unit.is_leader(): + self.charm.set_secret( + APP_SCOPE, MONITORING_PASSWORD_KEY, primary_data["monitoring-password"] + ) self.charm.set_secret( APP_SCOPE, USER_PASSWORD_KEY, primary_data["superuser-password"] ) self.charm.set_secret( APP_SCOPE, REPLICATION_PASSWORD_KEY, primary_data["replication-password"] ) + self.charm.set_secret( + APP_SCOPE, REWIND_PASSWORD_KEY, primary_data["rewind-password"] + ) del self.charm._peers.data[self.charm.app]["cluster_initialised"] ################ @@ -276,7 +280,7 @@ def _on_standby_changed(self, event): # noqa C901 # We need to: # 1) Stop all standby units # 2) Delete the k8s service - # 3) Remove the pgdata folder + # 3) Remove the pgdata folder (if the clusters are different) # 4) Start all standby units # For that, the peer leader must first stop its own service and then, issue a # coordination request to all units. All units ack that request once they all have @@ -291,7 +295,7 @@ def _on_coordination_request(self, event): # Stop the container. # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s # service from Kubernetes and not getting it recreated! - # We will restart the it once the cluster is ready. + # We will restart it once the cluster is ready. for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): with attempt: self.container.stop(self.charm._postgresql_service) @@ -376,12 +380,6 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail("No primary relation") return - # Check if this is a take over from a standby cluster - if event.params.get("force", False) and event.params.get( - "force-really-really-mean-it", False - ): - pass - # Let the exception error the unit unit = self._check_if_primary_already_selected() if unit: @@ -393,13 +391,12 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail(f"Failed to get system identifier: {error}") return - # If this is a standby-leader, then execute switchover logic - # TODO primary_relation.data[self.charm.unit]["elected"] = json.dumps( { - # "endpoint": self.charm.async_replication_endpoint, "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), "superuser-password": self.charm._patroni._superuser_password, "system-id": system_identifier, } @@ -412,7 +409,6 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: return del replica_relation.data[self.charm.unit]["pod-address"] - # event.set_result() def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: """Returns the PostgreSQL system identifier from this instance.""" @@ -433,3 +429,32 @@ def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: line for line in system_identifier.splitlines() if "Database system identifier" in line ][0].split(" ")[-1] return system_identifier, None + + def update_async_replication_data(self) -> None: + """Updates the async-replication data, if the unit is the leader. + + This is used to update the standby units with the new primary information. + If the unit is not the leader, then the data is removed from its databag. + """ + if "promoted" not in self.charm.app_peer_data: + return + + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if self.charm.unit.is_leader(): + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(f"Failed to get system identifier: {error}") + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret( + APP_SCOPE, MONITORING_PASSWORD_KEY + ), + "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, + } + ) + else: + primary_relation.data[self.charm.unit]["elected"] = "" From ae1dcc0d42831035409f497f9c2608aca0701910 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 20 Feb 2024 20:34:23 -0300 Subject: [PATCH 08/15] Enable switchover to another cluster Signed-off-by: Marcelo Henrique Neppel --- src/charm.py | 9 +- src/patroni.py | 26 +++++ src/relations/async_replication.py | 173 ++++++++++++++++++++++------- 3 files changed, 165 insertions(+), 43 deletions(-) diff --git a/src/charm.py b/src/charm.py index 79932d5e33..653227ab79 100755 --- a/src/charm.py +++ b/src/charm.py @@ -1111,8 +1111,15 @@ def _handle_processes_failures(self) -> bool: return False return True + try: + is_primary = self.is_primary + is_standby_leader = self.is_standby_leader + except RetryError: + return False + if ( - not self.is_primary + not is_primary + and not is_standby_leader and self._patroni.member_started and not self._patroni.member_streaming ): diff --git a/src/patroni.py b/src/patroni.py index bc7f03abc2..b9342065ef 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -30,6 +30,10 @@ logger = logging.getLogger(__name__) +class ClusterNotPromotedError(Exception): + """Raised when a cluster is not promoted.""" + + class NotReadyError(Exception): """Raised when not all cluster members healthy or finished initial sync.""" @@ -38,6 +42,10 @@ class EndpointNotReadyError(Exception): """Raised when an endpoint is not ready.""" +class StandbyClusterAlreadyPromotedError(Exception): + """Raised when a standby cluster is already promoted.""" + + class SwitchoverFailedError(Exception): """Raised when a switchover failed for some reason.""" @@ -334,6 +342,24 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: Dict[str, Any json={"postgresql": {"parameters": parameters}}, ) + def promote_standby_cluster(self) -> None: + """Promote a standby cluster to be a regular cluster.""" + config_response = requests.get( + f"{self._patroni_url}/config", + verify=self._verify + ) + if "standby_cluster" not in config_response.json(): + raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") + requests.patch( + f"{self._patroni_url}/config", + verify=self._verify, + json={"standby_cluster": None} + ) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if self.get_primary() is None: + raise ClusterNotPromotedError("cluster not promoted") + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def reinitialize_postgresql(self) -> None: """Reinitialize PostgreSQL.""" diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 7de7d88cbb..baf4712a0c 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -8,7 +8,7 @@ from datetime import datetime from typing import Dict, Optional, Set, Tuple -from lightkube import Client +from lightkube import ApiError, Client from lightkube.resources.core_v1 import Service from ops.charm import ( ActionEvent, @@ -16,11 +16,13 @@ ) from ops.framework import Object from ops.model import ( + ActiveStatus, + MaintenanceStatus, Unit, WaitingStatus, ) from ops.pebble import ChangeError -from tenacity import Retrying, stop_after_attempt, wait_fixed +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from constants import ( APP_SCOPE, @@ -32,6 +34,7 @@ WORKLOAD_OS_USER, ) from coordinator_ops import CoordinatedOpsManager +from patroni import ClusterNotPromotedError, StandbyClusterAlreadyPromotedError logger = logging.getLogger(__name__) @@ -183,12 +186,14 @@ def _on_departure(self, _): self.container.stop(self.charm._postgresql_service) self.charm.update_config() self.container.start(self.charm._postgresql_service) + self.charm.unit.status = ActiveStatus() def _on_primary_changed(self, event): """Triggers a configuration change in the primary units.""" primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) if not primary_relation: return + self.charm.unit.status = MaintenanceStatus("configuring main cluster") logger.info("_on_primary_changed: primary_relation exists") primary = self._check_if_primary_already_selected() @@ -196,31 +201,41 @@ def _on_primary_changed(self, event): # primary may not be available because the action of promoting a cluster was # executed way after the relation changes. # Defer it until + logger.debug("defer _on_primary_changed: primary not present") event.defer() return logger.info("_on_primary_changed: primary cluster exists") if primary.name != self.charm.unit.name: # this unit is not the system leader + logger.debug("early exit _on_primary_changed: unit is not the primary's leader") + self.charm.unit.status = ActiveStatus() return logger.info("_on_primary_changed: unit is the primary's leader") if not self._all_replica_published_pod_ips(): # We will have more events happening, no need for retrigger + logger.debug("defer _on_primary_changed: not all replicas published pod details") event.defer() return logger.info("_on_primary_changed: all replicas published pod details") # This unit is the leader, generate a new configuration and leave. # There is nothing to do for the leader. - for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): - with attempt: - self.container.stop(self.charm._postgresql_service) + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + self.container.stop(self.charm._postgresql_service) + except RetryError: + logger.debug("defer _on_primary_changed: failed to stop the container") + event.defer() + return self.charm.update_config() self.container.start(self.charm._postgresql_service) # Retrigger the other units' async-replica-changed primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "True" + self.charm.unit.status = ActiveStatus() def _on_standby_changed(self, event): # noqa C901 """Triggers a configuration change.""" @@ -229,6 +244,7 @@ def _on_standby_changed(self, event): # noqa C901 return logger.info("_on_standby_changed: replica relation available") + self.charm.unit.status = MaintenanceStatus("configuring standby cluster") primary = self._check_if_primary_already_selected() if not primary: return @@ -273,6 +289,9 @@ def _on_standby_changed(self, event): # noqa C901 ) del self.charm._peers.data[self.charm.app]["cluster_initialised"] + if "cluster_initialised" in self.charm._peers.data[self.charm.app]: + return + ################ # Initiate restart logic ################ @@ -290,12 +309,14 @@ def _on_standby_changed(self, event): # noqa C901 if self.charm.unit.is_leader() and not self.restart_coordinator.under_coordination: # The leader now requests a ack from each unit that they have stopped. self.restart_coordinator.coordinate() + self.charm.unit.status = WaitingStatus("waiting for promotion of the main cluster") def _on_coordination_request(self, event): # Stop the container. # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s # service from Kubernetes and not getting it recreated! # We will restart it once the cluster is ready. + self.charm.unit.status = MaintenanceStatus("stopping database to enable async replication") for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): with attempt: self.container.stop(self.charm._postgresql_service) @@ -327,23 +348,79 @@ def _on_coordination_approval(self, event): # This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't # work after the database service is stopped on Pebble. client = Client() - client.delete( - Service, - name=f"patroni-{self.charm._name}-config", - namespace=self.charm._namespace, - ) + try: + client.delete( + Service, + name=f"patroni-{self.charm._name}-config", + namespace=self.charm._namespace, + ) + except ApiError as e: + # Ignore the error only when the resource doesn't exist. + if e.status.code != 404: + raise e elif not self.charm._patroni.primary_endpoint_ready: - self.charm.unit.status = WaitingStatus("waiting for primary to be ready") + self.charm.unit.status = WaitingStatus("waiting for standby leader to be ready") event.defer() return + self.charm.unit.status = MaintenanceStatus("starting database to enable async replication") + self.charm.update_config() logger.info("_on_standby_changed: configuration done, waiting for restart of the service") # We are ready to restart the service now: all peers have configured themselves. self.container.start(self.charm._postgresql_service) + + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if not self.charm._patroni.member_started: + raise Exception + except RetryError: + logger.debug("defer _on_coordination_approval: database hasn't started yet") + event.defer() + return + if self.charm.unit.is_leader(): + diverging_databases = False + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if self.charm._patroni.get_standby_leader(unit_name_pattern=True) != self.charm.unit.name: + raise Exception + except RetryError: + diverging_databases = True + if diverging_databases: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + self.container.stop(self.charm._postgresql_service) + + # Store current data in a ZIP file, clean folder and generate configuration. + logger.info("Creating backup of pgdata folder") + self.container.exec( + f"tar -zcf /var/lib/postgresql/data/pgdata-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.zip /var/lib/postgresql/data/pgdata".split() + ).wait_output() + logger.info("Removing and recreating pgdata folder") + self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() + self.container.start(self.charm._postgresql_service) + self.charm._create_pgdata(self.container) + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if not self.charm._patroni.member_started: + raise Exception + except RetryError: + logger.debug("defer _on_coordination_approval: database hasn't started yet") + event.defer() + return + + if not self.charm._patroni.are_all_members_ready(): + self.charm.unit.status = WaitingStatus("waiting for all members to be ready") + event.defer() + return + self.charm._peers.data[self.charm.app]["cluster_initialised"] = "True" + self.charm.unit.status = ActiveStatus() def _check_if_primary_already_selected(self) -> Optional[Unit]: """Returns the unit if a primary is present.""" @@ -376,39 +453,51 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: event.fail("No relation found.") return primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) - if not primary_relation: - event.fail("No primary relation") - return - - # Let the exception error the unit - unit = self._check_if_primary_already_selected() - if unit: - event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") - return - - system_identifier, error = self.get_system_identifier() - if error is not None: - event.fail(f"Failed to get system identifier: {error}") - return + if primary_relation: + # Let the exception error the unit + unit = self._check_if_primary_already_selected() + if unit: + event.fail(f"Cannot promote - {self.charm.app.name} is already the main cluster") + return + + try: + self.charm._patroni.promote_standby_cluster() + except StandbyClusterAlreadyPromotedError: + # Ignore this error for non-standby clusters. + pass + except ClusterNotPromotedError as e: + event.fail(str(e)) + return - primary_relation.data[self.charm.unit]["elected"] = json.dumps( - { - "endpoint": self.endpoint, - "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), - "replication-password": self.charm._patroni._replication_password, - "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), - "superuser-password": self.charm._patroni._superuser_password, - "system-id": system_identifier, - } - ) - self.charm.app_peer_data["promoted"] = "True" + system_identifier, error = self.get_system_identifier() + if error is not None: + event.fail(f"Failed to get system identifier: {error}") + return - # Now, check if postgresql it had originally published its pod IP in the - # replica relation databag. Delete it, if yes. - replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) - if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: - return - del replica_relation.data[self.charm.unit]["pod-address"] + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), + "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, + } + ) + self.charm.app_peer_data["promoted"] = "True" + + # Now, check if postgresql it had originally published its pod IP in the + # replica relation databag. Delete it, if yes. + replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: + return + del replica_relation.data[self.charm.unit]["pod-address"] + else: + # Remove the standby cluster information from the Patroni configuration. + try: + self.charm._patroni.promote_standby_cluster() + except Exception as e: + event.fail(str(e)) def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: """Returns the PostgreSQL system identifier from this instance.""" From f9d878d9c0d7d1b037723b8fc15f49aaf15e0e95 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 18 Mar 2024 14:28:20 -0300 Subject: [PATCH 09/15] Fix standby cluster trying to write to the database Signed-off-by: Marcelo Henrique Neppel --- lib/charms/postgresql_k8s/v0/postgresql.py | 23 ++- src/charm.py | 3 + src/patroni.py | 29 ++-- src/relations/async_replication.py | 159 ++++++++++++--------- tests/unit/test_charm.py | 3 + 5 files changed, 125 insertions(+), 92 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 574e157780..645f663518 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -35,7 +35,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 24 +LIBPATCH = 25 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -423,6 +423,16 @@ def get_postgresql_version(self) -> str: logger.error(f"Failed to get PostgreSQL version: {e}") raise PostgreSQLGetPostgreSQLVersionError() + def is_standby_cluster(self) -> bool: + """Returns whether the PostgreSQL cluster is a standby cluster.""" + try: + with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute("SELECT pg_is_in_recovery();") + return cursor.fetchone()[0] + except psycopg2.Error as e: + logger.error(f"Failed to check if PostgreSQL cluster is a standby cluster: {e}") + return False + def is_tls_enabled(self, check_current_host: bool = False) -> bool: """Returns whether TLS is enabled. @@ -477,11 +487,10 @@ def set_up_database(self) -> None: """Set up postgres database with the right permissions.""" connection = None try: - self.create_user( - "admin", - extra_user_roles="pg_read_all_data,pg_write_all_data", - ) with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';") + if cursor.fetchone() is not None: + return # Allow access to the postgres database only to the system users. cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;") cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;") @@ -491,6 +500,10 @@ def set_up_database(self) -> None: sql.Identifier(user) ) ) + self.create_user( + "admin", + extra_user_roles="pg_read_all_data,pg_write_all_data", + ) cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;") except psycopg2.Error as e: logger.error(f"Failed to set up databases: {e}") diff --git a/src/charm.py b/src/charm.py index f814b7b224..0fda2853d2 100755 --- a/src/charm.py +++ b/src/charm.py @@ -507,6 +507,9 @@ def enable_disable_extensions(self, database: str = None) -> None: Args: database: optional database where to enable/disable the extension. """ + if self.postgresql.is_standby_cluster: + logger.debug("Early exit enable_disable_extensions: standby cluster") + return spi_module = ["refint", "autoinc", "insert_username", "moddatetime"] plugins_exception = {"uuid_ossp": '"uuid-ossp"'} original_status = self.unit.status diff --git a/src/patroni.py b/src/patroni.py index b9342065ef..ddadeffe79 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -344,16 +344,11 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: Dict[str, Any def promote_standby_cluster(self) -> None: """Promote a standby cluster to be a regular cluster.""" - config_response = requests.get( - f"{self._patroni_url}/config", - verify=self._verify - ) + config_response = requests.get(f"{self._patroni_url}/config", verify=self._verify) if "standby_cluster" not in config_response.json(): raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") requests.patch( - f"{self._patroni_url}/config", - verify=self._verify, - json={"standby_cluster": None} + f"{self._patroni_url}/config", verify=self._verify, json={"standby_cluster": None} ) for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): with attempt: @@ -427,12 +422,12 @@ def render_patroni_yml_file( is_no_sync_member=is_no_sync_member, namespace=self._namespace, storage_path=self._storage_path, - superuser_password=primary["superuser-password"] - if primary - else self._superuser_password, - replication_password=primary["replication-password"] - if primary - else self._replication_password, + superuser_password=( + primary["superuser-password"] if primary else self._superuser_password + ), + replication_password=( + primary["replication-password"] if primary else self._replication_password + ), rewind_user=REWIND_USER, rewind_password=self._rewind_password, enable_pgbackrest=stanza is not None, @@ -444,9 +439,11 @@ def render_patroni_yml_file( version=self.rock_postgresql_version.split(".")[0], pg_parameters=parameters, standby_cluster_endpoint=primary["endpoint"] if primary else None, - extra_replication_endpoints={"{}/32".format(primary["endpoint"])} - if primary - else self._charm.async_manager.standby_endpoints(), + extra_replication_endpoints=( + {"{}/32".format(primary["endpoint"])} + if primary + else self._charm.async_manager.standby_endpoints() + ), ) self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index baf4712a0c..b2eeec4eed 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -18,6 +18,7 @@ from ops.model import ( ActiveStatus, MaintenanceStatus, + Relation, Unit, WaitingStatus, ) @@ -358,7 +359,10 @@ def _on_coordination_approval(self, event): # Ignore the error only when the resource doesn't exist. if e.status.code != 404: raise e - elif not self.charm._patroni.primary_endpoint_ready: + elif ( + not self.charm._patroni.primary_endpoint_ready + or not self.charm._patroni.get_standby_leader(unit_name_pattern=True) + ): self.charm.unit.status = WaitingStatus("waiting for standby leader to be ready") event.defer() return @@ -366,7 +370,9 @@ def _on_coordination_approval(self, event): self.charm.unit.status = MaintenanceStatus("starting database to enable async replication") self.charm.update_config() - logger.info("_on_standby_changed: configuration done, waiting for restart of the service") + logger.info( + "_on_coordination_approval: configuration done, waiting for restart of the service" + ) # We are ready to restart the service now: all peers have configured themselves. self.container.start(self.charm._postgresql_service) @@ -382,45 +388,51 @@ def _on_coordination_approval(self, event): return if self.charm.unit.is_leader(): - diverging_databases = False + self._handle_leader_startup() + self.charm.unit.status = ActiveStatus() + + def _handle_leader_startup(self, event) -> None: + diverging_databases = False + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if ( + self.charm._patroni.get_standby_leader(unit_name_pattern=True) + != self.charm.unit.name + ): + raise Exception + except RetryError: + diverging_databases = True + if diverging_databases: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + self.container.stop(self.charm._postgresql_service) + + # Store current data in a ZIP file, clean folder and generate configuration. + logger.info("Creating backup of pgdata folder") + self.container.exec( + f"tar -zcf /var/lib/postgresql/data/pgdata-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.zip /var/lib/postgresql/data/pgdata".split() + ).wait_output() + logger.info("Removing and recreating pgdata folder") + self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() + self.container.start(self.charm._postgresql_service) + self.charm._create_pgdata(self.container) try: for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): with attempt: - if self.charm._patroni.get_standby_leader(unit_name_pattern=True) != self.charm.unit.name: + if not self.charm._patroni.member_started: raise Exception except RetryError: - diverging_databases = True - if diverging_databases: - for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): - with attempt: - self.container.stop(self.charm._postgresql_service) - - # Store current data in a ZIP file, clean folder and generate configuration. - logger.info("Creating backup of pgdata folder") - self.container.exec( - f"tar -zcf /var/lib/postgresql/data/pgdata-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.zip /var/lib/postgresql/data/pgdata".split() - ).wait_output() - logger.info("Removing and recreating pgdata folder") - self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() - self.container.start(self.charm._postgresql_service) - self.charm._create_pgdata(self.container) - try: - for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): - with attempt: - if not self.charm._patroni.member_started: - raise Exception - except RetryError: - logger.debug("defer _on_coordination_approval: database hasn't started yet") - event.defer() - return - - if not self.charm._patroni.are_all_members_ready(): - self.charm.unit.status = WaitingStatus("waiting for all members to be ready") + logger.debug("defer _on_coordination_approval: database hasn't started yet") event.defer() return - self.charm._peers.data[self.charm.app]["cluster_initialised"] = "True" - self.charm.unit.status = ActiveStatus() + if not self.charm._patroni.are_all_members_ready(): + self.charm.unit.status = WaitingStatus("waiting for all members to be ready") + event.defer() + return + + self.charm._peers.data[self.charm.app]["cluster_initialised"] = "True" def _check_if_primary_already_selected(self) -> Optional[Unit]: """Returns the unit if a primary is present.""" @@ -454,44 +466,7 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: return primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) if primary_relation: - # Let the exception error the unit - unit = self._check_if_primary_already_selected() - if unit: - event.fail(f"Cannot promote - {self.charm.app.name} is already the main cluster") - return - - try: - self.charm._patroni.promote_standby_cluster() - except StandbyClusterAlreadyPromotedError: - # Ignore this error for non-standby clusters. - pass - except ClusterNotPromotedError as e: - event.fail(str(e)) - return - - system_identifier, error = self.get_system_identifier() - if error is not None: - event.fail(f"Failed to get system identifier: {error}") - return - - primary_relation.data[self.charm.unit]["elected"] = json.dumps( - { - "endpoint": self.endpoint, - "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), - "replication-password": self.charm._patroni._replication_password, - "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), - "superuser-password": self.charm._patroni._superuser_password, - "system-id": system_identifier, - } - ) - self.charm.app_peer_data["promoted"] = "True" - - # Now, check if postgresql it had originally published its pod IP in the - # replica relation databag. Delete it, if yes. - replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) - if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: - return - del replica_relation.data[self.charm.unit]["pod-address"] + self._promote_standby_cluster_from_two_clusters(event, primary_relation) else: # Remove the standby cluster information from the Patroni configuration. try: @@ -499,6 +474,48 @@ def _on_promote_standby_cluster(self, event: ActionEvent) -> None: except Exception as e: event.fail(str(e)) + def _promote_standby_cluster_from_two_clusters( + self, event: ActionEvent, primary_relation: Relation + ) -> None: + # Let the exception error the unit + unit = self._check_if_primary_already_selected() + if unit: + event.fail(f"Cannot promote - {self.charm.app.name} is already the main cluster") + return + + try: + self.charm._patroni.promote_standby_cluster() + except StandbyClusterAlreadyPromotedError: + # Ignore this error for non-standby clusters. + pass + except ClusterNotPromotedError as e: + event.fail(str(e)) + return + + system_identifier, error = self.get_system_identifier() + if error is not None: + event.fail(f"Failed to get system identifier: {error}") + return + + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), + "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, + } + ) + self.charm.app_peer_data["promoted"] = "True" + + # Now, check if postgresql it had originally published its pod IP in the + # replica relation databag. Delete it, if yes. + replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: + return + del replica_relation.data[self.charm.unit]["pod-address"] + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: """Returns the PostgreSQL system identifier from this instance.""" try: diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index eb8221dc6d..816f9c84e1 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -1189,6 +1189,7 @@ def test_on_peer_relation_changed( @patch("charm.Patroni.reinitialize_postgresql") @patch("charm.Patroni.member_streaming", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock) @patch("charm.PostgresqlOperatorCharm.is_primary", new_callable=PropertyMock) @patch("charm.Patroni.is_database_running", new_callable=PropertyMock) @patch("charm.Patroni.member_started", new_callable=PropertyMock) @@ -1199,6 +1200,7 @@ def test_handle_processes_failures( _member_started, _is_database_running, _is_primary, + _is_standby_leader, _member_streaming, _reinitialize_postgresql, ): @@ -1258,6 +1260,7 @@ def test_handle_processes_failures( # Test when the unit is a replica and it's not streaming from primary. _restart.reset_mock() _is_primary.return_value = False + _is_standby_leader.return_value = False _member_streaming.return_value = False for values in itertools.product( [None, RetryError(last_attempt=1)], [True, False], [True, False] From 1aa144de51a475a8f23c9bc2c3cf9d4d332e8c0f Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 18 Mar 2024 15:15:19 -0300 Subject: [PATCH 10/15] Add missing parameter Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index b2eeec4eed..c1918d5b67 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -388,7 +388,7 @@ def _on_coordination_approval(self, event): return if self.charm.unit.is_leader(): - self._handle_leader_startup() + self._handle_leader_startup(event) self.charm.unit.status = ActiveStatus() def _handle_leader_startup(self, event) -> None: From de6c55285d940fee7306bd26349c72e7887f8e3d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 19 Mar 2024 18:49:56 -0300 Subject: [PATCH 11/15] Remove previous cluster information to avoid that the secondary cluster gets stuck Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 35 +++++++++++++++++++----------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index c1918d5b67..18c1bf3587 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -9,7 +9,7 @@ from typing import Dict, Optional, Set, Tuple from lightkube import ApiError, Client -from lightkube.resources.core_v1 import Service +from lightkube.resources.core_v1 import Endpoints, Service from ops.charm import ( ActionEvent, CharmBase, @@ -348,17 +348,7 @@ def _on_coordination_approval(self, event): # Delete the K8S endpoints that tracks the cluster information, including its id. # This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't # work after the database service is stopped on Pebble. - client = Client() - try: - client.delete( - Service, - name=f"patroni-{self.charm._name}-config", - namespace=self.charm._namespace, - ) - except ApiError as e: - # Ignore the error only when the resource doesn't exist. - if e.status.code != 404: - raise e + self._remove_previous_cluster_information() elif ( not self.charm._patroni.primary_endpoint_ready or not self.charm._patroni.get_standby_leader(unit_name_pattern=True) @@ -391,6 +381,24 @@ def _on_coordination_approval(self, event): self._handle_leader_startup(event) self.charm.unit.status = ActiveStatus() + def _remove_previous_cluster_information(self) -> None: + client = Client() + try: + client.delete( + Service, + name=f"patroni-{self.charm._name}-config", + namespace=self.charm._namespace, + ) + client.delete( + Endpoints, + name=f"patroni-{self.charm._name}", + namespace=self.charm._namespace, + ) + except ApiError as e: + # Ignore the error only when the resource doesn't exist. + if e.status.code != 404: + raise e + def _handle_leader_startup(self, event) -> None: diverging_databases = False try: @@ -415,8 +423,9 @@ def _handle_leader_startup(self, event) -> None: ).wait_output() logger.info("Removing and recreating pgdata folder") self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() - self.container.start(self.charm._postgresql_service) self.charm._create_pgdata(self.container) + self._remove_previous_cluster_information() + self.container.start(self.charm._postgresql_service) try: for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): with attempt: From e5a41c51d87ed79dcc9802320933020a3b2cfe53 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 19 Mar 2024 19:22:04 -0300 Subject: [PATCH 12/15] Format code Signed-off-by: Marcelo Henrique Neppel --- src/coordinator_ops.py | 1 - src/relations/async_replication.py | 38 +++++++++++++----------------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/src/coordinator_ops.py b/src/coordinator_ops.py index 2bd1c2becc..0914b1bb9c 100644 --- a/src/coordinator_ops.py +++ b/src/coordinator_ops.py @@ -69,7 +69,6 @@ def _on_coordinator_approved(self, event): self.start_coordinator.coordinate() """ - import logging from typing import AnyStr diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 18c1bf3587..27384b8610 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -506,16 +506,14 @@ def _promote_standby_cluster_from_two_clusters( event.fail(f"Failed to get system identifier: {error}") return - primary_relation.data[self.charm.unit]["elected"] = json.dumps( - { - "endpoint": self.endpoint, - "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), - "replication-password": self.charm._patroni._replication_password, - "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), - "superuser-password": self.charm._patroni._superuser_password, - "system-id": system_identifier, - } - ) + primary_relation.data[self.charm.unit]["elected"] = json.dumps({ + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), + "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, + }) self.charm.app_peer_data["promoted"] = "True" # Now, check if postgresql it had originally published its pod IP in the @@ -559,17 +557,13 @@ def update_async_replication_data(self) -> None: system_identifier, error = self.get_system_identifier() if error is not None: raise Exception(f"Failed to get system identifier: {error}") - primary_relation.data[self.charm.unit]["elected"] = json.dumps( - { - "endpoint": self.endpoint, - "monitoring-password": self.charm.get_secret( - APP_SCOPE, MONITORING_PASSWORD_KEY - ), - "replication-password": self.charm._patroni._replication_password, - "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), - "superuser-password": self.charm._patroni._superuser_password, - "system-id": system_identifier, - } - ) + primary_relation.data[self.charm.unit]["elected"] = json.dumps({ + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), + "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, + }) else: primary_relation.data[self.charm.unit]["elected"] = "" From fc5a61378747e91a41fb76cd6a7d856ebc8b0f6c Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Sat, 23 Mar 2024 18:33:15 -0300 Subject: [PATCH 13/15] Fix check for standby cluster Signed-off-by: Marcelo Henrique Neppel --- lib/charms/postgresql_k8s/v0/postgresql.py | 10 ---------- src/charm.py | 2 +- tests/unit/test_charm.py | 2 ++ 3 files changed, 3 insertions(+), 11 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 47fc900fdd..34fc9ebedf 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -422,16 +422,6 @@ def get_postgresql_version(self) -> str: logger.error(f"Failed to get PostgreSQL version: {e}") raise PostgreSQLGetPostgreSQLVersionError() - def is_standby_cluster(self) -> bool: - """Returns whether the PostgreSQL cluster is a standby cluster.""" - try: - with self._connect_to_database() as connection, connection.cursor() as cursor: - cursor.execute("SELECT pg_is_in_recovery();") - return cursor.fetchone()[0] - except psycopg2.Error as e: - logger.error(f"Failed to check if PostgreSQL cluster is a standby cluster: {e}") - return False - def is_tls_enabled(self, check_current_host: bool = False) -> bool: """Returns whether TLS is enabled. diff --git a/src/charm.py b/src/charm.py index d41199f5f4..2fe482c654 100755 --- a/src/charm.py +++ b/src/charm.py @@ -508,7 +508,7 @@ def enable_disable_extensions(self, database: str = None) -> None: Args: database: optional database where to enable/disable the extension. """ - if self.postgresql.is_standby_cluster: + if self._patroni.get_primary() is None: logger.debug("Early exit enable_disable_extensions: standby cluster") return spi_module = ["refint", "autoinc", "insert_username", "moddatetime"] diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index eaebc9114d..ea906d3837 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -126,6 +126,7 @@ def test_on_leader_elected(self, _, __, ___, _set_secret, _get_secret, _____, _c @patch("charm.Patroni.rock_postgresql_version", new_callable=PropertyMock) @patch("charm.Patroni.primary_endpoint_ready", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.enable_disable_extensions") @patch("charm.PostgresqlOperatorCharm.update_config") @patch("charm.PostgresqlOperatorCharm.postgresql") @patch( @@ -147,6 +148,7 @@ def test_on_postgresql_pebble_ready( _create_services, _postgresql, ___, + ____, _primary_endpoint_ready, _rock_postgresql_version, ): From c05200ba2c6d7dfbfe73ce3da5666f9eed95b1b8 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 28 Mar 2024 15:46:07 -0300 Subject: [PATCH 14/15] Remove unnecessary pgdata backup creation Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 27384b8610..a2a19d9955 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -416,12 +416,9 @@ def _handle_leader_startup(self, event) -> None: with attempt: self.container.stop(self.charm._postgresql_service) - # Store current data in a ZIP file, clean folder and generate configuration. - logger.info("Creating backup of pgdata folder") - self.container.exec( - f"tar -zcf /var/lib/postgresql/data/pgdata-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.zip /var/lib/postgresql/data/pgdata".split() - ).wait_output() - logger.info("Removing and recreating pgdata folder") + logger.info( + "Removing and recreating pgdata folder due to diverging databases between this and the other cluster" + ) self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() self.charm._create_pgdata(self.container) self._remove_previous_cluster_information() From 2850ec789aa22c131f52666881c7d663c32a9e1e Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 1 Apr 2024 10:09:54 -0300 Subject: [PATCH 15/15] Handle replicas issues Signed-off-by: Marcelo Henrique Neppel --- src/charm.py | 3 ++ src/relations/async_replication.py | 48 ++++++++++++++++++++---------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/src/charm.py b/src/charm.py index 2fe482c654..1ec0330cb5 100755 --- a/src/charm.py +++ b/src/charm.py @@ -820,6 +820,9 @@ def _on_upgrade_charm(self, _) -> None: self.unit.status = BlockedStatus(f"failed to patch pod with error {e}") return + # Update the endpoint in the async replication data. + self.async_manager.update_async_replication_data() + def _patch_pod_labels(self, member: str) -> None: """Add labels required for replication to the current pod. diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index a2a19d9955..85dc4d7768 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -373,6 +373,15 @@ def _on_coordination_approval(self, event): if not self.charm._patroni.member_started: raise Exception except RetryError: + if ( + not self.charm.is_primary + and not self.charm.is_standby_leader + and ( + self.charm._patroni.member_replication_lag == "unknown" + or int(self.charm._patroni.member_replication_lag) > 1000 + ) + ): + self.charm._patroni.reinitialize_postgresql() logger.debug("defer _on_coordination_approval: database hasn't started yet") event.defer() return @@ -546,21 +555,28 @@ def update_async_replication_data(self) -> None: This is used to update the standby units with the new primary information. If the unit is not the leader, then the data is removed from its databag. """ - if "promoted" not in self.charm.app_peer_data: - return - primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) - if self.charm.unit.is_leader(): - system_identifier, error = self.get_system_identifier() - if error is not None: - raise Exception(f"Failed to get system identifier: {error}") - primary_relation.data[self.charm.unit]["elected"] = json.dumps({ - "endpoint": self.endpoint, - "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), - "replication-password": self.charm._patroni._replication_password, - "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), - "superuser-password": self.charm._patroni._superuser_password, - "system-id": system_identifier, - }) + if primary_relation: + if "promoted" not in self.charm.app_peer_data: + return + if self.charm.unit.is_leader(): + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(f"Failed to get system identifier: {error}") + primary_relation.data[self.charm.unit]["elected"] = json.dumps({ + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret( + APP_SCOPE, MONITORING_PASSWORD_KEY + ), + "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, + }) + else: + primary_relation.data[self.charm.unit]["elected"] = "" else: - primary_relation.data[self.charm.unit]["elected"] = "" + replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) + primary = self._check_if_primary_already_selected() + if replica_relation and primary: + replica_relation.data[self.charm.unit]["pod-address"] = _get_pod_ip()