Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2897] Cross-region async replication #368

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b6251b4
Add first draft of standby_cluster support
phvalguima Oct 23, 2023
c4c0adb
First support finished:
phvalguima Oct 24, 2023
0a47ced
Add support for clusters with >1 untis
phvalguima Oct 26, 2023
2918489
Add lint fix
phvalguima Oct 26, 2023
73af835
Add a coordinator logic to make sure all units of the standby cluster…
phvalguima Oct 27, 2023
fabf4c3
Merge remote-tracking branch 'origin/main' into dpe-2897-cross-region…
marceloneppel Jan 22, 2024
f3cfd31
Add extra needed details
marceloneppel Jan 22, 2024
6973451
Merge remote-tracking branch 'origin/main' into dpe-2897-cross-region…
marceloneppel Jan 22, 2024
789217c
Add all secrets and add minor fixes to code
marceloneppel Jan 23, 2024
b0d8ed3
Merge remote-tracking branch 'origin/main' into dpe-2897-cross-region…
marceloneppel Feb 12, 2024
ae1dcc0
Enable switchover to another cluster
marceloneppel Feb 20, 2024
02a19f1
Merge remote-tracking branch 'origin/main' into dpe-2897-cross-region…
marceloneppel Mar 18, 2024
f9d878d
Fix standby cluster trying to write to the database
marceloneppel Mar 18, 2024
1aa144d
Add missing parameter
marceloneppel Mar 18, 2024
de6c552
Remove previous cluster information to avoid that the secondary clust…
marceloneppel Mar 19, 2024
f52dfdc
Merge remote-tracking branch 'origin/main' into dpe-2897-cross-region…
marceloneppel Mar 19, 2024
e5a41c5
Format code
marceloneppel Mar 19, 2024
fc5a613
Fix check for standby cluster
marceloneppel Mar 23, 2024
c05200b
Remove unnecessary pgdata backup creation
marceloneppel Mar 28, 2024
91ce71a
Merge remote-tracking branch 'origin/main' into dpe-2897-cross-region…
marceloneppel Mar 28, 2024
2850ec7
Handle replicas issues
marceloneppel Apr 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ set-tls-private-key:
private-key:
type: string
description: The content of private key for communications with clients. Content will be auto-generated if this option is not specified.

promote-standby-cluster:
description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster.
11 changes: 7 additions & 4 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,10 @@ def set_up_database(self) -> None:
"""Set up postgres database with the right permissions."""
connection = None
try:
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';")
if cursor.fetchone() is not None:
return
# Allow access to the postgres database only to the system users.
cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;")
cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;")
Expand All @@ -490,6 +489,10 @@ def set_up_database(self) -> None:
sql.Identifier(user)
)
)
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;")
except psycopg2.Error as e:
logger.error(f"Failed to set up databases: {e}")
Expand Down
5 changes: 5 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
database:
interface: postgresql_client
db:
Expand All @@ -51,6 +53,9 @@ provides:
interface: grafana_dashboard

requires:
async-replica:
interface: async_replication
limit: 1
certificates:
interface: tls-certificates
limit: 1
Expand Down
3 changes: 1 addition & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

from constants import BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER
from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -717,6 +718,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool:
event.fail(error_message)
return False

logger.info("Checking that the cluster is not replicating data to a standby cluster")
for rel in [
self.model.get_relation(ASYNC_REPLICA_RELATION),
self.model.get_relation(ASYNC_PRIMARY_RELATION),
]:
if not rel: # if no relation exits, then it rel == None
continue
error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster"
logger.error(f"Restore failed: {error_message}")
event.fail(error_message)
return False

logger.info("Checking that this unit was already elected the leader unit")
if not self.charm.unit.is_leader():
error_message = "Unit cannot restore backup as it was not elected the leader unit yet"
Expand Down
56 changes: 52 additions & 4 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
WORKLOAD_OS_USER,
)
from patroni import NotReadyError, Patroni
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model
Expand Down Expand Up @@ -182,6 +183,7 @@ def __init__(self, *args):
postgresql_db_port = ServicePort(5432, name="database")
patroni_api_port = ServicePort(8008, name="api")
self.service_patcher = KubernetesServicePatch(self, [postgresql_db_port, patroni_api_port])
self.async_manager = PostgreSQLAsyncReplication(self)

