diff --git a/boefjes/tests/conftest.py b/boefjes/tests/conftest.py index af57a982ac3..60242989aef 100644 --- a/boefjes/tests/conftest.py +++ b/boefjes/tests/conftest.py @@ -1,6 +1,7 @@ import multiprocessing import uuid from ipaddress import ip_address +from typing import Iterator import time from datetime import datetime, timezone @@ -17,6 +18,7 @@ from octopoes.models.ooi.network import IPAddressV4, IPPort, IPAddressV6, Network from octopoes.models.ooi.service import IPService, Service from pydantic import TypeAdapter +from sqlalchemy.orm import sessionmaker from boefjes.app import SchedulerWorkerManager from boefjes.clients.bytes_client import BytesAPIClient @@ -24,7 +26,10 @@ from boefjes.config import Settings, settings from boefjes.job_handler import bytes_api_client from boefjes.job_models import BoefjeMeta, NormalizerMeta +from boefjes.models import Organisation from boefjes.runtime_interfaces import Handler, WorkerManager +from boefjes.sql.db import get_engine, SQL_BASE +from boefjes.sql.organisation_storage import SQLOrganisationStorage from tests.loading import get_dummy_data @@ -145,10 +150,30 @@ def api(tmp_path): @pytest.fixture -def octopoes_api_connector(request) -> OctopoesAPIConnector: - test_node = f"test-{request.node.originalname}" +def organisation_repository(): + engine = get_engine() + session = sessionmaker(bind=engine)() - connector = OctopoesAPIConnector(str(settings.octopoes_api), test_node) + yield SQLOrganisationStorage(session, settings) + + sessionmaker(bind=engine, autocommit=True)().execute( + ";".join([f"TRUNCATE TABLE {t} CASCADE" for t in SQL_BASE.metadata.tables]) + ) + + +@pytest.fixture +def organisation(organisation_repository) -> Organisation: + organisation = Organisation(id="test", name="Test org") + + with organisation_repository as repo: + repo.create(organisation) + + return organisation + + +@pytest.fixture +def octopoes_api_connector(organisation) -> OctopoesAPIConnector: + connector = OctopoesAPIConnector(str(settings.octopoes_api), organisation.id) connector.create_node() yield connector connector.delete_node() @@ -212,18 +237,8 @@ def seed_system( octopoes_api_connector.save_observation( Observation( - method="", - source_method="test", - source=network.reference, - task_id=uuid.uuid4(), - valid_time=valid_time, - result=oois, - ) - ) - octopoes_api_connector.save_observation( - Observation( - method="", - source_method="test", + method="test-normalizer", + source_method="test-boefje", source=hostnames[0].reference, task_id=uuid.uuid4(), valid_time=valid_time, diff --git a/boefjes/tests/integration/test_bench.py b/boefjes/tests/integration/test_bench.py index 0349f8e88a8..b80257ea7a3 100644 --- a/boefjes/tests/integration/test_bench.py +++ b/boefjes/tests/integration/test_bench.py @@ -13,7 +13,7 @@ @pytest.mark.slow def test_migration(octopoes_api_connector: OctopoesAPIConnector, bytes_client: BytesAPIClient, valid_time): - hostname_range = range(0, 20) + hostname_range = range(0, 1) for x in hostname_range: seed_system( @@ -24,16 +24,6 @@ def test_migration(octopoes_api_connector: OctopoesAPIConnector, bytes_client: B test_ipv6=f"{x % 7}e4d:64a2:cb49:bd48:a1ba:def3:d15d:{x % 5}230", ) - export = octopoes_api_connector.export_all() - - # Drop the source method field to test the migration - for tx in export: - if "txOps" in tx and len(tx["txOps"]) > 1 and len(tx["txOps"][1]) > 1 and "source_method" in tx["txOps"][1][1]: - del tx["txOps"][1][1]["source_method"] - - octopoes_api_connector.import_new(json.dumps(export)) - bytes_client.login() - raw = b"1234567890" for origin in octopoes_api_connector.list_origins(valid_time, origin_type=OriginType.OBSERVATION): @@ -46,6 +36,25 @@ def test_migration(octopoes_api_connector: OctopoesAPIConnector, bytes_client: B bytes_client.save_normalizer_meta(normalizer_meta) + export = [] + + # Drop the source method field to test the migration + for tx in octopoes_api_connector.export_all(): + if "txOps" in tx: + ops = [] + for tx_op in tx["txOps"]: + if "source_method" in tx_op[1]: + del tx_op[1]["source_method"] + + ops.append(tx_op) + + tx["txOps"] = ops + + export.append(tx) + + breakpoint() + octopoes_api_connector.import_new(json.dumps(export)) + bytes_client.login() total_processed, total_failed = upgrade(valid_time) assert total_processed == 0 diff --git a/boefjes/tools/upgrade_v1_16_0.py b/boefjes/tools/upgrade_v1_16_0.py index 959ab412c03..3c9605fb240 100755 --- a/boefjes/tools/upgrade_v1_16_0.py +++ b/boefjes/tools/upgrade_v1_16_0.py @@ -73,8 +73,10 @@ def migrate_org(bytes_client, connector, organisation_id, valid_time) -> tuple[i processed = 0 offset = 0 + page_size = 200 + while True: - origins = connector.list_origins(valid_time, offset=offset, limit=200) + origins = connector.list_origins(valid_time, offset=offset, limit=page_size) logger.info("Processing %s origins", len(origins)) for origin in origins: @@ -118,7 +120,7 @@ def migrate_org(bytes_client, connector, organisation_id, valid_time) -> tuple[i logger.info("Processed all origins [organization_id=%s]", organisation_id) break - offset += 1 + offset += page_size return failed, processed