Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add batch almanac api and contract registrations for Bureau #551

Merged
merged 18 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 175 additions & 34 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
from uagents.registration import (
AgentRegistrationPolicy,
AgentStatusUpdate,
BatchLedgerRegistrationPolicy,
BatchRegistrationPolicy,
DefaultBatchRegistrationPolicy,
DefaultRegistrationPolicy,
LedgerBasedRegistrationPolicy,
update_agent_status,
)
from uagents.resolver import GlobalResolver, Resolver
Expand Down Expand Up @@ -372,15 +376,18 @@ def __init__(
self._on_shutdown = []
self._test = test
self._version = version or "0.1.0"
self._registration_policy = registration_policy or DefaultRegistrationPolicy(
self._identity,
self._ledger,
self._wallet,
self._almanac_contract,
self._test,
logger=self._logger,
almanac_api=self._almanac_api_url,
)
self._registration_policy = registration_policy or None

if self._registration_policy is None:
self._registration_policy = DefaultRegistrationPolicy(
self._identity,
self._ledger,
self._wallet,
self._almanac_contract,
self._test,
logger=self._logger,
almanac_api=self._almanac_api_url,
)
self._metadata = self._initialize_metadata(metadata)

self.initialize_wallet_messaging(enable_wallet_messaging)
Expand Down Expand Up @@ -642,6 +649,21 @@ def balance(self) -> int:

return self.ledger.query_bank_balance(Address(self.wallet.address()))

@property
def info(self) -> AgentInfo:
"""
Get basic information about the agent.

Returns:
AgentInfo: The agent's address, endpoints, protocols, and metadata.
"""
return AgentInfo(
agent_address=self.address,
endpoints=self._endpoints,
protocols=list(self.protocols.keys()),
metadata=self.metadata,
)

@property
def metadata(self) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -699,19 +721,30 @@ def sign_digest(self, digest: bytes) -> str:
"""
return self._identity.sign_digest(digest)

def sign_registration(self, current_time: int) -> str:
def sign_registration(
self, timestamp: int, sender_wallet_address: Optional[str] = None
) -> str:
"""
Sign the registration data for Almanac contract.

Args:
timestamp (int): The timestamp for the registration.
sender_wallet_address (Optional[str]): The wallet address of the transaction sender.

Returns:
str: The signature of the registration data.

Raises:
AssertionError: If the Almanac contract address is None.
AssertionError: If the Almanac contract is None.
"""
assert self._almanac_contract.address is not None
sender_address = sender_wallet_address or str(self.wallet.address())

assert self._almanac_contract is not None

return self._identity.sign_registration(
str(self._almanac_contract.address),
current_time,
str(self.wallet.address()),
timestamp,
sender_address,
)

def update_endpoints(self, endpoints: List[AgentEndpoint]):
Expand All @@ -722,7 +755,6 @@ def update_endpoints(self, endpoints: List[AgentEndpoint]):
endpoints (List[AgentEndpoint]): List of endpoint dictionaries.

"""

self._endpoints = endpoints

def update_loop(self, loop):
Expand Down Expand Up @@ -766,11 +798,13 @@ async def register(self):
if necessary.

"""
assert self._registration_policy is not None, "Agent has no registration policy"

await self._registration_policy.register(
self.address, list(self.protocols.keys()), self._endpoints, self._metadata
)

async def _registration_loop(self):
async def _schedule_registration(self):
"""
Execute the registration loop.

Expand All @@ -784,12 +818,12 @@ async def _registration_loop(self):
except InsufficientFundsError:
time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register on almanac contract: {ex}")
self._logger.exception(f"Failed to register: {ex}")
time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._registration_loop(), time_until_next_registration)
_delay(self._schedule_registration(), time_until_next_registration)
)

