Skip to content

Commit

Permalink
fix(backend): Add execution persistence for execution scheduler service
Browse files Browse the repository at this point in the history
  • Loading branch information
majdyz committed Nov 14, 2024
1 parent c707ee9 commit 172045f
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 178 deletions.
81 changes: 0 additions & 81 deletions autogpt_platform/backend/backend/data/schedule.py

This file was deleted.

190 changes: 130 additions & 60 deletions autogpt_platform/backend/backend/executor/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,88 @@
import logging
import time
from datetime import datetime
import os
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from autogpt_libs.utils.cache import thread_cached
from dotenv import load_dotenv
from pydantic import BaseModel
from sqlalchemy import MetaData, create_engine

from backend.data.block import BlockInput
from backend.data.schedule import (
ExecutionSchedule,
add_schedule,
get_active_schedules,
get_schedules,
update_schedule,
)
from backend.executor.manager import ExecutionManager
from backend.util.service import AppService, expose, get_service_client
from backend.util.settings import Config


def _extract_schema_from_url(database_url) -> tuple[str, str]:
"""
Extracts the schema from the DATABASE_URL and returns the schema and cleaned URL.
"""
parsed_url = urlparse(database_url)
query_params = parse_qs(parsed_url.query)

# Extract the 'schema' parameter
schema_list = query_params.pop("schema", None)
schema = schema_list[0] if schema_list else "public"

# Reconstruct the query string without the 'schema' parameter
new_query = urlencode(query_params, doseq=True)
new_parsed_url = parsed_url._replace(query=new_query)
database_url_clean = str(urlunparse(new_parsed_url))

return schema, database_url_clean


logger = logging.getLogger(__name__)


def log(msg, **kwargs):
logger.warning("[ExecutionScheduler] " + msg, **kwargs)


class ExecutionScheduler(AppService):
def job_listener(event):
"""Logs job execution outcomes for better monitoring."""
if event.exception:
log(f"Job {event.job_id} failed.")
else:
log(f"Job {event.job_id} completed successfully.")


@thread_cached
def get_execution_client() -> ExecutionManager:
return get_service_client(ExecutionManager)


def execute_graph(**kwargs):
args = JobArgs(**kwargs)
try:
log(f"Executing recurring job for graph #{args.graph_id}")
get_execution_client().add_execution(
args.graph_id, args.input_data, args.user_id
)
except Exception as e:
logger.exception(f"Error executing graph {args.graph_id}: {e}")


class JobArgs(BaseModel):
graph_id: str
input_data: BlockInput
user_id: str
graph_version: int
cron: str


class JobInfo(JobArgs):
id: str
name: str
next_run_time: str

def __init__(self, refresh_interval=10):
super().__init__()
self.use_db = True
self.last_check = datetime.min
self.refresh_interval = refresh_interval

class ExecutionScheduler(AppService):
scheduler: BlockingScheduler

@classmethod
def get_port(cls) -> int:
Expand All @@ -43,43 +94,18 @@ def execution_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager)

def run_service(self):
scheduler = BackgroundScheduler()
scheduler.start()
while True:
self.__refresh_jobs_from_db(scheduler)
time.sleep(self.refresh_interval)

def __refresh_jobs_from_db(self, scheduler: BackgroundScheduler):
schedules = self.run_and_wait(get_active_schedules(self.last_check))
for schedule in schedules:
if schedule.last_updated:
self.last_check = max(self.last_check, schedule.last_updated)

if not schedule.is_enabled:
log(f"Removing recurring job {schedule.id}: {schedule.schedule}")
scheduler.remove_job(schedule.id)
continue

log(f"Adding recurring job {schedule.id}: {schedule.schedule}")
scheduler.add_job(
self.__execute_graph,
CronTrigger.from_crontab(schedule.schedule),
id=schedule.id,
args=[schedule.graph_id, schedule.input_data, schedule.user_id],
replace_existing=True,
)

def __execute_graph(self, graph_id: str, input_data: dict, user_id: str):
try:
log(f"Executing recurring job for graph #{graph_id}")
self.execution_client.add_execution(graph_id, input_data, user_id)
except Exception as e:
logger.exception(f"Error executing graph {graph_id}: {e}")

