diff --git a/algosdk/abi/__init__.py b/algosdk/abi/__init__.py index 02e78ad5..c0cdd484 100644 --- a/algosdk/abi/__init__.py +++ b/algosdk/abi/__init__.py @@ -1,14 +1,13 @@ -from .util import type_from_string -from .uint_type import UintType -from .ufixed_type import UfixedType -from .base_type import ABIType -from .bool_type import BoolType -from .byte_type import ByteType -from .address_type import AddressType -from .string_type import StringType -from .array_dynamic_type import ArrayDynamicType -from .array_static_type import ArrayStaticType -from .tuple_type import TupleType +from algosdk.abi.uint_type import UintType +from algosdk.abi.ufixed_type import UfixedType +from algosdk.abi.base_type import ABIType +from algosdk.abi.bool_type import BoolType +from algosdk.abi.byte_type import ByteType +from algosdk.abi.address_type import AddressType +from algosdk.abi.string_type import StringType +from algosdk.abi.array_dynamic_type import ArrayDynamicType +from algosdk.abi.array_static_type import ArrayStaticType +from algosdk.abi.tuple_type import TupleType from .method import Method, Argument, Returns from .interface import Interface from .contract import Contract diff --git a/algosdk/abi/address_type.py b/algosdk/abi/address_type.py index a7c3d616..57050c71 100644 --- a/algosdk/abi/address_type.py +++ b/algosdk/abi/address_type.py @@ -1,7 +1,9 @@ -from .base_type import ABIType -from .byte_type import ByteType -from .tuple_type import TupleType -from .. import error +from typing import Union + +from algosdk.abi.base_type import ABIType +from algosdk.abi.byte_type import ByteType +from algosdk.abi.tuple_type import TupleType +from algosdk import error from algosdk import encoding @@ -14,18 +16,18 @@ class AddressType(ABIType): def __init__(self) -> None: super().__init__() - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, AddressType): return False return True - def __str__(self): + def __str__(self) -> str: return "address" - def byte_len(self): + def byte_len(self) -> int: return 32 - def is_dynamic(self): + def is_dynamic(self) -> bool: return False def _to_tuple_type(self): @@ -34,7 +36,7 @@ def _to_tuple_type(self): child_type_array.append(ByteType()) return TupleType(child_type_array) - def encode(self, value): + def encode(self, value: Union[str, bytes]) -> bytes: """ Encode an address string or a 32-byte public key into a Address ABI bytestring. @@ -62,7 +64,7 @@ def encode(self, value): ) return bytes(value) - def decode(self, bytestring): + def decode(self, bytestring: Union[bytearray, bytes]) -> str: """ Decodes a bytestring to a base32 encoded address string. diff --git a/algosdk/abi/array_dynamic_type.py b/algosdk/abi/array_dynamic_type.py index b92a837e..97be5b15 100644 --- a/algosdk/abi/array_dynamic_type.py +++ b/algosdk/abi/array_dynamic_type.py @@ -1,7 +1,9 @@ -from .base_type import ABI_LENGTH_SIZE, ABIType -from .byte_type import ByteType -from .tuple_type import TupleType -from .. import error +from typing import Any, List, NoReturn, Union + +from algosdk.abi.base_type import ABI_LENGTH_SIZE, ABIType +from algosdk.abi.byte_type import ByteType +from algosdk.abi.tuple_type import TupleType +from algosdk import error class ArrayDynamicType(ABIType): @@ -9,37 +11,37 @@ class ArrayDynamicType(ABIType): Represents a ArrayDynamic ABI Type for encoding. Args: - child_type (Type): the type of the dynamic array. + child_type (ABIType): the type of the dynamic array. Attributes: - child_type (Type) + child_type (ABIType) """ - def __init__(self, arg_type) -> None: + def __init__(self, arg_type: ABIType) -> None: super().__init__() self.child_type = arg_type - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, ArrayDynamicType): return False return self.child_type == other.child_type - def __str__(self): + def __str__(self) -> str: return "{}[]".format(self.child_type) - def byte_len(self): + def byte_len(self) -> NoReturn: raise error.ABITypeError( "cannot get length of a dynamic type: {}".format(self) ) - def is_dynamic(self): + def is_dynamic(self) -> bool: return True - def _to_tuple_type(self, length): + def _to_tuple_type(self, length: int) -> TupleType: child_type_array = [self.child_type] * length return TupleType(child_type_array) - def encode(self, value_array): + def encode(self, value_array: Union[List[Any], bytes, bytearray]) -> bytes: """ Encodes a list of values into a ArrayDynamic ABI bytestring. @@ -67,7 +69,7 @@ def encode(self, value_array): encoded = converted_tuple.encode(value_array) return bytes(length_to_encode) + encoded - def decode(self, array_bytes): + def decode(self, array_bytes: Union[bytes, bytearray]) -> list: """ Decodes a bytestring to a dynamic list. diff --git a/algosdk/abi/array_static_type.py b/algosdk/abi/array_static_type.py index 4d935d2a..690917df 100644 --- a/algosdk/abi/array_static_type.py +++ b/algosdk/abi/array_static_type.py @@ -1,10 +1,11 @@ import math +from typing import Any, List, Union -from .base_type import ABIType -from .bool_type import BoolType -from .byte_type import ByteType -from .tuple_type import TupleType -from .. import error +from algosdk.abi.base_type import ABIType +from algosdk.abi.bool_type import BoolType +from algosdk.abi.byte_type import ByteType +from algosdk.abi.tuple_type import TupleType +from algosdk import error class ArrayStaticType(ABIType): @@ -12,26 +13,26 @@ class ArrayStaticType(ABIType): Represents a ArrayStatic ABI Type for encoding. Args: - child_type (Type): the type of the child_types array. - static_length (int): length of the static array. + child_type (ABIType): the type of the child_types array. + array_len (int): length of the static array. Attributes: - child_type (Type) + child_type (ABIType) static_length (int) """ - def __init__(self, arg_type, array_len) -> None: + def __init__(self, arg_type: ABIType, array_len: int) -> None: if array_len < 1: raise error.ABITypeError( "static array length must be a positive integer: {}".format( - len(array_len) + array_len ) ) super().__init__() self.child_type = arg_type self.static_length = array_len - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, ArrayStaticType): return False return ( @@ -39,24 +40,24 @@ def __eq__(self, other) -> bool: and self.static_length == other.static_length ) - def __str__(self): + def __str__(self) -> str: return "{}[{}]".format(self.child_type, self.static_length) - def byte_len(self): + def byte_len(self) -> int: if isinstance(self.child_type, BoolType): # 8 Boolean values can be encoded into 1 byte return math.ceil(self.static_length / 8) element_byte_length = self.child_type.byte_len() return self.static_length * element_byte_length - def is_dynamic(self): + def is_dynamic(self) -> bool: return self.child_type.is_dynamic() - def _to_tuple_type(self): + def _to_tuple_type(self) -> TupleType: child_type_array = [self.child_type] * self.static_length return TupleType(child_type_array) - def encode(self, value_array): + def encode(self, value_array: Union[List[Any], bytes, bytearray]) -> bytes: """ Encodes a list of values into a ArrayStatic ABI bytestring. @@ -87,7 +88,7 @@ def encode(self, value_array): converted_tuple = self._to_tuple_type() return converted_tuple.encode(value_array) - def decode(self, array_bytes): + def decode(self, array_bytes: Union[bytes, bytearray]) -> list: """ Decodes a bytestring to a static list. diff --git a/algosdk/abi/base_type.py b/algosdk/abi/base_type.py index 0fdcedee..d0878942 100644 --- a/algosdk/abi/base_type.py +++ b/algosdk/abi/base_type.py @@ -1,7 +1,14 @@ from abc import ABC, abstractmethod +import re +from typing import Any, Union + +from algosdk import error + # Globals ABI_LENGTH_SIZE = 2 # We use 2 bytes to encode the length of a dynamic element +UFIXED_REGEX = r"^ufixed([1-9][\d]*)x([1-9][\d]*)$" +STATIC_ARRAY_REGEX = r"^([a-z\d\[\](),]+)\[([1-9][\d]*)]$" class ABIType(ABC): @@ -9,43 +16,124 @@ class ABIType(ABC): Represents an ABI Type for encoding. """ - def __init__( - self, - ) -> None: + def __init__(self) -> None: pass @abstractmethod - def __str__(self): + def __str__(self) -> str: pass @abstractmethod - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: pass @abstractmethod - def is_dynamic(self): + def is_dynamic(self) -> bool: """ Return whether the ABI type is dynamic. """ pass @abstractmethod - def byte_len(self): + def byte_len(self) -> int: """ Return the length in bytes of the ABI type. """ pass @abstractmethod - def encode(self, value): + def encode(self, value: Any) -> bytes: """ Serialize the ABI value into a byte string using ABI encoding rules. """ pass @abstractmethod - def decode(self, bytestring): + def decode(self, bytestring: bytes) -> Any: """ Deserialize the ABI type and value from a byte string using ABI encoding rules. """ pass + + @staticmethod + def from_string(s: str) -> "ABIType": + """ + Convert a valid ABI string to a corresponding ABI type. + """ + # We define the imports here to avoid circular imports + from algosdk.abi.uint_type import UintType + from algosdk.abi.ufixed_type import UfixedType + from algosdk.abi.byte_type import ByteType + from algosdk.abi.bool_type import BoolType + from algosdk.abi.address_type import AddressType + from algosdk.abi.string_type import StringType + from algosdk.abi.array_dynamic_type import ArrayDynamicType + from algosdk.abi.array_static_type import ArrayStaticType + from algosdk.abi.tuple_type import TupleType + + if s.endswith("[]"): + array_arg_type = ABIType.from_string(s[:-2]) + return ArrayDynamicType(array_arg_type) + elif s.endswith("]"): + matches = re.search(STATIC_ARRAY_REGEX, s) + try: + static_length = int(matches.group(2)) + array_type = ABIType.from_string(matches.group(1)) + return ArrayStaticType(array_type, static_length) + except Exception as e: + raise error.ABITypeError( + "malformed static array string: {}".format(s) + ) from e + if s.startswith("uint"): + try: + if not s[4:].isdecimal(): + raise error.ABITypeError( + "uint string does not contain a valid size: {}".format( + s + ) + ) + type_size = int(s[4:]) + return UintType(type_size) + except Exception as e: + raise error.ABITypeError( + "malformed uint string: {}".format(s) + ) from e + elif s == "byte": + return ByteType() + elif s.startswith("ufixed"): + matches = re.search(UFIXED_REGEX, s) + try: + bit_size = int(matches.group(1)) + precision = int(matches.group(2)) + return UfixedType(bit_size, precision) + except Exception as e: + raise error.ABITypeError( + "malformed ufixed string: {}".format(s) + ) from e + elif s == "bool": + return BoolType() + elif s == "address": + return AddressType() + elif s == "string": + return StringType() + elif len(s) >= 2 and s[0] == "(" and s[-1] == ")": + # Recursively parse parentheses from a tuple string + tuples = TupleType._parse_tuple(s[1:-1]) + tuple_list = [] + for tup in tuples: + if isinstance(tup, str): + tt = ABIType.from_string(tup) + tuple_list.append(tt) + elif isinstance(tup, list): + tts = [ABIType.from_string(t_) for t_ in tup] + tuple_list.append(tts) + else: + raise error.ABITypeError( + "cannot convert {} to an ABI type".format(tup) + ) + + return TupleType(tuple_list) + else: + raise error.ABITypeError( + "cannot convert {} to an ABI type".format(s) + ) diff --git a/algosdk/abi/bool_type.py b/algosdk/abi/bool_type.py index a7b0a913..ee73ae5f 100644 --- a/algosdk/abi/bool_type.py +++ b/algosdk/abi/bool_type.py @@ -1,5 +1,7 @@ -from .base_type import ABIType -from .. import error +from typing import Union + +from algosdk.abi.base_type import ABIType +from algosdk import error class BoolType(ABIType): @@ -10,21 +12,21 @@ class BoolType(ABIType): def __init__(self) -> None: super().__init__() - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, BoolType): return False return True - def __str__(self): + def __str__(self) -> str: return "bool" - def byte_len(self): + def byte_len(self) -> int: return 1 - def is_dynamic(self): + def is_dynamic(self) -> bool: return False - def encode(self, value): + def encode(self, value: bool) -> bytes: """ Encode a boolean value @@ -40,7 +42,7 @@ def encode(self, value): return b"\x80" return b"\x00" - def decode(self, bytestring): + def decode(self, bytestring: Union[bytes, bytearray]) -> bool: """ Decodes a bytestring to a single boolean. diff --git a/algosdk/abi/byte_type.py b/algosdk/abi/byte_type.py index ccca8087..e719e80e 100644 --- a/algosdk/abi/byte_type.py +++ b/algosdk/abi/byte_type.py @@ -1,5 +1,7 @@ -from .base_type import ABIType -from .. import error +from typing import Union + +from algosdk.abi.base_type import ABIType +from algosdk import error class ByteType(ABIType): @@ -10,21 +12,21 @@ class ByteType(ABIType): def __init__(self) -> None: super().__init__() - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, ByteType): return False return True - def __str__(self): + def __str__(self) -> str: return "byte" - def byte_len(self): + def byte_len(self) -> int: return 1 - def is_dynamic(self): + def is_dynamic(self) -> bool: return False - def encode(self, value): + def encode(self, value: int) -> bytes: """ Encode a single byte or a uint8 @@ -40,7 +42,7 @@ def encode(self, value): ) return bytes([value]) - def decode(self, bytestring): + def decode(self, bytestring: Union[bytes, bytearray]) -> bytes: """ Decodes a bytestring to a single byte. diff --git a/algosdk/abi/contract.py b/algosdk/abi/contract.py index 054e3ee1..d607a8d8 100644 --- a/algosdk/abi/contract.py +++ b/algosdk/abi/contract.py @@ -1,4 +1,5 @@ import json +from typing import List, Union from algosdk.abi.method import Method @@ -13,12 +14,12 @@ class Contract: methods (list): list of Method objects """ - def __init__(self, name, app_id, methods) -> None: + def __init__(self, name: str, app_id: int, methods: List[Method]) -> None: self.name = name self.app_id = int(app_id) self.methods = methods - def __eq__(self, o) -> bool: + def __eq__(self, o: object) -> bool: if not isinstance(o, Contract): return False return ( @@ -28,11 +29,11 @@ def __eq__(self, o) -> bool: ) @staticmethod - def from_json(resp): + def from_json(resp: Union[str, bytes, bytearray]) -> "Contract": d = json.loads(resp) return Contract.undictify(d) - def dictify(self): + def dictify(self) -> dict: d = {} d["name"] = self.name d["appId"] = self.app_id @@ -40,7 +41,7 @@ def dictify(self): return d @staticmethod - def undictify(d): + def undictify(d: dict) -> "Contract": name = d["name"] app_id = d["appId"] method_list = [Method.undictify(method) for method in d["methods"]] diff --git a/algosdk/abi/interface.py b/algosdk/abi/interface.py index 64bccdfe..ff42d809 100644 --- a/algosdk/abi/interface.py +++ b/algosdk/abi/interface.py @@ -1,4 +1,5 @@ import json +from typing import List, Union from algosdk.abi.method import Method @@ -12,28 +13,28 @@ class Interface: methods (list): list of Method objects """ - def __init__(self, name, methods): + def __init__(self, name: str, methods: List[Method]) -> None: self.name = name self.methods = methods - def __eq__(self, o): + def __eq__(self, o: object) -> bool: if not isinstance(o, Interface): return False return self.name == o.name and self.methods == o.methods @staticmethod - def from_json(resp): + def from_json(resp: Union[str, bytes, bytearray]) -> "Interface": d = json.loads(resp) return Interface.undictify(d) - def dictify(self): + def dictify(self) -> dict: d = {} d["name"] = self.name d["methods"] = [m.dictify() for m in self.methods] return d @staticmethod - def undictify(d): + def undictify(d: dict) -> "Interface": name = d["name"] method_list = [Method.undictify(method) for method in d["methods"]] return Interface(name=name, methods=method_list) diff --git a/algosdk/abi/method.py b/algosdk/abi/method.py index 1254a60c..011310ba 100644 --- a/algosdk/abi/method.py +++ b/algosdk/abi/method.py @@ -1,4 +1,5 @@ import json +from typing import List, Union from Cryptodome.Hash import SHA512 @@ -28,7 +29,13 @@ class Method: desc (string, optional): optional description of the method """ - def __init__(self, name, args, returns, desc=None) -> None: + def __init__( + self, + name: str, + args: List["Argument"], + returns: "Returns", + desc: str = None, + ) -> None: self.name = name self.args = args self.desc = desc @@ -41,7 +48,7 @@ def __init__(self, name, args, returns, desc=None) -> None: txn_count += 1 self.txn_calls = txn_count - def __eq__(self, o) -> bool: + def __eq__(self, o: object) -> bool: if not isinstance(o, Method): return False return ( @@ -52,12 +59,12 @@ def __eq__(self, o) -> bool: and self.txn_calls == o.txn_calls ) - def get_signature(self): + def get_signature(self) -> str: arg_string = ",".join(str(arg.type) for arg in self.args) - ret_string = self.returns.type if self.returns else "void" + ret_string = self.returns.type return "{}({}){}".format(self.name, arg_string, ret_string) - def get_selector(self): + def get_selector(self) -> bytes: """ Returns the ABI method signature, which is the first four bytes of the SHA-512/256 hash of the method signature. @@ -69,14 +76,14 @@ def get_selector(self): hash.update(self.get_signature().encode("utf-8")) return hash.digest()[:4] - def get_txn_calls(self): + def get_txn_calls(self) -> int: """ Returns the number of transactions needed to invoke this ABI method. """ return self.txn_calls @staticmethod - def _parse_string(s): + def _parse_string(s: str) -> list: # Parses a method signature into three tokens, returned as a list: # e.g. 'a(b,c)d' -> ['a', 'b,c', 'd'] stack = [] @@ -88,19 +95,19 @@ def _parse_string(s): break left_index = stack.pop() if not stack: - return (s[:left_index], s[left_index + 1 : i], s[i + 1 :]) + return [s[:left_index], s[left_index + 1 : i], s[i + 1 :]] raise error.ABIEncodingError( "ABI method string has mismatched parentheses: {}".format(s) ) @staticmethod - def from_json(resp): + def from_json(resp: Union[str, bytes, bytearray]) -> "Method": method_dict = json.loads(resp) return Method.undictify(method_dict) @staticmethod - def from_signature(s): + def from_signature(s: str) -> "Method": # Split string into tokens around outer parentheses. # The first token should always be the name of the method, # the second token should be the arguments as a tuple, @@ -112,23 +119,20 @@ def from_signature(s): return_type = Returns(tokens[-1]) return Method(name=tokens[0], args=argument_list, returns=return_type) - def dictify(self): + def dictify(self) -> dict: d = {} d["name"] = self.name d["args"] = [arg.dictify() for arg in self.args] - if self.returns: - d["returns"] = self.returns.dictify() + d["returns"] = self.returns.dictify() if self.desc: d["desc"] = self.desc return d @staticmethod - def undictify(d): + def undictify(d: dict) -> "Method": name = d["name"] arg_list = [Argument.undictify(arg) for arg in d["args"]] - return_obj = ( - Returns.undictify(d["returns"]) if "returns" in d else None - ) + return_obj = Returns.undictify(d["returns"]) desc = d["desc"] if "desc" in d else None return Method(name=name, args=arg_list, returns=return_obj, desc=desc) @@ -143,26 +147,28 @@ class Argument: desc (string, optional): description of this method argument """ - def __init__(self, arg_type, name=None, desc=None) -> None: + def __init__( + self, arg_type: str, name: str = None, desc: str = None + ) -> None: if arg_type in TRANSACTION_ARGS: self.type = arg_type else: # If the type cannot be parsed into an ABI type, it will error - self.type = abi.util.type_from_string(arg_type) + self.type = abi.ABIType.from_string(arg_type) self.name = name self.desc = desc - def __eq__(self, o) -> bool: + def __eq__(self, o: object) -> bool: if not isinstance(o, Argument): return False return ( self.name == o.name and self.type == o.type and self.desc == o.desc ) - def __str__(self): + def __str__(self) -> str: return str(self.type) - def dictify(self): + def dictify(self) -> dict: d = {} d["type"] = str(self.type) if self.name: @@ -172,7 +178,7 @@ def dictify(self): return d @staticmethod - def undictify(d): + def undictify(d: dict) -> "Argument": return Argument( arg_type=d["type"], name=d["name"] if "name" in d else None, @@ -192,23 +198,23 @@ class Returns: # Represents a void return. VOID = "void" - def __init__(self, arg_type, desc=None) -> None: + def __init__(self, arg_type: str, desc: str = None) -> None: if arg_type == "void": self.type = self.VOID else: # If the type cannot be parsed into an ABI type, it will error. - self.type = abi.util.type_from_string(arg_type) + self.type = abi.ABIType.from_string(arg_type) self.desc = desc - def __eq__(self, o) -> bool: + def __eq__(self, o: object) -> bool: if not isinstance(o, Returns): return False return self.type == o.type and self.desc == o.desc - def __str__(self): + def __str__(self) -> str: return str(self.type) - def dictify(self): + def dictify(self) -> dict: d = {} d["type"] = str(self.type) if self.desc: @@ -216,7 +222,7 @@ def dictify(self): return d @staticmethod - def undictify(d): + def undictify(d: dict) -> "Returns": return Returns( arg_type=d["type"], desc=d["desc"] if "desc" in d else None ) diff --git a/algosdk/abi/string_type.py b/algosdk/abi/string_type.py index b2c207df..c5182e6e 100644 --- a/algosdk/abi/string_type.py +++ b/algosdk/abi/string_type.py @@ -1,5 +1,7 @@ -from .base_type import ABI_LENGTH_SIZE, ABIType -from .. import error +from typing import NoReturn, Union + +from algosdk.abi.base_type import ABI_LENGTH_SIZE, ABIType +from algosdk import error class StringType(ABIType): @@ -10,23 +12,23 @@ class StringType(ABIType): def __init__(self) -> None: super().__init__() - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, StringType): return False return True - def __str__(self): + def __str__(self) -> str: return "string" - def byte_len(self): + def byte_len(self) -> NoReturn: raise error.ABITypeError( "cannot get length of a dynamic type: {}".format(self) ) - def is_dynamic(self): + def is_dynamic(self) -> bool: return True - def encode(self, string_val): + def encode(self, string_val: str) -> bytes: """ Encode a value into a String ABI bytestring. @@ -40,7 +42,7 @@ def encode(self, string_val): encoded = string_val.encode("utf-8") return length_to_encode + encoded - def decode(self, bytestring): + def decode(self, bytestring: Union[bytes, bytearray]) -> str: """ Decodes a bytestring to a string. diff --git a/algosdk/abi/tuple_type.py b/algosdk/abi/tuple_type.py index 7ec5095e..b384381e 100644 --- a/algosdk/abi/tuple_type.py +++ b/algosdk/abi/tuple_type.py @@ -1,6 +1,8 @@ -from .base_type import ABI_LENGTH_SIZE, ABIType -from .bool_type import BoolType -from .. import error +from typing import Any, List, Union + +from algosdk.abi.base_type import ABI_LENGTH_SIZE, ABIType +from algosdk.abi.bool_type import BoolType +from algosdk import error class TupleType(ABIType): @@ -8,13 +10,13 @@ class TupleType(ABIType): Represents a Tuple ABI Type for encoding. Args: - child_types (list): list of types in the tuple. + arg_types (list): list of types in the tuple. Attributes: child_types (list) """ - def __init__(self, arg_types) -> None: + def __init__(self, arg_types: List[Any]) -> None: if len(arg_types) >= 2 ** 16: raise error.ABITypeError( "tuple args cannot exceed a uint16: {}".format(len(arg_types)) @@ -22,15 +24,15 @@ def __init__(self, arg_types) -> None: super().__init__() self.child_types = arg_types - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, TupleType): return False return self.child_types == other.child_types - def __str__(self): + def __str__(self) -> str: return "({})".format(",".join(str(t) for t in self.child_types)) - def byte_len(self): + def byte_len(self) -> int: size = 0 i = 0 while i < len(self.child_types): @@ -47,11 +49,11 @@ def byte_len(self): i += 1 return size - def is_dynamic(self): + def is_dynamic(self) -> bool: return any(child.is_dynamic() for child in self.child_types) @staticmethod - def _find_bool(type_list, index, delta): + def _find_bool(type_list: List[ABIType], index: int, delta: int) -> int: """ Helper function to find consecutive booleans from current index in a tuple. """ @@ -71,7 +73,7 @@ def _find_bool(type_list, index, delta): return until @staticmethod - def _parse_tuple(s): + def _parse_tuple(s: str) -> list: """ Given a tuple string, parses one layer of the tuple and returns tokens as a list. i.e. 'x,(y,(z))' -> ['x', '(y,(z))'] @@ -112,7 +114,7 @@ def _parse_tuple(s): return tuple_strs @staticmethod - def _compress_multiple_bool(value_list): + def _compress_multiple_bool(value_list: List[bool]) -> int: """ Compress consecutive boolean values into a byte for a Tuple/Array. """ @@ -128,13 +130,15 @@ def _compress_multiple_bool(value_list): result |= 1 << (7 - i) return result - def encode(self, values): + def encode(self, values: Union[List[Any], bytes, bytearray]) -> bytes: """ Encodes a list of values into a TupleType ABI bytestring. Args: - values (list): list of values to be encoded. + values (list | bytes | bytearray): list of values to be encoded. The length of the list cannot exceed a uint16. + If the child types are ByteType, then bytes or bytearray can be + passed in to be encoded as well. Returns: bytes: encoded bytes of the tuple @@ -208,7 +212,7 @@ def encode(self, values): # Concatenate bytes return b"".join(heads) + b"".join(tails) - def decode(self, bytestring): + def decode(self, bytestring: Union[bytes, bytearray]) -> list: """ Decodes a bytestring to a tuple list. diff --git a/algosdk/abi/ufixed_type.py b/algosdk/abi/ufixed_type.py index d86cfe3c..9827cab1 100644 --- a/algosdk/abi/ufixed_type.py +++ b/algosdk/abi/ufixed_type.py @@ -1,5 +1,7 @@ -from .base_type import ABIType -from .. import error +from typing import Union + +from algosdk.abi.base_type import ABIType +from algosdk import error class UfixedType(ABIType): @@ -7,15 +9,15 @@ class UfixedType(ABIType): Represents an Ufixed ABI Type for encoding. Args: - bit_size (int, optional): size of a ufixed type. - precision (int, optional): number of precision for a ufixed type. + type_size (int): size of a ufixed type. + type_precision (int): number of precision for a ufixed type. Attributes: bit_size (int) precision (int) """ - def __init__(self, type_size, type_precision) -> None: + def __init__(self, type_size: int, type_precision: int) -> None: if ( not isinstance(type_size, int) or type_size % 8 != 0 @@ -37,7 +39,7 @@ def __init__(self, type_size, type_precision) -> None: self.bit_size = type_size self.precision = type_precision - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, UfixedType): return False return ( @@ -45,16 +47,16 @@ def __eq__(self, other) -> bool: and self.precision == other.precision ) - def __str__(self): + def __str__(self) -> str: return "ufixed{}x{}".format(self.bit_size, self.precision) - def byte_len(self): + def byte_len(self) -> int: return self.bit_size // 8 - def is_dynamic(self): + def is_dynamic(self) -> bool: return False - def encode(self, value): + def encode(self, value: int) -> bytes: """ Encodes a value into a Ufixed ABI type bytestring. The precision denotes the denominator and the value denotes the numerator. @@ -77,7 +79,7 @@ def encode(self, value): ) return value.to_bytes(self.bit_size // 8, byteorder="big") - def decode(self, bytestring): + def decode(self, bytestring: Union[bytes, bytearray]) -> int: """ Decodes a bytestring to a ufixed numerator. diff --git a/algosdk/abi/uint_type.py b/algosdk/abi/uint_type.py index b22c26ce..44dd2648 100644 --- a/algosdk/abi/uint_type.py +++ b/algosdk/abi/uint_type.py @@ -1,5 +1,7 @@ -from .base_type import ABIType -from .. import error +from typing import Union + +from algosdk.abi.base_type import ABIType +from algosdk import error class UintType(ABIType): @@ -7,13 +9,13 @@ class UintType(ABIType): Represents an Uint ABI Type for encoding. Args: - bit_size (int, optional): size of a uint type, e.g. for a uint8, the bit_size is 8. + bit_size (int): size of a uint type, e.g. for a uint8, the bit_size is 8. Attributes: bit_size (int) """ - def __init__(self, type_size) -> None: + def __init__(self, type_size: int) -> None: if ( not isinstance(type_size, int) or type_size % 8 != 0 @@ -26,21 +28,21 @@ def __init__(self, type_size) -> None: super().__init__() self.bit_size = type_size - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: if not isinstance(other, UintType): return False return self.bit_size == other.bit_size - def __str__(self): + def __str__(self) -> str: return "uint{}".format(self.bit_size) - def byte_len(self): + def byte_len(self) -> int: return self.bit_size // 8 - def is_dynamic(self): + def is_dynamic(self) -> bool: return False - def encode(self, value): + def encode(self, value: int) -> bytes: """ Encodes a value into a Uint ABI type bytestring. @@ -62,7 +64,7 @@ def encode(self, value): ) return value.to_bytes(self.bit_size // 8, byteorder="big") - def decode(self, bytestring): + def decode(self, bytestring: Union[bytes, bytearray]) -> int: """ Decodes a bytestring to a uint. diff --git a/algosdk/abi/util.py b/algosdk/abi/util.py deleted file mode 100644 index 5deca770..00000000 --- a/algosdk/abi/util.py +++ /dev/null @@ -1,85 +0,0 @@ -import re - -from .uint_type import UintType -from .ufixed_type import UfixedType -from .byte_type import ByteType -from .bool_type import BoolType -from .address_type import AddressType -from .string_type import StringType -from .array_dynamic_type import ArrayDynamicType -from .array_static_type import ArrayStaticType -from .tuple_type import TupleType -from .. import error - - -# Globals -UFIXED_REGEX = r"^ufixed([1-9][\d]*)x([1-9][\d]*)$" -STATIC_ARRAY_REGEX = r"^([a-z\d\[\](),]+)\[([1-9][\d]*)]$" - - -def type_from_string(s): - """ - Convert a valid ABI string to a corresponding ABI type. - """ - if s.endswith("[]"): - array_arg_type = type_from_string(s[:-2]) - return ArrayDynamicType(array_arg_type) - elif s.endswith("]"): - matches = re.search(STATIC_ARRAY_REGEX, s) - try: - static_length = int(matches.group(2)) - array_type = type_from_string(matches.group(1)) - return ArrayStaticType(array_type, static_length) - except Exception as e: - raise error.ABITypeError( - "malformed static array string: {}".format(s) - ) from e - if s.startswith("uint"): - try: - if not s[4:].isdecimal(): - raise error.ABITypeError( - "uint string does not contain a valid size: {}".format(s) - ) - type_size = int(s[4:]) - return UintType(type_size) - except Exception as e: - raise error.ABITypeError( - "malformed uint string: {}".format(s) - ) from e - elif s == "byte": - return ByteType() - elif s.startswith("ufixed"): - matches = re.search(UFIXED_REGEX, s) - try: - bit_size = int(matches.group(1)) - precision = int(matches.group(2)) - return UfixedType(bit_size, precision) - except Exception as e: - raise error.ABITypeError( - "malformed ufixed string: {}".format(s) - ) from e - elif s == "bool": - return BoolType() - elif s == "address": - return AddressType() - elif s == "string": - return StringType() - elif len(s) >= 2 and s[0] == "(" and s[-1] == ")": - # Recursively parse parentheses from a tuple string - tuples = TupleType._parse_tuple(s[1:-1]) - tuple_list = [] - for tup in tuples: - if isinstance(tup, str): - tt = type_from_string(tup) - tuple_list.append(tt) - elif isinstance(tup, list): - tts = [type_from_string(t_) for t_ in tup] - tuple_list.append(tts) - else: - raise error.ABITypeError( - "cannot convert {} to an ABI type".format(tup) - ) - - return TupleType(tuple_list) - else: - raise error.ABITypeError("cannot convert {} to an ABI type".format(s)) diff --git a/algosdk/atomic_transaction_composer.py b/algosdk/atomic_transaction_composer.py index 156dee20..9496231a 100644 --- a/algosdk/atomic_transaction_composer.py +++ b/algosdk/atomic_transaction_composer.py @@ -2,9 +2,12 @@ import base64 import copy from enum import IntEnum +from typing import Any, List, Union from algosdk import abi, error +from algosdk.abi.base_type import ABIType from algosdk.future import transaction +from algosdk.v2client import algod # The first four bytes of an ABI method call return must have this hash ABI_RETURN_HASH = b"\x15\x1f\x7c\x75" @@ -56,19 +59,19 @@ def __init__(self) -> None: self.signed_txns = [] self.tx_ids = [] - def get_status(self): + def get_status(self) -> AtomicTransactionComposerStatus: """ Returns the status of this composer's transaction group. """ return self.status - def get_tx_count(self): + def get_tx_count(self) -> int: """ Returns the number of transactions currently in this atomic group. """ return len(self.txn_list) - def clone(self): + def clone(self) -> "AtomicTransactionComposer": """ Creates a new composer with the same underlying transactions. The new composer's status will be BUILDING, so additional transactions @@ -82,7 +85,9 @@ def clone(self): cloned.status = AtomicTransactionComposerStatus.BUILDING return cloned - def add_transaction(self, txn_and_signer): + def add_transaction( + self, txn_and_signer: "TransactionWithSigner" + ) -> "AtomicTransactionComposer": """ Adds a transaction to this atomic group. @@ -116,17 +121,17 @@ def add_transaction(self, txn_and_signer): def add_method_call( self, - app_id, - method, - sender, - sp, - signer, - method_args=None, - on_complete=transaction.OnComplete.NoOpOC, - note=None, - lease=None, - rekey_to=None, - ): + app_id: int, + method: abi.method.Method, + sender: str, + sp: transaction.SuggestedParams, + signer: "TransactionSigner", + method_args: List[Union[Any]] = None, + on_complete: transaction.OnComplete = transaction.OnComplete.NoOpOC, + note: bytes = None, + lease: bytes = None, + rekey_to: str = None, + ) -> "AtomicTransactionComposer": """ Add a smart contract method call to this atomic group. @@ -229,7 +234,7 @@ def add_method_call( self.method_dict[len(self.txn_list) - 1] = method return self - def build_group(self): + def build_group(self) -> list: """ Finalize the transaction group and returns the finalized transactions with signers. The composer's status will be at least BUILT after executing this method. @@ -254,7 +259,7 @@ def build_group(self): self.status = AtomicTransactionComposerStatus.BUILT return self.txn_list - def gather_signatures(self): + def gather_signatures(self) -> list: """ Obtain signatures for each transaction in this group. If signatures have already been obtained, this method will return cached versions of the signatures. @@ -293,7 +298,7 @@ def gather_signatures(self): self.signed_txns = stxn_list return self.signed_txns - def submit(self, client): + def submit(self, client: algod.AlgodClient) -> list: """ Send the transaction group to the network, but don't wait for it to be committed to a block. An error will be thrown if submission fails. @@ -319,7 +324,9 @@ def submit(self, client): self.status = AtomicTransactionComposerStatus.SUBMITTED return self.tx_ids - def execute(self, client, wait_rounds): + def execute( + self, client: algod.AlgodClient, wait_rounds: int + ) -> "AtomicTransactionResponse": """ Send the transaction group to the network and wait until it's committed to a block. An error will be thrown if submission or execution fails. @@ -419,7 +426,9 @@ def __init__(self) -> None: pass @abstractmethod - def sign_transactions(self, txn_group, indexes): + def sign_transactions( + self, txn_group: List[transaction.Transaction], indexes: List[int] + ) -> list: pass @@ -432,11 +441,13 @@ class AccountTransactionSigner(TransactionSigner): private_key (str): private key of signing account """ - def __init__(self, private_key) -> None: + def __init__(self, private_key: str) -> None: super().__init__() self.private_key = private_key - def sign_transactions(self, txn_group, indexes): + def sign_transactions( + self, txn_group: List[transaction.Transaction], indexes: List[int] + ) -> list: """ Sign transactions in a transaction group given the indexes. @@ -464,11 +475,13 @@ class LogicSigTransactionSigner(TransactionSigner): lsig (LogicSigAccount): LogicSig account """ - def __init__(self, lsig) -> None: + def __init__(self, lsig: transaction.LogicSigAccount) -> None: super().__init__() self.lsig = lsig - def sign_transactions(self, txn_group, indexes): + def sign_transactions( + self, txn_group: List[transaction.Transaction], indexes: List[int] + ) -> list: """ Sign transactions in a transaction group given the indexes. @@ -497,12 +510,14 @@ class MultisigTransactionSigner(TransactionSigner): sks (str): private keys of multisig """ - def __init__(self, msig, sks) -> None: + def __init__(self, msig: transaction.Multisig, sks: str) -> None: super().__init__() self.msig = msig self.sks = sks - def sign_transactions(self, txn_group, indexes): + def sign_transactions( + self, txn_group: List[transaction.Transaction], indexes: List[int] + ) -> list: """ Sign transactions in a transaction group given the indexes. @@ -524,13 +539,21 @@ def sign_transactions(self, txn_group, indexes): class TransactionWithSigner: - def __init__(self, txn, signer) -> None: + def __init__( + self, txn: transaction.Transaction, signer: TransactionSigner + ) -> None: self.txn = txn self.signer = signer class ABIResult: - def __init__(self, tx_id, raw_value, return_value, decode_error) -> None: + def __init__( + self, + tx_id: int, + raw_value: bytes, + return_value: Any, + decode_error: error, + ) -> None: self.tx_id = tx_id self.raw_value = raw_value self.return_value = return_value @@ -538,7 +561,9 @@ def __init__(self, tx_id, raw_value, return_value, decode_error) -> None: class AtomicTransactionResponse: - def __init__(self, confirmed_round, tx_ids, results) -> None: + def __init__( + self, confirmed_round: int, tx_ids: List[str], results: ABIResult + ) -> None: self.confirmed_round = confirmed_round self.tx_ids = tx_ids self.abi_results = results diff --git a/test_unit.py b/test_unit.py index 24596f45..f0fe5ed5 100644 --- a/test_unit.py +++ b/test_unit.py @@ -20,7 +20,7 @@ wordlist, ) from algosdk.abi import ( - type_from_string, + ABIType, UintType, UfixedType, BoolType, @@ -3622,7 +3622,7 @@ def test_make_type_valid(self): for uint_index in range(8, 513, 8): uint_type = UintType(uint_index) self.assertEqual(str(uint_type), f"uint{uint_index}") - actual = type_from_string(str(uint_type)) + actual = ABIType.from_string(str(uint_type)) self.assertEqual(uint_type, actual) # Test for ufixed @@ -3632,7 +3632,7 @@ def test_make_type_valid(self): self.assertEqual( str(ufixed_type), f"ufixed{size_index}x{precision_index}" ) - actual = type_from_string(str(ufixed_type)) + actual = ABIType.from_string(str(ufixed_type)) self.assertEqual(ufixed_type, actual) test_cases = [ @@ -3725,7 +3725,7 @@ def test_make_type_valid(self): ] for test_case in test_cases: self.assertEqual(str(test_case[0]), test_case[1]) - self.assertEqual(test_case[0], type_from_string(test_case[1])) + self.assertEqual(test_case[0], ABIType.from_string(test_case[1])) def test_make_type_invalid(self): # Test for invalid uint @@ -3789,7 +3789,7 @@ def test_type_from_string_invalid(self): ) for test_case in test_cases: with self.assertRaises(error.ABITypeError) as e: - type_from_string(test_case) + ABIType.from_string(test_case) def test_is_dynamic(self): test_cases = [ @@ -3804,16 +3804,16 @@ def test_is_dynamic(self): True, ), # Test tuple child types - (type_from_string("(string[100])"), True), - (type_from_string("(address,bool,uint256)"), False), - (type_from_string("(uint8,(byte[10]))"), False), - (type_from_string("(string,uint256)"), True), + (ABIType.from_string("(string[100])"), True), + (ABIType.from_string("(address,bool,uint256)"), False), + (ABIType.from_string("(uint8,(byte[10]))"), False), + (ABIType.from_string("(string,uint256)"), True), ( - type_from_string("(bool,(ufixed16x10[],(byte,address)))"), + ABIType.from_string("(bool,(ufixed16x10[],(byte,address)))"), True, ), ( - type_from_string("(bool,(uint256,(byte,address,string)))"), + ABIType.from_string("(bool,(uint256,(byte,address,string)))"), True, ), ] @@ -3827,23 +3827,23 @@ def test_byte_len(self): (BoolType(), 1), (UintType(64), 8), (UfixedType(256, 50), 32), - (type_from_string("bool[81]"), 11), - (type_from_string("bool[80]"), 10), - (type_from_string("bool[88]"), 11), - (type_from_string("address[5]"), 160), - (type_from_string("uint16[20]"), 40), - (type_from_string("ufixed64x20[10]"), 80), - (type_from_string(f"(address,byte,ufixed16x20)"), 35), + (ABIType.from_string("bool[81]"), 11), + (ABIType.from_string("bool[80]"), 10), + (ABIType.from_string("bool[88]"), 11), + (ABIType.from_string("address[5]"), 160), + (ABIType.from_string("uint16[20]"), 40), + (ABIType.from_string("ufixed64x20[10]"), 80), + (ABIType.from_string(f"(address,byte,ufixed16x20)"), 35), ( - type_from_string( + ABIType.from_string( f"((bool,address[10]),(bool,bool,bool),uint8[20])" ), 342, ), - (type_from_string(f"(bool,bool)"), 1), - (type_from_string(f"({'bool,'*6}uint8)"), 2), + (ABIType.from_string(f"(bool,bool)"), 1), + (ABIType.from_string(f"({'bool,'*6}uint8)"), 2), ( - type_from_string(f"({'bool,'*10}uint8,{'bool,'*10}byte)"), + ABIType.from_string(f"({'bool,'*10}uint8,{'bool,'*10}byte)"), 6, ), ] @@ -4139,32 +4139,32 @@ def test_array_dynamic_encoding(self): def test_tuple_encoding(self): test_cases = [ ( - type_from_string("()"), + ABIType.from_string("()"), [], b"", ), ( - type_from_string("(bool[3])"), + ABIType.from_string("(bool[3])"), [[True, True, False]], bytes([0b11000000]), ), ( - type_from_string("(bool[])"), + ABIType.from_string("(bool[])"), [[True, True, False]], bytes.fromhex("00 02 00 03 C0"), ), ( - type_from_string("(bool[2],bool[])"), + ABIType.from_string("(bool[2],bool[])"), [[True, True], [True, True]], bytes.fromhex("C0 00 03 00 02 C0"), ), ( - type_from_string("(bool[],bool[])"), + ABIType.from_string("(bool[],bool[])"), [[], []], bytes.fromhex("00 04 00 06 00 00 00 00"), ), ( - type_from_string("(string,bool,bool,bool,bool,string)"), + ABIType.from_string("(string,bool,bool,bool,bool,string)"), ["AB", True, False, True, False, "DE"], bytes.fromhex("00 05 A0 00 09 00 02 41 42 00 02 44 45"), ), @@ -4190,7 +4190,7 @@ def test_method(self): self.assertEqual(m.get_selector(), b"\x8a\xa3\xb6\x1f") self.assertEqual( [(a.type) for a in m.args], - [type_from_string("uint64"), type_from_string("uint64")], + [ABIType.from_string("uint64"), ABIType.from_string("uint64")], ) self.assertEqual(m.get_txn_calls(), 1) @@ -4199,16 +4199,16 @@ def test_method(self): ( "add(uint64,uint64)uint128", b"\x8a\xa3\xb6\x1f", - [type_from_string("uint64"), type_from_string("uint64")], - type_from_string("uint128"), + [ABIType.from_string("uint64"), ABIType.from_string("uint64")], + ABIType.from_string("uint128"), 1, ), ( "tupler((string,uint16),bool)void", b"\x3d\x98\xe4\x5d", [ - type_from_string("(string,uint16)"), - type_from_string("bool"), + ABIType.from_string("(string,uint16)"), + ABIType.from_string("bool"), ], "void", 1, @@ -4216,15 +4216,15 @@ def test_method(self): ( "txcalls(pay,pay,axfer,byte)bool", b"\x05\x6d\x2e\xc0", - ["pay", "pay", "axfer", type_from_string("byte")], - type_from_string("bool"), + ["pay", "pay", "axfer", ABIType.from_string("byte")], + ABIType.from_string("bool"), 4, ), ( "getter()string", b"\xa2\x59\x11\x1d", [], - type_from_string("string"), + ABIType.from_string("string"), 1, ), ] @@ -4244,7 +4244,7 @@ def test_method(self): self.assertEqual(m.get_txn_calls(), test_case[4]) def test_interface(self): - test_json = '{"name": "Calculator","methods": [{ "name": "add", "args": [ { "name": "a", "type": "uint64", "desc": "..." },{ "name": "b", "type": "uint64", "desc": "..." } ] },{ "name": "multiply", "args": [ { "name": "a", "type": "uint64", "desc": "..." },{ "name": "b", "type": "uint64", "desc": "..." } ] }]}' + test_json = '{"name": "Calculator","methods": [{ "name": "add", "returns": {"type": "void"}, "args": [ { "name": "a", "type": "uint64", "desc": "..." },{ "name": "b", "type": "uint64", "desc": "..." } ] },{ "name": "multiply", "returns": {"type": "void"}, "args": [ { "name": "a", "type": "uint64", "desc": "..." },{ "name": "b", "type": "uint64", "desc": "..." } ] }]}' i = Interface.from_json(test_json) self.assertEqual(i.name, "Calculator") self.assertEqual( @@ -4253,7 +4253,7 @@ def test_interface(self): ) def test_contract(self): - test_json = '{"name": "Calculator","appId": 3, "methods": [{ "name": "add", "args": [ { "name": "a", "type": "uint64", "desc": "..." },{ "name": "b", "type": "uint64", "desc": "..." } ] },{ "name": "multiply", "args": [ { "name": "a", "type": "uint64", "desc": "..." },{ "name": "b", "type": "uint64", "desc": "..." } ] }]}' + test_json = '{"name": "Calculator","appId": 3, "methods": [{ "name": "add", "returns": {"type": "void"}, "args": [ { "name": "a", "type": "uint64", "desc": "..." },{ "name": "b", "type": "uint64", "desc": "..." } ] },{ "name": "multiply", "returns": {"type": "void"}, "args": [ { "name": "a", "type": "uint64", "desc": "..." },{ "name": "b", "type": "uint64", "desc": "..." } ] }]}' c = Contract.from_json(test_json) self.assertEqual(c.name, "Calculator") self.assertEqual(c.app_id, 3)