def on_interval(
Expand Down Expand Up @@ -1064,13 +1098,13 @@ async def _startup(self):
Perform startup actions.

"""
if self._endpoints:
await self._registration_loop()

else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
)
if self._registration_policy:
if self._endpoints:
await self._schedule_registration()
else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
)
for handler in self._on_startup:
try:
ctx = self._build_context()
Expand Down Expand Up @@ -1339,16 +1373,29 @@ def __init__(
agents: Optional[List[Agent]] = None,
port: Optional[int] = None,
endpoint: Optional[Union[str, List[str], Dict[str, dict]]] = None,
agentverse: Optional[Union[str, Dict[str, str]]] = None,
registration_policy: Optional[BatchRegistrationPolicy] = None,
ledger: Optional[LedgerClient] = None,
wallet: Optional[LocalWallet] = None,
seed: Optional[str] = None,
test: bool = True,
loop: Optional[asyncio.AbstractEventLoop] = None,
log_level: Union[int, str] = logging.INFO,
):
"""
Initialize a Bureau instance.

Args:
port (Optional[int]): The port on which the bureau's server will run.
endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration
for the bureau.
agents (Optional[List[Agent]]): The list of agents to be managed by the bureau.
port (Optional[int]): The port number for the server.
endpoint (Optional[Union[str, List[str], Dict[str, dict]]]): The endpoint configuration.
agentverse (Optional[Union[str, Dict[str, str]]]): The agentverse configuration.
registration_policy (Optional[BatchRegistrationPolicy]): The registration policy.
wallet (Optional[LocalWallet]): The wallet for the bureau (overrides 'seed').
seed (Optional[str]): The seed phrase for the wallet (overridden by 'wallet').
test (Optional[bool]): True if the bureau will register and transact on the testnet.
loop (Optional[asyncio.AbstractEventLoop]): The event loop.
log_level (Union[int, str]): The logging level for the bureau.
"""
self._loop = loop or asyncio.get_event_loop_policy().get_event_loop()
self._agents: List[Agent] = []
Expand All @@ -1362,31 +1409,124 @@ def __init__(
queries=self._queries,
logger=self._logger,
)
self._use_mailbox = False
self._agentverse = parse_agentverse_config(agentverse)
self._use_mailbox = self._agentverse["use_mailbox"]
almanac_api_url = f"{self._agentverse['http_prefix']}://{self._agentverse['base_url']}/v1/almanac"
almanac_contract = get_almanac_contract(test)

if wallet and seed:
self._logger.warning(
"Ignoring 'seed' argument because 'wallet' is provided."
)
elif seed:
wallet = LocalWallet(
PrivateKey(derive_key_from_seed(seed, LEDGER_PREFIX, 0)),
prefix=LEDGER_PREFIX,
)

if registration_policy is not None:
if (
isinstance(registration_policy, BatchLedgerRegistrationPolicy)
and wallet is None
):
raise ValueError(
"Argument 'wallet' must be provided when using "
"the batch ledger registration policy."
)
self._registration_policy = registration_policy
else:
self._registration_policy = DefaultBatchRegistrationPolicy(
ledger or get_ledger(test),
wallet,
jrriehl marked this conversation as resolved.
Show resolved Hide resolved
almanac_contract,
test,
logger=self._logger,
almanac_api=almanac_api_url,
)

if agents is not None:
for agent in agents:
self.add(agent)

def add(self, agent: Agent):
def _update_agent(self, agent: Agent):
"""
Add an agent to the bureau.
Update the agent to be taken over by the Bureau.

Args:
agent (Agent): The agent to be added.
agent (Agent): The agent to be updated.

"""
if agent in self._agents:
return
agent.update_loop(self._loop)
agent.update_queries(self._queries)
if agent.agentverse["use_mailbox"]:
self._use_mailbox = True
else:
if agent._endpoints:
self._logger.warning(
f"Overwriting the agent's endpoints {agent._endpoints} "
f"with the Bureau's endpoints {self._endpoints}."
)
agent.update_endpoints(self._endpoints)
self._server._rest_handler_map.update(agent._server._rest_handler_map)

# Run the batch Almanac API registration by default and only run the agent's
# ledger registration if the Bureau is not using a batch ledger registration
# policy because it has no wallet address.
agent._registration_policy = None
if (
isinstance(self._registration_policy, DefaultBatchRegistrationPolicy)
and self._registration_policy._ledger_policy is None
and agent._almanac_contract is not None
):
agent._registration_policy = LedgerBasedRegistrationPolicy(
agent._identity,
agent._ledger,
agent._wallet,
agent._almanac_contract,
agent._test,
logger=agent._logger,
)

agent._agentverse = self._agentverse
agent._logger.setLevel(self._logger.level)

def add(self, agent: Agent):
"""
Add an agent to the bureau.

Args:
agent (Agent): The agent to be added.

"""
if agent in self._agents:
return
self._registration_policy.add_agent(agent.info, agent._identity)
self._update_agent(agent)
self._agents.append(agent)

async def _schedule_registration(self):
"""
Start the batch registration loop.

"""

if not any(agent._endpoints for agent in self._agents):
return

time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self._registration_policy.register()
except InsufficientFundsError:
time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._schedule_registration(), time_to_next_registration)
)

async def run_async(self):
"""
Run the agents managed by the bureau.
Expand All @@ -1397,6 +1537,7 @@ async def run_async(self):
await agent.setup()
if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None:
tasks.append(agent.mailbox_client.run())
tasks.append(self._schedule_registration())

try:
await asyncio.gather(*tasks)
Expand Down
4 changes: 2 additions & 2 deletions python/src/uagents/crypto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,14 @@ def sign_digest(self, digest: bytes) -> str:
def sign_registration(
self,
contract_address: str,
sequence: int,
timestamp: int,
wallet_address: str,
) -> str:
"""Sign the registration data for the Almanac contract."""
hasher = hashlib.sha256()
hasher.update(encode_length_prefixed(contract_address))
hasher.update(encode_length_prefixed(self.address))
hasher.update(encode_length_prefixed(sequence))
hasher.update(encode_length_prefixed(timestamp))
hasher.update(encode_length_prefixed(wallet_address))
return self.sign_digest(hasher.digest())

Expand Down
Loading
Loading