Skip to content

Commit

Permalink
Merge branch 'dev' into aarushikansal/add-new-health-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
aarushik93 authored Oct 16, 2024
2 parents ca76093 + 6f07d24 commit 05a354a
Show file tree
Hide file tree
Showing 41 changed files with 625 additions and 300 deletions.
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ def main(**kwargs):
Run all the processes required for the AutoGPT-server (REST and WebSocket APIs).
"""

from backend.executor import ExecutionManager, ExecutionScheduler
from backend.executor import DatabaseManager, ExecutionManager, ExecutionScheduler
from backend.server.rest_api import AgentServer
from backend.server.ws_api import WebsocketServer

run_processes(
DatabaseManager(),
ExecutionManager(),
ExecutionScheduler(),
WebsocketServer(),
Expand Down
12 changes: 8 additions & 4 deletions autogpt_platform/backend/backend/data/credit.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
LlmModel,
)
from backend.blocks.talking_head import CreateTalkingAvatarVideoBlock
from backend.data.block import Block, BlockInput
from backend.data.block import Block, BlockInput, get_block
from backend.util.settings import Config


Expand Down Expand Up @@ -96,7 +96,7 @@ async def spend_credits(
self,
user_id: str,
user_credit: int,
block: Block,
block_id: str,
input_data: BlockInput,
data_size: float,
run_time: float,
Expand All @@ -107,7 +107,7 @@ async def spend_credits(
Args:
user_id (str): The user ID.
user_credit (int): The current credit for the user.
block (Block): The block that is being used.
block_id (str): The block ID.
input_data (BlockInput): The input data for the block.
data_size (float): The size of the data being processed.
run_time (float): The time taken to run the block.
Expand Down Expand Up @@ -208,12 +208,16 @@ async def spend_credits(
self,
user_id: str,
user_credit: int,
block: Block,
block_id: str,
input_data: BlockInput,
data_size: float,
run_time: float,
validate_balance: bool = True,
) -> int:
block = get_block(block_id)
if not block:
raise ValueError(f"Block not found: {block_id}")

cost, matching_filter = self._block_usage_cost(
block=block, input_data=input_data, data_size=data_size, run_time=run_time
)
Expand Down
19 changes: 5 additions & 14 deletions autogpt_platform/backend/backend/data/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import prisma.types
from prisma.models import AgentGraph, AgentGraphExecution, AgentNode, AgentNodeLink
from prisma.types import AgentGraphInclude
from pydantic import BaseModel, PrivateAttr
from pydantic import BaseModel
from pydantic_core import PydanticUndefinedType

from backend.blocks.basic import AgentInputBlock, AgentOutputBlock
Expand Down Expand Up @@ -51,17 +51,8 @@ class Node(BaseDbModel):
block_id: str
input_default: BlockInput = {} # dict[input_name, default_value]
metadata: dict[str, Any] = {}

_input_links: list[Link] = PrivateAttr(default=[])
_output_links: list[Link] = PrivateAttr(default=[])

@property
def input_links(self) -> list[Link]:
return self._input_links

@property
def output_links(self) -> list[Link]:
return self._output_links
input_links: list[Link] = []
output_links: list[Link] = []

@staticmethod
def from_db(node: AgentNode):
Expand All @@ -73,8 +64,8 @@ def from_db(node: AgentNode):
input_default=json.loads(node.constantInput),
metadata=json.loads(node.metadata),
)
obj._input_links = [Link.from_db(link) for link in node.Input or []]
obj._output_links = [Link.from_db(link) for link in node.Output or []]
obj.input_links = [Link.from_db(link) for link in node.Input or []]
obj.output_links = [Link.from_db(link) for link in node.Output or []]
return obj


Expand Down
37 changes: 13 additions & 24 deletions autogpt_platform/backend/backend/data/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@ def default(self, o):


class AbstractEventQueue(ABC):
@abstractmethod
def connect(self):
pass

@abstractmethod
def close(self):
pass

@abstractmethod
def put(self, execution_result: ExecutionResult):
pass
Expand All @@ -36,26 +28,23 @@ def get(self) -> ExecutionResult | None:

class RedisEventQueue(AbstractEventQueue):
def __init__(self):
self.connection = None
self.queue_name = redis.QUEUE_NAME

def connect(self):
self.connection = redis.connect()
@property
def connection(self):
return redis.get_redis()

def put(self, execution_result: ExecutionResult):
if self.connection:
message = json.dumps(execution_result.model_dump(), cls=DateTimeEncoder)
logger.info(f"Putting execution result to Redis {message}")
self.connection.lpush(self.queue_name, message)
message = json.dumps(execution_result.model_dump(), cls=DateTimeEncoder)
logger.info(f"Putting execution result to Redis {message}")
self.connection.lpush(self.queue_name, message)

def get(self) -> ExecutionResult | None:
if self.connection:
message = self.connection.rpop(self.queue_name)
if message is not None and isinstance(message, (str, bytes, bytearray)):
data = json.loads(message)
logger.info(f"Getting execution result from Redis {data}")
return ExecutionResult(**data)
message = self.connection.rpop(self.queue_name)
if message is not None and isinstance(message, (str, bytes, bytearray)):
data = json.loads(message)
logger.info(f"Getting execution result from Redis {data}")
return ExecutionResult(**data)
elif message is not None:
logger.error(f"Failed to get execution result from Redis {message}")
return None

def close(self):
redis.disconnect()
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/exec.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from backend.app import run_processes
from backend.executor import ExecutionManager
from backend.executor import DatabaseManager, ExecutionManager


def main():
"""
Run all the processes required for the AutoGPT-server REST API.
"""
run_processes(
DatabaseManager(),
ExecutionManager(),
)

Expand Down
2 changes: 2 additions & 0 deletions autogpt_platform/backend/backend/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from .database import DatabaseManager
from .manager import ExecutionManager
from .scheduler import ExecutionScheduler

__all__ = [
"DatabaseManager",
"ExecutionManager",
"ExecutionScheduler",
]
75 changes: 75 additions & 0 deletions autogpt_platform/backend/backend/executor/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from functools import wraps
from typing import Any, Callable, Concatenate, Coroutine, ParamSpec, TypeVar, cast

from backend.data.credit import get_user_credit_model
from backend.data.execution import (
ExecutionResult,
create_graph_execution,
get_execution_results,
get_incomplete_executions,
get_latest_execution,
update_execution_status,
update_graph_execution_stats,
update_node_execution_stats,
upsert_execution_input,
upsert_execution_output,
)
from backend.data.graph import get_graph, get_node
from backend.data.queue import RedisEventQueue
from backend.util.service import AppService, expose
from backend.util.settings import Config

P = ParamSpec("P")
R = TypeVar("R")


class DatabaseManager(AppService):

def __init__(self):
super().__init__(port=Config().database_api_port)
self.use_db = True
self.use_redis = True
self.event_queue = RedisEventQueue()

@expose
def send_execution_update(self, execution_result_dict: dict[Any, Any]):
self.event_queue.put(ExecutionResult(**execution_result_dict))

@staticmethod
def exposed_run_and_wait(
f: Callable[P, Coroutine[None, None, R]]
) -> Callable[Concatenate[object, P], R]:
@expose
@wraps(f)
def wrapper(self, *args: P.args, **kwargs: P.kwargs) -> R:
coroutine = f(*args, **kwargs)
res = self.run_and_wait(coroutine)
return res

return wrapper

# Executions
create_graph_execution = exposed_run_and_wait(create_graph_execution)
get_execution_results = exposed_run_and_wait(get_execution_results)
get_incomplete_executions = exposed_run_and_wait(get_incomplete_executions)
get_latest_execution = exposed_run_and_wait(get_latest_execution)
update_execution_status = exposed_run_and_wait(update_execution_status)
update_graph_execution_stats = exposed_run_and_wait(update_graph_execution_stats)
update_node_execution_stats = exposed_run_and_wait(update_node_execution_stats)
upsert_execution_input = exposed_run_and_wait(upsert_execution_input)
upsert_execution_output = exposed_run_and_wait(upsert_execution_output)

# Graphs
get_node = exposed_run_and_wait(get_node)
get_graph = exposed_run_and_wait(get_graph)

# Credits
user_credit_model = get_user_credit_model()
get_or_refill_credit = cast(
Callable[[Any, str], int],
exposed_run_and_wait(user_credit_model.get_or_refill_credit),
)
spend_credits = cast(
Callable[[Any, str, int, str, dict[str, str], float, float], int],
exposed_run_and_wait(user_credit_model.spend_credits),
)
Loading

0 comments on commit 05a354a

Please sign in to comment.