Skip to content

Commit

Permalink
fix(backend): Fix .env file read contention on pyro connection setup (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
majdyz authored Nov 25, 2024
1 parent bc8ae1f commit f00654c
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 10 deletions.
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/data/credit.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from backend.data.cost import BlockCost, BlockCostType
from backend.util.settings import Config

config = Config()


class UserCreditBase(ABC):
def __init__(self, num_user_credits_refill: int):
Expand Down Expand Up @@ -202,7 +204,6 @@ async def top_up_credits(self, *args, **kwargs):


def get_user_credit_model() -> UserCreditBase:
config = Config()
if config.enable_credit.lower() == "true":
return UserCredit(config.num_user_credits_refill)
else:
Expand Down
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/executor/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

P = ParamSpec("P")
R = TypeVar("R")
config = Config()


class DatabaseManager(AppService):
Expand All @@ -38,7 +39,7 @@ def __init__(self):

@classmethod
def get_port(cls) -> int:
return Config().database_api_port
return config.database_api_port

@expose
def send_execution_update(self, execution_result: ExecutionResult):
Expand Down
3 changes: 2 additions & 1 deletion autogpt_platform/backend/backend/executor/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def _extract_schema_from_url(database_url) -> tuple[str, str]:


logger = logging.getLogger(__name__)
config = Config()


def log(msg, **kwargs):
Expand Down Expand Up @@ -96,7 +97,7 @@ class ExecutionScheduler(AppService):

@classmethod
def get_port(cls) -> int:
return Config().execution_scheduler_port
return config.execution_scheduler_port

@property
@thread_cached
Expand Down
5 changes: 3 additions & 2 deletions autogpt_platform/backend/backend/util/retry.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import threading
from functools import wraps
from uuid import uuid4

Expand All @@ -16,7 +17,7 @@ def _log_prefix(resource_name: str, conn_id: str):
This needs to be called on the fly to get the current process ID & service name,
not the parent process ID & service name.
"""
return f"[PID-{os.getpid()}|{get_service_name()}|{resource_name}-{conn_id}]"
return f"[PID-{os.getpid()}|THREAD-{threading.get_native_id()}|{get_service_name()}|{resource_name}-{conn_id}]"


def conn_retry(resource_name: str, action_name: str, max_retry: int = 5):
Expand All @@ -25,7 +26,7 @@ def conn_retry(resource_name: str, action_name: str, max_retry: int = 5):
def on_retry(retry_state):
prefix = _log_prefix(resource_name, conn_id)
exception = retry_state.outcome.exception()
logger.info(f"{prefix} {action_name} failed: {exception}. Retrying now...")
logger.error(f"{prefix} {action_name} failed: {exception}. Retrying now...")

def decorator(func):
@wraps(func)
Expand Down
9 changes: 4 additions & 5 deletions autogpt_platform/backend/backend/util/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def get_port(cls) -> int:

@classmethod
def get_host(cls) -> str:
return os.environ.get(f"{cls.service_name.upper()}_HOST", Config().pyro_host)
return os.environ.get(f"{cls.service_name.upper()}_HOST", config.pyro_host)

def run_service(self) -> None:
while True:
Expand Down Expand Up @@ -170,14 +170,13 @@ def cleanup(self):

@conn_retry("Pyro", "Starting Pyro Service")
def __start_pyro(self):
conf = Config()
maximum_connection_thread_count = max(
Pyro5.config.THREADPOOL_SIZE,
conf.num_node_workers * conf.num_graph_workers,
config.num_node_workers * config.num_graph_workers,
)

Pyro5.config.THREADPOOL_SIZE = maximum_connection_thread_count # type: ignore
daemon = Pyro5.api.Daemon(host=conf.pyro_host, port=self.get_port())
daemon = Pyro5.api.Daemon(host=config.pyro_host, port=self.get_port())
self.uri = daemon.register(self, objectId=self.service_name)
logger.info(f"[{self.service_name}] Connected to Pyro; URI = {self.uri}")
daemon.requestLoop()
Expand Down Expand Up @@ -209,7 +208,7 @@ def get_service_client(service_type: Type[AS]) -> AS:
class DynamicClient(PyroClient):
@conn_retry("Pyro", f"Connecting to [{service_name}]")
def __init__(self):
host = os.environ.get(f"{service_name.upper()}_HOST", "localhost")
host = os.environ.get(f"{service_name.upper()}_HOST", pyro_host)
uri = f"PYRO:{service_type.service_name}@{host}:{service_type.get_port()}"
logger.debug(f"Connecting to service [{service_name}]. URI = {uri}")
self.proxy = Pyro5.api.Proxy(uri)
Expand Down

0 comments on commit f00654c

Please sign in to comment.