From d12313ecb7fa578763076c5390d8fb029c0393e7 Mon Sep 17 00:00:00 2001 From: Archento Date: Tue, 19 Nov 2024 16:46:50 +0100 Subject: [PATCH] fix(core): bureau shutdown routine (#579) --- python/docs/api/uagents/agent.md | 71 +++++-- python/docs/api/uagents/crypto/__init__.md | 3 +- .../uagents/experimental/quota/__init__.md | 179 ++++++++++++++++++ python/docs/api/uagents/network.md | 95 +++++++++- python/docs/api/uagents/registration.md | 44 +++++ python/src/uagents/agent.py | 27 +-- 6 files changed, 383 insertions(+), 36 deletions(-) create mode 100644 python/docs/api/uagents/experimental/quota/__init__.md diff --git a/python/docs/api/uagents/agent.md b/python/docs/api/uagents/agent.md index 642a2a69..9fe67330 100644 --- a/python/docs/api/uagents/agent.md +++ b/python/docs/api/uagents/agent.md @@ -389,6 +389,21 @@ Get the balance of the agent. - `int` - Bank balance. + + +#### info + +```python +@property +def info() -> AgentInfo +``` + +Get basic information about the agent. + +**Returns**: + +- `AgentInfo` - The agent's address, endpoints, protocols, and metadata. + #### metadata @@ -478,18 +493,26 @@ Sign the provided digest. #### sign`_`registration ```python -def sign_registration() -> str +def sign_registration(timestamp: int, + sender_wallet_address: Optional[str] = None) -> str ``` Sign the registration data for Almanac contract. +**Arguments**: + +- `timestamp` _int_ - The timestamp for the registration. +- `sender_wallet_address` _Optional[str]_ - The wallet address of the transaction sender. + + **Returns**: - `str` - The signature of the registration data. + **Raises**: -- `AssertionError` - If the Almanac contract address is None. +- `AssertionError` - If the Almanac contract is None. @@ -736,6 +759,16 @@ async def setup() Include the internal agent protocol, run startup tasks, and start background tasks. + + +#### start`_`registration`_`loop + +```python +def start_registration_loop() +``` + +Start the registration loop. + #### start`_`message`_`dispenser @@ -794,7 +827,8 @@ Create all tasks for the agent. def run() ``` -Run the agent. +Run the agent by itself. +A fresh event loop is created for the agent and it is closed after the agent stops. @@ -819,27 +853,20 @@ A class representing a Bureau of agents. This class manages a collection of agents and orchestrates their execution. -**Arguments**: - -- `agents` _Optional[List[Agent]]_ - The list of agents to be managed by the bureau. -- `port` _Optional[int]_ - The port number for the server. -- `endpoint` _Optional[Union[str, List[str], Dict[str, dict]]]_ - Configuration - for agent endpoints. - - **Attributes**: - `_loop` _asyncio.AbstractEventLoop_ - The event loop. - `_agents` _List[Agent]_ - The list of agents to be managed by the bureau. -- `_registered_agents` _List[Agent]_ - The list of agents contained in the bureau. - `_endpoints` _List[Dict[str, Any]]_ - The endpoint configuration for the bureau. - `_port` _int_ - The port on which the bureau's server runs. - `_queries` _Dict[str, asyncio.Future]_ - Dictionary mapping query senders to their response Futures. - `_logger` _Logger_ - The logger instance. - `_server` _ASGIServer_ - The ASGI server instance for handling requests. +- `_agentverse` _Dict[str, str]_ - The agentverse configuration for the bureau. - `_use_mailbox` _bool_ - A flag indicating whether mailbox functionality is enabled for any of the agents. +- `_registration_policy` _AgentRegistrationPolicy_ - The registration policy for the bureau. @@ -849,6 +876,12 @@ This class manages a collection of agents and orchestrates their execution. def __init__(agents: Optional[List[Agent]] = None, port: Optional[int] = None, endpoint: Optional[Union[str, List[str], Dict[str, dict]]] = None, + agentverse: Optional[Union[str, Dict[str, str]]] = None, + registration_policy: Optional[BatchRegistrationPolicy] = None, + ledger: Optional[LedgerClient] = None, + wallet: Optional[LocalWallet] = None, + seed: Optional[str] = None, + test: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None, log_level: Union[int, str] = logging.INFO) ``` @@ -857,9 +890,17 @@ Initialize a Bureau instance. **Arguments**: -- `port` _Optional[int]_ - The port on which the bureau's server will run. -- `endpoint` _Optional[Union[str, List[str], Dict[str, dict]]]_ - The endpoint configuration - for the bureau. +- `agents` _Optional[List[Agent]]_ - The list of agents to be managed by the bureau. +- `port` _Optional[int]_ - The port number for the server. +- `endpoint` _Optional[Union[str, List[str], Dict[str, dict]]]_ - The endpoint configuration. +- `agentverse` _Optional[Union[str, Dict[str, str]]]_ - The agentverse configuration. +- `registration_policy` _Optional[BatchRegistrationPolicy]_ - The registration policy. +- `ledger` _Optional[LedgerClient]_ - The ledger for the bureau. +- `wallet` _Optional[LocalWallet]_ - The wallet for the bureau (overrides 'seed'). +- `seed` _Optional[str]_ - The seed phrase for the wallet (overridden by 'wallet'). +- `test` _Optional[bool]_ - True if the bureau will register and transact on the testnet. +- `loop` _Optional[asyncio.AbstractEventLoop]_ - The event loop. +- `log_level` _Union[int, str]_ - The logging level for the bureau. diff --git a/python/docs/api/uagents/crypto/__init__.md b/python/docs/api/uagents/crypto/__init__.md index 5814a581..c2ffee57 100644 --- a/python/docs/api/uagents/crypto/__init__.md +++ b/python/docs/api/uagents/crypto/__init__.md @@ -102,7 +102,8 @@ Sign the provided digest. #### sign`_`registration ```python -def sign_registration(contract_address: str, sequence: int) -> str +def sign_registration(contract_address: str, timestamp: int, + wallet_address: str) -> str ``` Sign the registration data for the Almanac contract. diff --git a/python/docs/api/uagents/experimental/quota/__init__.md b/python/docs/api/uagents/experimental/quota/__init__.md new file mode 100644 index 00000000..793abf0e --- /dev/null +++ b/python/docs/api/uagents/experimental/quota/__init__.md @@ -0,0 +1,179 @@ + + +# src.uagents.experimental.quota.`__`init`__` + +This Protocol class can be used to rate limit `on_message` message handlers. + +The rate limiter uses the agents storage to keep track of the number of requests +made by another agent within a given time window. If the number of requests exceeds +a specified limit, the rate limiter will block further requests until the time +window resets. + +> Default: Not rate limited, but you can set a default during initialization. + +Additionally, the protocol can be used to set access control rules for handlers +allowing or blocking specific agents from accessing the handler. +The default access control rule can be set to allow or block all agents. + +Both rules can work together to provide a secure and rate-limited environment for +message handlers. + + +Usage examples: + +```python +from uagents.experimental.quota import AccessControlList, QuotaProtocol, RateLimit + +# Initialize the QuotaProtocol instance +quota_protocol = QuotaProtocol( + storage_reference=agent.storage, + name="quota_proto", + version=agent._version, + # default_rate_limit=RateLimit(window_size_minutes=1, max_requests=3), # Optional +) + +# This message handler is not rate limited +@quota_protocol.on_message(ExampleMessage1) +async def handle(ctx: Context, sender: str, msg: ExampleMessage1): + ... + +# This message handler is rate limited with custom window size and request limit +@quota_protocol.on_message( + ExampleMessage2, + rate_limit=RateLimit(window_size_minutes=1, max_requests=3), +) +async def handle(ctx: Context, sender: str, msg: ExampleMessage2): + ... + +# This message handler has access control rules set +@quota_protocol.on_message( + ExampleMessage3, + acl=AccessControlList(default=False, allowed={""}), +) +async def handle(ctx: Context, sender: str, msg: ExampleMessage3): + ... + +agent.include(quota_protocol) +``` + +Tip: The `AccessControlList` object can be used to set access control rules during +runtime. This can be useful for dynamic access control rules based on the state of the +agent or the network. +```python +acl = AccessControlList(default=True) + +@proto.on_message(model=Message, access_control_list=acl) +async def message_handler(ctx: Context, sender: str, msg: Message): + if REASON_TO_BLOCK: + acl.blocked.add(sender) + ctx.logger.info(f"Received message from {sender}: {msg.text}") +``` + + + +## QuotaProtocol Objects + +```python +class QuotaProtocol(Protocol) +``` + + + +#### `__`init`__` + +```python +def __init__(storage_reference: StorageAPI, + name: Optional[str] = None, + version: Optional[str] = None, + default_rate_limit: Optional[RateLimit] = None, + default_acl: Optional[AccessControlList] = None) +``` + +Initialize a QuotaProtocol instance. + +**Arguments**: + +- `storage_reference` _StorageAPI_ - The storage reference to use for rate limiting. +- `name` _Optional[str], optional_ - The name of the protocol. Defaults to None. +- `version` _Optional[str], optional_ - The version of the protocol. Defaults to None. +- `default_rate_limit` _Optional[RateLimit], optional_ - The default rate limit. + Defaults to None. +- `default_acl` _Optional[AccessControlList], optional_ - The access control list. + Defaults to None. + + + +#### on`_`message + +```python +def on_message(model: Type[Model], + replies: Optional[Union[Type[Model], Set[Type[Model]]]] = None, + allow_unverified: Optional[bool] = False, + rate_limit: Optional[RateLimit] = None, + access_control_list: Optional[AccessControlList] = None) +``` + +Overwritten decorator to register a message handler for the protocol +including rate limiting. + +**Arguments**: + +- `model` _Type[Model]_ - The message model type. +- `replies` _Optional[Union[Type[Model], Set[Type[Model]]]], optional_ - The associated + reply types. Defaults to None. +- `allow_unverified` _Optional[bool], optional_ - Whether to allow unverified messages. + Defaults to False. +- `rate_limit` _Optional[RateLimit], optional_ - The rate limit to apply. Defaults to None. +- `access_control_list` _Optional[AccessControlList], optional_ - The access control list to + apply. + + +**Returns**: + +- `Callable` - The decorator to register the message handler. + + + +#### wrap + +```python +def wrap(func: MessageCallback, + rate_limit: Optional[RateLimit] = None, + acl: Optional[AccessControlList] = None) -> MessageCallback +``` + +Decorator to wrap a function with rate limiting. + +**Arguments**: + +- `func` - The function to wrap with rate limiting +- `rate_limit` - The rate limit to apply +- `acl` - The access control list to apply + + +**Returns**: + +- `Callable` - The decorated + + + +#### add`_`request + +```python +def add_request(agent_address: str, function_name: str, + window_size_minutes: int, max_requests: int) -> bool +``` + +Add a request to the rate limiter if the current time is still within the +time window since the beginning of the most recent time window. Otherwise, +reset the time window and add the request. + +**Arguments**: + +- `agent_address` - The address of the agent making the request + + +**Returns**: + + False if the maximum number of requests has been exceeded, True otherwise + diff --git a/python/docs/api/uagents/network.md b/python/docs/api/uagents/network.md index 0cd04585..e16e88a2 100644 --- a/python/docs/api/uagents/network.md +++ b/python/docs/api/uagents/network.md @@ -118,6 +118,21 @@ This class provides methods to interact with the Almanac contract, including checking if an agent is registered, retrieving the expiry height of an agent's registration, and getting the endpoints associated with an agent's registration. + + +#### check`_`version + +```python +def check_version() -> bool +``` + +Check if the contract version supported by this version of uAgents matches the +deployed version. + +**Returns**: + +- `bool` - True if the contract version is supported, False otherwise. + #### query`_`contract @@ -175,6 +190,52 @@ Check if an agent is registered in the Almanac contract. - `bool` - True if the agent is registered, False otherwise. + + +#### registration`_`needs`_`update + +```python +def registration_needs_update(address: str, endpoints: List[AgentEndpoint], + protocols: List[str], + min_seconds_left: int) -> bool +``` + +Check if an agent's registration needs to be updated. + +**Arguments**: + +- `address` _str_ - The agent's address. +- `endpoints` _List[AgentEndpoint]_ - The agent's endpoints. +- `protocols` _List[str]_ - The agent's protocols. +- `min_time_left` _int_ - The minimum time left before the agent's registration expires + + +**Returns**: + +- `bool` - True if the agent's registration needs to be updated or will expire sooner + than the specified minimum time, False otherwise. + + + +#### query`_`agent`_`record + +```python +def query_agent_record( + address: str) -> Tuple[int, List[AgentEndpoint], List[str]] +``` + +Get the records associated with an agent's registration. + +**Arguments**: + +- `address` _str_ - The agent's address. + + +**Returns**: + + Tuple[int, List[AgentEndpoint], List[str]]: The expiry height of the agent's + registration, the agent's endpoints, and the agent's protocols. + #### get`_`expiry @@ -183,7 +244,7 @@ Check if an agent is registered in the Almanac contract. def get_expiry(address: str) -> int ``` -Get the expiry height of an agent's registration. +Get the approximate seconds to expiry of an agent's registration. **Arguments**: @@ -192,7 +253,7 @@ Get the expiry height of an agent's registration. **Returns**: -- `int` - The expiry height of the agent's registration. +- `int` - The approximate seconds to expiry of the agent's registration. @@ -211,14 +272,14 @@ Get the endpoints associated with an agent's registration. **Returns**: -- `List[AgentEndpoint]` - The endpoints associated with the agent's registration. +- `List[AgentEndpoint]` - The agent's registered endpoints. #### get`_`protocols ```python -def get_protocols(address: str) +def get_protocols(address: str) -> List[str] ``` Get the protocols associated with an agent's registration. @@ -230,7 +291,7 @@ Get the protocols associated with an agent's registration. **Returns**: -- `Any` - The protocols associated with the agent's registration. +- `List[str]` - The agent's registered protocols. @@ -239,7 +300,8 @@ Get the protocols associated with an agent's registration. ```python async def register(ledger: LedgerClient, wallet: LocalWallet, agent_address: str, protocols: List[str], - endpoints: List[AgentEndpoint], signature: str) + endpoints: List[AgentEndpoint], signature: str, + current_time: int) ``` Register an agent with the Almanac contract. @@ -253,6 +315,23 @@ Register an agent with the Almanac contract. - `endpoints` _List[Dict[str, Any]]_ - List of endpoint dictionaries. - `signature` _str_ - The agent's signature. + + +#### register`_`batch + +```python +async def register_batch(ledger: LedgerClient, wallet: LocalWallet, + agent_records: List[AlmanacContractRecord]) +``` + +Register multiple agents with the Almanac contract. + +**Arguments**: + +- `ledger` _LedgerClient_ - The Ledger client. +- `wallet` _LocalWallet_ - The wallet of the registration sender. +- `agents` _List[ALmanacContractRecord]_ - The list of signed agent records to register. + #### get`_`sequence @@ -277,7 +356,7 @@ Get the agent's sequence number for Almanac registration. #### get`_`almanac`_`contract ```python -def get_almanac_contract(test: bool = True) -> AlmanacContract +def get_almanac_contract(test: bool = True) -> Optional[AlmanacContract] ``` Get the AlmanacContract instance. @@ -289,7 +368,7 @@ Get the AlmanacContract instance. **Returns**: -- `AlmanacContract` - The AlmanacContract instance. +- `AlmanacContract` - The AlmanacContract instance if version is supported. diff --git a/python/docs/api/uagents/registration.md b/python/docs/api/uagents/registration.md index 1ab829a8..5cc7354c 100644 --- a/python/docs/api/uagents/registration.md +++ b/python/docs/api/uagents/registration.md @@ -24,3 +24,47 @@ def coerce_metadata_to_str( Step through the metadata and convert any non-string values to strings. + + +#### extract`_`geo`_`metadata + +```python +def extract_geo_metadata( + metadata: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]] +``` + +Extract geo-location metadata from the metadata dictionary. + + + +## LedgerBasedRegistrationPolicy Objects + +```python +class LedgerBasedRegistrationPolicy(AgentRegistrationPolicy) +``` + + + +#### check`_`contract`_`version + +```python +def check_contract_version() +``` + +Check the version of the deployed Almanac contract and log a warning +if it is different from the supported version. + + + +#### register + +```python +async def register(agent_address: str, + protocols: List[str], + endpoints: List[AgentEndpoint], + metadata: Optional[Dict[str, Any]] = None) +``` + +Register the agent on the Almanac contract if registration is about to expire or +the registration data has changed. + diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 5e86c1c0..b0eae87a 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -1349,24 +1349,19 @@ class Bureau: This class manages a collection of agents and orchestrates their execution. - Args: - agents (Optional[List[Agent]]): The list of agents to be managed by the bureau. - port (Optional[int]): The port number for the server. - endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): Configuration - for agent endpoints. - Attributes: _loop (asyncio.AbstractEventLoop): The event loop. _agents (List[Agent]): The list of agents to be managed by the bureau. - _registered_agents (List[Agent]): The list of agents contained in the bureau. _endpoints (List[Dict[str, Any]]): The endpoint configuration for the bureau. _port (int): The port on which the bureau's server runs. _queries (Dict[str, asyncio.Future]): Dictionary mapping query senders to their response Futures. _logger (Logger): The logger instance. _server (ASGIServer): The ASGI server instance for handling requests. + _agentverse (Dict[str, str]): The agentverse configuration for the bureau. _use_mailbox (bool): A flag indicating whether mailbox functionality is enabled for any of the agents. + _registration_policy (AgentRegistrationPolicy): The registration policy for the bureau. """ @@ -1393,6 +1388,7 @@ def __init__( endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration. agentverse (Optional[Union[str, Dict[str, str]]]): The agentverse configuration. registration_policy (Optional[BatchRegistrationPolicy]): The registration policy. + ledger (Optional[LedgerClient]): The ledger for the bureau. wallet (Optional[LocalWallet]): The wallet for the bureau (overrides 'seed'). seed (Optional[str]): The seed phrase for the wallet (overridden by 'wallet'). test (Optional[bool]): True if the bureau will register and transact on the testnet. @@ -1539,18 +1535,25 @@ async def run_async(self): await agent.setup() if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None: tasks.append(agent.mailbox_client.run()) - tasks.append(self._schedule_registration()) + self._loop.create_task(self._schedule_registration()) try: await asyncio.gather(*tasks, return_exceptions=True) + except (asyncio.CancelledError, KeyboardInterrupt): + pass finally: - await asyncio.gather( - *[agent._shutdown() for agent in self._agents], return_exceptions=True - ) + for agent in self._agents: + await agent._shutdown() + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + _ = [task.cancel() for task in tasks] + await asyncio.gather(*tasks) def run(self): """ Run the bureau. """ - self._loop.run_until_complete(self.run_async()) + with contextlib.suppress(asyncio.CancelledError, KeyboardInterrupt): + self._loop.run_until_complete(self.run_async()) + self._loop.stop() + self._loop.close()