Skip to content

Commit

Permalink
feat(core): add batch almanac api and contract registrations for Bure…
Browse files Browse the repository at this point in the history
…au (#551)
  • Loading branch information
jrriehl authored Nov 8, 2024
1 parent 271fa6c commit 1b67a4f
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 78 deletions.
209 changes: 175 additions & 34 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
from uagents.registration import (
AgentRegistrationPolicy,
AgentStatusUpdate,
BatchLedgerRegistrationPolicy,
BatchRegistrationPolicy,
DefaultBatchRegistrationPolicy,
DefaultRegistrationPolicy,
LedgerBasedRegistrationPolicy,
update_agent_status,
)
from uagents.resolver import GlobalResolver, Resolver
Expand Down Expand Up @@ -372,15 +376,18 @@ def __init__(
self._on_shutdown = []
self._test = test
self._version = version or "0.1.0"
self._registration_policy = registration_policy or DefaultRegistrationPolicy(
self._identity,
self._ledger,
self._wallet,
self._almanac_contract,
self._test,
logger=self._logger,
almanac_api=self._almanac_api_url,
)
self._registration_policy = registration_policy or None

if self._registration_policy is None:
self._registration_policy = DefaultRegistrationPolicy(
self._identity,
self._ledger,
self._wallet,
self._almanac_contract,
self._test,
logger=self._logger,
almanac_api=self._almanac_api_url,
)
self._metadata = self._initialize_metadata(metadata)

self.initialize_wallet_messaging(enable_wallet_messaging)
Expand Down Expand Up @@ -642,6 +649,21 @@ def balance(self) -> int:

return self.ledger.query_bank_balance(Address(self.wallet.address()))

@property
def info(self) -> AgentInfo:
"""
Get basic information about the agent.
Returns:
AgentInfo: The agent's address, endpoints, protocols, and metadata.
"""
return AgentInfo(
agent_address=self.address,
endpoints=self._endpoints,
protocols=list(self.protocols.keys()),
metadata=self.metadata,
)

@property
def metadata(self) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -699,19 +721,30 @@ def sign_digest(self, digest: bytes) -> str:
"""
return self._identity.sign_digest(digest)

def sign_registration(self, current_time: int) -> str:
def sign_registration(
self, timestamp: int, sender_wallet_address: Optional[str] = None
) -> str:
"""
Sign the registration data for Almanac contract.
Args:
timestamp (int): The timestamp for the registration.
sender_wallet_address (Optional[str]): The wallet address of the transaction sender.
Returns:
str: The signature of the registration data.
Raises:
AssertionError: If the Almanac contract address is None.
AssertionError: If the Almanac contract is None.
"""
assert self._almanac_contract.address is not None
sender_address = sender_wallet_address or str(self.wallet.address())

assert self._almanac_contract is not None

return self._identity.sign_registration(
str(self._almanac_contract.address),
current_time,
str(self.wallet.address()),
timestamp,
sender_address,
)

def update_endpoints(self, endpoints: List[AgentEndpoint]):
Expand All @@ -722,7 +755,6 @@ def update_endpoints(self, endpoints: List[AgentEndpoint]):
endpoints (List[AgentEndpoint]): List of endpoint dictionaries.
"""

self._endpoints = endpoints

def update_loop(self, loop):
Expand Down Expand Up @@ -766,11 +798,13 @@ async def register(self):
if necessary.
"""
assert self._registration_policy is not None, "Agent has no registration policy"

await self._registration_policy.register(
self.address, list(self.protocols.keys()), self._endpoints, self._metadata
)

async def _registration_loop(self):
async def _schedule_registration(self):
"""
Execute the registration loop.
Expand All @@ -784,12 +818,12 @@ async def _registration_loop(self):
except InsufficientFundsError:
time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register on almanac contract: {ex}")
self._logger.exception(f"Failed to register: {ex}")
time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._registration_loop(), time_until_next_registration)
_delay(self._schedule_registration(), time_until_next_registration)
)

