Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add batch registration to Almanac API and Almanac Contract #571

Closed
wants to merge 9 commits into from
182 changes: 151 additions & 31 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
from uagents.registration import (
AgentRegistrationPolicy,
AgentStatusUpdate,
BatchLedgerRegistrationPolicy,
BatchRegistrationPolicy,
DefaultBatchRegistrationPolicy,
DefaultRegistrationPolicy,
LedgerBasedRegistrationPolicy,
update_agent_status,
)
from uagents.resolver import GlobalResolver, Resolver
Expand Down Expand Up @@ -372,15 +376,18 @@ def __init__(
self._on_shutdown = []
self._test = test
self._version = version or "0.1.0"
self._registration_policy = registration_policy or DefaultRegistrationPolicy(
self._identity,
self._ledger,
self._wallet,
self._almanac_contract,
self._test,
logger=self._logger,
almanac_api=self._almanac_api_url,
)
self._registration_policy = registration_policy or None

if self._registration_policy is None:
self._registration_policy = DefaultRegistrationPolicy(
self._identity,
self._ledger,
self._wallet,
self._almanac_contract,
self._test,
logger=self._logger,
almanac_api=self._almanac_api_url,
)
self._metadata = self._initialize_metadata(metadata)

self.initialize_wallet_messaging(enable_wallet_messaging)
Expand Down Expand Up @@ -642,6 +649,21 @@ def balance(self) -> int:

return self.ledger.query_bank_balance(Address(self.wallet.address()))

@property
def info(self) -> AgentInfo:
"""
Get basic information about the agent.

Returns:
AgentInfo: The agent's address, endpoints, protocols, and metadata.
"""
return AgentInfo(
agent_address=self.address,
endpoints=self._endpoints,
protocols=list(self.protocols.keys()),
metadata=self.metadata,
)

@property
def metadata(self) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -699,19 +721,23 @@ def sign_digest(self, digest: bytes) -> str:
"""
return self._identity.sign_digest(digest)

def sign_registration(self, current_time: int) -> str:
def sign_registration(
self, current_time: int, sender_address: Optional[str] = None
) -> str:
"""
Sign the registration data for Almanac contract.
Returns:
str: The signature of the registration data.
Raises:
AssertionError: If the Almanac contract address is None.
"""
sender_address = sender_address or str(self.wallet.address())

assert self._almanac_contract.address is not None
return self._identity.sign_registration(
str(self._almanac_contract.address),
current_time,
str(self.wallet.address()),
sender_address,
)

def update_endpoints(self, endpoints: List[AgentEndpoint]):
Expand All @@ -722,7 +748,6 @@ def update_endpoints(self, endpoints: List[AgentEndpoint]):
endpoints (List[AgentEndpoint]): List of endpoint dictionaries.

"""

self._endpoints = endpoints

def update_loop(self, loop):
Expand Down Expand Up @@ -770,7 +795,7 @@ async def register(self):
self.address, list(self.protocols.keys()), self._endpoints, self._metadata
)

async def _registration_loop(self):
async def _schedule_registration(self):
"""
Execute the registration loop.

Expand All @@ -784,12 +809,12 @@ async def _registration_loop(self):
except InsufficientFundsError:
time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register on almanac contract: {ex}")
self._logger.exception(f"Failed to register: {ex}")
time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._registration_loop(), time_until_next_registration)
_delay(self._schedule_registration(), time_until_next_registration)
)

def on_interval(
Expand Down Expand Up @@ -1064,13 +1089,13 @@ async def _startup(self):
Perform startup actions.

"""
if self._endpoints:
await self._registration_loop()

