From 3e44fd3e396e320f0b23079615638ce4a369f91e Mon Sep 17 00:00:00 2001 From: James Riehl Date: Fri, 11 Oct 2024 13:06:17 +0100 Subject: [PATCH 01/15] feat: add batch registration to almanac api --- python/src/uagents/agent.py | 76 ++++++++++++++++++---------- python/src/uagents/registration.py | 80 +++++++++++++++++++++++++++++- python/src/uagents/types.py | 1 + 3 files changed, 129 insertions(+), 28 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 4bdfb464..6f59fb76 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -26,7 +26,6 @@ from uagents.asgi import ASGIServer from uagents.communication import Dispenser from uagents.config import ( - ALMANAC_CONTRACT_VERSION, AVERAGE_BLOCK_INTERVAL, LEDGER_PREFIX, MAINNET_PREFIX, @@ -50,6 +49,8 @@ from uagents.protocol import Protocol from uagents.registration import ( AgentRegistrationPolicy, + BatchAlmanacApiRegistrationPolicy, + BatchRegistrationPolicy, DefaultRegistrationPolicy, ) from uagents.resolver import GlobalResolver, Resolver @@ -756,21 +757,11 @@ async def register(self): if necessary. """ - # Check if the deployed contract version matches the supported version - deployed_version = self._almanac_contract.get_contract_version() - if deployed_version != ALMANAC_CONTRACT_VERSION: - self._logger.warning( - "Mismatch in almanac contract versions: supported (%s), deployed (%s). " - "Update uAgents to the latest version for compatibility.", - ALMANAC_CONTRACT_VERSION, - deployed_version, - ) - await self._registration_policy.register( self.address, list(self.protocols.keys()), self._endpoints, self._metadata ) - async def _registration_loop(self): + async def _schedule_registration(self): """ Execute the registration loop. @@ -784,12 +775,12 @@ async def _registration_loop(self): except InsufficientFundsError: time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL except Exception as ex: - self._logger.exception(f"Failed to register on almanac contract: {ex}") + self._logger.exception(f"Failed to register: {ex}") time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS # schedule the next registration update self._loop.create_task( - _delay(self._registration_loop(), time_until_next_registration) + _delay(self._schedule_registration(), time_until_next_registration) ) def on_interval( @@ -1061,18 +1052,18 @@ async def handle_rest( return await handler(*args) # type: ignore - async def _startup(self): + async def _startup(self, start_registration_loop: bool = True): """ Perform startup actions. """ - if self._endpoints: - await self._registration_loop() - - else: - self._logger.warning( - "No endpoints provided. Skipping registration: Agent won't be reachable." - ) + if start_registration_loop: + if self._endpoints: + await self._schedule_registration() + else: + self._logger.warning( + "No endpoints provided. Skipping registration: Agent won't be reachable." + ) for handler in self._on_startup: try: ctx = self._build_context() @@ -1100,13 +1091,13 @@ async def _shutdown(self): except Exception as ex: self._logger.exception(f"Exception in shutdown handler: {ex}") - async def setup(self): + async def setup(self, start_registration_loop: bool = True): """ Include the internal agent protocol, run startup tasks, and start background tasks. """ self.include(self._protocol) self.start_message_dispenser() - await self._startup() + await self._startup(start_registration_loop) self.start_message_receivers() self.start_interval_tasks() @@ -1329,6 +1320,8 @@ def __init__( agents: Optional[List[Agent]] = None, port: Optional[int] = None, endpoint: Optional[Union[str, List[str], Dict[str, dict]]] = None, + agentverse: Optional[Union[str, Dict[str, str]]] = None, + registration_policy: Optional[BatchRegistrationPolicy] = None, loop: Optional[asyncio.AbstractEventLoop] = None, log_level: Union[int, str] = logging.INFO, ): @@ -1352,7 +1345,16 @@ def __init__( queries=self._queries, logger=self._logger, ) - self._use_mailbox = False + self._agentverse = parse_agentverse_config(agentverse) + self._use_mailbox = self._agentverse["use_mailbox"] + almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac" + self._registration_policy = ( + registration_policy + or BatchAlmanacApiRegistrationPolicy( + almanac_api=almanac_api_url, + logger=self._logger, + ) + ) if agents is not None: for agent in agents: @@ -1374,9 +1376,30 @@ def add(self, agent: Agent): self._use_mailbox = True else: agent.update_endpoints(self._endpoints) + + self._registration_policy.add_agent(agent) self._server._rest_handler_map.update(agent._server._rest_handler_map) self._agents.append(agent) + async def _schedule_registration(self): + """ + Start the batch registration loop. + + """ + time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS + try: + await self._registration_policy.register() + except InsufficientFundsError: + time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL + except Exception as ex: + self._logger.exception(f"Failed to register: {ex}") + time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS + + # schedule the next registration update + self._loop.create_task( + _delay(self._schedule_registration(), time_to_next_registration) + ) + async def run_async(self): """ Run the agents managed by the bureau. @@ -1384,9 +1407,10 @@ async def run_async(self): """ tasks = [self._server.serve()] for agent in self._agents: - await agent.setup() + await agent.setup(start_registration_loop=False) if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None: tasks.append(agent.mailbox_client.run()) + tasks.append(self._schedule_registration()) try: await asyncio.gather(*tasks) diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index 5f210b31..1d3e14a7 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -15,6 +15,7 @@ ALMANAC_API_MAX_RETRIES, ALMANAC_API_TIMEOUT_SECONDS, ALMANAC_API_URL, + ALMANAC_CONTRACT_VERSION, REGISTRATION_FEE, REGISTRATION_UPDATE_INTERVAL_SECONDS, ) @@ -43,6 +44,13 @@ async def register( pass +class BatchRegistrationPolicy(ABC): + @abstractmethod + # pylint: disable=unnecessary-pass + async def register(self): + pass + + class AgentRegistrationAttestation(BaseModel): agent_address: str protocols: List[str] @@ -133,6 +141,56 @@ async def register( await asyncio.sleep(generate_backoff_time(retry)) +class BatchAlmanacApiRegistrationPolicy(AgentRegistrationPolicy): + def __init__( + self, almanac_api: Optional[str] = None, logger: Optional[logging.Logger] = None + ): + self._almanac_api = almanac_api or ALMANAC_API_URL + self._attestations: List[AgentRegistrationAttestation] = [] + self._logger = logger or logging.getLogger(__name__) + + def add_agent(self, agent: Any): + attestation = AgentRegistrationAttestation( + agent_address=agent.address, + protocols=list(agent.protocols.keys()), + endpoints=agent._endpoints, + metadata=agent.metadata, + ) + attestation.sign(agent._identity) + self._attestations.append(attestation) + + async def register(self): + if not self._attestations: + return + + attestations = [a.model_dump() for a in self._attestations] + + async with aiohttp.ClientSession() as session: + for retry in range(ALMANAC_API_MAX_RETRIES): + try: + async with session.post( + f"{self._almanac_api}/agents/batch", + headers={"content-type": "application/json"}, + data=json.dumps(attestations), + timeout=aiohttp.ClientTimeout( + total=ALMANAC_API_TIMEOUT_SECONDS + ), + ) as resp: + resp.raise_for_status() + self._logger.info( + "Batch registration on Almanac API successful" + ) + return + except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: + if retry == ALMANAC_API_MAX_RETRIES - 1: + raise e + time_to_retry = generate_backoff_time(retry) + self._logger.debug( + f"Batch registration failed. Retrying in {time_to_retry} seconds..." + ) + await asyncio.sleep(time_to_retry) + + class LedgerBasedRegistrationPolicy(AgentRegistrationPolicy): def __init__( self, @@ -151,6 +209,20 @@ def __init__( self._almanac_contract = almanac_contract self._logger = logger or logging.getLogger(__name__) + def check_contract_version(self): + """ + Check the version of the deployed Almanac contract and log a warning + if it is different from the supported version. + """ + deployed_version = self._almanac_contract.get_contract_version() + if deployed_version != ALMANAC_CONTRACT_VERSION: + self._logger.warning( + "Mismatch in almanac contract versions: supported (%s), deployed (%s). " + "Update uAgents to the latest version for compatibility.", + ALMANAC_CONTRACT_VERSION, + deployed_version, + ) + async def register( self, agent_address: str, @@ -158,8 +230,12 @@ async def register( endpoints: List[AgentEndpoint], metadata: Optional[Dict[str, Any]] = None, ): - # register if not yet registered or registration is about to expire - # or anything has changed from the last registration + """ + Register the agent on the Almanac contract if registration is about to expire or + the registration data has changed. + """ + self.check_contract_version() + if ( not self._almanac_contract.is_registered(agent_address) or self._almanac_contract.get_expiry(agent_address) diff --git a/python/src/uagents/types.py b/python/src/uagents/types.py index 4d1f8e08..04844e42 100644 --- a/python/src/uagents/types.py +++ b/python/src/uagents/types.py @@ -47,6 +47,7 @@ class AgentInfo(BaseModel): agent_address: str endpoints: List[AgentEndpoint] protocols: List[str] + metadata: Optional[Dict[str, Any]] = None class RestHandlerDetails(BaseModel): From 312f8478017ffb45ee043c173b3e9e608332eefa Mon Sep 17 00:00:00 2001 From: James Riehl Date: Fri, 11 Oct 2024 13:22:14 +0100 Subject: [PATCH 02/15] chore: add update agent function to bureau --- python/src/uagents/agent.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 6f59fb76..32f910b3 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -1360,25 +1360,37 @@ def __init__( for agent in agents: self.add(agent) - def add(self, agent: Agent): + def _update_agent(self, agent: Agent): """ - Add an agent to the bureau. + Update the agent to be taken over by the Bureau. Args: - agent (Agent): The agent to be added. + agent (Agent): The agent to be updated. """ - if agent in self._agents: - return agent.update_loop(self._loop) agent.update_queries(self._queries) if agent.agentverse["use_mailbox"]: self._use_mailbox = True else: agent.update_endpoints(self._endpoints) + self._server._rest_handler_map.update(agent._server._rest_handler_map) + + agent._agentverse = self._agentverse + agent._logger.setLevel(self._logger.level) + + def add(self, agent: Agent): + """ + Add an agent to the bureau. + + Args: + agent (Agent): The agent to be added. + """ + if agent in self._agents: + return + self._update_agent(agent) self._registration_policy.add_agent(agent) - self._server._rest_handler_map.update(agent._server._rest_handler_map) self._agents.append(agent) async def _schedule_registration(self): From fd41b955031d48163759ace5782ed96645e50286 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Mon, 14 Oct 2024 16:29:50 +0100 Subject: [PATCH 03/15] feat: add ledger batch registrations --- python/examples/05-send-msg/main.py | 12 +++- python/src/uagents/network.py | 94 ++++++++++++++++++++--------- python/src/uagents/registration.py | 61 ++++++++++++++++++- 3 files changed, 136 insertions(+), 31 deletions(-) diff --git a/python/examples/05-send-msg/main.py b/python/examples/05-send-msg/main.py index 8a958931..ffe5ff67 100644 --- a/python/examples/05-send-msg/main.py +++ b/python/examples/05-send-msg/main.py @@ -5,8 +5,10 @@ class Message(Model): text: str -alice = Agent(name="alice", seed="alice recovery phrase") -bob = Agent(name="bob", seed="bob recovery phrase") +alice = Agent( + name="alice", seed="alice recovery phrase", agentverse="http://localhost:8001" +) +bob = Agent(name="bob", seed="bob recovery phrase", agentverse="http://localhost:8001") @alice.on_interval(period=2.0) @@ -20,7 +22,11 @@ async def message_handler(ctx: Context, sender: str, msg: Message): ctx.logger.info(f"Received message from {sender}: {msg.text}") -bureau = Bureau() +bureau = Bureau( + endpoint="http://localhost:8000/submit", + log_level="DEBUG", + agentverse="http://localhost:8001", +) bureau.add(alice) bureau.add(bob) diff --git a/python/src/uagents/network.py b/python/src/uagents/network.py index 451ce851..f6793129 100644 --- a/python/src/uagents/network.py +++ b/python/src/uagents/network.py @@ -2,7 +2,7 @@ import asyncio from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union from cosmpy.aerial.client import ( DEFAULT_QUERY_INTERVAL_SECS, @@ -199,52 +199,98 @@ def is_registered(self, address: str) -> bool: return bool(response.get("record")) - def get_expiry(self, address: str) -> int: + def registration_needs_update( + self, + address: str, + endpoints: List[AgentEndpoint], + protocols: List[str], + min_seconds_left: int, + ) -> bool: + """ + Check if an agent's registration needs to be updated. + + Args: + address (str): The agent's address. + endpoints (List[AgentEndpoint]): The agent's endpoints. + protocols (List[str]): The agent's protocols. + min_time_left (int): The minimum time left before the agent's registration expires + + Returns: + bool: True if the agent's registration needs to be updated or will expire sooner + than the specified minimum time, False otherwise. + """ + seconds_to_expiry, registered_endpoints, registered_protocols = ( + self.query_agent_record(address) + ) + return ( + not self.is_registered(address) + or seconds_to_expiry < min_seconds_left + or endpoints != registered_endpoints + or protocols != registered_protocols + ) + + def query_agent_record( + self, address: str + ) -> Tuple[int, List[AgentEndpoint], List[str]]: """ - Get the expiry height of an agent's registration. + Get the records associated with an agent's registration. Args: address (str): The agent's address. Returns: - int: The expiry height of the agent's registration. + Tuple[int, List[AgentEndpoint], List[str]]: The expiry height of the agent's + registration, the agent's endpoints, and the agent's protocols. """ query_msg = {"query_records": {"agent_address": address}} response = self.query_contract(query_msg) + if not response.get("record"): + return [] + if not response.get("record"): contract_state = self.query_contract({"query_contract_state": {}}) expiry = contract_state.get("state", {}).get("expiry_height", 0) return expiry * AVERAGE_BLOCK_INTERVAL - expiry = response["record"][0].get("expiry", 0) - height = response.get("height", 0) + expiry_block = response["record"][0].get("expiry", 0) + current_block = response.get("height", 0) - return (expiry - height) * AVERAGE_BLOCK_INTERVAL + seconds_to_expiry = (expiry_block - current_block) * AVERAGE_BLOCK_INTERVAL - def get_endpoints(self, address: str) -> List[AgentEndpoint]: + endpoints = [] + for endpoint in response["record"][0]["record"]["service"]["endpoints"]: + endpoints.append(AgentEndpoint.model_validate(endpoint)) + + protocols = response["record"][0]["record"]["service"]["protocols"] + + return seconds_to_expiry, endpoints, protocols + + def get_expiry(self, address: str) -> int: """ - Get the endpoints associated with an agent's registration. + Get the approximate seconds to expiry of an agent's registration. Args: address (str): The agent's address. Returns: - List[AgentEndpoint]: The endpoints associated with the agent's registration. + int: The approximate seconds to expiry of the agent's registration. """ - query_msg = {"query_records": {"agent_address": address}} - response = self.query_contract(query_msg) + return self.query_agent_record(address)[0] - if not response.get("record"): - return [] + def get_endpoints(self, address: str) -> List[AgentEndpoint]: + """ + Get the endpoints associated with an agent's registration. - endpoints = [] - for endpoint in response["record"][0]["record"]["service"]["endpoints"]: - endpoints.append(AgentEndpoint.model_validate(endpoint)) + Args: + address (str): The agent's address. - return endpoints + Returns: + List[AgentEndpoint]: The agent's registered endpoints. + """ + return self.query_agent_record(address)[1] - def get_protocols(self, address: str): + def get_protocols(self, address: str) -> List[str]: """ Get the protocols associated with an agent's registration. @@ -252,15 +298,9 @@ def get_protocols(self, address: str): address (str): The agent's address. Returns: - Any: The protocols associated with the agent's registration. + List[str]: The agent's registered protocols. """ - query_msg = {"query_records": {"agent_address": address}} - response = self.query_contract(query_msg) - - if not response.get("record"): - return None - - return response["record"][0]["record"]["service"]["protocols"] + return self.query_agent_record(address)[2] def get_registration_msg( self, diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index 1d3e14a7..2a88f2d0 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -3,7 +3,7 @@ import json import logging from abc import ABC, abstractmethod -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional import aiohttp from cosmpy.aerial.client import LedgerClient @@ -88,6 +88,13 @@ def _build_digest(self) -> bytes: return sha256.digest() +class AgentLedgerRegistrationDetails(BaseModel): + agent_address: str + protocols: List[str] + endpoints: List[AgentEndpoint] + signer: Callable[[], str] + + class AlmanacApiRegistrationPolicy(AgentRegistrationPolicy): def __init__( self, @@ -294,6 +301,58 @@ def _sign_registration(self, agent_address: str) -> str: ) +class BatchLedgerRegistrationPolicy(BatchRegistrationPolicy): + def __init__( + self, + ledger: LedgerClient, + wallet: LocalWallet, + almanac_contract: AlmanacContract, + testnet: bool, + logger: Optional[logging.Logger] = None, + ): + self._ledger = ledger + self._wallet = wallet + self._almanac_contract = almanac_contract + self._testnet = testnet + self._logger = logger or logging.getLogger(__name__) + self._agents: List[AgentLedgerRegistrationDetails] = [] + + def add_agent(self, agent: Any): + agent_details = AgentLedgerRegistrationDetails( + agent_address=agent.address, + protocols=list(agent.protocols.keys()), + endpoints=agent._endpoints, + sign_registration=agent._identity.sign_registration, + ) + self._agents.append(agent_details) + + def _get_balance(self) -> int: + return self._ledger.query_bank_balance(Address(self._wallet.address())) + + async def register(self): + self._logger.info("Registering agents on Almanac contract...") + for agent in self._agents: + if self._almanac_contract.registration_needs_update( + agent.agent_address, + agent.protocols, + agent.endpoints, + REGISTRATION_UPDATE_INTERVAL_SECONDS, + ): + signature = agent.sign_registration(agent.agent_address) + await self._almanac_contract.register( + self._ledger, + self._wallet, + agent.agent_address, + agent.protocols, + agent.endpoints, + signature, + ) + else: + self._logger.info( + f"Agent {agent.agent_address} registration is up to date!" + ) + + class DefaultRegistrationPolicy(AgentRegistrationPolicy): def __init__( self, From 71e3477a0c0ca1ea1547fb56c81dd37dd6d40c64 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Mon, 21 Oct 2024 11:52:46 +0100 Subject: [PATCH 04/15] feat: init batch tx --- python/src/uagents/network.py | 45 +++++++++++++++++++++++++++++- python/src/uagents/registration.py | 24 +++------------- 2 files changed, 48 insertions(+), 21 deletions(-) diff --git a/python/src/uagents/network.py b/python/src/uagents/network.py index f6793129..c511dd19 100644 --- a/python/src/uagents/network.py +++ b/python/src/uagents/network.py @@ -29,7 +29,7 @@ TESTNET_CONTRACT_ALMANAC, TESTNET_CONTRACT_NAME_SERVICE, ) -from uagents.types import AgentEndpoint +from uagents.types import AgentEndpoint, AgentInfo from uagents.utils import get_logger logger = get_logger("network") @@ -372,6 +372,49 @@ async def register( ) await wait_for_tx_to_complete(transaction.tx_hash, ledger) + async def register_batch( + self, + ledger: LedgerClient, + wallet: LocalWallet, + agents: List[AgentInfo], + ): + """ + Register multiple agents with the Almanac contract. + + Args: + ledger (LedgerClient): The Ledger client. + wallet (LocalWallet): The wallet of the agent. + agents (List[AgentInfo]): The list of agents to register. + """ + if not self.address: + raise ValueError("Contract address not set") + + transaction = Transaction() + + for agent in agents: + sequence = self.get_sequence(agent.agent_address) + almanac_msg = self.get_registration_msg( + protocols=agent.protocols, + endpoints=agent.endpoints, + signature=agent.metadata.get("signature"), + sequence=sequence, + address=agent.agent_address, + ) + + transaction.add_message( + create_cosmwasm_execute_msg( + wallet.address(), + self.address, + almanac_msg, + funds=f"{REGISTRATION_FEE}{REGISTRATION_DENOM}", + ) + ) + + transaction = prepare_and_broadcast_basic_transaction( + ledger, transaction, wallet + ) + await wait_for_tx_to_complete(transaction.tx_hash, ledger) + def get_sequence(self, address: str) -> int: """ Get the agent's sequence number for Almanac registration. diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index 2a88f2d0..56c6cf63 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -331,26 +331,10 @@ def _get_balance(self) -> int: async def register(self): self._logger.info("Registering agents on Almanac contract...") - for agent in self._agents: - if self._almanac_contract.registration_needs_update( - agent.agent_address, - agent.protocols, - agent.endpoints, - REGISTRATION_UPDATE_INTERVAL_SECONDS, - ): - signature = agent.sign_registration(agent.agent_address) - await self._almanac_contract.register( - self._ledger, - self._wallet, - agent.agent_address, - agent.protocols, - agent.endpoints, - signature, - ) - else: - self._logger.info( - f"Agent {agent.agent_address} registration is up to date!" - ) + + # transaction = Transaction() + + self._almanac_contract.register_batch(self._ledger, self._wallet, self._agents) class DefaultRegistrationPolicy(AgentRegistrationPolicy): From 4a954d61fcd50c92897376e673c99403a4ea40d0 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Mon, 28 Oct 2024 17:09:19 +0000 Subject: [PATCH 05/15] feat: working batch ledger registration --- python/src/uagents/agent.py | 117 ++++++++++++++++++++------ python/src/uagents/network.py | 41 ++++++--- python/src/uagents/registration.py | 131 +++++++++++++++++++++++------ 3 files changed, 227 insertions(+), 62 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index aa4e0179..fb3e892e 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -50,9 +50,11 @@ from uagents.registration import ( AgentRegistrationPolicy, AgentStatusUpdate, - BatchAlmanacApiRegistrationPolicy, + BatchLedgerRegistrationPolicy, BatchRegistrationPolicy, + DefaultBatchRegistrationPolicy, DefaultRegistrationPolicy, + LedgerBasedRegistrationPolicy, update_agent_status, ) from uagents.resolver import GlobalResolver, Resolver @@ -374,15 +376,18 @@ def __init__( self._on_shutdown = [] self._test = test self._version = version or "0.1.0" - self._registration_policy = registration_policy or DefaultRegistrationPolicy( - self._identity, - self._ledger, - self._wallet, - self._almanac_contract, - self._test, - logger=self._logger, - almanac_api=self._almanac_api_url, - ) + self._registration_policy = registration_policy or None + + if self._registration_policy is None: + self._registration_policy = DefaultRegistrationPolicy( + self._identity, + self._ledger, + self._wallet, + self._almanac_contract, + self._test, + logger=self._logger, + almanac_api=self._almanac_api_url, + ) self._metadata = self._initialize_metadata(metadata) self.initialize_wallet_messaging(enable_wallet_messaging) @@ -644,6 +649,21 @@ def balance(self) -> int: return self.ledger.query_bank_balance(Address(self.wallet.address())) + @property + def info(self) -> AgentInfo: + """ + Get basic information about the agent. + + Returns: + AgentInfo: The agent's address, endpoints, protocols, and metadata. + """ + return AgentInfo( + agent_address=self.address, + endpoints=self._endpoints, + protocols=list(self.protocols.keys()), + metadata=self.metadata, + ) + @property def metadata(self) -> Dict[str, Any]: """ @@ -701,7 +721,9 @@ def sign_digest(self, digest: bytes) -> str: """ return self._identity.sign_digest(digest) - def sign_registration(self, current_time: int) -> str: + def sign_registration( + self, current_time: int, sender_address: Optional[str] = None + ) -> str: """ Sign the registration data for Almanac contract. Returns: @@ -709,11 +731,13 @@ def sign_registration(self, current_time: int) -> str: Raises: AssertionError: If the Almanac contract address is None. """ + sender_address = sender_address or str(self.wallet.address()) + assert self._almanac_contract.address is not None return self._identity.sign_registration( str(self._almanac_contract.address), current_time, - str(self.wallet.address()), + sender_address, ) def update_endpoints(self, endpoints: List[AgentEndpoint]): @@ -1061,12 +1085,12 @@ async def handle_rest( return await handler(*args) # type: ignore - async def _startup(self, start_registration_loop: bool = True): + async def _startup(self): """ Perform startup actions. """ - if start_registration_loop: + if self._registration_policy: if self._endpoints: await self._schedule_registration() else: @@ -1107,13 +1131,13 @@ async def _shutdown(self): except Exception as ex: self._logger.exception(f"Exception in shutdown handler: {ex}") - async def setup(self, start_registration_loop: bool = True): + async def setup(self): """ Include the internal agent protocol, run startup tasks, and start background tasks. """ self.include(self._protocol) self.start_message_dispenser() - await self._startup(start_registration_loop) + await self._startup() self.start_message_receivers() self.start_interval_tasks() @@ -1343,6 +1367,9 @@ def __init__( endpoint: Optional[Union[str, List[str], Dict[str, dict]]] = None, agentverse: Optional[Union[str, Dict[str, str]]] = None, registration_policy: Optional[BatchRegistrationPolicy] = None, + ledger: Optional[LedgerClient] = None, + wallet: Optional[LocalWallet] = None, + test: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None, log_level: Union[int, str] = logging.INFO, ): @@ -1350,9 +1377,15 @@ def __init__( Initialize a Bureau instance. Args: - port (Optional[int]): The port on which the bureau's server will run. - endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration - for the bureau. + agents (Optional[List[Agent]]): The list of agents to be managed by the bureau. + port (Optional[int]): The port number for the server. + endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration. + agentverse (Optional[Union[str, Dict[str, str]]]): The agentverse configuration. + registration_policy (Optional[BatchRegistrationPolicy]): The registration policy. + wallet (Optional[LocalWallet]): The wallet for the bureau. + test (Optional[bool]): True if the bureau will register and transact on the testnet. + loop (Optional[asyncio.AbstractEventLoop]): The event loop. + log_level (Union[int, str]): The logging level for the bureau. """ self._loop = loop or asyncio.get_event_loop_policy().get_event_loop() self._agents: List[Agent] = [] @@ -1369,13 +1402,27 @@ def __init__( self._agentverse = parse_agentverse_config(agentverse) self._use_mailbox = self._agentverse["use_mailbox"] almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac" - self._registration_policy = ( - registration_policy - or BatchAlmanacApiRegistrationPolicy( - almanac_api=almanac_api_url, + almanac_contract = get_almanac_contract(test) + + if registration_policy is not None: + if ( + isinstance(registration_policy, BatchLedgerRegistrationPolicy) + and wallet is None + ): + raise ValueError( + "Argument 'wallet' must be provided when using " + "the batch ledger registration policy." + ) + self._registration_policy = registration_policy + else: + self._registration_policy = DefaultBatchRegistrationPolicy( + ledger or get_ledger(test), + wallet, + almanac_contract, + test, logger=self._logger, + almanac_api=almanac_api_url, ) - ) if agents is not None: for agent in agents: @@ -1397,6 +1444,24 @@ def _update_agent(self, agent: Agent): agent.update_endpoints(self._endpoints) self._server._rest_handler_map.update(agent._server._rest_handler_map) + # Run the batch Almanac API registration by default and only run the agent's + # ledger registration if the Bureau is not using a batch ledger registration + # policy because it has no wallet address. + agent._registration_policy = None + if ( + isinstance(self._registration_policy, DefaultBatchRegistrationPolicy) + and self._registration_policy._ledger_policy is None + and agent._almanac_contract is not None + ): + agent._registration_policy = LedgerBasedRegistrationPolicy( + agent._identity, + agent._ledger, + agent._wallet, + agent._almanac_contract, + agent._test, + logger=agent._logger, + ) + agent._agentverse = self._agentverse agent._logger.setLevel(self._logger.level) @@ -1410,8 +1475,8 @@ def add(self, agent: Agent): """ if agent in self._agents: return + self._registration_policy.add_agent(agent.info, agent._identity) self._update_agent(agent) - self._registration_policy.add_agent(agent) self._agents.append(agent) async def _schedule_registration(self): @@ -1440,7 +1505,7 @@ async def run_async(self): """ tasks = [self._server.serve()] for agent in self._agents: - await agent.setup(start_registration_loop=False) + await agent.setup() if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None: tasks.append(agent.mailbox_client.run()) tasks.append(self._schedule_registration()) diff --git a/python/src/uagents/network.py b/python/src/uagents/network.py index a0396c26..51dd25a6 100644 --- a/python/src/uagents/network.py +++ b/python/src/uagents/network.py @@ -1,6 +1,7 @@ """Network and Contracts.""" import asyncio +import time from datetime import datetime, timedelta from typing import Any, Dict, List, Optional, Tuple, Union @@ -22,6 +23,7 @@ from uagents.config import ( ALMANAC_CONTRACT_VERSION, + ALMANAC_REGISTRATION_WAIT, AVERAGE_BLOCK_INTERVAL, MAINNET_CONTRACT_ALMANAC, MAINNET_CONTRACT_NAME_SERVICE, @@ -30,6 +32,7 @@ TESTNET_CONTRACT_ALMANAC, TESTNET_CONTRACT_NAME_SERVICE, ) +from uagents.crypto import Identity from uagents.types import AgentEndpoint, AgentInfo from uagents.utils import get_logger @@ -45,6 +48,19 @@ class InsufficientFundsError(Exception): """Raised when an agent has insufficient funds for a transaction.""" +class AlmanacContractRecord(AgentInfo): + contract_address: str + sender_address: str + timestamp: Optional[int] = None + signature: Optional[str] = None + + def sign(self, identity: Identity): + self.timestamp = int(time.time()) - ALMANAC_REGISTRATION_WAIT + self.signature = identity.sign_registration( + self.contract_address, self.timestamp, self.sender_address + ) + + def get_ledger(test: bool = True) -> LedgerClient: """ Get the Ledger client. @@ -401,29 +417,34 @@ async def register_batch( self, ledger: LedgerClient, wallet: LocalWallet, - agents: List[AgentInfo], + agent_records: List[AlmanacContractRecord], ): """ Register multiple agents with the Almanac contract. Args: ledger (LedgerClient): The Ledger client. - wallet (LocalWallet): The wallet of the agent. - agents (List[AgentInfo]): The list of agents to register. + wallet (LocalWallet): The wallet of the registration sender. + agents (List[ALmanacContractRecord]): The list of signed agent records to register. """ if not self.address: raise ValueError("Contract address not set") transaction = Transaction() - for agent in agents: - sequence = self.get_sequence(agent.agent_address) + for record in agent_records: + if record.timestamp is None: + raise ValueError("Agent record is missing timestamp") + + if record.signature is None: + raise ValueError("Agent record is not signed") + almanac_msg = self.get_registration_msg( - protocols=agent.protocols, - endpoints=agent.endpoints, - signature=agent.metadata.get("signature"), - sequence=sequence, - address=agent.agent_address, + protocols=record.protocols, + endpoints=record.endpoints, + signature=record.signature, + sequence=record.timestamp, + address=record.agent_address, ) transaction.add_message( diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index a6be6950..2e632895 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -4,7 +4,7 @@ import logging import time from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Union import aiohttp from cosmpy.aerial.client import LedgerClient @@ -22,8 +22,13 @@ REGISTRATION_UPDATE_INTERVAL_SECONDS, ) from uagents.crypto import Identity -from uagents.network import AlmanacContract, InsufficientFundsError, add_testnet_funds -from uagents.types import AgentEndpoint +from uagents.network import ( + AlmanacContract, + AlmanacContractRecord, + InsufficientFundsError, + add_testnet_funds, +) +from uagents.types import AgentEndpoint, AgentInfo class VerifiableModel(BaseModel): @@ -131,12 +136,9 @@ class BatchRegistrationPolicy(ABC): async def register(self): pass - -class AgentLedgerRegistrationDetails(BaseModel): - agent_address: str - protocols: List[str] - endpoints: List[AgentEndpoint] - signer: Callable[[], str] + @abstractmethod + def add_agent(self, agent_info: AgentInfo, identity: Identity): + pass class AlmanacApiRegistrationPolicy(AgentRegistrationPolicy): @@ -192,14 +194,14 @@ def __init__( self._attestations: List[AgentRegistrationAttestation] = [] self._logger = logger or logging.getLogger(__name__) - def add_agent(self, agent: Any): + def add_agent(self, agent_info: AgentInfo, identity: Identity): attestation = AgentRegistrationAttestation( - agent_address=agent.address, - protocols=list(agent.protocols.keys()), - endpoints=agent._endpoints, - metadata=agent.metadata, + agent_address=agent_info.agent_address, + protocols=list(agent_info.protocols), + endpoints=agent_info.endpoints, + metadata=agent_info.metadata, ) - attestation.sign(agent._identity) + attestation.sign(identity) self._attestations.append(attestation) async def register(self): @@ -355,24 +357,48 @@ def __init__( self._almanac_contract = almanac_contract self._testnet = testnet self._logger = logger or logging.getLogger(__name__) - self._agents: List[AgentLedgerRegistrationDetails] = [] - - def add_agent(self, agent: Any): - agent_details = AgentLedgerRegistrationDetails( - agent_address=agent.address, - protocols=list(agent.protocols.keys()), - endpoints=agent._endpoints, - sign_registration=agent._identity.sign_registration, + self._records: List[AlmanacContractRecord] = [] + self._identities: Dict[str, Identity] = {} + + def add_agent(self, agent_info: AgentInfo, identity: Identity): + agent_record = AlmanacContractRecord( + agent_address=agent_info.agent_address, + protocols=agent_info.protocols, + endpoints=agent_info.endpoints, + metadata=agent_info.metadata, + contract_address=str(self._almanac_contract.address), + sender_address=str(self._wallet.address()), ) - self._agents.append(agent_details) + self._records.append(agent_record) + self._identities[agent_info.agent_address] = identity def _get_balance(self) -> int: return self._ledger.query_bank_balance(Address(self._wallet.address())) async def register(self): self._logger.info("Registering agents on Almanac contract...") + for record in self._records: + record.sign(self._identities[record.agent_address]) + + if self._get_balance() < REGISTRATION_FEE * len(self._records): + self._logger.warning( + f"I do not have enough funds to register {len(self._records)} " + "agents on Almanac contract" + ) + if self._testnet: + add_testnet_funds(str(self._wallet.address())) + self._logger.info(f"Adding testnet funds to {self._wallet.address()}") + else: + self._logger.info( + f"Send funds to wallet address: {self._wallet.address()}" + ) + raise InsufficientFundsError() + + await self._almanac_contract.register_batch( + self._ledger, self._wallet, self._records + ) - self._almanac_contract.register_batch(self._ledger, self._wallet, self._agents) + self._logger.info("Registering agents on Almanac contract...complete") class DefaultRegistrationPolicy(AgentRegistrationPolicy): @@ -381,7 +407,7 @@ def __init__( identity: Identity, ledger: LedgerClient, wallet: LocalWallet, - almanac_contract: AlmanacContract, + almanac_contract: Optional[AlmanacContract], testnet: bool, *, logger: Optional[logging.Logger] = None, @@ -439,3 +465,56 @@ async def update_agent_status(status: AgentStatusUpdate, almanac_api: str): status, raise_from=False, ) + + +class DefaultBatchRegistrationPolicy(BatchRegistrationPolicy): + def __init__( + self, + ledger: LedgerClient, + wallet: Optional[LocalWallet] = None, + almanac_contract: Optional[AlmanacContract] = None, + testnet: bool = True, + *, + logger: Optional[logging.Logger] = None, + almanac_api: Optional[str] = None, + ): + self._logger = logger or logging.getLogger(__name__) + self._api_policy = BatchAlmanacApiRegistrationPolicy( + almanac_api=almanac_api, logger=logger + ) + + if almanac_contract is None or wallet is None: + self._ledger_policy = None + else: + self._ledger_policy = BatchLedgerRegistrationPolicy( + ledger, wallet, almanac_contract, testnet, logger=logger + ) + + def add_agent(self, agent_info: AgentInfo, identity: Identity): + self._api_policy.add_agent(agent_info, identity) + if self._ledger_policy is not None: + self._ledger_policy.add_agent(agent_info, identity) + + async def register(self): + # prefer the API registration policy as it is faster + try: + await self._api_policy.register() + except Exception as e: + self._logger.warning( + f"Failed to batch register on Almanac API: {e.__class__.__name__}" + ) + + if self._ledger_policy is None: + return + + # schedule the ledger registration + try: + await self._ledger_policy.register() + except InsufficientFundsError: + self._logger.warning( + "Failed to batch register on Almanac contract due to insufficient funds" + ) + raise + except Exception as e: + self._logger.error(f"Failed to batch register on Almanac contract: {e}") + raise From 7765c8d4f33d6cbb7d3b8764a6a4bba6f3112235 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Mon, 28 Oct 2024 17:18:17 +0000 Subject: [PATCH 06/15] chore: revert example changes --- python/examples/05-send-msg/main.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/python/examples/05-send-msg/main.py b/python/examples/05-send-msg/main.py index ffe5ff67..8a958931 100644 --- a/python/examples/05-send-msg/main.py +++ b/python/examples/05-send-msg/main.py @@ -5,10 +5,8 @@ class Message(Model): text: str -alice = Agent( - name="alice", seed="alice recovery phrase", agentverse="http://localhost:8001" -) -bob = Agent(name="bob", seed="bob recovery phrase", agentverse="http://localhost:8001") +alice = Agent(name="alice", seed="alice recovery phrase") +bob = Agent(name="bob", seed="bob recovery phrase") @alice.on_interval(period=2.0) @@ -22,11 +20,7 @@ async def message_handler(ctx: Context, sender: str, msg: Message): ctx.logger.info(f"Received message from {sender}: {msg.text}") -bureau = Bureau( - endpoint="http://localhost:8000/submit", - log_level="DEBUG", - agentverse="http://localhost:8001", -) +bureau = Bureau() bureau.add(alice) bureau.add(bob) From 54f2a9e756d64833569fe087b6641cbf943097fd Mon Sep 17 00:00:00 2001 From: James Riehl Date: Tue, 29 Oct 2024 12:17:14 +0000 Subject: [PATCH 07/15] fix: do not schedule reg if no agent has endpoints --- python/src/uagents/agent.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index fb3e892e..cdf98375 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -748,7 +748,6 @@ def update_endpoints(self, endpoints: List[AgentEndpoint]): endpoints (List[AgentEndpoint]): List of endpoint dictionaries. """ - self._endpoints = endpoints def update_loop(self, loop): @@ -1441,6 +1440,11 @@ def _update_agent(self, agent: Agent): if agent.agentverse["use_mailbox"]: self._use_mailbox = True else: + if agent._endpoints: + self._logger.warning( + f"Overwriting the agent's endpoints {agent._endpoints} " + f"with the Bureau's endpoints {self._endpoints}." + ) agent.update_endpoints(self._endpoints) self._server._rest_handler_map.update(agent._server._rest_handler_map) @@ -1484,6 +1488,10 @@ async def _schedule_registration(self): Start the batch registration loop. """ + + if not any(agent._endpoints for agent in self._agents): + return + time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS try: await self._registration_policy.register() From a5b75d7bd9b4ed250ca59db3b08211e9900f9c91 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Tue, 29 Oct 2024 13:12:58 +0000 Subject: [PATCH 08/15] refactor: use almanac api post --- python/src/uagents/registration.py | 45 +++++++++++------------------- 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index 2e632895..847adec1 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -64,6 +64,10 @@ class AgentRegistrationAttestation(VerifiableModel): metadata: Optional[Dict[str, Union[str, Dict[str, str]]]] = None +class AgentRegistrationAttestationBatch(BaseModel): + attestations: List[AgentRegistrationAttestation] + + class AgentStatusUpdate(VerifiableModel): is_active: bool @@ -184,6 +188,8 @@ async def register( ) if success: self._logger.info("Registration on Almanac API successful") + else: + self._logger.warning("Registration on Almanac API failed") class BatchAlmanacApiRegistrationPolicy(AgentRegistrationPolicy): @@ -199,7 +205,7 @@ def add_agent(self, agent_info: AgentInfo, identity: Identity): agent_address=agent_info.agent_address, protocols=list(agent_info.protocols), endpoints=agent_info.endpoints, - metadata=agent_info.metadata, + metadata=coerce_metadata_to_str(agent_info.metadata), ) attestation.sign(identity) self._attestations.append(attestation) @@ -207,33 +213,16 @@ def add_agent(self, agent_info: AgentInfo, identity: Identity): async def register(self): if not self._attestations: return - - attestations = [a.model_dump() for a in self._attestations] - - async with aiohttp.ClientSession() as session: - for retry in range(ALMANAC_API_MAX_RETRIES): - try: - async with session.post( - f"{self._almanac_api}/agents/batch", - headers={"content-type": "application/json"}, - data=json.dumps(attestations), - timeout=aiohttp.ClientTimeout( - total=ALMANAC_API_TIMEOUT_SECONDS - ), - ) as resp: - resp.raise_for_status() - self._logger.info( - "Batch registration on Almanac API successful" - ) - return - except (aiohttp.ClientError, asyncio.exceptions.TimeoutError) as e: - if retry == ALMANAC_API_MAX_RETRIES - 1: - raise e - time_to_retry = generate_backoff_time(retry) - self._logger.debug( - f"Batch registration failed. Retrying in {time_to_retry} seconds..." - ) - await asyncio.sleep(time_to_retry) + attestations = AgentRegistrationAttestationBatch( + attestations=self._attestations + ) + success = await almanac_api_post( + f"{self._almanac_api}/agents/batch", attestations + ) + if success: + self._logger.info("Batch registration on Almanac API successful") + else: + self._logger.warning("Batch registration on Almanac API failed") class LedgerBasedRegistrationPolicy(AgentRegistrationPolicy): From b380dd97b0d3e16279615de76147af5701e39845 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Tue, 29 Oct 2024 13:53:34 +0000 Subject: [PATCH 09/15] chore: doc string update and typing assertions --- python/src/uagents/agent.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index cdf98375..5bad9f5c 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -722,21 +722,28 @@ def sign_digest(self, digest: bytes) -> str: return self._identity.sign_digest(digest) def sign_registration( - self, current_time: int, sender_address: Optional[str] = None + self, timestamp: int, sender_address: Optional[str] = None ) -> str: """ Sign the registration data for Almanac contract. + + Args: + timestamp (int): The timestamp for the registration. + sender_address (Optional[str]): The address of the sender. + Returns: str: The signature of the registration data. + Raises: - AssertionError: If the Almanac contract address is None. + AssertionError: If the Almanac contract is None. """ sender_address = sender_address or str(self.wallet.address()) - assert self._almanac_contract.address is not None + assert self._almanac_contract is not None + return self._identity.sign_registration( str(self._almanac_contract.address), - current_time, + timestamp, sender_address, ) @@ -791,6 +798,8 @@ async def register(self): if necessary. """ + assert self._registration_policy is not None, "Agent has no registration policy" + await self._registration_policy.register( self.address, list(self.protocols.keys()), self._endpoints, self._metadata ) From 55247e788e78cb5ad89ef78b0ab2e80cbaf5efb9 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Tue, 29 Oct 2024 13:58:52 +0000 Subject: [PATCH 10/15] chore: rename sequence and current_time to timestamp --- python/src/uagents/agent.py | 6 +++--- python/src/uagents/crypto/__init__.py | 4 ++-- python/src/uagents/registration.py | 7 +++++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 5bad9f5c..425fd92d 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -722,14 +722,14 @@ def sign_digest(self, digest: bytes) -> str: return self._identity.sign_digest(digest) def sign_registration( - self, timestamp: int, sender_address: Optional[str] = None + self, timestamp: int, sender_wallet_address: Optional[str] = None ) -> str: """ Sign the registration data for Almanac contract. Args: timestamp (int): The timestamp for the registration. - sender_address (Optional[str]): The address of the sender. + sender_wallet_address (Optional[str]): The wallet address of the transaction sender. Returns: str: The signature of the registration data. @@ -737,7 +737,7 @@ def sign_registration( Raises: AssertionError: If the Almanac contract is None. """ - sender_address = sender_address or str(self.wallet.address()) + sender_address = sender_wallet_address or str(self.wallet.address()) assert self._almanac_contract is not None diff --git a/python/src/uagents/crypto/__init__.py b/python/src/uagents/crypto/__init__.py index d19e7219..d18e2cd6 100644 --- a/python/src/uagents/crypto/__init__.py +++ b/python/src/uagents/crypto/__init__.py @@ -144,14 +144,14 @@ def sign_digest(self, digest: bytes) -> str: def sign_registration( self, contract_address: str, - sequence: int, + timestamp: int, wallet_address: str, ) -> str: """Sign the registration data for the Almanac contract.""" hasher = hashlib.sha256() hasher.update(encode_length_prefixed(contract_address)) hasher.update(encode_length_prefixed(self.address)) - hasher.update(encode_length_prefixed(sequence)) + hasher.update(encode_length_prefixed(timestamp)) hasher.update(encode_length_prefixed(wallet_address)) return self.sign_digest(hasher.digest()) diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index 847adec1..58a18d67 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -313,10 +313,13 @@ async def register( def _get_balance(self) -> int: return self._ledger.query_bank_balance(Address(self._wallet.address())) - def _sign_registration(self, current_time: int) -> str: + def _sign_registration(self, timestamp: int) -> str: """ Sign the registration data for Almanac contract. + Args: + timestamp (int): The timestamp for the registration. + Returns: str: The signature of the registration data. @@ -327,7 +330,7 @@ def _sign_registration(self, current_time: int) -> str: assert self._almanac_contract.address is not None return self._identity.sign_registration( str(self._almanac_contract.address), - current_time, + timestamp, str(self._wallet.address()), ) From 1391e4692c9d3e1aa08774d3c96321f718b538f8 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Tue, 29 Oct 2024 16:34:26 +0000 Subject: [PATCH 11/15] fix: extract geo-location metadata for registration --- python/src/uagents/registration.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index 58a18d67..1b711dd1 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -98,6 +98,17 @@ def coerce_metadata_to_str( return out +def extract_geo_metadata( + metadata: Optional[Dict[str, Any]], +) -> Optional[Dict[str, Any]]: + """ + Extract geo-location metadata from the metadata dictionary. + """ + if metadata is None: + return None + return {k: v for k, v in metadata.items() if k == "geolocation"} + + async def almanac_api_post( url: str, data: BaseModel, raise_from: bool = True, retries: int = 3 ) -> bool: @@ -166,18 +177,12 @@ async def register( endpoints: List[AgentEndpoint], metadata: Optional[Dict[str, Any]] = None, ): - clean_metadata = ( - {k: v for k, v in metadata.items() if k == "geolocation"} - if metadata - else None - ) # only keep geolocation metadata for registration - # create the attestation attestation = AgentRegistrationAttestation( agent_address=agent_address, protocols=protocols, endpoints=endpoints, - metadata=coerce_metadata_to_str(clean_metadata), + metadata=coerce_metadata_to_str(extract_geo_metadata(metadata)), ) # sign the attestation @@ -205,7 +210,7 @@ def add_agent(self, agent_info: AgentInfo, identity: Identity): agent_address=agent_info.agent_address, protocols=list(agent_info.protocols), endpoints=agent_info.endpoints, - metadata=coerce_metadata_to_str(agent_info.metadata), + metadata=coerce_metadata_to_str(extract_geo_metadata(agent_info.metadata)), ) attestation.sign(identity) self._attestations.append(attestation) @@ -357,7 +362,7 @@ def add_agent(self, agent_info: AgentInfo, identity: Identity): agent_address=agent_info.agent_address, protocols=agent_info.protocols, endpoints=agent_info.endpoints, - metadata=agent_info.metadata, + metadata=coerce_metadata_to_str(extract_geo_metadata(agent_info.metadata)), contract_address=str(self._almanac_contract.address), sender_address=str(self._wallet.address()), ) From 671f3c065938efecf4629ea7860e34ac586c9833 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Tue, 29 Oct 2024 16:40:01 +0000 Subject: [PATCH 12/15] chore: do not add metadata to contract record --- python/src/uagents/registration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/src/uagents/registration.py b/python/src/uagents/registration.py index 1b711dd1..62826517 100644 --- a/python/src/uagents/registration.py +++ b/python/src/uagents/registration.py @@ -362,7 +362,6 @@ def add_agent(self, agent_info: AgentInfo, identity: Identity): agent_address=agent_info.agent_address, protocols=agent_info.protocols, endpoints=agent_info.endpoints, - metadata=coerce_metadata_to_str(extract_geo_metadata(agent_info.metadata)), contract_address=str(self._almanac_contract.address), sender_address=str(self._wallet.address()), ) From d63eb87a37175a7b01d7619628e22509e250b765 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Mon, 4 Nov 2024 14:39:48 +0000 Subject: [PATCH 13/15] test: add bureau registration test cases --- python/src/uagents/agent.py | 14 ++++- python/tests/test_bureau.py | 105 ++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 python/tests/test_bureau.py diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index 425fd92d..9c4656dc 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -1377,6 +1377,7 @@ def __init__( registration_policy: Optional[BatchRegistrationPolicy] = None, ledger: Optional[LedgerClient] = None, wallet: Optional[LocalWallet] = None, + seed: Optional[str] = None, test: bool = True, loop: Optional[asyncio.AbstractEventLoop] = None, log_level: Union[int, str] = logging.INFO, @@ -1390,7 +1391,8 @@ def __init__( endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration. agentverse (Optional[Union[str, Dict[str, str]]]): The agentverse configuration. registration_policy (Optional[BatchRegistrationPolicy]): The registration policy. - wallet (Optional[LocalWallet]): The wallet for the bureau. + wallet (Optional[LocalWallet]): The wallet for the bureau (overrides 'seed'). + seed (Optional[str]): The seed phrase for the wallet (overridden by 'wallet'). test (Optional[bool]): True if the bureau will register and transact on the testnet. loop (Optional[asyncio.AbstractEventLoop]): The event loop. log_level (Union[int, str]): The logging level for the bureau. @@ -1412,6 +1414,16 @@ def __init__( almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac" almanac_contract = get_almanac_contract(test) + if wallet and seed: + self._logger.warning( + "Ignoring 'seed' argument because 'wallet' is provided." + ) + elif seed: + wallet = LocalWallet( + PrivateKey(derive_key_from_seed(seed, LEDGER_PREFIX, 0)), + prefix=LEDGER_PREFIX, + ) + if registration_policy is not None: if ( isinstance(registration_policy, BatchLedgerRegistrationPolicy) diff --git a/python/tests/test_bureau.py b/python/tests/test_bureau.py new file mode 100644 index 00000000..84119c01 --- /dev/null +++ b/python/tests/test_bureau.py @@ -0,0 +1,105 @@ +from cosmpy.aerial.wallet import LocalWallet + +from uagents import Agent, Bureau +from uagents.registration import ( + AgentEndpoint, + BatchLedgerRegistrationPolicy, + DefaultBatchRegistrationPolicy, + DefaultRegistrationPolicy, + LedgerBasedRegistrationPolicy, +) + +ALICE_ENDPOINT = AgentEndpoint(url="http://alice:8000/submit", weight=1) +BOB_ENDPOINT = AgentEndpoint(url="http://bob:8000/submit", weight=1) +BUREAU_ENDPOINT = AgentEndpoint(url="http://bureau:8000/submit", weight=1) + + +bureau_wallet = LocalWallet.generate() + + +def test_bureau_updates_agents_no_ledger_batch(): + alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) + bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) + + assert alice._endpoints == [ALICE_ENDPOINT] + assert bob._endpoints == [BOB_ENDPOINT] + + assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) + assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) + + bureau = Bureau(agents=[alice, bob], endpoint=BUREAU_ENDPOINT.url) + + assert alice._endpoints == [BUREAU_ENDPOINT] + assert bob._endpoints == [BUREAU_ENDPOINT] + + assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) + assert bureau._registration_policy._ledger_policy is None + assert isinstance(alice._registration_policy, LedgerBasedRegistrationPolicy) + assert isinstance(bob._registration_policy, LedgerBasedRegistrationPolicy) + + +def test_bureau_updates_agents_with_wallet(): + alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) + bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) + + assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) + assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) + + bureau = Bureau(agents=[alice, bob], wallet=bureau_wallet) + + assert alice._endpoints == [] + assert bob._endpoints == [] + + assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) + assert isinstance( + bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy + ) + assert alice._registration_policy is None + assert bob._registration_policy is None + + +def test_bureau_updates_agents_with_seed(): + alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) + bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) + + assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) + assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) + + bureau = Bureau( + agents=[alice, bob], + endpoint=BUREAU_ENDPOINT.url, + seed="bureau test seed phrase", + ) + + assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) + assert isinstance( + bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy + ) + assert alice._registration_policy is None + assert bob._registration_policy is None + + +def test_bureau_updates_agents_wallet_overrides_seed(): + alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) + bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) + + assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) + assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) + + bureau = Bureau( + agents=[alice, bob], + endpoint=BUREAU_ENDPOINT.url, + wallet=bureau_wallet, + seed="bureau test seed phrase", + ) + + assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) + assert isinstance( + bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy + ) + assert ( + bureau._registration_policy._ledger_policy._wallet.address() + == bureau_wallet.address() + ) + assert alice._registration_policy is None + assert bob._registration_policy is None From 54a9828d2149ad8228a0e5fa89d7a4941060515f Mon Sep 17 00:00:00 2001 From: James Riehl Date: Mon, 4 Nov 2024 14:56:50 +0000 Subject: [PATCH 14/15] chore: set event loop for CI tests to pass --- python/tests/test_bureau.py | 177 ++++++++++++++++++------------------ 1 file changed, 91 insertions(+), 86 deletions(-) diff --git a/python/tests/test_bureau.py b/python/tests/test_bureau.py index 84119c01..b6c71884 100644 --- a/python/tests/test_bureau.py +++ b/python/tests/test_bureau.py @@ -1,3 +1,6 @@ +import asyncio +import unittest + from cosmpy.aerial.wallet import LocalWallet from uagents import Agent, Bureau @@ -17,89 +20,91 @@ bureau_wallet = LocalWallet.generate() -def test_bureau_updates_agents_no_ledger_batch(): - alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) - bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) - - assert alice._endpoints == [ALICE_ENDPOINT] - assert bob._endpoints == [BOB_ENDPOINT] - - assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) - assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) - - bureau = Bureau(agents=[alice, bob], endpoint=BUREAU_ENDPOINT.url) - - assert alice._endpoints == [BUREAU_ENDPOINT] - assert bob._endpoints == [BUREAU_ENDPOINT] - - assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) - assert bureau._registration_policy._ledger_policy is None - assert isinstance(alice._registration_policy, LedgerBasedRegistrationPolicy) - assert isinstance(bob._registration_policy, LedgerBasedRegistrationPolicy) - - -def test_bureau_updates_agents_with_wallet(): - alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) - bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) - - assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) - assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) - - bureau = Bureau(agents=[alice, bob], wallet=bureau_wallet) - - assert alice._endpoints == [] - assert bob._endpoints == [] - - assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) - assert isinstance( - bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy - ) - assert alice._registration_policy is None - assert bob._registration_policy is None - - -def test_bureau_updates_agents_with_seed(): - alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) - bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) - - assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) - assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) - - bureau = Bureau( - agents=[alice, bob], - endpoint=BUREAU_ENDPOINT.url, - seed="bureau test seed phrase", - ) - - assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) - assert isinstance( - bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy - ) - assert alice._registration_policy is None - assert bob._registration_policy is None - - -def test_bureau_updates_agents_wallet_overrides_seed(): - alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) - bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) - - assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) - assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) - - bureau = Bureau( - agents=[alice, bob], - endpoint=BUREAU_ENDPOINT.url, - wallet=bureau_wallet, - seed="bureau test seed phrase", - ) - - assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) - assert isinstance( - bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy - ) - assert ( - bureau._registration_policy._ledger_policy._wallet.address() - == bureau_wallet.address() - ) - assert alice._registration_policy is None - assert bob._registration_policy is None +class TestBureau(unittest.TestCase): + def setUp(self) -> None: + self.loop = asyncio.get_event_loop() + return super().setUp() + + def test_bureau_updates_agents_no_ledger_batch(self): + alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url, loop=self.loop) + bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) + + assert alice._endpoints == [ALICE_ENDPOINT] + assert bob._endpoints == [BOB_ENDPOINT] + + assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) + assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) + + bureau = Bureau(agents=[alice, bob], endpoint=BUREAU_ENDPOINT.url) + + assert alice._endpoints == [BUREAU_ENDPOINT] + assert bob._endpoints == [BUREAU_ENDPOINT] + + assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) + assert bureau._registration_policy._ledger_policy is None + assert isinstance(alice._registration_policy, LedgerBasedRegistrationPolicy) + assert isinstance(bob._registration_policy, LedgerBasedRegistrationPolicy) + + def test_bureau_updates_agents_with_wallet(self): + alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) + bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) + + assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) + assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) + + bureau = Bureau(agents=[alice, bob], wallet=bureau_wallet) + + assert alice._endpoints == [] + assert bob._endpoints == [] + + assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) + assert isinstance( + bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy + ) + assert alice._registration_policy is None + assert bob._registration_policy is None + + def test_bureau_updates_agents_with_seed(self): + alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) + bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) + + assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) + assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) + + bureau = Bureau( + agents=[alice, bob], + endpoint=BUREAU_ENDPOINT.url, + seed="bureau test seed phrase", + ) + + assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) + assert isinstance( + bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy + ) + assert alice._registration_policy is None + assert bob._registration_policy is None + + def test_bureau_updates_agents_wallet_overrides_seed(self): + alice = Agent(name="alice", endpoint=ALICE_ENDPOINT.url) + bob = Agent(name="bob", endpoint=BOB_ENDPOINT.url) + + assert isinstance(alice._registration_policy, DefaultRegistrationPolicy) + assert isinstance(bob._registration_policy, DefaultRegistrationPolicy) + + bureau = Bureau( + agents=[alice, bob], + endpoint=BUREAU_ENDPOINT.url, + wallet=bureau_wallet, + seed="bureau test seed phrase", + ) + + assert isinstance(bureau._registration_policy, DefaultBatchRegistrationPolicy) + assert isinstance( + bureau._registration_policy._ledger_policy, BatchLedgerRegistrationPolicy + ) + assert ( + bureau._registration_policy._ledger_policy._wallet.address() + == bureau_wallet.address() + ) + assert alice._registration_policy is None + assert bob._registration_policy is None From ebc857811b9354cf78c3aecdd94e0655764d34f2 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Mon, 4 Nov 2024 15:11:24 +0000 Subject: [PATCH 15/15] fix: try isolated asyncio test case --- python/tests/test_bureau.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_bureau.py b/python/tests/test_bureau.py index b6c71884..d32fba68 100644 --- a/python/tests/test_bureau.py +++ b/python/tests/test_bureau.py @@ -20,7 +20,7 @@ bureau_wallet = LocalWallet.generate() -class TestBureau(unittest.TestCase): +class TestBureau(unittest.IsolatedAsyncioTestCase): def setUp(self) -> None: self.loop = asyncio.get_event_loop() return super().setUp()