Skip to content

Commit

Permalink
Fix naming issues and spot a bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Donnype committed Oct 16, 2024
1 parent d0c3a6b commit 73b96fe
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 62 deletions.
2 changes: 1 addition & 1 deletion rocky/reports/runner/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from octopoes.models.ooi.reports import ReportRecipe


class ReportJobRunner:
class ReportRunner:
def run(self, recipe: ReportRecipe) -> None:
raise NotImplementedError()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
from datetime import datetime, timezone

from django.conf import settings
from katalogus.client import KATalogusClientV1, KATalogusError
from tools.models import Organization

from octopoes.connector.octopoes import OctopoesAPIConnector
from octopoes.models import Reference
from reports.report_types.definitions import report_plugins_union
from reports.report_types.helpers import get_report_by_id
from reports.runner.models import JobRuntimeError, ReportJobRunner
from reports.views.base import format_plugin_data, hydrate_plugins
from reports.runner.models import ReportRunner
from reports.views.mixins import collect_reports, save_report_data
from rocky.bytes_client import BytesClient
from rocky.scheduler import ReportTask


class LocalReportJobRunner(ReportJobRunner):
def __init__(
self, katalogus_client: KATalogusClientV1, bytes_client: BytesClient, valid_time: datetime | None = None
):
self.katalogus_client = katalogus_client
class LocalReportRunner(ReportRunner):
def __init__(self, bytes_client: BytesClient, valid_time: datetime | None = None):
self.bytes_client = bytes_client
self.valid_time = valid_time

Expand All @@ -33,18 +29,6 @@ def run(self, report_task: ReportTask) -> None:
valid_time, connector, recipe.input_recipe["input_oois"], report_types
)

self.katalogus_client.organization = report_task.organisation_id
self.katalogus_client.organization_uri = f"/v1/organisations/{report_task.organisation_id}"

try:
report_type_plugins = hydrate_plugins(report_types, self.katalogus_client)
plugins = format_plugin_data(report_type_plugins)
except KATalogusError as e:
raise JobRuntimeError("Failed to hydrate plugins from KATalogus") from e

self.katalogus_client.organization = None
self.katalogus_client.organization_uri = ""

self.bytes_client.organization = report_task.organisation_id
report_names = []
oois_count = 0
Expand All @@ -54,18 +38,26 @@ def run(self, report_task: ReportTask) -> None:
report_type = get_report_by_id(report_type_id)

for ooi in data:
report_name = recipe.subreport_name_format.format(ooi=ooi, report_type=str(report_type.name))
report_name = recipe.subreport_name_format.replace("{ooi}", ooi).replace(
"{report type}", str(report_type.name)
)
report_names.append((report_name, report_name))

save_report_data(
self.bytes_client,
valid_time,
connector,
Organization.objects.get(code=report_task.organisation_id),
plugins,
{
"input_data": {
"input_oois": recipe.input_recipe["input_oois"],
"report_types": recipe.report_types,
"plugins": report_plugins_union(report_types),
}
},
report_data,
report_names,
recipe.report_name_format.format(oois_count=oois_count),
recipe.report_name_format.replace("{oois_count}", str(oois_count)),
)

self.bytes_client.organization = None
11 changes: 5 additions & 6 deletions rocky/reports/runner/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import structlog
from django.conf import settings
from httpx import HTTPError
from katalogus.client import get_katalogus
from pydantic import ValidationError

from reports.runner.local import LocalReportJobRunner
from reports.runner.models import ReportJobRunner, WorkerManager
from reports.runner.models import ReportRunner, WorkerManager
from reports.runner.report_runner import LocalReportRunner
from rocky.bytes_client import get_bytes_client
from rocky.scheduler import SchedulerClient, Task, TaskStatus, scheduler_client

