Skip to content

Commit

Permalink
Fixes for the ci test
Browse files Browse the repository at this point in the history
Signed-off-by: Donny Peeters <[email protected]>
  • Loading branch information
Donnype committed Jul 25, 2024
1 parent 6e165db commit 8f5832a
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 28 deletions.
45 changes: 30 additions & 15 deletions boefjes/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import multiprocessing
import uuid
from ipaddress import ip_address
from typing import Iterator

import time
from datetime import datetime, timezone
Expand All @@ -17,14 +18,18 @@
from octopoes.models.ooi.network import IPAddressV4, IPPort, IPAddressV6, Network
from octopoes.models.ooi.service import IPService, Service
from pydantic import TypeAdapter
from sqlalchemy.orm import sessionmaker

from boefjes.app import SchedulerWorkerManager
from boefjes.clients.bytes_client import BytesAPIClient
from boefjes.clients.scheduler_client import Queue, QueuePrioritizedItem, SchedulerClientInterface, Task, TaskStatus
from boefjes.config import Settings, settings
from boefjes.job_handler import bytes_api_client
from boefjes.job_models import BoefjeMeta, NormalizerMeta
from boefjes.models import Organisation
from boefjes.runtime_interfaces import Handler, WorkerManager
from boefjes.sql.db import get_engine, SQL_BASE
from boefjes.sql.organisation_storage import SQLOrganisationStorage
from tests.loading import get_dummy_data


Expand Down Expand Up @@ -145,10 +150,30 @@ def api(tmp_path):


@pytest.fixture
def octopoes_api_connector(request) -> OctopoesAPIConnector:
test_node = f"test-{request.node.originalname}"
def organisation_repository():
engine = get_engine()
session = sessionmaker(bind=engine)()

connector = OctopoesAPIConnector(str(settings.octopoes_api), test_node)
yield SQLOrganisationStorage(session, settings)

sessionmaker(bind=engine, autocommit=True)().execute(
";".join([f"TRUNCATE TABLE {t} CASCADE" for t in SQL_BASE.metadata.tables])
)


@pytest.fixture
def organisation(organisation_repository) -> Organisation:
organisation = Organisation(id="test", name="Test org")

with organisation_repository as repo:
repo.create(organisation)

return organisation


@pytest.fixture
def octopoes_api_connector(organisation) -> OctopoesAPIConnector:
connector = OctopoesAPIConnector(str(settings.octopoes_api), organisation.id)
connector.create_node()
yield connector
connector.delete_node()
Expand Down Expand Up @@ -212,18 +237,8 @@ def seed_system(

octopoes_api_connector.save_observation(
Observation(
method="",
source_method="test",
source=network.reference,
task_id=uuid.uuid4(),
valid_time=valid_time,
result=oois,
)
)
octopoes_api_connector.save_observation(
Observation(
method="",
source_method="test",
method="test-normalizer",
source_method="test-boefje",
source=hostnames[0].reference,
task_id=uuid.uuid4(),
valid_time=valid_time,
Expand Down
31 changes: 20 additions & 11 deletions boefjes/tests/integration/test_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

@pytest.mark.slow
def test_migration(octopoes_api_connector: OctopoesAPIConnector, bytes_client: BytesAPIClient, valid_time):
hostname_range = range(0, 20)
hostname_range = range(0, 1)

for x in hostname_range:
seed_system(
Expand All @@ -24,16 +24,6 @@ def test_migration(octopoes_api_connector: OctopoesAPIConnector, bytes_client: B
test_ipv6=f"{x % 7}e4d:64a2:cb49:bd48:a1ba:def3:d15d:{x % 5}230",
)

export = octopoes_api_connector.export_all()

# Drop the source method field to test the migration
for tx in export:
if "txOps" in tx and len(tx["txOps"]) > 1 and len(tx["txOps"][1]) > 1 and "source_method" in tx["txOps"][1][1]:
del tx["txOps"][1][1]["source_method"]

octopoes_api_connector.import_new(json.dumps(export))
bytes_client.login()

raw = b"1234567890"

for origin in octopoes_api_connector.list_origins(valid_time, origin_type=OriginType.OBSERVATION):
Expand All @@ -46,6 +36,25 @@ def test_migration(octopoes_api_connector: OctopoesAPIConnector, bytes_client: B

bytes_client.save_normalizer_meta(normalizer_meta)

export = []

# Drop the source method field to test the migration
for tx in octopoes_api_connector.export_all():
if "txOps" in tx:
ops = []
for tx_op in tx["txOps"]:
if "source_method" in tx_op[1]:
del tx_op[1]["source_method"]

ops.append(tx_op)

tx["txOps"] = ops

export.append(tx)

breakpoint()
octopoes_api_connector.import_new(json.dumps(export))
bytes_client.login()
total_processed, total_failed = upgrade(valid_time)

assert total_processed == 0
Expand Down
6 changes: 4 additions & 2 deletions boefjes/tools/upgrade_v1_16_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ def migrate_org(bytes_client, connector, organisation_id, valid_time) -> tuple[i
processed = 0

offset = 0
page_size = 200

while True:
origins = connector.list_origins(valid_time, offset=offset, limit=200)
origins = connector.list_origins(valid_time, offset=offset, limit=page_size)
logger.info("Processing %s origins", len(origins))

for origin in origins:
Expand Down Expand Up @@ -118,7 +120,7 @@ def migrate_org(bytes_client, connector, organisation_id, valid_time) -> tuple[i
logger.info("Processed all origins [organization_id=%s]", organisation_id)
break

offset += 1
offset += page_size

return failed, processed

Expand Down

0 comments on commit 8f5832a

Please sign in to comment.