Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2897] Cross-region async replication #447

Merged
merged 9 commits into from
May 3, 2024
Merged
6 changes: 6 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ list-backups:
description: Lists backups in s3 storage in AWS.
pre-upgrade-check:
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
promote-cluster:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
restore:
description: Restore a database backup using pgBackRest.
S3 credentials are retrieved from a relation with the S3 integrator charm.
Expand Down
14 changes: 9 additions & 5 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 25
LIBPATCH = 26

INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"

Expand Down Expand Up @@ -476,11 +476,11 @@ def set_up_database(self) -> None:
"""Set up postgres database with the right permissions."""
connection = None
try:
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';")
if cursor.fetchone() is not None:
return

# Allow access to the postgres database only to the system users.
cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;")
cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;")
Expand All @@ -490,6 +490,10 @@ def set_up_database(self) -> None:
sql.Identifier(user)
)
)
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;")
except psycopg2.Error as e:
logger.error(f"Failed to set up databases: {e}")
Expand Down
8 changes: 8 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
limit: 1
optional: true
database:
interface: postgresql_client
db:
Expand All @@ -51,6 +55,10 @@ provides:
interface: grafana_dashboard

requires:
async-replica:
interface: async_replication
limit: 1
optional: true
certificates:
interface: tls-certificates
limit: 1
Expand Down
3 changes: 1 addition & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

from constants import BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER
from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -717,6 +718,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
event.fail(error_message)
return False

logger.info("Checking that the cluster is not replicating data to a standby cluster")
for relation in [
self.model.get_relation(ASYNC_REPLICA_RELATION),
self.model.get_relation(ASYNC_PRIMARY_RELATION),
]:
if not relation:
continue
error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster"
logger.error(f"Restore failed: {error_message}")
event.fail(error_message)
return False

logger.info("Checking that this unit was already elected the leader unit")
if not self.charm.unit.is_leader():
error_message = "Unit cannot restore backup as it was not elected the leader unit yet"
Expand Down
96 changes: 76 additions & 20 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
WORKLOAD_OS_USER,
)
from patroni import NotReadyError, Patroni
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model
Expand Down Expand Up @@ -176,6 +177,7 @@ def __init__(self, *args):
self.legacy_db_admin_relation = DbProvides(self, admin=True)
self.backup = PostgreSQLBackups(self, "s3-parameters")
self.tls = PostgreSQLTLS(self, PEER, [self.primary_endpoint, self.replicas_endpoint])
self.async_replication = PostgreSQLAsyncReplication(self)
self.restart_manager = RollingOpsManager(
charm=self, relation="restart", callback=self._restart
)
Expand Down Expand Up @@ -350,6 +352,18 @@ def _get_endpoints_to_remove(self) -> List[str]:
endpoints_to_remove = list(set(old) - set(current))
return endpoints_to_remove

def get_unit_ip(self, unit: Unit) -> Optional[str]:
"""Get the IP address of a specific unit."""
# Check if host is current host.
if unit == self.unit:
return str(self.model.get_binding(PEER).network.bind_address)
# Check if host is a peer.
elif unit in self._peers.data:
return str(self._peers.data[unit].get("private-address"))
# Return None if the unit is not a peer neither the current unit.
else:
return None

def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
"""The leader removes the departing units from the list of cluster members."""
# Allow leader to update endpoints if it isn't leaving.
Expand All @@ -367,6 +381,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
self.postgresql_client_relation.update_read_only_endpoint()
self._remove_from_endpoints(endpoints_to_remove)

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_peer_relation_changed(self, event: HookEvent) -> None:
"""Reconfigure cluster members."""
# The cluster must be initialized first in the leader unit
Expand Down Expand Up @@ -411,9 +428,13 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:

# Restart the workload if it's stuck on the starting state after a timeline divergence
# due to a backup that was restored.
if not self.is_primary and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
if (
not self.is_primary
and not self.is_standby_leader
and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
)
):
self._patroni.reinitialize_postgresql()
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
Expand All @@ -439,8 +460,7 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:
else:
self.unit_peer_data.pop("start-tls-server", None)

if not self.is_blocked:
self._set_active_status()
self.async_replication.handle_read_only_mode()

def _on_config_changed(self, event) -> None:
"""Handle configuration changes, like enabling plugins."""
Expand Down Expand Up @@ -470,6 +490,9 @@ def _on_config_changed(self, event) -> None:
if self.is_blocked and "Configuration Error" in self.unit.status.message:
self._set_active_status()

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