def _generate_metrics_jobs(self, enable_tls: bool) -> Dict:
"""Generate spec for Prometheus scraping."""
Expand Down Expand Up @@ -337,6 +339,18 @@ def _get_endpoints_to_remove(self) -> List[str]:
endpoints_to_remove = list(set(old) - set(current))
return endpoints_to_remove

def get_unit_ip(self, unit: Unit) -> Optional[str]:
"""Get the IP address of a specific unit."""
# Check if host is current host.
if unit == self.unit:
return str(self.model.get_binding(PEER).network.bind_address)
# Check if host is a peer.
elif unit in self._peers.data:
return str(self._peers.data[unit].get("private-address"))
# Return None if the unit is not a peer neither the current unit.
else:
return None

def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
"""The leader removes the departing units from the list of cluster members."""
# Allow leader to update endpoints if it isn't leaving.
Expand All @@ -354,6 +368,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
self.postgresql_client_relation.update_read_only_endpoint()
self._remove_from_endpoints(endpoints_to_remove)

# Update the endpoint in the async replication data.
self.async_manager.update_async_replication_data()

def _on_peer_relation_changed(self, event: HookEvent) -> None:
"""Reconfigure cluster members."""
# The cluster must be initialized first in the leader unit
Expand Down Expand Up @@ -398,9 +415,13 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:

# Restart the workload if it's stuck on the starting state after a timeline divergence
# due to a backup that was restored.
if not self.is_primary and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
if (
not self.is_primary
and not self.is_standby_leader
and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
)
):
self._patroni.reinitialize_postgresql()
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
Expand Down Expand Up @@ -457,6 +478,9 @@ def _on_config_changed(self, event) -> None:
if self.is_blocked and "Configuration Error" in self.unit.status.message:
self.unit.status = ActiveStatus()

# Update the endpoint in the async replication data.
self.async_manager.update_async_replication_data()

if not self.unit.is_leader():
return

Expand Down Expand Up @@ -484,6 +508,9 @@ def enable_disable_extensions(self, database: str = None) -> None:
Args:
database: optional database where to enable/disable the extension.
"""
if self._patroni.get_primary() is None:
logger.debug("Early exit enable_disable_extensions: standby cluster")
return
spi_module = ["refint", "autoinc", "insert_username", "moddatetime"]
plugins_exception = {"uuid_ossp": '"uuid-ossp"'}
original_status = self.unit.status
Expand Down Expand Up @@ -793,6 +820,9 @@ def _on_upgrade_charm(self, _) -> None:
self.unit.status = BlockedStatus(f"failed to patch pod with error {e}")
return

# Update the endpoint in the async replication data.
self.async_manager.update_async_replication_data()

def _patch_pod_labels(self, member: str) -> None:
"""Add labels required for replication to the current pod.

Expand Down Expand Up @@ -959,6 +989,9 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_manager.update_async_replication_data()

event.set_results({"password": password})

def _on_get_primary(self, event: ActionEvent) -> None:
Expand Down Expand Up @@ -1080,6 +1113,9 @@ def _on_update_status(self, _) -> None:
if self._handle_processes_failures():
return

# Update the endpoint in the async replication data.
self.async_manager.update_async_replication_data()

self._set_primary_status_message()

def _handle_processes_failures(self) -> bool:
Expand All @@ -1102,8 +1138,15 @@ def _handle_processes_failures(self) -> bool:
return False
return True

try:
is_primary = self.is_primary
is_standby_leader = self.is_standby_leader
except RetryError:
return False

if (
not self.is_primary
not is_primary
and not is_standby_leader
and self._patroni.member_started
and not self._patroni.member_streaming
):
Expand Down Expand Up @@ -1151,6 +1194,11 @@ def is_primary(self) -> bool:
"""Return whether this unit is the primary instance."""
return self._unit == self._patroni.get_primary(unit_name_pattern=True)

@property
def is_standby_leader(self) -> bool:
"""Return whether this unit is the standby leader instance."""
return self._unit == self._patroni.get_standby_leader(unit_name_pattern=True)

@property
def is_tls_enabled(self) -> bool:
"""Return whether TLS is enabled."""
Expand Down
Loading
Loading