Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks contain property that tells the boefjerunner what network it is supposed to run on. #3299

Draft
wants to merge 44 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8d90c00
Broken version, added network_scope to tasks
Souf149 Jul 16, 2024
a353bd2
Tasks now properly get `network_scope` assigned
Souf149 Jul 17, 2024
80f04b0
Added new scopes to runtime manager
Souf149 Jul 18, 2024
d2e38ec
Added `NETWORK_SCOPES` env variable
Souf149 Jul 19, 2024
80f12ae
Made tasks get run with only the scopes they are requested from
Souf149 Jul 22, 2024
99c2439
Reformatted code
Souf149 Jul 24, 2024
d08ed4e
Moved the `network_scopes` attribute to `SchedulerAPIClient`
Souf149 Jul 24, 2024
48209f5
Fixed bug that would make rescheduled tasks not have the correct scope
Souf149 Jul 26, 2024
34391ae
Changed `NETWORK_SCOPES` to `TASK_CAPABILITIES` inside the boefje runner
Souf149 Sep 13, 2024
819feda
Made the scheduler client of boefje-runners able to be used without
Souf149 Sep 13, 2024
571036c
[Broken version] Added `network` to the base ooi class and look for
Souf149 Sep 16, 2024
f7690ea
Merge branch 'main' into feature/tasks-with-traits
Souf149 Oct 3, 2024
32317fa
Fix jsonb contained by query
jpbruinsslot Oct 8, 2024
ffabf35
Add api tests for jsonb filtering
jpbruinsslot Oct 9, 2024
db18569
Rename test
jpbruinsslot Oct 9, 2024
bc0ad2e
Rename test
jpbruinsslot Oct 9, 2024
7316e5a
Merge branch 'main' into fix/mula/jsonb-filtering
jpbruinsslot Oct 9, 2024
c2550fe
Precommit
jpbruinsslot Oct 9, 2024
d90a5ee
Merge branch 'main' into fix/mula/jsonb-filtering
jpbruinsslot Oct 9, 2024
668e2c5
Fix casting of strings
jpbruinsslot Oct 9, 2024
56f83e7
Merge branch 'main' into fix/mula/jsonb-filtering
jpbruinsslot Oct 9, 2024
8875105
Trying to find OOI in new structure
Souf149 Oct 10, 2024
0a5cbdc
Merge branch 'fix/mula/jsonb-filtering' into feature/tasks-with-traits
Souf149 Oct 10, 2024
8f49bb7
Made boefjerunner use env now for task_capabilities
Souf149 Oct 10, 2024
c638b30
rocky now creates requirements on tasks
Souf149 Oct 16, 2024
0393858
Renamed network names to OOI standard
Souf149 Oct 16, 2024
53c44b8
All boefjetasks get the correct requirements
Souf149 Oct 24, 2024
95f3db7
removed default inside .env-dist
Souf149 Oct 24, 2024
386d2a1
Removed unneeded pieces which were used for debugging
Souf149 Oct 24, 2024
c066627
Removed responsibility of rocky to make requirements.
Souf149 Oct 24, 2024
aa91919
Merge remote-tracking branch 'origin/main' into feature/tasks-with-tr…
Souf149 Oct 24, 2024
b3be4f2
Made normalizers call with their own requirements
Souf149 Oct 24, 2024
74efb48
Made `_hydrate_task_for_queue` not mutate its parameter
Souf149 Oct 24, 2024
a04e55f
Split up the attribute that describes where an OOI lives
Souf149 Oct 25, 2024
aabfeeb
removed unneeded logs
Souf149 Oct 25, 2024
5899ed6
Removed unneeded attributes of normalizers' tasks
Souf149 Oct 31, 2024
c65854e
Clarified in OOI list what network an IPAddress lives on
Souf149 Oct 31, 2024
3ab3456
Added example of how jobs can be checked for if they need internet
Souf149 Oct 31, 2024
f77d7a8
Merge branch 'main' into feature/tasks-with-traits
Souf149 Dec 6, 2024
9512733
Merge branch 'main' into feature/tasks-with-traits
Souf149 Dec 6, 2024
591c91a
make use of runner_type env to determine what jobs to take
Souf149 Dec 6, 2024
2ef1726
Since schedulerclient has access to settings anyways. Moved logic aro…
Souf149 Dec 6, 2024
faab1fa
Merge branch 'main' into feature/tasks-with-traits
Souf149 Dec 12, 2024
8231a1b
Merge branch 'main' into feature/tasks-with-traits
Souf149 Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions boefjes/boefjes/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from boefjes.app import get_runtime_manager
from boefjes.config import settings
from boefjes.runtime_interfaces import WorkerManager