def on_interval(
Expand Down Expand Up @@ -1064,13 +1098,13 @@ async def _startup(self):
Perform startup actions.
"""
if self._endpoints:
await self._registration_loop()

else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
)
if self._registration_policy:
if self._endpoints:
await self._schedule_registration()
else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
)
for handler in self._on_startup:
try:
ctx = self._build_context()
Expand Down Expand Up @@ -1339,16 +1373,29 @@ def __init__(
agents: Optional[List[Agent]] = None,
port: Optional[int] = None,
endpoint: Optional[Union[str, List[str], Dict[str, dict]]] = None,
agentverse: Optional[Union[str, Dict[str, str]]] = None,
registration_policy: Optional[BatchRegistrationPolicy] = None,
ledger: Optional[LedgerClient] = None,
wallet: Optional[LocalWallet] = None,
seed: Optional[str] = None,
test: bool = True,
loop: Optional[asyncio.AbstractEventLoop] = None,
log_level: Union[int, str] = logging.INFO,
):
"""
Initialize a Bureau instance.
Args:
port (Optional[int]): The port on which the bureau's server will run.
endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration
for the bureau.
agents (Optional[List[Agent]]): The list of agents to be managed by the bureau.
port (Optional[int]): The port number for the server.
endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration.
agentverse (Optional[Union[str, Dict[str, str]]]): The agentverse configuration.
registration_policy (Optional[BatchRegistrationPolicy]): The registration policy.
wallet (Optional[LocalWallet]): The wallet for the bureau (overrides 'seed').
seed (Optional[str]): The seed phrase for the wallet (overridden by 'wallet').
test (Optional[bool]): True if the bureau will register and transact on the testnet.
loop (Optional[asyncio.AbstractEventLoop]): The event loop.
log_level (Union[int, str]): The logging level for the bureau.
"""
self._loop = loop or asyncio.get_event_loop_policy().get_event_loop()
self._agents: List[Agent] = []
Expand All @@ -1362,31 +1409,124 @@ def __init__(
queries=self._queries,
logger=self._logger,
)
self._use_mailbox = False
self._agentverse = parse_agentverse_config(agentverse)
self._use_mailbox = self._agentverse["use_mailbox"]
almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac"
almanac_contract = get_almanac_contract(test)

if wallet and seed:
self._logger.warning(
"Ignoring 'seed' argument because 'wallet' is provided."
)
elif seed:
wallet = LocalWallet(
PrivateKey(derive_key_from_seed(seed, LEDGER_PREFIX, 0)),
prefix=LEDGER_PREFIX,
)

if registration_policy is not None:
if (
isinstance(registration_policy, BatchLedgerRegistrationPolicy)
and wallet is None
):
raise ValueError(
"Argument 'wallet' must be provided when using "
"the batch ledger registration policy."
)
self._registration_policy = registration_policy
else:
self._registration_policy = DefaultBatchRegistrationPolicy(
ledger or get_ledger(test),
wallet,
almanac_contract,
test,
logger=self._logger,
almanac_api=almanac_api_url,
)

if agents is not None:
for agent in agents:
self.add(agent)

def add(self, agent: Agent):
def _update_agent(self, agent: Agent):
"""
Add an agent to the bureau.
Update the agent to be taken over by the Bureau.
Args:
agent (Agent): The agent to be added.
agent (Agent): The agent to be updated.
"""
if agent in self._agents:
return
agent.update_loop(self._loop)
agent.update_queries(self._queries)
if agent.agentverse["use_mailbox"]:
self._use_mailbox = True
else:
if agent._endpoints:
self._logger.warning(
f"Overwriting the agent's endpoints {agent._endpoints} "
f"with the Bureau's endpoints {self._endpoints}."
)
agent.update_endpoints(self._endpoints)
self._server._rest_handler_map.update(agent._server._rest_handler_map)

# Run the batch Almanac API registration by default and only run the agent's
# ledger registration if the Bureau is not using a batch ledger registration
# policy because it has no wallet address.
agent._registration_policy = None
if (
isinstance(self._registration_policy, DefaultBatchRegistrationPolicy)
and self._registration_policy._ledger_policy is None
and agent._almanac_contract is not None
):
agent._registration_policy = LedgerBasedRegistrationPolicy(
agent._identity,
agent._ledger,
agent._wallet,
agent._almanac_contract,
agent._test,
logger=agent._logger,
)

agent._agentverse = self._agentverse
agent._logger.setLevel(self._logger.level)

def add(self, agent: Agent):
"""
Add an agent to the bureau.
Args:
agent (Agent): The agent to be added.
"""
if agent in self._agents:
return
self._registration_policy.add_agent(agent.info, agent._identity)
self._update_agent(agent)
self._agents.append(agent)

async def _schedule_registration(self):
"""
Start the batch registration loop.
"""

if not any(agent._endpoints for agent in self._agents):
return

time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self._registration_policy.register()
except InsufficientFundsError:
time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._schedule_registration(), time_to_next_registration)
)

async def run_async(self):
"""
Run the agents managed by the bureau.
Expand All @@ -1397,6 +1537,7 @@ async def run_async(self):
await agent.setup()
if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None:
tasks.append(agent.mailbox_client.run())
tasks.append(self._schedule_registration())

try:
await asyncio.gather(*tasks)
Expand Down
4 changes: 2 additions & 2 deletions python/src/uagents/crypto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,14 @@ def sign_digest(self, digest: bytes) -> str:
def sign_registration(
self,
contract_address: str,
sequence: int,
timestamp: int,
wallet_address: str,
) -> str:
"""Sign the registration data for the Almanac contract."""
hasher = hashlib.sha256()
hasher.update(encode_length_prefixed(contract_address))
hasher.update(encode_length_prefixed(self.address))
hasher.update(encode_length_prefixed(sequence))
hasher.update(encode_length_prefixed(timestamp))
hasher.update(encode_length_prefixed(wallet_address))
return self.sign_digest(hasher.digest())

Expand Down
Loading

0 comments on commit 1b67a4f

Please sign in to comment.