Skip to content

Commit

Permalink
Introduce tests for config nibbles
Browse files Browse the repository at this point in the history
  • Loading branch information
originalsouth committed Dec 17, 2024
1 parent c51dca9 commit 0ecceec
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 4 deletions.
6 changes: 2 additions & 4 deletions octopoes/octopoes/models/origin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ def id(self) -> str:
self.__class__.__name__,
self.origin_type.value,
self.method,
self.source,
f"[{','.join([str(param) for param in self.result])}]" if self.result else "Null",
f"[{','.join([str(param) for param in self.parameters_references])}]"
if self.parameters_references
f"[{','.join(sorted([str(param) for param in self.parameters_references]))}]"
if self.parameters_references is not None
else "Null",
],
)
Expand Down
201 changes: 201 additions & 0 deletions octopoes/tests/integration/test_config_nibbles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import os
import sys
from collections.abc import Iterator
from datetime import datetime
from unittest.mock import Mock

import pytest
from nibbles.definitions import NibbleDefinition, NibbleParameter

from octopoes.core.service import OctopoesService
from octopoes.models import OOI, Reference
from octopoes.models.ooi.config import Config
from octopoes.models.ooi.findings import Finding, KATFindingType
from octopoes.models.ooi.network import Network
from octopoes.models.ooi.web import URL
from octopoes.models.origin import OriginType

if os.environ.get("CI") != "1":
pytest.skip("Needs XTDB multinode container.", allow_module_level=True)


def config_nibble_payload(url: URL, config: Config | None) -> Iterator[OOI]:
if config is not None and str(url.raw) in config.config:
kft = KATFindingType(id="URL in config")
yield kft
yield Finding(finding_type=kft.reference, ooi=url.reference, proof=f"{url.reference} in {config.config}")


def config_nibble_query(targets: list[Reference | None]) -> str:
sgn = "".join(str(int(isinstance(target, Reference))) for target in targets)
if sgn == "10":
return f"""
{{
:query {{
:find [(pull ?url [*]) (pull ?config [*])] :where [
[?header :object_type "URL"]
[?header :URL/primary_key "{str(targets[0])}"]
(or
(and
[?url :URL/network ?network]
[?config :Config/ooi ?network]
[?config :Config/bit_id "config_nibble_test"]
)
(and
[(identity nil) ?url]
[(identity nil) ?network]
[(identity nil) ?config]
)
)
]
}}
}}
"""
elif sgn == "01":
return f"""
{{
:query {{
:find [(pull ?url [*]) (pull ?config [*])] :where [
[?config :object_type "Config"]
[?config :Config/primary_key "{str(targets[1])}"]
[?config :Config/bit_id "config_nibble_test"]
(or
(and
[?url :URL/network ?network]
[?config :Config/ooi ?network]
[?config :Config/bit_id "config_nibble_test"]
)
(and
[(identity nil) ?url]
[(identity nil) ?network]
[(identity nil) ?config]
)
)
]
}}
}}
"""
else:
return f"""
{{
:query {{
:find [(pull ?url [*]) (pull ?config [*])] :where [
[?url :object_type "URL"]
[?url :URL/primary_key "{str(targets[0])}"]
[?config :object_type "Config"]
[?config :Config/primary_key "{str(targets[1])}"]
[?config :Config/bit_id "config_nibble_test"]
]
}}
}}
"""


config_nibble = NibbleDefinition(
id="config_nibble",
signature=[
NibbleParameter(object_type=URL, parser="[*][?object_type == 'URL'][]"),
NibbleParameter(object_type=Config, parser="[*][?object_type == 'Config'][]", optional=True),
],
query=config_nibble_query,
)
config_nibble._payload = getattr(sys.modules[__name__], "config_nibble_payload")


def test_inference_without_config(xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime):
xtdb_octopoes_service.nibbler.nibbles = {"config_nibble_test": config_nibble}

network = Network(name="internet")
url = URL(network=network.reference, raw="https://mispo.es/")

xtdb_octopoes_service.ooi_repository.save(network, valid_time)
xtdb_octopoes_service.ooi_repository.save(url, valid_time)
event_manager.complete_process_events(xtdb_octopoes_service)

assert xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).count == 0
assert xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).count == 0


def test_inference_with_config(xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime):
xtdb_octopoes_service.nibbler.nibbles = {"config_nibble_test": config_nibble}

network = Network(name="internet")
url = URL(network=network.reference, raw="https://mispo.es/")
config = Config(ooi=network.reference, bit_id="config_nibble_test", config={str(url.raw): None})

xtdb_octopoes_service.ooi_repository.save(network, valid_time)
xtdb_octopoes_service.ooi_repository.save(url, valid_time)
xtdb_octopoes_service.ooi_repository.save(config, valid_time)
event_manager.complete_process_events(xtdb_octopoes_service)

assert xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).count == 1
assert xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).count == 1


def test_inference_with_other_config(xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime):
xtdb_octopoes_service.nibbler.nibbles = {"config_nibble_test": config_nibble}

network = Network(name="internet")
network_potato = Network(name="potato")
url = URL(network=network.reference, raw="https://mispo.es/")
config = Config(ooi=network_potato.reference, bit_id="config_nibble_test", config={str(url.raw): None})

xtdb_octopoes_service.ooi_repository.save(network, valid_time)
xtdb_octopoes_service.ooi_repository.save(network_potato, valid_time)
xtdb_octopoes_service.ooi_repository.save(url, valid_time)
xtdb_octopoes_service.ooi_repository.save(config, valid_time)
event_manager.complete_process_events(xtdb_octopoes_service)

assert xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).count == 0
assert xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).count == 0


def test_inference_with_fake_id_config(
xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime
):
xtdb_octopoes_service.nibbler.nibbles = {"config_nibble_test": config_nibble}

network = Network(name="internet")
url = URL(network=network.reference, raw="https://mispo.es/")
config = Config(ooi=network.reference, bit_id="fake_id", config={str(url.raw): None})

xtdb_octopoes_service.ooi_repository.save(network, valid_time)
xtdb_octopoes_service.ooi_repository.save(url, valid_time)
xtdb_octopoes_service.ooi_repository.save(config, valid_time)
event_manager.complete_process_events(xtdb_octopoes_service)

assert xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).count == 0
assert xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).count == 0


def test_inference_with_changed_config(
xtdb_octopoes_service: OctopoesService, event_manager: Mock, valid_time: datetime
):
xtdb_octopoes_service.nibbler.nibbles = {"config_nibble_test": config_nibble}

network = Network(name="internet")
url = URL(network=network.reference, raw="https://mispo.es/")
config = Config(ooi=network.reference, bit_id="config_nibble_test", config={str(url.raw): None})

xtdb_octopoes_service.ooi_repository.save(network, valid_time)
xtdb_octopoes_service.ooi_repository.save(url, valid_time)
xtdb_octopoes_service.ooi_repository.save(config, valid_time)
event_manager.complete_process_events(xtdb_octopoes_service)

assert xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).count == 1
assert xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).count == 1

config = Config(ooi=network.reference, bit_id="config_nibble_test", config={})
xtdb_octopoes_service.ooi_repository.save(config, valid_time)
event_manager.complete_process_events(xtdb_octopoes_service)

assert xtdb_octopoes_service.ooi_repository.list_oois({Finding}, valid_time).count == 0
assert xtdb_octopoes_service.ooi_repository.list_oois({KATFindingType}, valid_time).count == 0

assert len(xtdb_octopoes_service.origin_repository.list_origins(valid_time, origin_type=OriginType.NIBBLET)) == 1

0 comments on commit 0ecceec

Please sign in to comment.