if not self.unit.is_leader():
return

Expand Down Expand Up @@ -497,6 +520,9 @@ def enable_disable_extensions(self, database: str = None) -> None:
Args:
database: optional database where to enable/disable the extension.
"""
if self._patroni.get_primary() is None:
logger.debug("Early exit enable_disable_extensions: standby cluster")
return
spi_module = ["refint", "autoinc", "insert_username", "moddatetime"]
plugins_exception = {"uuid_ossp": '"uuid-ossp"'}
original_status = self.unit.status
Expand Down Expand Up @@ -640,6 +666,25 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None:
self._add_to_endpoints(self._endpoint)

self._cleanup_old_cluster_resources()

if not self.fix_leader_annotation():
return

# Create resources and add labels needed for replication.
try:
self._create_services()
except ApiError:
logger.exception("failed to create k8s services")
self.unit.status = BlockedStatus("failed to create k8s services")
return

# Remove departing units when the leader changes.
self._remove_from_endpoints(self._get_endpoints_to_remove())

self._add_members(event)

def fix_leader_annotation(self) -> bool:
"""Fix the leader annotation if it's missing."""
client = Client()
try:
endpoint = client.get(Endpoints, name=self.cluster_name, namespace=self._namespace)
Expand All @@ -656,23 +701,11 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None:
except ApiError as e:
if e.status.code == 403:
self.on_deployed_without_trust()
return
return False
# Ignore the error only when the resource doesn't exist.
if e.status.code != 404:
raise e

# Create resources and add labels needed for replication.
try:
self._create_services()
except ApiError:
logger.exception("failed to create k8s services")
self.unit.status = BlockedStatus("failed to create k8s services")
return

# Remove departing units when the leader changes.
self._remove_from_endpoints(self._get_endpoints_to_remove())

self._add_members(event)
return True

def _create_pgdata(self, container: Container):
"""Create the PostgreSQL data directory."""
Expand Down Expand Up @@ -746,6 +779,8 @@ def _set_active_status(self):
try:
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
self.unit.status = ActiveStatus("Primary")
elif self.is_standby_leader:
self.unit.status = ActiveStatus("Standby Leader")
elif self._patroni.member_started:
self.unit.status = ActiveStatus()
except (RetryError, ConnectionError) as e:
Expand Down Expand Up @@ -818,6 +853,9 @@ def _on_upgrade_charm(self, _) -> None:
self.unit.status = BlockedStatus(f"failed to patch pod with error {e}")
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _patch_pod_labels(self, member: str) -> None:
"""Add labels required for replication to the current pod.

Expand Down Expand Up @@ -987,6 +1025,9 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_get_primary(self, event: ActionEvent) -> None:
Expand Down Expand Up @@ -1108,6 +1149,9 @@ def _on_update_status(self, _) -> None:
if self._handle_processes_failures():
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

self._set_active_status()

def _handle_processes_failures(self) -> bool:
Expand All @@ -1130,8 +1174,15 @@ def _handle_processes_failures(self) -> bool:
return False
return True

try:
is_primary = self.is_primary
is_standby_leader = self.is_standby_leader
except RetryError:
return False

if (
not self.is_primary
not is_primary
and not is_standby_leader
and self._patroni.member_started
and not self._patroni.member_streaming
):
Expand Down Expand Up @@ -1169,6 +1220,11 @@ def is_primary(self) -> bool:
"""Return whether this unit is the primary instance."""
return self._unit == self._patroni.get_primary(unit_name_pattern=True)

@property
def is_standby_leader(self) -> bool:
"""Return whether this unit is the standby leader instance."""
return self._unit == self._patroni.get_standby_leader(unit_name_pattern=True)

@property
def is_tls_enabled(self) -> bool:
"""Return whether TLS is enabled."""
Expand Down
1 change: 1 addition & 0 deletions src/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
WORKLOAD_OS_GROUP = "postgres"
WORKLOAD_OS_USER = "postgres"
METRICS_PORT = "9187"
POSTGRESQL_DATA_PATH = "/var/lib/postgresql/data/pgdata"
POSTGRES_LOG_FILES = [
"/var/log/pgbackrest/*",
"/var/log/postgresql/patroni.log",
Expand Down
Loading
Loading