Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): agent shutdown routine #578

Merged
merged 2 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 48 additions & 44 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Agent"""

import asyncio
import contextlib
import functools
import logging
import uuid
from typing import (
Any,
Callable,
Coroutine,
Dict,
List,
Optional,
Expand Down Expand Up @@ -106,18 +106,6 @@ async def _run_interval(
await asyncio.sleep(period)


async def _delay(coroutine: Coroutine, delay_seconds: float):
"""
Delay the execution of the provided coroutine by the specified number of seconds.

Args:
coroutine (Coroutine): The coroutine to delay.
delay_seconds (float): The delay time in seconds.
"""
await asyncio.sleep(delay_seconds)
await coroutine


async def _send_error_message(ctx: Context, destination: str, msg: ErrorMessage):
"""
Send an error message to the specified destination.
Expand Down Expand Up @@ -812,19 +800,17 @@ async def _schedule_registration(self):
registration.

"""
time_until_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self.register()
except InsufficientFundsError:
time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS
while True:
time_until_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self.register()
except InsufficientFundsError:
time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._schedule_registration(), time_until_next_registration)
)
await asyncio.sleep(time_until_next_registration)

def on_interval(
self,
Expand Down Expand Up @@ -1100,7 +1086,7 @@ async def _startup(self):
"""
if self._registration_policy:
if self._endpoints:
await self._schedule_registration()
self.start_registration_loop()
else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
Expand Down Expand Up @@ -1149,6 +1135,13 @@ async def setup(self):
self.start_message_receivers()
self.start_interval_tasks()

def start_registration_loop(self):
"""
Start the registration loop.

"""
self._loop.create_task(self._schedule_registration())

def start_message_dispenser(self):
"""
Start the message dispenser.
Expand Down Expand Up @@ -1217,16 +1210,25 @@ async def run_async(self):
tasks.append(self._mailbox_client.run())

try:
await asyncio.gather(*tasks)
await asyncio.gather(*tasks, return_exceptions=True)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
finally:
await self._shutdown()
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
_ = [task.cancel() for task in tasks]
await asyncio.gather(*tasks)

def run(self):
"""
Run the agent.
Run the agent by itself.
A fresh event loop is created for the agent and it is closed after the agent stops.

"""
self._loop.run_until_complete(self.run_async())
with contextlib.suppress(asyncio.CancelledError, KeyboardInterrupt):
self._loop.run_until_complete(self.run_async())
self._loop.stop()
self._loop.close()

def get_message_protocol(
self, message_schema_digest
Expand Down Expand Up @@ -1509,40 +1511,42 @@ async def _schedule_registration(self):
Start the batch registration loop.

"""

if not any(agent._endpoints for agent in self._agents):
return

time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self._registration_policy.register()
except InsufficientFundsError:
time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS
while True:
time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self._registration_policy.register()
except InsufficientFundsError:
time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._schedule_registration(), time_to_next_registration)
)
await asyncio.sleep(time_to_next_registration)

async def run_async(self):
"""
Run the agents managed by the bureau.

"""
tasks = [self._server.serve()]
if not self._agents:
self._logger.warning("No agents to run.")
return
for agent in self._agents:
await agent.setup()
if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None:
tasks.append(agent.mailbox_client.run())
tasks.append(self._schedule_registration())

try:
await asyncio.gather(*tasks)
await asyncio.gather(*tasks, return_exceptions=True)
finally:
await asyncio.gather(*[agent._shutdown() for agent in self._agents])
await asyncio.gather(
*[agent._shutdown() for agent in self._agents], return_exceptions=True
)

def run(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions python/src/uagents/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ async def serve(self):
)
try:
await self._server.serve()
except KeyboardInterrupt:
self._logger.info("Shutting down server")
except (asyncio.CancelledError, KeyboardInterrupt):
self._logger.info("Shutting down server...")

async def _handle_rest(
self,
Expand Down
4 changes: 3 additions & 1 deletion python/src/uagents/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ async def run(self):
"""
Runs the mailbox client.
"""
await asyncio.gather(self.start_polling(), self.process_deletion_queue())
loop = asyncio.get_event_loop()
loop.create_task(self.start_polling())
loop.create_task(self.process_deletion_queue())

async def start_polling(self):
"""
Expand Down
Loading