From d7e45971b0de6d2fae7976236ae3a7e1f75891bd Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Jan 2024 10:07:17 -0300 Subject: [PATCH 01/10] Add integration tests Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/helpers.py | 88 ++++-- .../ha_tests/test_async_replication.py | 282 ++++++++++++++++++ tests/integration/helpers.py | 39 ++- 3 files changed, 379 insertions(+), 30 deletions(-) create mode 100644 tests/integration/ha_tests/test_async_replication.py diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 4eb50d1883..1ab5fe4016 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -12,6 +12,7 @@ import kubernetes as kubernetes import psycopg2 import requests +from juju.model import Model from kubernetes import config from kubernetes.client.api import core_v1_api from kubernetes.stream import stream @@ -184,10 +185,10 @@ async def is_member_isolated( return True -async def check_writes(ops_test) -> int: +async def check_writes(ops_test, extra_model: Model = None) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) - actual_writes, max_number_written = await count_writes(ops_test) + actual_writes, max_number_written = await count_writes(ops_test, extra_model=extra_model) for member, count in actual_writes.items(): assert ( count == max_number_written[member] @@ -263,28 +264,34 @@ def copy_file_into_pod( async def count_writes( - ops_test: OpsTest, down_unit: str = None + ops_test: OpsTest, down_unit: str = None, extra_model: Model = None ) -> Tuple[Dict[str, int], Dict[str, int]]: """Count the number of writes in the database.""" app = await app_name(ops_test) password = await get_password(ops_test, database_app_name=app, down_unit=down_unit) - status = await ops_test.model.get_status() - for unit_name, unit in status["applications"][app]["units"].items(): - if unit_name != down_unit: - cluster = get_patroni_cluster(unit["address"]) - break + members = [] + for model in [ops_test.model, extra_model]: + if model is None: + continue + status = await model.get_status() + for unit_name, unit in status["applications"][app]["units"].items(): + if unit_name != down_unit: + members_data = get_patroni_cluster(unit["address"])["members"] + for index, member_data in enumerate(members_data): + members_data[index]["model"] = model.info.name + members.extend(members_data) + break count = {} maximum = {} - for member in cluster["members"]: + for member in members: if member["role"] != "replica" and member["host"].split(".")[0] != ( down_unit or "" ).replace("/", "-"): host = member["host"] # Translate the service hostname to an IP address. - model = ops_test.model.info - client = Client(namespace=model.name) + client = Client(namespace=member["model"]) service = client.get(Pod, name=host.split(".")[0]) ip = service.status.podIP @@ -293,12 +300,23 @@ async def count_writes( f" host='{ip}' password='{password}' connect_timeout=10" ) - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") - results = cursor.fetchone() - count[member["name"]] = results[0] - maximum[member["name"]] = results[1] - connection.close() + member_name = f'{member["model"]}.{member["name"]}' + connection = None + try: + with psycopg2.connect( + connection_string + ) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") + results = cursor.fetchone() + count[member_name] = results[0] + maximum[member_name] = results[1] + except psycopg2.Error: + # Error raised when the connection is not possible. + count[member_name] = -1 + maximum[member_name] = -1 + finally: + if connection is not None: + connection.close() return count, maximum @@ -415,6 +433,42 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Op return parameter_value +async def get_standby_leader(model: Model, application_name: str) -> str: + """Get the standby leader name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the standby leader. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "standby_leader": + return member["name"] + + +async def get_sync_standby(model: Model, application_name: str) -> str: + """Get the sync_standby name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the sync standby. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "sync_standby": + return member["name"] + + @retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool: """Test a connection to a PostgreSQL server.""" diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py new file mode 100644 index 0000000000..dd68f8e048 --- /dev/null +++ b/tests/integration/ha_tests/test_async_replication.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. +import contextlib +import logging +from asyncio import gather +from typing import Optional + +import pytest as pytest +from juju.controller import Controller +from juju.model import Model +from lightkube import Client +from lightkube.resources.core_v1 import Pod +from pytest_operator.plugin import OpsTest + +from tests.integration.ha_tests.helpers import ( + are_writes_increasing, + check_writes, + get_standby_leader, + get_sync_standby, + start_continuous_writes, +) +from tests.integration.helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, + build_and_deploy, + get_leader_unit, + wait_for_relation_removed_between, +) +from tests.integration.juju_ import juju_major_version + +logger = logging.getLogger(__name__) + + +@contextlib.asynccontextmanager +async def fast_forward( + model: Model, fast_interval: str = "10s", slow_interval: Optional[str] = None +): + """Adaptation of OpsTest.fast_forward to work with different models.""" + update_interval_key = "update-status-hook-interval" + if slow_interval: + interval_after = slow_interval + else: + interval_after = (await model.get_config())[update_interval_key] + + await model.set_config({update_interval_key: fast_interval}) + yield + await model.set_config({update_interval_key: interval_after}) + + +@pytest.fixture(scope="module") +async def controller(first_model) -> Controller: + """Return the controller.""" + return await first_model.get_controller() + + +@pytest.fixture(scope="module") +def first_model(ops_test: OpsTest) -> Model: + """Return the first model.""" + first_model = ops_test.model + return first_model + + +@pytest.fixture(scope="module") +async def second_model(controller, first_model) -> Model: + """Create and return the second model.""" + second_model_name = f"{first_model.info.name}-other" + await controller.add_model(second_model_name) + second_model = Model() + await second_model.connect(model_name=second_model_name) + return second_model + + +@pytest.mark.abort_on_fail +async def test_deploy_async_replication_setup( + ops_test: OpsTest, first_model: Model, second_model: Model +) -> None: + """Build and deploy two PostgreSQL cluster in two separate models to test async replication.""" + await build_and_deploy(ops_test, 3, wait_for_idle=False) + await build_and_deploy(ops_test, 3, wait_for_idle=False, model=second_model) + await ops_test.model.deploy(APPLICATION_NAME, num_units=1) + + async with ops_test.fast_forward(), fast_forward(second_model): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="active", + ), + ) + + +@pytest.mark.abort_on_fail +async def test_async_replication( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + continuous_writes, +) -> None: + """Test async replication between two PostgreSQL clusters.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + offer_endpoint = ( + f"{DATABASE_APP_NAME}:async-primary" if juju_major_version == 2 else "async-primary" + ) + await first_model.create_offer(offer_endpoint, "async-primary", DATABASE_APP_NAME) + await second_model.consume( + f"admin/{first_model.info.name}.async-primary", controller=controller + ) + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + await second_model.relate(DATABASE_APP_NAME, "async-primary") + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +async def test_break_and_reestablish_relation( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that the relation can be broken and re-established.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-replica", "async-primary" + ) + wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("reestablishing the relation") + await second_model.relate(DATABASE_APP_NAME, "async-primary") + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +async def test_async_replication_failover_in_main_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails over correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"Sync-standby: {sync_standby}") + logger.info("deleting the sync-standby pod") + client = Client(namespace=first_model.info.name) + client.delete(Pod, name=sync_standby.replace("/", "-")) + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + # Check that the sync-standby unit is not the same as before. + new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"New sync-standby: {new_sync_standby}") + assert new_sync_standby != sync_standby, "Sync-standby is the same as before" + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +async def test_async_replication_failover_in_secondary_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails back correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) + logger.info(f"Standby leader: {standby_leader}") + logger.info("deleting the standby leader pod") + client = Client(namespace=second_model.info.name) + client.delete(Pod, name=standby_leader.replace("/", "-")) + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + ) + + # Check that the standby leader unit is not the same as before. + new_standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) + assert new_standby_leader != standby_leader, "Standby leader is the same as before" + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index bfb81ba55f..874c7ecc4e 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -11,6 +11,7 @@ import psycopg2 import requests import yaml +from juju.model import Model from juju.unit import Unit from lightkube.core.client import Client from lightkube.core.exceptions import ApiError @@ -38,7 +39,9 @@ charm = None -async def app_name(ops_test: OpsTest, application_name: str = "postgresql-k8s") -> Optional[str]: +async def app_name( + ops_test: OpsTest, application_name: str = "postgresql-k8s", model: Model = None +) -> Optional[str]: """Returns the name of the cluster running PostgreSQL. This is important since not all deployments of the PostgreSQL charm have the application name @@ -46,8 +49,10 @@ async def app_name(ops_test: OpsTest, application_name: str = "postgresql-k8s") Note: if multiple clusters are running PostgreSQL this will return the one first found. """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: + if model is None: + model = ops_test.model + status = await model.get_status() + for app in model.applications: if application_name in status["applications"][app]["charm"]: return app @@ -60,11 +65,15 @@ async def build_and_deploy( database_app_name: str = DATABASE_APP_NAME, wait_for_idle: bool = True, status: str = "active", + model: Model = None, ) -> None: """Builds the charm and deploys a specified number of units.""" + if model is None: + model = ops_test.model + # It is possible for users to provide their own cluster for testing. Hence, check if there # is a pre-existing cluster. - if await app_name(ops_test, database_app_name): + if await app_name(ops_test, database_app_name, model): return global charm @@ -73,7 +82,7 @@ async def build_and_deploy( resources = { "postgresql-image": METADATA["resources"]["postgresql-image"]["upstream-source"], } - await ops_test.model.deploy( + await model.deploy( charm, resources=resources, application_name=database_app_name, @@ -84,7 +93,7 @@ async def build_and_deploy( ), if wait_for_idle: # Wait until the PostgreSQL charm is successfully deployed. - await ops_test.model.wait_for_idle( + await model.wait_for_idle( apps=[database_app_name], status=status, raise_on_blocked=True, @@ -555,13 +564,16 @@ async def check_tls_patroni_api(ops_test: OpsTest, unit_name: str, enabled: bool return False -def has_relation_exited(ops_test: OpsTest, endpoint_one: str, endpoint_two: str) -> bool: +def has_relation_exited( + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None +) -> bool: """Returns true if the relation between endpoint_one and endpoint_two has been removed.""" - for rel in ops_test.model.relations: + relations = model.relations if model is not None else ops_test.model.relations + for rel in relations: endpoints = [endpoint.name for endpoint in rel.endpoints] - if endpoint_one not in endpoints and endpoint_two not in endpoints: - return True - return False + if endpoint_one in endpoints and endpoint_two in endpoints: + return False + return True @retry( @@ -684,7 +696,7 @@ async def wait_for_idle_on_blocked( def wait_for_relation_removed_between( - ops_test: OpsTest, endpoint_one: str, endpoint_two: str + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None ) -> None: """Wait for relation to be removed before checking if it's waiting or idle. @@ -692,11 +704,12 @@ def wait_for_relation_removed_between( ops_test: running OpsTest instance endpoint_one: one endpoint of the relation. Doesn't matter if it's provider or requirer. endpoint_two: the other endpoint of the relation. + model: optional model to check for the relation. """ try: for attempt in Retrying(stop=stop_after_delay(3 * 60), wait=wait_fixed(3)): with attempt: - if has_relation_exited(ops_test, endpoint_one, endpoint_two): + if has_relation_exited(ops_test, endpoint_one, endpoint_two, model): break except RetryError: assert False, "Relation failed to exit after 3 minutes." From b28dc2db92436669e8eef76fd1b9fc5f92e31ebf Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Jan 2024 10:09:17 -0300 Subject: [PATCH 02/10] Fix year in copyright notice Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_async_replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index dd68f8e048..4384fc3d2c 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# Copyright 2023 Canonical Ltd. +# Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. import contextlib import logging From 9ab52f703a4a750dded49ff061128c072dd146e9 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Jan 2024 15:05:04 -0300 Subject: [PATCH 03/10] Add group mark to tests Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_async_replication.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 4384fc3d2c..e7d34410a5 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -93,6 +93,7 @@ async def test_deploy_async_replication_setup( ) +@pytest.mark.group(1) @pytest.mark.abort_on_fail async def test_async_replication( ops_test: OpsTest, @@ -156,6 +157,7 @@ async def test_async_replication( await check_writes(ops_test, extra_model=second_model) +@pytest.mark.group(1) async def test_break_and_reestablish_relation( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes ) -> None: @@ -211,6 +213,7 @@ async def test_break_and_reestablish_relation( await check_writes(ops_test, extra_model=second_model) +@pytest.mark.group(1) async def test_async_replication_failover_in_main_cluster( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes ) -> None: @@ -247,6 +250,7 @@ async def test_async_replication_failover_in_main_cluster( await check_writes(ops_test, extra_model=second_model) +@pytest.mark.group(1) async def test_async_replication_failover_in_secondary_cluster( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes ) -> None: From db580dae7a188bd9ee304325ce7cb2c6ec257d9a Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Jan 2024 15:22:56 -0300 Subject: [PATCH 04/10] Add group mark to test Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_async_replication.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index e7d34410a5..63c9d86ef4 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -71,6 +71,7 @@ async def second_model(controller, first_model) -> Model: return second_model +@pytest.mark.group(1) @pytest.mark.abort_on_fail async def test_deploy_async_replication_setup( ops_test: OpsTest, first_model: Model, second_model: Model From 99329d58bef2396dd3d02fdbd658bdf43ce95298 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 19 Mar 2024 18:55:30 -0300 Subject: [PATCH 05/10] Add remaining tests Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/helpers.py | 24 +- .../ha_tests/test_async_replication.py | 264 ++++++++++++++++-- tests/integration/helpers.py | 6 +- 3 files changed, 253 insertions(+), 41 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index ef02d125b5..26371b986a 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -197,13 +197,17 @@ async def check_writes(ops_test, extra_model: Model = None) -> int: return total_expected_writes -async def are_writes_increasing(ops_test, down_unit: str = None) -> None: +async def are_writes_increasing( + ops_test, down_unit: str = None, extra_model: Model = None +) -> None: """Verify new writes are continuing by counting the number of writes.""" - writes, _ = await count_writes(ops_test, down_unit=down_unit) + writes, _ = await count_writes(ops_test, down_unit=down_unit, extra_model=extra_model) for member, count in writes.items(): for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3), reraise=True): with attempt: - more_writes, _ = await count_writes(ops_test, down_unit=down_unit) + more_writes, _ = await count_writes( + ops_test, down_unit=down_unit, extra_model=extra_model + ) assert ( more_writes[member] > count ), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})" @@ -776,23 +780,25 @@ async def send_signal_to_process( ) -async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: +async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None: """Start continuous writes to PostgreSQL.""" # Start the process by relating the application to the database or # by calling the action if the relation already exists. + if model is None: + model = ops_test.model relations = [ relation - for relation in ops_test.model.applications[app].relations + for relation in model.applications[app].relations if not relation.is_peer and f"{relation.requires.application_name}:{relation.requires.name}" == f"{APPLICATION_NAME}:first-database" ] if not relations: - await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database") - await ops_test.model.wait_for_idle(status="active", timeout=1000) + await model.relate(app, f"{APPLICATION_NAME}:first-database") + await model.wait_for_idle(status="active", timeout=1000) else: action = ( - await ops_test.model.applications[APPLICATION_NAME] + await model.applications[APPLICATION_NAME] .units[0] .run_action("start-continuous-writes") ) @@ -800,7 +806,7 @@ async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): with attempt: action = ( - await ops_test.model.applications[APPLICATION_NAME] + await model.applications[APPLICATION_NAME] .units[0] .run_action("start-continuous-writes") ) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 63c9d86ef4..11935e79a9 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -6,12 +6,14 @@ from asyncio import gather from typing import Optional +import psycopg2 import pytest as pytest from juju.controller import Controller from juju.model import Model from lightkube import Client from lightkube.resources.core_v1 import Pod from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed from tests.integration.ha_tests.helpers import ( are_writes_increasing, @@ -25,13 +27,18 @@ DATABASE_APP_NAME, build_and_deploy, get_leader_unit, + get_password, + get_primary, + get_unit_address, wait_for_relation_removed_between, ) -from tests.integration.juju_ import juju_major_version logger = logging.getLogger(__name__) +TIMEOUT = 1500 + + @contextlib.asynccontextmanager async def fast_forward( model: Model, fast_interval: str = "10s", slow_interval: Optional[str] = None @@ -71,6 +78,22 @@ async def second_model(controller, first_model) -> Model: return second_model +@pytest.fixture(scope="module") +async def second_model_continuous_writes(second_model) -> None: + """Cleans up continuous writes on the second model after a test run.""" + yield + # Clear the written data at the end. + for attempt in Retrying(stop=stop_after_delay(10), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await second_model.applications[APPLICATION_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + + @pytest.mark.group(1) @pytest.mark.abort_on_fail async def test_deploy_async_replication_setup( @@ -80,16 +103,19 @@ async def test_deploy_async_replication_setup( await build_and_deploy(ops_test, 3, wait_for_idle=False) await build_and_deploy(ops_test, 3, wait_for_idle=False, model=second_model) await ops_test.model.deploy(APPLICATION_NAME, num_units=1) + await second_model.deploy(APPLICATION_NAME, num_units=1) async with ops_test.fast_forward(), fast_forward(second_model): await gather( first_model.wait_for_idle( apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", + timeout=TIMEOUT, ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], + apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", + timeout=TIMEOUT, ), ) @@ -110,26 +136,31 @@ async def test_async_replication( logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - offer_endpoint = ( - f"{DATABASE_APP_NAME}:async-primary" if juju_major_version == 2 else "async-primary" - ) - await first_model.create_offer(offer_endpoint, "async-primary", DATABASE_APP_NAME) + await first_model.create_offer("async-primary", "async-primary", DATABASE_APP_NAME) await second_model.consume( f"admin/{first_model.info.name}.async-primary", controller=controller ) async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), ) await second_model.relate(DATABASE_APP_NAME, "async-primary") async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), ) logger.info("checking whether writes are increasing") @@ -145,8 +176,12 @@ async def test_async_replication( async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), ) logger.info("checking whether writes are increasing") @@ -159,33 +194,188 @@ async def test_async_replication( @pytest.mark.group(1) -async def test_break_and_reestablish_relation( - ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes -) -> None: - """Test that the relation can be broken and re-established.""" +@pytest.mark.abort_on_fail +async def test_switchover( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +): + """Test switching over to the second cluster.""" + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-replica", "async-primary" + ) + wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + ) + + second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" + await ops_test.juju(*second_offer_command.split()) + await second_model.consume( + f"admin/{first_model.info.name}.async-replica", controller=controller + ) + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + ) + + await second_model.relate(DATABASE_APP_NAME, "async-replica") + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + ) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the second cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + ) + logger.info("starting continuous writes to the database") - await start_continuous_writes(ops_test, DATABASE_APP_NAME) + await start_continuous_writes(ops_test, DATABASE_APP_NAME, model=second_model) logger.info("checking whether writes are increasing") - await are_writes_increasing(ops_test) + await are_writes_increasing(ops_test, extra_model=second_model) + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_promote_standby( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +) -> None: + """Test promoting the standby cluster.""" logger.info("breaking the relation") await second_model.applications[DATABASE_APP_NAME].remove_relation( - "async-replica", "async-primary" + "async-primary", "async-replica" ) - wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + wait_for_relation_removed_between(ops_test, "async-replica", "async-primary", first_model) + async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + ) + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + ) + + logger.info("removing the previous data") + # action = await second_model.applications[APPLICATION_NAME].units[0].run_action("clear-continuous-writes") + # await action.wait() + # assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + primary = await get_primary(ops_test) + address = await get_unit_address(ops_test, primary) + print(f"address: {address}") + password = await get_password(ops_test) + print(f"password: {password}") + database_name = f'{APPLICATION_NAME.replace("-", "_")}_first_database' + connection = None + try: + connection = psycopg2.connect( + f"dbname={database_name} user=operator password={password} host={address}" ) + connection.autocommit = True + cursor = connection.cursor() + # cursor.execute(f"DROP DATABASE {database_name};") + cursor.execute("DROP TABLE IF EXISTS continuous_writes;") + except psycopg2.Error as e: + assert False, f"Failed to drop continuous writes table: {e}" + finally: + if connection is not None: + connection.close() + # action = await first_model.applications[APPLICATION_NAME].units[0].run_action("clear-continuous-writes") + # await action.wait() + # assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + + # logger.info("remove the relation between the database and the application") + # await first_model.applications[DATABASE_APP_NAME].remove_relation( + # f"{DATABASE_APP_NAME}", f"{APPLICATION_NAME}:first_database" + # ) + # wait_for_relation_removed_between(ops_test, DATABASE_APP_NAME, APPLICATION_NAME) + # await first_model.wait_for_idle(apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", raise_on_blocked=True) + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + +@pytest.mark.group(1) +@pytest.mark.abort_on_fail +async def test_reestablish_relation( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that the relation can be broken and re-established.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) logger.info("reestablishing the relation") await second_model.relate(DATABASE_APP_NAME, "async-primary") async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), ) logger.info("checking whether writes are increasing") @@ -201,8 +391,12 @@ async def test_break_and_reestablish_relation( async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), ) logger.info("checking whether writes are increasing") @@ -215,6 +409,7 @@ async def test_break_and_reestablish_relation( @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_async_replication_failover_in_main_cluster( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes ) -> None: @@ -233,8 +428,12 @@ async def test_async_replication_failover_in_main_cluster( async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), ) # Check that the sync-standby unit is not the same as before. @@ -252,6 +451,7 @@ async def test_async_replication_failover_in_main_cluster( @pytest.mark.group(1) +@pytest.mark.abort_on_fail async def test_async_replication_failover_in_secondary_cluster( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes ) -> None: @@ -270,8 +470,12 @@ async def test_async_replication_failover_in_secondary_cluster( async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): await gather( - first_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), - second_model.wait_for_idle(apps=[DATABASE_APP_NAME], status="active", idle_period=30), + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + ), ) # Check that the standby leader unit is not the same as before. diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 5634ff857e..950a557bec 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -415,9 +415,11 @@ def get_expected_k8s_resources(application: str) -> set: } -async def get_leader_unit(ops_test: OpsTest, app: str) -> Optional[Unit]: +async def get_leader_unit(ops_test: OpsTest, app: str, model: Model = None) -> Optional[Unit]: leader_unit = None - for unit in ops_test.model.applications[app].units: + if model is None: + model = ops_test.model + for unit in model.applications[app].units: if await unit.is_leader_from_status(): leader_unit = unit break From 1c0df2336630f9742db351fc70cdac9173e2ac4d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 20 Mar 2024 15:10:16 -0300 Subject: [PATCH 06/10] Fix deployment and leadership change Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_async_replication.py | 6 +++++- tests/integration/helpers.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 11935e79a9..43279fb829 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -453,7 +453,11 @@ async def test_async_replication_failover_in_main_cluster( @pytest.mark.group(1) @pytest.mark.abort_on_fail async def test_async_replication_failover_in_secondary_cluster( - ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes + ops_test: OpsTest, + first_model: Model, + second_model: Model, + continuous_writes, + primary_start_timeout, ) -> None: """Test that async replication fails back correctly.""" logger.info("starting continuous writes to the database") diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 9dd2a0133d..c9be6bfd67 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -83,7 +83,7 @@ async def build_and_deploy( "postgresql-image": METADATA["resources"]["postgresql-image"]["upstream-source"], } ( - await ops_test.model.deploy( + await model.deploy( charm, resources=resources, application_name=database_app_name, From 247be724705f07af5f9798c8badb4efae273b843 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Sat, 23 Mar 2024 18:24:43 -0300 Subject: [PATCH 07/10] Restrict async replication tests to Juju 3 Signed-off-by: Marcelo Henrique Neppel --- .../ha_tests/test_async_replication.py | 114 ++++++++---------- 1 file changed, 50 insertions(+), 64 deletions(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 43279fb829..10788d3224 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -36,7 +36,9 @@ logger = logging.getLogger(__name__) -TIMEOUT = 1500 +FAST_INTERVAL = "60s" +IDLE_PERIOD = 30 +TIMEOUT = 2000 @contextlib.asynccontextmanager @@ -95,6 +97,7 @@ async def second_model_continuous_writes(second_model) -> None: @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_deploy_async_replication_setup( ops_test: OpsTest, first_model: Model, second_model: Model @@ -121,6 +124,7 @@ async def test_deploy_async_replication_setup( @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_async_replication( ops_test: OpsTest, @@ -141,25 +145,25 @@ async def test_async_replication( f"admin/{first_model.info.name}.async-primary", controller=controller ) - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) await second_model.relate(DATABASE_APP_NAME, "async-primary") - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) @@ -174,13 +178,13 @@ async def test_async_replication( run_action = await leader_unit.run_action("promote-standby-cluster") await run_action.wait() - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) @@ -194,6 +198,7 @@ async def test_async_replication( @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_switchover( ops_test: OpsTest, @@ -208,13 +213,13 @@ async def test_switchover( "async-replica", "async-primary" ) wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) @@ -224,25 +229,25 @@ async def test_switchover( f"admin/{first_model.info.name}.async-replica", controller=controller ) - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) await second_model.relate(DATABASE_APP_NAME, "async-replica") - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) @@ -254,13 +259,13 @@ async def test_switchover( run_action = await leader_unit.run_action("promote-standby-cluster") await run_action.wait() - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) @@ -272,6 +277,7 @@ async def test_switchover( @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_promote_standby( ops_test: OpsTest, @@ -286,13 +292,13 @@ async def test_promote_standby( "async-primary", "async-replica" ) wait_for_relation_removed_between(ops_test, "async-replica", "async-primary", first_model) - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) # Run the promote action. @@ -302,25 +308,20 @@ async def test_promote_standby( logger.info("promoting the first cluster") run_action = await leader_unit.run_action("promote-standby-cluster") await run_action.wait() - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) logger.info("removing the previous data") - # action = await second_model.applications[APPLICATION_NAME].units[0].run_action("clear-continuous-writes") - # await action.wait() - # assert action.results["result"] == "True", "Unable to clear up continuous_writes table" primary = await get_primary(ops_test) address = await get_unit_address(ops_test, primary) - print(f"address: {address}") password = await get_password(ops_test) - print(f"password: {password}") database_name = f'{APPLICATION_NAME.replace("-", "_")}_first_database' connection = None try: @@ -336,16 +337,6 @@ async def test_promote_standby( finally: if connection is not None: connection.close() - # action = await first_model.applications[APPLICATION_NAME].units[0].run_action("clear-continuous-writes") - # await action.wait() - # assert action.results["result"] == "True", "Unable to clear up continuous_writes table" - - # logger.info("remove the relation between the database and the application") - # await first_model.applications[DATABASE_APP_NAME].remove_relation( - # f"{DATABASE_APP_NAME}", f"{APPLICATION_NAME}:first_database" - # ) - # wait_for_relation_removed_between(ops_test, DATABASE_APP_NAME, APPLICATION_NAME) - # await first_model.wait_for_idle(apps=[DATABASE_APP_NAME, APPLICATION_NAME], status="active", raise_on_blocked=True) logger.info("starting continuous writes to the database") await start_continuous_writes(ops_test, DATABASE_APP_NAME) @@ -355,6 +346,7 @@ async def test_promote_standby( @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_reestablish_relation( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes @@ -368,13 +360,13 @@ async def test_reestablish_relation( logger.info("reestablishing the relation") await second_model.relate(DATABASE_APP_NAME, "async-primary") - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) @@ -389,13 +381,13 @@ async def test_reestablish_relation( run_action = await leader_unit.run_action("promote-standby-cluster") await run_action.wait() - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) @@ -409,6 +401,7 @@ async def test_reestablish_relation( @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_async_replication_failover_in_main_cluster( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes @@ -426,13 +419,13 @@ async def test_async_replication_failover_in_main_cluster( client = Client(namespace=first_model.info.name) client.delete(Pod, name=sync_standby.replace("/", "-")) - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) @@ -451,13 +444,10 @@ async def test_async_replication_failover_in_main_cluster( @pytest.mark.group(1) +@pytest.mark.juju3 @pytest.mark.abort_on_fail async def test_async_replication_failover_in_secondary_cluster( - ops_test: OpsTest, - first_model: Model, - second_model: Model, - continuous_writes, - primary_start_timeout, + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes ) -> None: """Test that async replication fails back correctly.""" logger.info("starting continuous writes to the database") @@ -472,20 +462,16 @@ async def test_async_replication_failover_in_secondary_cluster( client = Client(namespace=second_model.info.name) client.delete(Pod, name=standby_leader.replace("/", "-")) - async with ops_test.fast_forward("60s"), fast_forward(second_model, "60s"): + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( first_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), second_model.wait_for_idle( - apps=[DATABASE_APP_NAME], status="active", idle_period=30, timeout=TIMEOUT + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT ), ) - # Check that the standby leader unit is not the same as before. - new_standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) - assert new_standby_leader != standby_leader, "Standby leader is the same as before" - logger.info("Ensure continuous_writes after the crashed unit") await are_writes_increasing(ops_test) From 373319739e97a1a11a1f17479a0fd39a51f58702 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 28 Mar 2024 15:43:22 -0300 Subject: [PATCH 08/10] Fix fixture scope Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_async_replication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 10788d3224..6c9dbb13e9 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -80,7 +80,7 @@ async def second_model(controller, first_model) -> Model: return second_model -@pytest.fixture(scope="module") +@pytest.fixture async def second_model_continuous_writes(second_model) -> None: """Cleans up continuous writes on the second model after a test run.""" yield From 50529bcd71409b6f72fb72579c837a1cb2f145b4 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 1 Apr 2024 14:47:25 -0300 Subject: [PATCH 09/10] Fix controller retrieval Signed-off-by: Marcelo Henrique Neppel --- .../ha_tests/test_async_replication.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 6c9dbb13e9..f592e046a3 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -8,7 +8,6 @@ import psycopg2 import pytest as pytest -from juju.controller import Controller from juju.model import Model from lightkube import Client from lightkube.resources.core_v1 import Pod @@ -57,12 +56,6 @@ async def fast_forward( await model.set_config({update_interval_key: interval_after}) -@pytest.fixture(scope="module") -async def controller(first_model) -> Controller: - """Return the controller.""" - return await first_model.get_controller() - - @pytest.fixture(scope="module") def first_model(ops_test: OpsTest) -> Model: """Return the first model.""" @@ -71,10 +64,10 @@ def first_model(ops_test: OpsTest) -> Model: @pytest.fixture(scope="module") -async def second_model(controller, first_model) -> Model: +async def second_model(ops_test: OpsTest, first_model) -> Model: """Create and return the second model.""" second_model_name = f"{first_model.info.name}-other" - await controller.add_model(second_model_name) + await ops_test._controller.add_model(second_model_name) second_model = Model() await second_model.connect(model_name=second_model_name) return second_model @@ -128,7 +121,6 @@ async def test_deploy_async_replication_setup( @pytest.mark.abort_on_fail async def test_async_replication( ops_test: OpsTest, - controller: Controller, first_model: Model, second_model: Model, continuous_writes, @@ -142,7 +134,7 @@ async def test_async_replication( await first_model.create_offer("async-primary", "async-primary", DATABASE_APP_NAME) await second_model.consume( - f"admin/{first_model.info.name}.async-primary", controller=controller + f"admin/{first_model.info.name}.async-primary", controller=ops_test._controller ) async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): @@ -202,7 +194,6 @@ async def test_async_replication( @pytest.mark.abort_on_fail async def test_switchover( ops_test: OpsTest, - controller: Controller, first_model: Model, second_model: Model, second_model_continuous_writes, @@ -226,7 +217,7 @@ async def test_switchover( second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" await ops_test.juju(*second_offer_command.split()) await second_model.consume( - f"admin/{first_model.info.name}.async-replica", controller=controller + f"admin/{first_model.info.name}.async-replica", controller=ops_test._controller ) async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): @@ -281,7 +272,6 @@ async def test_switchover( @pytest.mark.abort_on_fail async def test_promote_standby( ops_test: OpsTest, - controller: Controller, first_model: Model, second_model: Model, second_model_continuous_writes, From 76f6dd1c24825123dcc7a489fc9b1672ff9cb6e8 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 2 Apr 2024 10:37:14 -0300 Subject: [PATCH 10/10] Fix offer and consume commands for existing model Signed-off-by: Marcelo Henrique Neppel --- .../integration/ha_tests/test_async_replication.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index f592e046a3..7d5941b1d4 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -132,10 +132,12 @@ async def test_async_replication( logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - await first_model.create_offer("async-primary", "async-primary", DATABASE_APP_NAME) - await second_model.consume( - f"admin/{first_model.info.name}.async-primary", controller=ops_test._controller + first_offer_command = f"offer {DATABASE_APP_NAME}:async-primary async-primary" + await ops_test.juju(*first_offer_command.split()) + first_consume_command = ( + f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-primary" ) + await ops_test.juju(*first_consume_command.split()) async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( @@ -216,9 +218,10 @@ async def test_switchover( second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" await ops_test.juju(*second_offer_command.split()) - await second_model.consume( - f"admin/{first_model.info.name}.async-replica", controller=ops_test._controller + second_consume_command = ( + f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-replica" ) + await ops_test.juju(*second_consume_command.split()) async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather(