Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined schedulers #3839

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2783e5b
Migrate schedulers to postgres
jpbruinsslot Oct 31, 2024
f6030fc
Implement scheduler storage
jpbruinsslot Oct 31, 2024
5dbf526
Restructure app
jpbruinsslot Nov 4, 2024
ef00d7d
Update
jpbruinsslot Nov 6, 2024
0f26d7f
Taking out the trash
jpbruinsslot Nov 6, 2024
6838bf3
Clean up on aisle six
jpbruinsslot Nov 6, 2024
5279369
Kondofy
jpbruinsslot Nov 6, 2024
718df53
Sweeping the floor
jpbruinsslot Nov 7, 2024
605eb78
Dust-off
jpbruinsslot Nov 7, 2024
47085d1
Brush off
jpbruinsslot Nov 11, 2024
d5bbf55
Squeaky clean
jpbruinsslot Nov 12, 2024
43bd6b0
Wash cycle
jpbruinsslot Nov 12, 2024
cc0bec4
Rinse
jpbruinsslot Nov 12, 2024
6d7bd0e
Shiny
jpbruinsslot Nov 18, 2024
33a72d7
Polish
jpbruinsslot Nov 18, 2024
da3f337
Combining organisation schedulers
jpbruinsslot Nov 13, 2024
05f36d4
Revert to in-memory
jpbruinsslot Nov 18, 2024
32f06ed
Update
jpbruinsslot Nov 20, 2024
9ccecba
Refactor organisations
jpbruinsslot Nov 21, 2024
d602c08
Merge branch 'main' into poc/mula/combined-schedulers
jpbruinsslot Dec 2, 2024
9c17a32
Update tests
jpbruinsslot Dec 3, 2024
38d436f
Update
jpbruinsslot Dec 4, 2024
0896efe
Made schedulers work
jpbruinsslot Dec 5, 2024
79b2ffc
Fixing tests and code
jpbruinsslot Dec 5, 2024
de570cd
Fix api tests
jpbruinsslot Dec 9, 2024
c43546c
Fixing tests
jpbruinsslot Dec 10, 2024
abf8709
Pre-commit
jpbruinsslot Dec 10, 2024
0a3a86f
Fix api
jpbruinsslot Dec 10, 2024
850c595
Rename method
jpbruinsslot Dec 11, 2024
bf396e2
Rename method names
jpbruinsslot Dec 11, 2024
c40193f
Merge branch 'main' into poc/mula/combined-schedulers
jpbruinsslot Dec 11, 2024
5cc20ee
Fix tests
jpbruinsslot Dec 11, 2024
400684a
Rename methods
jpbruinsslot Dec 11, 2024
f55d62a
Rename tracing names
jpbruinsslot Dec 11, 2024
54311e5
Rename methods
jpbruinsslot Dec 11, 2024
2baee5f
I long for errors as values
jpbruinsslot Dec 11, 2024
f44b441
Fix exceptions
jpbruinsslot Dec 12, 2024
f692b83
Use Literal
jpbruinsslot Dec 12, 2024
924805a
Tidy up
jpbruinsslot Dec 12, 2024
4e1d0a3
Merge branch 'main' into poc/mula/combined-schedulers
stephanie0x00 Dec 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 43 additions & 178 deletions mula/scheduler/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import structlog
from opentelemetry import trace

from scheduler import clients, context, schedulers, server
from scheduler import context, schedulers, server
from scheduler.utils import thread

tracer = trace.get_tracer(__name__)
Expand All @@ -26,34 +26,21 @@ class App:
through a REST API.

* Metrics: The collection of application specific metrics.

Attributes:
logger:
The logger for the class.
ctx:
Application context of shared data (e.g. configuration, external
services connections).
stop_event: A threading.Event object used for communicating a stop
event across threads.
schedulers:
A dict of schedulers, keyed by scheduler id.
server:
The http rest api server instance.
"""

def __init__(self, ctx: context.AppContext) -> None:
"""Initialize the application.

Args:
ctx:
Application context of shared data (e.g. configuration,
external services connections).
ctx (context.AppContext): Application context of shared data (e.g.
configuration, external services connections).
"""

self.logger: structlog.BoundLogger = structlog.getLogger(__name__)
self.ctx: context.AppContext = ctx
self.server: server.Server | None = None

threading.excepthook = self.unhandled_exception
threading.excepthook = self._unhandled_exception
self.stop_event: threading.Event = threading.Event()
self.lock: threading.Lock = threading.Lock()

Expand All @@ -64,147 +51,6 @@ def __init__(self, ctx: context.AppContext) -> None:
| schedulers.NormalizerScheduler
| schedulers.ReportScheduler,
] = {}
self.server: server.Server | None = None

@tracer.start_as_current_span("monitor_organisations")
def monitor_organisations(self) -> None:
"""Monitor the organisations from the Katalogus service, and add/remove
organisations from the schedulers.
"""
current_schedulers = self.schedulers.copy()

# We make a difference between the organisation id's that are used
# by the schedulers, and the organisation id's that are in the
# Katalogus service. We will add/remove schedulers based on the
# difference between these two sets.
scheduler_orgs: set[str] = {
s.organisation.id for s in current_schedulers.values() if hasattr(s, "organisation")
}
try:
orgs = self.ctx.services.katalogus.get_organisations()
except clients.errors.ExternalServiceError:
self.logger.exception("Failed to get organisations from Katalogus")
return

katalogus_orgs = {org.id for org in orgs}

additions = katalogus_orgs.difference(scheduler_orgs)
self.logger.debug("Organisations to add: %s", len(additions), additions=sorted(additions))

removals = scheduler_orgs.difference(katalogus_orgs)
self.logger.debug("Organisations to remove: %s", len(removals), removals=sorted(removals))

# We need to get scheduler ids of the schedulers that are associated
# with the removed organisations
removal_scheduler_ids: set[str] = {
s.scheduler_id
for s in current_schedulers.values()
if hasattr(s, "organisation") and s.organisation.id in removals
}

# Remove schedulers for removed organisations
for scheduler_id in removal_scheduler_ids:
if scheduler_id not in self.schedulers:
continue

self.schedulers[scheduler_id].stop()

if removals:
self.logger.debug("Removed %s organisations from scheduler", len(removals), removals=sorted(removals))

# Add schedulers for organisation
for org_id in additions:
try:
org = self.ctx.services.katalogus.get_organisation(org_id)
except clients.errors.ExternalServiceError as e:
self.logger.error("Failed to get organisation from Katalogus", error=e, org_id=org_id)
continue

scheduler_boefje = schedulers.BoefjeScheduler(
ctx=self.ctx, scheduler_id=f"boefje-{org.id}", organisation=org, callback=self.remove_scheduler
)

scheduler_normalizer = schedulers.NormalizerScheduler(
ctx=self.ctx, scheduler_id=f"normalizer-{org.id}", organisation=org, callback=self.remove_scheduler
)

scheduler_report = schedulers.ReportScheduler(
ctx=self.ctx, scheduler_id=f"report-{org.id}", organisation=org, callback=self.remove_scheduler
)

with self.lock:
self.schedulers[scheduler_boefje.scheduler_id] = scheduler_boefje
self.schedulers[scheduler_normalizer.scheduler_id] = scheduler_normalizer
self.schedulers[scheduler_report.scheduler_id] = scheduler_report

scheduler_normalizer.run()
scheduler_boefje.run()
scheduler_report.run()

if additions:
# Flush katalogus caches when new organisations are added
self.ctx.services.katalogus.flush_caches()

self.logger.debug("Added %s organisations to scheduler", len(additions), additions=sorted(additions))

@tracer.start_as_current_span("collect_metrics")
def collect_metrics(self) -> None:
"""Collect application metrics

This method that allows to collect metrics throughout the application.
"""
with self.lock:
for s in self.schedulers.copy().values():
self.ctx.metrics_qsize.labels(scheduler_id=s.scheduler_id).set(s.queue.qsize())

status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(scheduler_id=s.scheduler_id, status=status).set(count)

def start_schedulers(self) -> None:
# Initialize the schedulers
try:
orgs = self.ctx.services.katalogus.get_organisations()
except clients.errors.ExternalServiceError as e:
self.logger.error("Failed to get organisations from Katalogus", error=e)
return

for org in orgs:
boefje_scheduler = schedulers.BoefjeScheduler(
ctx=self.ctx, scheduler_id=f"boefje-{org.id}", organisation=org, callback=self.remove_scheduler
)
self.schedulers[boefje_scheduler.scheduler_id] = boefje_scheduler

normalizer_scheduler = schedulers.NormalizerScheduler(
ctx=self.ctx, scheduler_id=f"normalizer-{org.id}", organisation=org, callback=self.remove_scheduler
)
self.schedulers[normalizer_scheduler.scheduler_id] = normalizer_scheduler

report_scheduler = schedulers.ReportScheduler(
ctx=self.ctx, scheduler_id=f"report-{org.id}", organisation=org, callback=self.remove_scheduler
)
self.schedulers[report_scheduler.scheduler_id] = report_scheduler

# Start schedulers
for scheduler in self.schedulers.values():
scheduler.run()

def start_monitors(self) -> None:
thread.ThreadRunner(
name="App-monitor_organisations",
target=self.monitor_organisations,
stop_event=self.stop_event,
interval=self.ctx.config.monitor_organisations_interval,
).start()

def start_collectors(self) -> None:
thread.ThreadRunner(
name="App-metrics_collector", target=self.collect_metrics, stop_event=self.stop_event, interval=10
).start()

def start_server(self) -> None:
self.server = server.Server(self.ctx, self.schedulers)
thread.ThreadRunner(name="App-server", target=self.server.run, stop_event=self.stop_event, loop=False).start()

def run(self) -> None:
"""Start the main scheduler application, and run in threads the
Expand All @@ -218,9 +64,6 @@ def run(self) -> None:
# Start schedulers
self.start_schedulers()

# Start monitors
self.start_monitors()

# Start metrics collecting
if self.ctx.config.collect_metrics:
self.start_collectors()
Expand All @@ -241,24 +84,46 @@ def run(self) -> None:
# Source: https://stackoverflow.com/a/1489838/1346257
os._exit(1)

def start_schedulers(self) -> None:
boefje = schedulers.BoefjeScheduler(ctx=self.ctx)
self.schedulers[boefje.scheduler_id] = boefje

normalizer = schedulers.NormalizerScheduler(ctx=self.ctx)
self.schedulers[normalizer.scheduler_id] = normalizer

report = schedulers.ReportScheduler(ctx=self.ctx)
self.schedulers[report.scheduler_id] = report

for s in self.schedulers.values():
s.run()

def start_collectors(self) -> None:
thread.ThreadRunner(
name="App-metrics_collector", target=self._collect_metrics, stop_event=self.stop_event, interval=10
).start()

def start_server(self) -> None:
self.server = server.Server(self.ctx, self.schedulers)
thread.ThreadRunner(name="App-server", target=self.server.run, stop_event=self.stop_event, loop=False).start()

def shutdown(self) -> None:
"""Shutdown the scheduler application, and all threads."""
self.logger.info("Shutdown initiated")

self.stop_event.set()

# First stop schedulers
for s in self.schedulers.copy().values():
# Stop all schedulers
for s in self.schedulers.values():
s.stop()

# Stop all threads that are still running, except the main thread.
# These threads likely have a blocking call and as such are not able
# to leverage a stop event.
self.stop_threads()
self._stop_threads()

self.logger.info("Shutdown complete")

def stop_threads(self) -> None:
def _stop_threads(self) -> None:
"""Stop all threads, except the main thread."""
for t in threading.enumerate():
if t is threading.current_thread():
Expand All @@ -272,23 +137,23 @@ def stop_threads(self) -> None:

t.join(5)

def unhandled_exception(self, args: threading.ExceptHookArgs) -> None:
def _unhandled_exception(self, args: threading.ExceptHookArgs) -> None:
"""Gracefully shutdown the scheduler application, and all threads
when a unhandled exception occurs.
"""
self.logger.error("Unhandled exception occurred: %s", args.exc_value)
self.stop_event.set()

def remove_scheduler(self, scheduler_id: str) -> None:
"""Remove a scheduler from the application. This method is passed
as a callback to the scheduler, so that the scheduler can remove
itself from the application.
def _collect_metrics(self) -> None:
"""Collect application metrics throughout the application."""

Args:
scheduler_id: The id of the scheduler to remove.
"""
with self.lock:
if scheduler_id not in self.schedulers:
return
# FIXME:: can be queries instead of a loop
# Collect the queue size of the schedulers, and the status counts of
# the tasks for each scheduler.
for s in self.schedulers.values():
qsize = self.ctx.datastores.pq_store.qsize(s.scheduler_id)
self.ctx.metrics_qsize.labels(scheduler_id=s.scheduler_id).set(qsize)

self.schedulers.pop(scheduler_id)
status_counts = self.ctx.datastores.task_store.get_status_counts(s.scheduler_id)
for status, count in status_counts.items():
self.ctx.metrics_task_status_counts.labels(scheduler_id=s.scheduler_id, status=status).set(count)
2 changes: 1 addition & 1 deletion mula/scheduler/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
from .plugin import Plugin
from .queue import Queue
from .schedule import Schedule, ScheduleDB
from .scheduler import Scheduler
from .scheduler import Scheduler, SchedulerType
from .task import BoefjeTask, NormalizerTask, ReportTask, Task, TaskDB, TaskStatus
1 change: 1 addition & 0 deletions mula/scheduler/models/ooi.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ class ScanProfileMutation(BaseModel):
operation: MutationOperationType
primary_key: str
value: OOI | None
organisation: str
2 changes: 1 addition & 1 deletion mula/scheduler/models/organisation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

class Organisation(BaseModel):
id: str
name: str
name: str | None = None
16 changes: 2 additions & 14 deletions mula/scheduler/models/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@ class Schedule(BaseModel):
model_config = ConfigDict(from_attributes=True, validate_assignment=True)

id: uuid.UUID = Field(default_factory=uuid.uuid4)

scheduler_id: str

organisation: str
hash: str | None = Field(None, max_length=32)

data: dict | None = None

enabled: bool = True

schedule: str | None = None

tasks: list[Task] = []

deadline_at: datetime | None = None
Expand Down Expand Up @@ -57,21 +52,14 @@ class ScheduleDB(Base):
__tablename__ = "schedules"

id = Column(GUID, primary_key=True)

scheduler_id = Column(String, nullable=False)

organisation = Column(String, nullable=False)
hash = Column(String(32), nullable=True, unique=True)

data = Column(JSONB, nullable=False)

enabled = Column(Boolean, nullable=False, default=True)

schedule = Column(String, nullable=True)

tasks = relationship("TaskDB", back_populates="schedule")

deadline_at = Column(DateTime(timezone=True), nullable=True)

created_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now())

modified_at = Column(DateTime(timezone=True), nullable=False, server_default=func.now(), onupdate=func.now())
30 changes: 22 additions & 8 deletions mula/scheduler/models/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
from datetime import datetime
from typing import Any
import enum
from datetime import datetime, timezone

from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict, Field
from sqlalchemy import Boolean, Column, DateTime
from sqlalchemy import Enum as SQLAlchemyEnum
from sqlalchemy import Integer, String
from sqlalchemy.sql import func

from .base import Base


class SchedulerType(str, enum.Enum):
"""Enum for scheduler types."""

BOEFJE = "boefje"
NORMALIZER = "normalizer"
REPORT = "report"


class Scheduler(BaseModel):
"""Representation of a schedulers.Scheduler instance. Used for
unmarshalling of schedulers to a JSON representation."""
model_config = ConfigDict(from_attributes=True, use_enum_values=True)

id: str | None = None
enabled: bool | None = None
priority_queue: dict[str, Any] | None = None
id: str
type: SchedulerType
last_activity: datetime | None = None
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
modified_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
Loading
Loading