@expose
def update_schedule(self, schedule_id: str, is_enabled: bool, user_id: str) -> str:
self.run_and_wait(update_schedule(schedule_id, is_enabled, user_id))
return schedule_id
load_dotenv()
db_schema, db_url = _extract_schema_from_url(os.getenv("DATABASE_URL"))
self.scheduler = BlockingScheduler(
jobstores={
"default": SQLAlchemyJobStore(
engine=create_engine(db_url),
metadata=MetaData(schema=db_schema),
)
}
)
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
self.scheduler.start()

@expose
def add_execution_schedule(
Expand All @@ -90,16 +116,60 @@ def add_execution_schedule(
input_data: BlockInput,
user_id: str,
) -> str:
schedule = ExecutionSchedule(
job_id = f"{user_id}_{graph_id}"
job_args = JobArgs(
graph_id=graph_id,
input_data=input_data,
user_id=user_id,
graph_version=graph_version,
schedule=cron,
input_data=input_data,
cron=cron,
)
self.scheduler.add_job(
execute_graph,
CronTrigger.from_crontab(cron),
id=job_id,
kwargs=job_args.model_dump(),
replace_existing=True,
)
return self.run_and_wait(add_schedule(schedule)).id
log(f"Added job {job_id} with cron schedule '{cron}'")
return job_id

@expose
def update_schedule(self, schedule_id: str, is_enabled: bool, user_id: str) -> str:
job = self.scheduler.get_job(schedule_id)
if not job:
log(f"Job {schedule_id} not found.")
return schedule_id

job_args = JobArgs(**job.kwargs)
if job_args.user_id != user_id:
raise ValueError("User ID does not match the job's user ID.")

if not is_enabled:
log(f"Pausing job {schedule_id}")
job.pause()
else:
log(f"Resuming job {schedule_id}")
job.resume()

return schedule_id

@expose
def get_execution_schedules(self, graph_id: str, user_id: str) -> dict[str, str]:
schedules = self.run_and_wait(get_schedules(graph_id, user_id=user_id))
return {v.id: v.schedule for v in schedules}
def get_execution_schedules(self, graph_id: str, user_id: str) -> list[JobInfo]:
schedules = []
for job in self.scheduler.get_jobs():
job_args = JobArgs(**job.kwargs)
if (
job_args.graph_id == graph_id
and job_args.user_id == user_id
and job.next_run_time is not None
):
schedules.append(
JobInfo(
id=job.id,
name=job.name,
next_run_time=job.next_run_time.isoformat(),
**job_args.model_dump(),
)
)
return schedules
4 changes: 2 additions & 2 deletions autogpt_platform/backend/backend/server/routers/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from backend.data.block import BlockInput, CompletedBlockOutput
from backend.data.credit import get_block_costs, get_user_credit_model
from backend.data.user import get_or_create_user
from backend.executor import ExecutionManager, ExecutionScheduler
from backend.executor import ExecutionManager, ExecutionScheduler, scheduler
from backend.server.model import CreateGraph, SetGraphActiveVersion
from backend.server.utils import get_user_id
from backend.util.service import get_service_client
Expand Down Expand Up @@ -476,7 +476,7 @@ async def update_schedule(
)
async def get_execution_schedules(
graph_id: str, user_id: Annotated[str, Depends(get_user_id)]
) -> dict[str, str]:
) -> list[scheduler.JobInfo]:
return execution_scheduler_client().get_execution_schedules(graph_id, user_id)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
Warnings:
- You are about to drop the `AgentGraphExecutionSchedule` table. If the table is not empty, all the data it contains will be lost.
*/
-- DropForeignKey
ALTER TABLE "AgentGraphExecutionSchedule" DROP CONSTRAINT "AgentGraphExecutionSchedule_agentGraphId_agentGraphVersion_fkey";

-- DropForeignKey
ALTER TABLE "AgentGraphExecutionSchedule" DROP CONSTRAINT "AgentGraphExecutionSchedule_userId_fkey";

-- DropTable
DROP TABLE "AgentGraphExecutionSchedule";
Loading

0 comments on commit 172045f

Please sign in to comment.