Expand All @@ -22,7 +21,7 @@
class SchedulerWorkerManager(WorkerManager):
def __init__(
self,
runner: ReportJobRunner,
runner: ReportRunner,
scheduler: SchedulerClient,
pool_size: int,
poll_interval: int,
Expand Down Expand Up @@ -223,7 +222,7 @@ def _format_exit_code(exitcode: int | None) -> str:


def _start_working(
task_queue: mp.Queue, runner: ReportJobRunner, scheduler: SchedulerClient, handling_tasks: dict[int, str]
task_queue: mp.Queue, runner: ReportRunner, scheduler: SchedulerClient, handling_tasks: dict[int, str]
):
logger.info("Started listening for tasks from worker[pid=%s]", os.getpid())

Expand Down Expand Up @@ -253,7 +252,7 @@ def _start_working(

def get_runtime_manager() -> WorkerManager:
return SchedulerWorkerManager(
LocalReportJobRunner(get_katalogus(""), get_bytes_client("")), # These are set dynamically. Needs a refactor.
LocalReportRunner(get_bytes_client("")), # These are set dynamically. Needs a refactor.
scheduler_client(None),
settings.POOL_SIZE,
settings.POLL_INTERVAL,
Expand Down
16 changes: 2 additions & 14 deletions rocky/reports/views/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,9 @@ def get(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
return redirect(reverse("report_history", kwargs=self.get_kwargs()))


def get_plugin_ids(report_types: list[type[BaseReport]]):
return report_plugins_union(report_types)


def hydrate_plugins(report_types: list[type["BaseReport"]], katalogus: KATalogusClientV1) -> dict[str, list[Plugin]]:
plugins: dict[str, list[Plugin]] = {"required": [], "optional": []}
merged_plugins = get_plugin_ids(report_types)
merged_plugins = report_plugins_union(report_types)

required_plugins_ids = list(merged_plugins["required"])
optional_plugins_ids = list(merged_plugins["optional"])
Expand Down Expand Up @@ -255,14 +251,6 @@ def get_available_report_types(self) -> tuple[list[dict[str, str]] | dict[str, l
report_types = self.get_report_types_for_generate_report()
return report_types, len(report_types)

def get_plugin_data_for_saving(self) -> list[dict]:
try:
report_type_plugins = hydrate_plugins(self.get_report_types(), get_katalogus(self.organization.code))
except KATalogusError as error:
return messages.error(self.request, error.message)

return format_plugin_data(report_type_plugins)

def get_observed_at(self):
return self.observed_at if self.observed_at < datetime.now(timezone.utc) else datetime.now(timezone.utc)

Expand Down Expand Up @@ -296,7 +284,7 @@ def get_input_data(self) -> dict[str, Any]:
"input_data": {
"input_oois": self.get_ooi_pks(),
"report_types": self.get_report_type_ids(),
"plugins": get_plugin_ids(self.get_report_types()),
"plugins": report_plugins_union(self.get_report_types()),
}
}

Expand Down
9 changes: 5 additions & 4 deletions rocky/reports/views/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def save_report_data(
observed_at,
octopoes_api_connector,
organization,
input_data,
input_data: dict,
report_data,
report_names,
parent_report_name,
Expand All @@ -75,7 +75,8 @@ def save_report_data(
raw_id = bytes_client.upload_raw(
raw=ReportDataDict(input_data).model_dump_json().encode(), manual_mime_types={"openkat/report"}
)
name = now.strftime(parent_report_name)
name = now.strftime(parent_report_name.replace("{report type}", str(ConcatenatedReport.name)))

if not name or name.isspace():
name = ConcatenatedReport.name

Expand Down Expand Up @@ -123,7 +124,7 @@ def save_report_data(
]

child_input_data = {
"input_data": {"input_oois": [ooi], "report_types": [report_type_id], "plugins": child_plugins}
"input_data": {"input_oois": [ooi], "report_types": [report_type_id], "plugins": [child_plugins]}
}

raw_id = bytes_client.upload_raw(
Expand Down Expand Up @@ -162,7 +163,7 @@ def save_report_data(
manual_mime_types={"openkat/report"},
)
report_type = get_report_by_id(report_type_id)
name = now.strftime(report_names[0][1])
name = now.strftime(parent_report_name.replace("{report type}", str(report_type.name)))

if not name or name.isspace():
name = ConcatenatedReport.name
Expand Down
6 changes: 3 additions & 3 deletions rocky/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest
from django.conf import settings
from reports.runner.local import LocalReportJobRunner
from reports.runner.report_runner import LocalReportRunner
from tools.models import Organization

from octopoes.api.models import Declaration, Observation
Expand Down Expand Up @@ -66,8 +66,8 @@ def octopoes_api_connector_2(integration_organization_2) -> OctopoesAPIConnector


@pytest.fixture
def report_runner(valid_time, katalogus_mock, mocker) -> LocalReportJobRunner:
return LocalReportJobRunner(katalogus_mock, mocker.MagicMock(), valid_time)
def report_runner(valid_time, mocker) -> LocalReportRunner:
return LocalReportRunner(mocker.MagicMock(), valid_time)


def seed_system(
Expand Down
53 changes: 42 additions & 11 deletions rocky/tests/integration/test_report_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json

from reports.runner.local import LocalReportJobRunner
from reports.runner.report_runner import LocalReportRunner

from octopoes.api.models import Declaration
from octopoes.connector.octopoes import OctopoesAPIConnector
Expand All @@ -10,15 +10,15 @@
from tests.integration.conftest import seed_system


def test_run_report_task(octopoes_api_connector: OctopoesAPIConnector, report_runner: LocalReportJobRunner, valid_time):
def test_run_report_task(octopoes_api_connector: OctopoesAPIConnector, report_runner: LocalReportRunner, valid_time):
oois = seed_system(octopoes_api_connector, valid_time)
report_runner.bytes_client.health.return_value = ServiceHealth(service="bytes", healthy=True)
report_runner.bytes_client.upload_raw.return_value = "abcdabcd-f8ab-4bdf-9b1b-58cd98ef6342"

recipe = ReportRecipe(
recipe_id="abc4e52b-812c-4cc2-8196-35fb8efc63ca",
report_name_format="Concatenated report for {oois_count} objects",
subreport_name_format="{report_type} for {ooi} in %Y",
subreport_name_format="{report type} for {ooi} in %Y",
input_recipe={"input_oois": [oois["hostnames"][0].reference, oois["hostnames"][1].reference]},
report_types=["dns-report"],
cron_expression="* * * * *",
Expand All @@ -34,7 +34,15 @@ def test_run_report_task(octopoes_api_connector: OctopoesAPIConnector, report_ru
assert report_runner.bytes_client.upload_raw.mock_calls[1].kwargs["manual_mime_types"] == {"openkat/report"}
assert report_runner.bytes_client.upload_raw.mock_calls[2].kwargs["manual_mime_types"] == {"openkat/report"}

assert report_runner.bytes_client.upload_raw.mock_calls[0].kwargs["raw"] == b'{"plugins":[]}'
data = json.loads(report_runner.bytes_client.upload_raw.mock_calls[0].kwargs["raw"])
data["input_data"]["plugins"]["required"] = set(data["input_data"]["plugins"]["required"]) # ordering issues
assert data == {
"input_data": {
"input_oois": ["Hostname|test|example.com", "Hostname|test|a.example.com"],
"report_types": ["dns-report"],
"plugins": {"required": {"dns-sec", "dns-records"}, "optional": ["dns-zone"]},
}
}

# The order of the OOIs being processed is not guaranteed, so this is a simple workaround
both_calls = [
Expand All @@ -44,29 +52,52 @@ def test_run_report_task(octopoes_api_connector: OctopoesAPIConnector, report_ru
"records": [],
"security": {"spf": True, "dkim": True, "dmarc": True, "dnssec": True, "caa": True},
"finding_types": [],
}
},
"input_data": {
"input_oois": ["Hostname|test|example.com"],
"report_types": ["dns-report"],
"plugins": [{"required": {"dns-sec", "dns-records"}, "optional": ["dns-zone"]}],
},
},
{
"report_data": {
"input_ooi": "Hostname|test|a.example.com",
"records": [],
"security": {"spf": True, "dkim": True, "dmarc": True, "dnssec": True, "caa": True},
"finding_types": [],
}
},
"input_data": {
"input_oois": ["Hostname|test|a.example.com"],
"report_types": ["dns-report"],
"plugins": [{"required": {"dns-sec", "dns-records"}, "optional": ["dns-zone"]}],
},
},
]

assert json.loads(report_runner.bytes_client.upload_raw.mock_calls[1].kwargs["raw"]) in both_calls
assert json.loads(report_runner.bytes_client.upload_raw.mock_calls[2].kwargs["raw"]) in both_calls
data_1 = json.loads(report_runner.bytes_client.upload_raw.mock_calls[1].kwargs["raw"])
data_1["input_data"]["plugins"][0]["required"] = set(
data_1["input_data"]["plugins"][0]["required"]
) # ordering issues
data_2 = json.loads(report_runner.bytes_client.upload_raw.mock_calls[2].kwargs["raw"])
data_2["input_data"]["plugins"][0]["required"] = set(
data_2["input_data"]["plugins"][0]["required"]
) # ordering issues

assert data_1 in both_calls
assert data_2 in both_calls

reports = octopoes_api_connector.list_reports(valid_time)
assert reports.count == 1

report, subreports = reports.items[0]
assert len(subreports) == 2

assert report.name == "Concatenated report for 2 objects"
assert {x.name for x in subreports} == {
"DNS Report for Hostname|test|a.example.com in 2024",
"DNS Report for Hostname|test|example.com in 2024",
"DNS Report for Hostname|test|a.example.com in 2024"
# FIXME: the naming logic in reports/views/mixins.py 107-112 is not right. We expect to find example.com in this
# set, but instead only find a.example.com because when ooi_name is 'example.com', so the check:
# `ooi_name in default_name` also passes for 'DNS Report for Hostname|test|a.example.com in %Y'.
# We shouldn't have to guess the match in the report_names argument. The name should be overridden on an object
# in the report_data list probably.
# "DNS Report for Hostname|test|example.com in 2024",
}

0 comments on commit 73b96fe

Please sign in to comment.