Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2900] Cross-region async replication integration tests #369

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d7e4597
Add integration tests
marceloneppel Jan 22, 2024
b28dc2d
Fix year in copyright notice
marceloneppel Jan 22, 2024
e085460
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Jan 22, 2024
9ab52f7
Add group mark to tests
marceloneppel Jan 22, 2024
db580da
Add group mark to test
marceloneppel Jan 22, 2024
632efe6
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Jan 23, 2024
4c3cf42
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Feb 12, 2024
fad4e51
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Mar 13, 2024
5f2270a
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Mar 18, 2024
c3bdef5
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Mar 18, 2024
d4df4bd
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Mar 19, 2024
99329d5
Add remaining tests
marceloneppel Mar 19, 2024
5e25645
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Mar 19, 2024
1c0df23
Fix deployment and leadership change
marceloneppel Mar 20, 2024
247be72
Restrict async replication tests to Juju 3
marceloneppel Mar 23, 2024
81a4528
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Mar 23, 2024
3733197
Fix fixture scope
marceloneppel Mar 28, 2024
52625c2
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Mar 28, 2024
fd95850
Merge branch 'dpe-2897-cross-region-async-replication' into dpe-2900-…
marceloneppel Apr 1, 2024
50529bc
Fix controller retrieval
marceloneppel Apr 1, 2024
76f6dd1
Fix offer and consume commands for existing model
marceloneppel Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 86 additions & 26 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import kubernetes as kubernetes
import psycopg2
import requests
from juju.model import Model
from kubernetes import config
from kubernetes.client.api import core_v1_api
from kubernetes.stream import stream
Expand Down Expand Up @@ -184,10 +185,10 @@ async def is_member_isolated(
return True


async def check_writes(ops_test) -> int:
async def check_writes(ops_test, extra_model: Model = None) -> int:
"""Gets the total writes from the test charm and compares to the writes from db."""
total_expected_writes = await stop_continuous_writes(ops_test)
actual_writes, max_number_written = await count_writes(ops_test)
actual_writes, max_number_written = await count_writes(ops_test, extra_model=extra_model)
for member, count in actual_writes.items():
assert (
count == max_number_written[member]
Expand All @@ -196,13 +197,17 @@ async def check_writes(ops_test) -> int:
return total_expected_writes


async def are_writes_increasing(ops_test, down_unit: str = None) -> None:
async def are_writes_increasing(
ops_test, down_unit: str = None, extra_model: Model = None
) -> None:
"""Verify new writes are continuing by counting the number of writes."""
writes, _ = await count_writes(ops_test, down_unit=down_unit)
writes, _ = await count_writes(ops_test, down_unit=down_unit, extra_model=extra_model)
for member, count in writes.items():
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3), reraise=True):
with attempt:
more_writes, _ = await count_writes(ops_test, down_unit=down_unit)
more_writes, _ = await count_writes(
ops_test, down_unit=down_unit, extra_model=extra_model
)
assert (
more_writes[member] > count
), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})"
Expand Down Expand Up @@ -265,28 +270,34 @@ def copy_file_into_pod(


async def count_writes(
ops_test: OpsTest, down_unit: str = None
ops_test: OpsTest, down_unit: str = None, extra_model: Model = None
) -> Tuple[Dict[str, int], Dict[str, int]]:
"""Count the number of writes in the database."""
app = await app_name(ops_test)
password = await get_password(ops_test, database_app_name=app, down_unit=down_unit)
status = await ops_test.model.get_status()
for unit_name, unit in status["applications"][app]["units"].items():
if unit_name != down_unit:
cluster = get_patroni_cluster(unit["address"])
break
members = []
for model in [ops_test.model, extra_model]:
if model is None:
continue
status = await model.get_status()
for unit_name, unit in status["applications"][app]["units"].items():
if unit_name != down_unit:
members_data = get_patroni_cluster(unit["address"])["members"]
for index, member_data in enumerate(members_data):
members_data[index]["model"] = model.info.name
members.extend(members_data)
break

count = {}
maximum = {}
for member in cluster["members"]:
for member in members:
if member["role"] != "replica" and member["host"].split(".")[0] != (
down_unit or ""
).replace("/", "-"):
host = member["host"]

# Translate the service hostname to an IP address.
model = ops_test.model.info
client = Client(namespace=model.name)
client = Client(namespace=member["model"])
service = client.get(Pod, name=host.split(".")[0])
ip = service.status.podIP

Expand All @@ -295,12 +306,23 @@ async def count_writes(
f" host='{ip}' password='{password}' connect_timeout=10"
)

with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor:
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
results = cursor.fetchone()
count[member["name"]] = results[0]
maximum[member["name"]] = results[1]
connection.close()
member_name = f'{member["model"]}.{member["name"]}'
connection = None
try:
with psycopg2.connect(
connection_string
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
results = cursor.fetchone()
count[member_name] = results[0]
maximum[member_name] = results[1]
except psycopg2.Error:
# Error raised when the connection is not possible.
count[member_name] = -1
maximum[member_name] = -1
finally:
if connection is not None:
connection.close()
return count, maximum


Expand Down Expand Up @@ -415,6 +437,42 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Op
return parameter_value


async def get_standby_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.

Args:
model: the model instance.
application_name: the name of the application to get the value for.

Returns:
the name of the standby leader.
"""
status = await model.get_status()
first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"]
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "standby_leader":
return member["name"]


async def get_sync_standby(model: Model, application_name: str) -> str:
"""Get the sync_standby name.

Args:
model: the model instance.
application_name: the name of the application to get the value for.

Returns:
the name of the sync standby.
"""
status = await model.get_status()
first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"]
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "sync_standby":
return member["name"]


@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True)
async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool:
"""Test a connection to a PostgreSQL server."""
Expand Down Expand Up @@ -720,31 +778,33 @@ async def send_signal_to_process(
)


async def start_continuous_writes(ops_test: OpsTest, app: str) -> None:
async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None:
"""Start continuous writes to PostgreSQL."""
# Start the process by relating the application to the database or
# by calling the action if the relation already exists.
if model is None:
model = ops_test.model
relations = [
relation
for relation in ops_test.model.applications[app].relations
for relation in model.applications[app].relations
if not relation.is_peer
and f"{relation.requires.application_name}:{relation.requires.name}"
== f"{APPLICATION_NAME}:first-database"
]
if not relations:
await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database")
await ops_test.model.wait_for_idle(status="active", timeout=1000)
await model.relate(app, f"{APPLICATION_NAME}:first-database")
await model.wait_for_idle(status="active", timeout=1000)
else:
action = (
await ops_test.model.applications[APPLICATION_NAME]
await model.applications[APPLICATION_NAME]
.units[0]
.run_action("start-continuous-writes")
)
await action.wait()
for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True):
with attempt:
action = (
await ops_test.model.applications[APPLICATION_NAME]
await model.applications[APPLICATION_NAME]
.units[0]
.run_action("start-continuous-writes")
)
Expand Down
Loading
Loading