else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
)
if self._registration_policy:
if self._endpoints:
await self._schedule_registration()
else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
)
for handler in self._on_startup:
try:
ctx = self._build_context()
Expand Down Expand Up @@ -1339,16 +1364,27 @@ def __init__(
agents: Optional[List[Agent]] = None,
port: Optional[int] = None,
endpoint: Optional[Union[str, List[str], Dict[str, dict]]] = None,
agentverse: Optional[Union[str, Dict[str, str]]] = None,
registration_policy: Optional[BatchRegistrationPolicy] = None,
ledger: Optional[LedgerClient] = None,
wallet: Optional[LocalWallet] = None,
test: bool = True,
loop: Optional[asyncio.AbstractEventLoop] = None,
log_level: Union[int, str] = logging.INFO,
):
"""
Initialize a Bureau instance.

Args:
port (Optional[int]): The port on which the bureau's server will run.
endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration
for the bureau.
agents (Optional[List[Agent]]): The list of agents to be managed by the bureau.
port (Optional[int]): The port number for the server.
endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration.
agentverse (Optional[Union[str, Dict[str, str]]]): The agentverse configuration.
registration_policy (Optional[BatchRegistrationPolicy]): The registration policy.
wallet (Optional[LocalWallet]): The wallet for the bureau.
test (Optional[bool]): True if the bureau will register and transact on the testnet.
loop (Optional[asyncio.AbstractEventLoop]): The event loop.
log_level (Union[int, str]): The logging level for the bureau.
"""
self._loop = loop or asyncio.get_event_loop_policy().get_event_loop()
self._agents: List[Agent] = []
Expand All @@ -1362,31 +1398,114 @@ def __init__(
queries=self._queries,
logger=self._logger,
)
self._use_mailbox = False
self._agentverse = parse_agentverse_config(agentverse)
self._use_mailbox = self._agentverse["use_mailbox"]
almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac"
almanac_contract = get_almanac_contract(test)

if registration_policy is not None:
if (
isinstance(registration_policy, BatchLedgerRegistrationPolicy)
and wallet is None
):
raise ValueError(
"Argument 'wallet' must be provided when using "
"the batch ledger registration policy."
)
self._registration_policy = registration_policy
else:
self._registration_policy = DefaultBatchRegistrationPolicy(
ledger or get_ledger(test),
wallet,
almanac_contract,
test,
logger=self._logger,
almanac_api=almanac_api_url,
)

if agents is not None:
for agent in agents:
self.add(agent)

def add(self, agent: Agent):
def _update_agent(self, agent: Agent):
"""
Add an agent to the bureau.
Update the agent to be taken over by the Bureau.

Args:
agent (Agent): The agent to be added.
agent (Agent): The agent to be updated.

"""
if agent in self._agents:
return
agent.update_loop(self._loop)
agent.update_queries(self._queries)
if agent.agentverse["use_mailbox"]:
self._use_mailbox = True
else:
if agent._endpoints:
self._logger.warning(
f"Overwriting the agent's endpoints {agent._endpoints} "
f"with the Bureau's endpoints {self._endpoints}."
)
agent.update_endpoints(self._endpoints)
self._server._rest_handler_map.update(agent._server._rest_handler_map)

# Run the batch Almanac API registration by default and only run the agent's
# ledger registration if the Bureau is not using a batch ledger registration
# policy because it has no wallet address.
agent._registration_policy = None
if (
isinstance(self._registration_policy, DefaultBatchRegistrationPolicy)
and self._registration_policy._ledger_policy is None
and agent._almanac_contract is not None
):
agent._registration_policy = LedgerBasedRegistrationPolicy(
agent._identity,
agent._ledger,
agent._wallet,
agent._almanac_contract,
agent._test,
logger=agent._logger,
)

agent._agentverse = self._agentverse
agent._logger.setLevel(self._logger.level)

def add(self, agent: Agent):
"""
Add an agent to the bureau.

Args:
agent (Agent): The agent to be added.

"""
if agent in self._agents:
return
self._registration_policy.add_agent(agent.info, agent._identity)
self._update_agent(agent)
self._agents.append(agent)

async def _schedule_registration(self):
"""
Start the batch registration loop.

"""

if not any(agent._endpoints for agent in self._agents):
return

time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self._registration_policy.register()
except InsufficientFundsError:
time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._schedule_registration(), time_to_next_registration)
)

async def run_async(self):
"""
Run the agents managed by the bureau.
Expand All @@ -1397,6 +1516,7 @@ async def run_async(self):
await agent.setup()
if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None:
tasks.append(agent.mailbox_client.run())
tasks.append(self._schedule_registration())

try:
await asyncio.gather(*tasks)
Expand Down
Loading
Loading