Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend): Introduce executors shared DB connection #8340

Merged
merged 15 commits into from
Oct 16, 2024
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ def main(**kwargs):
Run all the processes required for the AutoGPT-server (REST and WebSocket APIs).
"""

from backend.executor import ExecutionManager, ExecutionScheduler
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.server.rest_api import AgentServer
from backend.server.ws_api import WebsocketServer

run_processes(
DatabaseManager(),
ExecutionManager(),
ExecutionScheduler(),
WebsocketServer(),
Expand Down
12 changes: 8 additions & 4 deletions autogpt_platform/backend/backend/data/credit.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
LlmModel,
)
from backend.blocks.talking_head import CreateTalkingAvatarVideoBlock
from backend.data.block import Block, BlockInput
from backend.data.block import Block, BlockInput, get_block
majdyz marked this conversation as resolved.
Show resolved Hide resolved
from backend.util.settings import Config


Expand Down Expand Up @@ -96,7 +96,7 @@ async def spend_credits(
self,
user_id: str,
user_credit: int,
block: Block,
block_id: str,
input_data: BlockInput,
data_size: float,
run_time: float,
Expand All @@ -107,7 +107,7 @@ async def spend_credits(
Args:
user_id (str): The user ID.
user_credit (int): The current credit for the user.
block (Block): The block that is being used.
block_id (str): The block ID.
input_data (BlockInput): The input data for the block.
data_size (float): The size of the data being processed.
run_time (float): The time taken to run the block.
Expand Down Expand Up @@ -208,12 +208,16 @@ async def spend_credits(
self,
user_id: str,
user_credit: int,
block: Block,
block_id: str,
input_data: BlockInput,
data_size: float,
run_time: float,
validate_balance: bool = True,
) -> int:
block = get_block(block_id)
if not block:
raise ValueError(f"Block not found: {block_id}")

cost, matching_filter = self._block_usage_cost(
block=block, input_data=input_data, data_size=data_size, run_time=run_time
)
Expand Down
19 changes: 5 additions & 14 deletions autogpt_platform/backend/backend/data/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import prisma.types
from prisma.models import AgentGraph, AgentGraphExecution, AgentNode, AgentNodeLink
from prisma.types import AgentGraphInclude
from pydantic import BaseModel, PrivateAttr
from pydantic import BaseModel
from pydantic_core import PydanticUndefinedType

from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
Expand Down Expand Up @@ -51,17 +51,8 @@ class Node(BaseDbModel):
block_id: str
input_default: BlockInput = {} # dict[input_name, default_value]
metadata: dict[str, Any] = {}

majdyz marked this conversation as resolved.
Show resolved Hide resolved
_input_links: list[Link] = PrivateAttr(default=[])
_output_links: list[Link] = PrivateAttr(default=[])

@property
def input_links(self) -> list[Link]:
return self._input_links

@property
def output_links(self) -> list[Link]:
return self._output_links
input_links: list[Link] = []
output_links: list[Link] = []

@staticmethod
def from_db(node: AgentNode):
Expand All @@ -73,8 +64,8 @@ def from_db(node: AgentNode):
input_default=json.loads(node.constantInput),
metadata=json.loads(node.metadata),
)
obj._input_links = [Link.from_db(link) for link in node.Input or []]
obj._output_links = [Link.from_db(link) for link in node.Output or []]
obj.input_links = [Link.from_db(link) for link in node.Input or []]
obj.output_links = [Link.from_db(link) for link in node.Output or []]
return obj


Expand Down
37 changes: 13 additions & 24 deletions autogpt_platform/backend/backend/data/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ def default(self, o):


class AbstractEventQueue(ABC):
majdyz marked this conversation as resolved.
Show resolved Hide resolved
@abstractmethod
def connect(self):
pass

@abstractmethod
def close(self):
pass

@abstractmethod
def put(self, execution_result: ExecutionResult):
pass
Expand All @@ -36,26 +28,23 @@ def get(self) -> ExecutionResult | None:

class RedisEventQueue(AbstractEventQueue):
def __init__(self):
self.connection = None
self.queue_name = redis.QUEUE_NAME

def connect(self):
self.connection = redis.connect()
@property
def connection(self):
return redis.get_redis()

def put(self, execution_result: ExecutionResult):
if self.connection:
message = json.dumps(execution_result.model_dump(), cls=DateTimeEncoder)
logger.info(f"Putting execution result to Redis {message}")
self.connection.lpush(self.queue_name, message)
message = json.dumps(execution_result.model_dump(), cls=DateTimeEncoder)
logger.info(f"Putting execution result to Redis {message}")
self.connection.lpush(self.queue_name, message)

def get(self) -> ExecutionResult | None:
if self.connection:
message = self.connection.rpop(self.queue_name)
if message is not None and isinstance(message, (str, bytes, bytearray)):
data = json.loads(message)
logger.info(f"Getting execution result from Redis {data}")
return ExecutionResult(**data)
message = self.connection.rpop(self.queue_name)
if message is not None and isinstance(message, (str, bytes, bytearray)):
data = json.loads(message)
logger.info(f"Getting execution result from Redis {data}")
return ExecutionResult(**data)
elif message is not None:
logger.error(f"Failed to get execution result from Redis {message}")
return None

def close(self):
redis.disconnect()
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/exec.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from backend.app import run_processes
from backend.executor import ExecutionManager
from backend.executor import DatabaseManager, ExecutionManager


def main():
"""
Run all the processes required for the AutoGPT-server REST API.
"""
run_processes(
DatabaseManager(),
ExecutionManager(),
)

Expand Down
2 changes: 2 additions & 0 deletions autogpt_platform/backend/backend/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from .database import DatabaseManager
from .manager import ExecutionManager
from .scheduler import ExecutionScheduler

__all__ = [
"DatabaseManager",
"ExecutionManager",
"ExecutionScheduler",
]
75 changes: 75 additions & 0 deletions autogpt_platform/backend/backend/executor/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from functools import wraps
from typing import Any, Callable, Concatenate, Coroutine, ParamSpec, TypeVar, cast

from backend.data.credit import get_user_credit_model
from backend.data.execution import (
ExecutionResult,
create_graph_execution,
get_execution_results,
get_incomplete_executions,
get_latest_execution,
update_execution_status,
update_graph_execution_stats,
update_node_execution_stats,
upsert_execution_input,
upsert_execution_output,
)
from backend.data.graph import get_graph, get_node
from backend.data.queue import RedisEventQueue
from backend.util.service import AppService, expose
from backend.util.settings import Config

P = ParamSpec("P")
R = TypeVar("R")


class DatabaseManager(AppService):

def __init__(self):
super().__init__(port=Config().database_api_port)
self.use_db = True
self.use_redis = True
self.event_queue = RedisEventQueue()

@expose
def send_execution_update(self, execution_result_dict: dict[Any, Any]):
self.event_queue.put(ExecutionResult(**execution_result_dict))

@staticmethod
def exposed_run_and_wait(
f: Callable[P, Coroutine[None, None, R]]
) -> Callable[Concatenate[object, P], R]:
@expose
@wraps(f)
def wrapper(self, *args: P.args, **kwargs: P.kwargs) -> R:
coroutine = f(*args, **kwargs)
res = self.run_and_wait(coroutine)
return res

return wrapper

# Executions
create_graph_execution = exposed_run_and_wait(create_graph_execution)
get_execution_results = exposed_run_and_wait(get_execution_results)
get_incomplete_executions = exposed_run_and_wait(get_incomplete_executions)
get_latest_execution = exposed_run_and_wait(get_latest_execution)
update_execution_status = exposed_run_and_wait(update_execution_status)
update_graph_execution_stats = exposed_run_and_wait(update_graph_execution_stats)
update_node_execution_stats = exposed_run_and_wait(update_node_execution_stats)
upsert_execution_input = exposed_run_and_wait(upsert_execution_input)
upsert_execution_output = exposed_run_and_wait(upsert_execution_output)

# Graphs
get_node = exposed_run_and_wait(get_node)
get_graph = exposed_run_and_wait(get_graph)

# Credits
user_credit_model = get_user_credit_model()
get_or_refill_credit = cast(
Callable[[Any, str], int],
exposed_run_and_wait(user_credit_model.get_or_refill_credit),
)
spend_credits = cast(
Callable[[Any, str, int, str, dict[str, str], float, float], int],
exposed_run_and_wait(user_credit_model.spend_credits),
)
Loading
Loading