with settings.log_cfg.open() as f:
logging.config.dictConfig(json.load(f))
Expand Down Expand Up @@ -35,21 +34,20 @@


@click.command()
@click.argument("worker_type", type=click.Choice([q.value for q in WorkerManager.Queue]))
@click.argument("worker_type", type=click.Choice(["boefje", "normalizer"]))
@click.option("--log-level", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"]), help="Log level", default="INFO")
def cli(worker_type: str, log_level: str) -> None:
logger.setLevel(log_level)
logger.info("Starting runtime for %s", worker_type)

queue = WorkerManager.Queue(worker_type)
runtime = get_runtime_manager(settings, queue, log_level)
runtime = get_runtime_manager(settings, worker_type, log_level)

if worker_type == "boefje":
import boefjes.api

boefjes.api.run()

runtime.run(queue)
runtime.run(worker_type)


if __name__ == "__main__":
Expand Down
15 changes: 8 additions & 7 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def __init__(

self.exited = False

def run(self, queue_type: WorkerManager.Queue) -> None:
logger.info("Created worker pool for queue '%s'", queue_type.value)
def run(self, queue_type: str) -> None:
logger.info("Created worker pool for queue '%s'", queue_type)

self.workers = [
ctx.Process(target=_start_working, args=self._worker_args()) for _ in range(self.settings.pool_size)
Expand Down Expand Up @@ -80,7 +80,7 @@ def run(self, queue_type: WorkerManager.Queue) -> None:

raise

def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> None:
def _fill_queue(self, task_queue: Queue, queue_type: str) -> None:
if task_queue.qsize() > self.settings.pool_size:
time.sleep(self.settings.worker_heartbeat)
return
Expand All @@ -95,7 +95,7 @@ def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> Non

# We do not target a specific queue since we start one runtime for all organisations
# and queue ids contain the organisation_id
queues = [q for q in queues if q.id.startswith(queue_type.value) and q.size > 0]
queues = [q for q in queues if q.id.startswith(queue_type) and q.size > 0]

logger.debug("Found queues: %s", [queue.id for queue in queues])

Expand Down Expand Up @@ -265,14 +265,15 @@ def _start_working(
logger.exception("Could not patch scheduler task to %s", status.value)


def get_runtime_manager(settings: Settings, queue: WorkerManager.Queue, log_level: str) -> WorkerManager:
def get_runtime_manager(settings: Settings, queue: str, log_level: str) -> WorkerManager:
local_repository = get_local_repository()

session = sessionmaker(bind=get_engine())()
plugin_service = PluginService(create_plugin_storage(session), create_config_storage(session), local_repository)

item_handler: Handler
if queue is WorkerManager.Queue.BOEFJES:

if queue == "boefje":
item_handler = BoefjeHandler(LocalBoefjeJobRunner(local_repository), plugin_service, bytes_api_client)
else:
item_handler = NormalizerHandler(
Expand All @@ -281,7 +282,7 @@ def get_runtime_manager(settings: Settings, queue: WorkerManager.Queue, log_leve

return SchedulerWorkerManager(
item_handler,
SchedulerAPIClient(str(settings.scheduler_api)), # Do not share a session between workers
SchedulerAPIClient(base_url=str(settings.scheduler_api)), # Do not share a session between workers
settings,
log_level,
)
39 changes: 38 additions & 1 deletion boefjes/boefjes/clients/scheduler_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import json
import uuid
from enum import Enum
from typing import Any

from httpx import Client, HTTPTransport, Response
from pydantic import BaseModel, TypeAdapter
Expand Down Expand Up @@ -39,6 +41,17 @@ class Task(BaseModel):
modified_at: datetime.datetime


class Filter(BaseModel):
column: str
field: str
operator: str
value: Any


class QueuePopRequest(BaseModel):
filters: list[Filter]


class SchedulerClientInterface:
def get_queues(self) -> list[Queue]:
raise NotImplementedError()
Expand Down Expand Up @@ -73,7 +86,31 @@ def get_queues(self) -> list[Queue]:
return TypeAdapter(list[Queue]).validate_json(response.content)

def pop_item(self, queue_id: str) -> Task | None:
response = self._session.post(f"/queues/{queue_id}/pop")
filters: list[Filter] = []

if settings.runner_type == "boefje":
# Client should only pop tasks that lie on a network that the
# runner is capable of reaching (e.g. the internet)
filters.append(
Filter(
column="data", field="network", operator="<@", value=json.dumps(settings.boefje_reachable_networks)
)
)

# Client should only pop tasks that have requirements that this runner is capable of (e.g. being able
# to handle ipv6 requests)
filters.append(
Filter(
column="data",
field="requirements",
operator="<@",
value=json.dumps(settings.boefje_task_capabilities),
)
)

response = self._session.post(
f"/queues/{queue_id}/pop", data=QueuePopRequest(filters=filters).model_dump_json()
)
self._verify_response(response)

return TypeAdapter(Task | None).validate_json(response.content)
Expand Down
15 changes: 15 additions & 0 deletions boefjes/boefjes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic_settings.sources import EnvSettingsSource

from boefjes.models import EncryptionMiddleware
from boefjes.runtime_interfaces import WorkerManager

BASE_DIR: Path = Path(__file__).parent.resolve()

Expand Down Expand Up @@ -120,6 +121,20 @@ class Settings(BaseSettings):
None, description="OpenTelemetry endpoint", validation_alias="SPAN_EXPORT_GRPC_ENDPOINT"
)

boefje_reachable_networks: list[str] | None = Field(
None,
description="List of networks the boefje-runner can reach",
examples=[["Network|internet", "Network|dentist"], []],
)

boefje_task_capabilities: list[str] | None = Field(
None,
description="List of technical requirements the boefje-runner is capable of running",
examples=[[], ["ipv4", "wifi-pineapple"]],
)

runner_type: WorkerManager.Queue | None = Field(None, examples=["boefje", "normalizer"])

logging_format: Literal["text", "json"] = Field("text", description="Logging format")

outgoing_request_timeout: int = Field(30, description="Timeout for outgoing HTTP requests")
Expand Down
8 changes: 3 additions & 5 deletions boefjes/boefjes/runtime_interfaces.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from enum import Enum
from typing import Literal

from boefjes.job_models import BoefjeMeta, NormalizerMeta, NormalizerResults

Expand All @@ -19,11 +19,9 @@ def run(self, normalizer_meta: NormalizerMeta, raw: bytes) -> NormalizerResults:


class WorkerManager:
class Queue(Enum):
BOEFJES = "boefje"
NORMALIZERS = "normalizer"
Queue = Literal["boefje", "normalizer"]

def run(self, queue: Queue) -> None:
def run(self, queue: str) -> None:
raise NotImplementedError()


Expand Down
6 changes: 3 additions & 3 deletions boefjes/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ set -e
shopt -s nocasematch

if [ "$1" = "boefje" ]; then
exec python -m boefjes boefje
exec env BOEFJES_RUNNER_TYPE="$1" python -m boefjes boefje
elif [ "$1" = "normalizer" ]; then
exec python -m boefjes normalizer
exec env BOEFJES_RUNNER_TYPE="$1" python -m boefjes normalizer
fi

# The migrations and seed are for the KATalogus. They are not inside the if because this way
Expand All @@ -17,7 +17,7 @@ if [ "$DATABASE_MIGRATION" = "1" ] || [[ $DATABASE_MIGRATION == "true" ]]; then
fi

if [ "$1" = "katalogus" ]; then
exec python -m uvicorn --host 0.0.0.0 boefjes.katalogus.root:app
exec env BOEFJES_RUNNER_TYPE="$1" python -m uvicorn --host 0.0.0.0 boefjes.katalogus.root:app
fi

exec "$@"
6 changes: 3 additions & 3 deletions boefjes/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
get_normalizer_resource,
)
from boefjes.models import Organisation
from boefjes.runtime_interfaces import Handler, WorkerManager
from boefjes.runtime_interfaces import Handler
from boefjes.sql.config_storage import SQLConfigStorage, create_encrypter
from boefjes.sql.db import SQL_BASE, get_engine
from boefjes.sql.organisation_storage import SQLOrganisationStorage, get_organisations_store
Expand Down Expand Up @@ -81,13 +81,13 @@ def pop_item(self, queue: str) -> Task | None:
time.sleep(self.sleep_time)

try:
if WorkerManager.Queue.BOEFJES.value in queue:
if "boefje" in queue:
p_item = TypeAdapter(Task).validate_json(self.boefje_responses.pop(0))
self._popped_items[str(p_item.id)] = p_item
self._tasks[str(p_item.id)] = self._task_from_id(p_item.id)
return p_item

if WorkerManager.Queue.NORMALIZERS.value in queue:
if "normalizer" in queue:
p_item = TypeAdapter(Task).validate_json(self.normalizer_responses.pop(0))
self._popped_items[str(p_item.id)] = p_item
self._tasks[str(p_item.id)] = self._task_from_id(p_item.id)
Expand Down
19 changes: 9 additions & 10 deletions boefjes/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@

from boefjes.app import SchedulerWorkerManager, get_runtime_manager
from boefjes.config import Settings
from boefjes.runtime_interfaces import WorkerManager
from tests.conftest import MockHandler, MockSchedulerClient
from tests.loading import get_dummy_data


def test_one_process(manager: SchedulerWorkerManager, item_handler: MockHandler) -> None:
with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)
manager.run("boefje")

items = item_handler.get_all()
assert len(items) == 2
Expand All @@ -38,7 +37,7 @@ def test_two_processes(manager: SchedulerWorkerManager, item_handler: MockHandle
manager.task_queue = Manager().Queue()

with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)
manager.run("boefje")

items = item_handler.get_all()
assert len(items) == 2
Expand All @@ -64,7 +63,7 @@ def test_two_processes_exception(manager: SchedulerWorkerManager, item_handler:

manager.settings.pool_size = 2
with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)
manager.run("boefje")

assert item_handler.queue.empty()
assert manager.scheduler_client.log_path.exists()
Expand All @@ -81,7 +80,7 @@ def test_two_processes_handler_exception(manager: SchedulerWorkerManager, item_h
manager.settings.pool_size = 2
manager.task_queue = Manager().Queue()
with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)
manager.run("boefje")

items = item_handler.get_all()
assert len(items) == 1
Expand Down Expand Up @@ -137,7 +136,7 @@ def test_two_processes_cleanup_unfinished_tasks(
item_handler.sleep_time = 200

with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)
manager.run("boefje")

items = item_handler.get_all()
assert len(items) == 0
Expand All @@ -161,7 +160,7 @@ def test_two_processes_cleanup_unfinished_tasks(

def test_normalizer_queue(manager: SchedulerWorkerManager, item_handler: MockHandler) -> None:
with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.NORMALIZERS)
manager.run("normalizer")

items = item_handler.get_all()
assert len(items) == 1
Expand All @@ -179,7 +178,7 @@ def test_null(manager: SchedulerWorkerManager, tmp_path: Path, item_handler: Moc
)

with pytest.raises(KeyboardInterrupt):
manager.run(WorkerManager.Queue.BOEFJES)
manager.run("boefje")

items = item_handler.get_all()
patched_tasks = manager.scheduler_client.get_all_patched_tasks()
Expand All @@ -197,5 +196,5 @@ def test_null(manager: SchedulerWorkerManager, tmp_path: Path, item_handler: Moc


def test_create_manager():
get_runtime_manager(Settings(), WorkerManager.Queue.BOEFJES, "INFO")
get_runtime_manager(Settings(), WorkerManager.Queue.NORMALIZERS, "INFO")
get_runtime_manager(Settings(), "boefje", "INFO")
get_runtime_manager(Settings(), "normalizer", "INFO")
1 change: 1 addition & 0 deletions mula/scheduler/models/ooi.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class OOI(BaseModel):
primary_key: str
object_type: str
scan_profile: ScanProfile
network: str | None = None


class ScanProfileMutation(BaseModel):
Expand Down
3 changes: 3 additions & 0 deletions mula/scheduler/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ class BoefjeTask(BaseModel):

dispatches: list[Normalizer] = Field(default_factory=list)

requirements: list[str] = []
network: str = "Network|internet"

@property
def hash(self) -> str:
"""Make BoefjeTask hashable, so that we can de-duplicate it when used
Expand Down
Loading
Loading