diff --git a/coverage/parser.py b/coverage/parser.py index a5ad2f5ce..1e2011e2d 100644 --- a/coverage/parser.py +++ b/coverage/parser.py @@ -3,20 +3,35 @@ """Code parsing for coverage.py.""" +from __future__ import annotations + import ast import collections import os import re +import sys import token import tokenize +from types import CodeType +from typing import ( + cast, TYPE_CHECKING, + Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple, +) + from coverage import env from coverage.bytecode import code_objects from coverage.debug import short_stack from coverage.exceptions import NoSource, NotPython, _StopEverything -from coverage.misc import contract, join_regex, nice_pair +from coverage.misc import join_regex, nice_pair from coverage.phystokens import generate_tokens +if TYPE_CHECKING: + # Protocol is new in 3.8. PYVERSIONS + from typing import Protocol +else: + class Protocol: # pylint: disable=missing-class-docstring + pass class PythonParser: """Parse code to find executable lines, excluded lines, etc. @@ -25,8 +40,12 @@ class PythonParser: involved. """ - @contract(text='unicode|None') - def __init__(self, text=None, filename=None, exclude=None): + def __init__( + self, + text: Optional[str]=None, + filename: Optional[str]=None, + exclude: Optional[str]=None, + ) -> None: """ Source can be provided as `text`, the text itself, or `filename`, from which the text will be read. Excluded lines are those that match @@ -35,8 +54,9 @@ def __init__(self, text=None, filename=None, exclude=None): """ assert text or filename, "PythonParser needs either text or filename" self.filename = filename or "" - self.text = text - if not self.text: + if text is not None: + self.text: str = text + else: from coverage.python import get_python_source try: self.text = get_python_source(self.filename) @@ -46,45 +66,45 @@ def __init__(self, text=None, filename=None, exclude=None): self.exclude = exclude # The text lines of the parsed code. - self.lines = self.text.split('\n') + self.lines: List[str] = self.text.split('\n') # The normalized line numbers of the statements in the code. Exclusions # are taken into account, and statements are adjusted to their first # lines. - self.statements = set() + self.statements: Set[int] = set() # The normalized line numbers of the excluded lines in the code, # adjusted to their first lines. - self.excluded = set() + self.excluded: Set[int] = set() # The raw_* attributes are only used in this class, and in # lab/parser.py to show how this class is working. # The line numbers that start statements, as reported by the line # number table in the bytecode. - self.raw_statements = set() + self.raw_statements: Set[int] = set() # The raw line numbers of excluded lines of code, as marked by pragmas. - self.raw_excluded = set() + self.raw_excluded: Set[int] = set() # The line numbers of class definitions. - self.raw_classdefs = set() + self.raw_classdefs: Set[int] = set() # The line numbers of docstring lines. - self.raw_docstrings = set() + self.raw_docstrings: Set[int] = set() # Internal detail, used by lab/parser.py. self.show_tokens = False # A dict mapping line numbers to lexical statement starts for # multi-line statements. - self._multiline = {} + self._multiline: Dict[int, int] = {} # Lazily-created arc data, and missing arc descriptions. - self._all_arcs = None - self._missing_arc_fragments = None + self._all_arcs: Optional[Set[TArc]] = None + self._missing_arc_fragments: Optional[TArcFragments] = None - def lines_matching(self, *regexes): + def lines_matching(self, *regexes: str) -> Set[int]: """Find the lines matching one of a list of regexes. Returns a set of line numbers, the lines that contain a match for one @@ -100,7 +120,7 @@ def lines_matching(self, *regexes): matches.add(i) return matches - def _raw_parse(self): + def _raw_parse(self) -> None: """Parse the source to find the interesting facts about its lines. A handful of attributes are updated. @@ -122,6 +142,7 @@ def _raw_parse(self): first_on_line = True nesting = 0 + assert self.text is not None tokgen = generate_tokens(self.text) for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen: if self.show_tokens: # pragma: debugging @@ -167,11 +188,11 @@ def _raw_parse(self): # http://stackoverflow.com/questions/1769332/x/1769794#1769794 self.raw_docstrings.update(range(slineno, elineno+1)) elif toktype == token.NEWLINE: - if first_line is not None and elineno != first_line: + if first_line is not None and elineno != first_line: # type: ignore[unreachable] # We're at the end of a line, and we've ended on a # different line than the first line of the statement, # so record a multi-line range. - for l in range(first_line, elineno+1): + for l in range(first_line, elineno+1): # type: ignore[unreachable] self._multiline[l] = first_line first_line = None first_on_line = True @@ -202,32 +223,32 @@ def _raw_parse(self): if env.PYBEHAVIOR.module_firstline_1 and self._multiline: self._multiline[1] = min(self.raw_statements) - def first_line(self, line): - """Return the first line number of the statement including `line`.""" - if line < 0: - line = -self._multiline.get(-line, -line) + def first_line(self, lineno: int) -> int: + """Return the first line number of the statement including `lineno`.""" + if lineno < 0: + lineno = -self._multiline.get(-lineno, -lineno) else: - line = self._multiline.get(line, line) - return line + lineno = self._multiline.get(lineno, lineno) + return lineno - def first_lines(self, lines): - """Map the line numbers in `lines` to the correct first line of the + def first_lines(self, linenos: Iterable[int]) -> Set[int]: + """Map the line numbers in `linenos` to the correct first line of the statement. Returns a set of the first lines. """ - return {self.first_line(l) for l in lines} + return {self.first_line(l) for l in linenos} - def translate_lines(self, lines): + def translate_lines(self, lines: Iterable[int]) -> Set[int]: """Implement `FileReporter.translate_lines`.""" return self.first_lines(lines) - def translate_arcs(self, arcs): + def translate_arcs(self, arcs: Iterable[TArc]) -> List[TArc]: """Implement `FileReporter.translate_arcs`.""" return [(self.first_line(a), self.first_line(b)) for (a, b) in arcs] - def parse_source(self): + def parse_source(self) -> None: """Parse source text to find executable lines, excluded lines, etc. Sets the .excluded and .statements attributes, normalized to the first @@ -252,7 +273,7 @@ def parse_source(self): starts = self.raw_statements - ignore self.statements = self.first_lines(starts) - ignore - def arcs(self): + def arcs(self) -> Set[TArc]: """Get information about the arcs available in the code. Returns a set of line number pairs. Line numbers have been normalized @@ -261,9 +282,10 @@ def arcs(self): """ if self._all_arcs is None: self._analyze_ast() + assert self._all_arcs is not None return self._all_arcs - def _analyze_ast(self): + def _analyze_ast(self) -> None: """Run the AstArcAnalyzer and save its results. `_all_arcs` is the set of arcs in the code. @@ -281,13 +303,13 @@ def _analyze_ast(self): self._missing_arc_fragments = aaa.missing_arc_fragments - def exit_counts(self): + def exit_counts(self) -> Dict[int, int]: """Get a count of exits from that each line. Excluded lines are excluded. """ - exit_counts = collections.defaultdict(int) + exit_counts: Dict[int, int] = collections.defaultdict(int) for l1, l2 in self.arcs(): if l1 < 0: # Don't ever report -1 as a line number @@ -308,10 +330,16 @@ def exit_counts(self): return exit_counts - def missing_arc_description(self, start, end, executed_arcs=None): + def missing_arc_description( + self, + start: int, + end: int, + executed_arcs: Optional[Set[TArc]]=None, + ) -> str: """Provide an English sentence describing a missing arc.""" if self._missing_arc_fragments is None: self._analyze_ast() + assert self._missing_arc_fragments is not None actual_start = start @@ -351,18 +379,23 @@ def missing_arc_description(self, start, end, executed_arcs=None): class ByteParser: """Parse bytecode to understand the structure of code.""" - @contract(text='unicode') - def __init__(self, text, code=None, filename=None): + def __init__( + self, + text: str, + code: Optional[CodeType]=None, + filename: Optional[str]=None, + ) -> None: self.text = text - if code: + if code is not None: self.code = code else: + assert filename is not None try: self.code = compile(text, filename, "exec") except SyntaxError as synerr: raise NotPython( "Couldn't parse '%s' as Python source: '%s' at line %d" % ( - filename, synerr.msg, synerr.lineno + filename, synerr.msg, synerr.lineno or 0 ) ) from synerr @@ -375,7 +408,7 @@ def __init__(self, text, code=None, filename=None): "Run coverage.py under another Python for this command." ) - def child_parsers(self): + def child_parsers(self) -> Iterable[ByteParser]: """Iterate over all the code objects nested within this one. The iteration includes `self` as its first value. @@ -383,7 +416,7 @@ def child_parsers(self): """ return (ByteParser(self.text, code=c) for c in code_objects(self.code)) - def _line_numbers(self): + def _line_numbers(self) -> Iterable[int]: """Yield the line numbers possible in this code object. Uses co_lnotab described in Python/compile.c to find the @@ -413,7 +446,7 @@ def _line_numbers(self): if line_num != last_line_num: yield line_num - def _find_statements(self): + def _find_statements(self) -> Iterable[int]: """Find the statements in `self.code`. Produce a sequence of line numbers that start statements. Recurses @@ -429,7 +462,37 @@ def _find_statements(self): # AST analysis # -class BlockBase: +class ArcStart(collections.namedtuple("Arc", "lineno, cause")): + """The information needed to start an arc. + + `lineno` is the line number the arc starts from. + + `cause` is an English text fragment used as the `startmsg` for + AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an + arc wasn't executed, so should fit well into a sentence of the form, + "Line 17 didn't run because {cause}." The fragment can include "{lineno}" + to have `lineno` interpolated into it. + + """ + def __new__(cls, lineno: int, cause: Optional[str]=None) -> ArcStart: + return super().__new__(cls, lineno, cause) + + +class TAddArcFn(Protocol): + """The type for AstArcAnalyzer.add_arc().""" + def __call__( + self, + start: int, + end: int, + smsg: Optional[str]=None, + emsg: Optional[str]=None, + ) -> None: + ... + +TArc = Tuple[int, int] +TArcFragments = Dict[TArc, List[Tuple[Optional[str], Optional[str]]]] + +class Block: """ Blocks need to handle various exiting statements in their own ways. @@ -439,56 +502,54 @@ class BlockBase: stack. """ # pylint: disable=unused-argument - def process_break_exits(self, exits, add_arc): + def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: """Process break exits.""" # Because break can only appear in loops, and most subclasses # implement process_break_exits, this function is never reached. raise AssertionError - def process_continue_exits(self, exits, add_arc): + def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: """Process continue exits.""" # Because continue can only appear in loops, and most subclasses # implement process_continue_exits, this function is never reached. raise AssertionError - def process_raise_exits(self, exits, add_arc): + def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: """Process raise exits.""" return False - def process_return_exits(self, exits, add_arc): + def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: """Process return exits.""" return False -class LoopBlock(BlockBase): +class LoopBlock(Block): """A block on the block stack representing a `for` or `while` loop.""" - @contract(start=int) - def __init__(self, start): + def __init__(self, start: int) -> None: # The line number where the loop starts. self.start = start # A set of ArcStarts, the arcs from break statements exiting this loop. - self.break_exits = set() + self.break_exits: Set[ArcStart] = set() - def process_break_exits(self, exits, add_arc): + def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: self.break_exits.update(exits) return True - def process_continue_exits(self, exits, add_arc): + def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: for xit in exits: add_arc(xit.lineno, self.start, xit.cause) return True -class FunctionBlock(BlockBase): +class FunctionBlock(Block): """A block on the block stack representing a function definition.""" - @contract(start=int, name=str) - def __init__(self, start, name): + def __init__(self, start: int, name: str) -> None: # The line number where the function starts. self.start = start # The name of the function. self.name = name - def process_raise_exits(self, exits, add_arc): + def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: for xit in exits: add_arc( xit.lineno, -self.start, xit.cause, @@ -496,7 +557,7 @@ def process_raise_exits(self, exits, add_arc): ) return True - def process_return_exits(self, exits, add_arc): + def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: for xit in exits: add_arc( xit.lineno, -self.start, xit.cause, @@ -505,10 +566,9 @@ def process_return_exits(self, exits, add_arc): return True -class TryBlock(BlockBase): +class TryBlock(Block): """A block on the block stack representing a `try` block.""" - @contract(handler_start='int|None', final_start='int|None') - def __init__(self, handler_start, final_start): + def __init__(self, handler_start: Optional[int], final_start: Optional[int]) -> None: # The line number of the first "except" handler, if any. self.handler_start = handler_start # The line number of the "finally:" clause, if any. @@ -516,24 +576,24 @@ def __init__(self, handler_start, final_start): # The ArcStarts for breaks/continues/returns/raises inside the "try:" # that need to route through the "finally:" clause. - self.break_from = set() - self.continue_from = set() - self.raise_from = set() - self.return_from = set() + self.break_from: Set[ArcStart] = set() + self.continue_from: Set[ArcStart] = set() + self.raise_from: Set[ArcStart] = set() + self.return_from: Set[ArcStart] = set() - def process_break_exits(self, exits, add_arc): + def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: if self.final_start is not None: self.break_from.update(exits) return True return False - def process_continue_exits(self, exits, add_arc): + def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: if self.final_start is not None: self.continue_from.update(exits) return True return False - def process_raise_exits(self, exits, add_arc): + def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: if self.handler_start is not None: for xit in exits: add_arc(xit.lineno, self.handler_start, xit.cause) @@ -542,17 +602,16 @@ def process_raise_exits(self, exits, add_arc): self.raise_from.update(exits) return True - def process_return_exits(self, exits, add_arc): + def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: if self.final_start is not None: self.return_from.update(exits) return True return False -class WithBlock(BlockBase): +class WithBlock(Block): """A block on the block stack representing a `with` block.""" - @contract(start=int) - def __init__(self, start): + def __init__(self, start: int) -> None: # We only ever use this block if it is needed, so that we don't have to # check this setting in all the methods. assert env.PYBEHAVIOR.exit_through_with @@ -562,11 +621,16 @@ def __init__(self, start): # The ArcStarts for breaks/continues/returns/raises inside the "with:" # that need to go through the with-statement while exiting. - self.break_from = set() - self.continue_from = set() - self.return_from = set() - - def _process_exits(self, exits, add_arc, from_set=None): + self.break_from: Set[ArcStart] = set() + self.continue_from: Set[ArcStart] = set() + self.return_from: Set[ArcStart] = set() + + def _process_exits( + self, + exits: Set[ArcStart], + add_arc: TAddArcFn, + from_set: Optional[Set[ArcStart]]=None, + ) -> bool: """Helper to process the four kinds of exits.""" for xit in exits: add_arc(xit.lineno, self.start, xit.cause) @@ -574,43 +638,27 @@ def _process_exits(self, exits, add_arc, from_set=None): from_set.update(exits) return True - def process_break_exits(self, exits, add_arc): + def process_break_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: return self._process_exits(exits, add_arc, self.break_from) - def process_continue_exits(self, exits, add_arc): + def process_continue_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: return self._process_exits(exits, add_arc, self.continue_from) - def process_raise_exits(self, exits, add_arc): + def process_raise_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: return self._process_exits(exits, add_arc) - def process_return_exits(self, exits, add_arc): + def process_return_exits(self, exits: Set[ArcStart], add_arc: TAddArcFn) -> bool: return self._process_exits(exits, add_arc, self.return_from) -class ArcStart(collections.namedtuple("Arc", "lineno, cause")): - """The information needed to start an arc. - - `lineno` is the line number the arc starts from. - - `cause` is an English text fragment used as the `startmsg` for - AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an - arc wasn't executed, so should fit well into a sentence of the form, - "Line 17 didn't run because {cause}." The fragment can include "{lineno}" - to have `lineno` interpolated into it. - - """ - def __new__(cls, lineno, cause=None): - return super().__new__(cls, lineno, cause) - - -class NodeList: +class NodeList(ast.AST): """A synthetic fictitious node, containing a sequence of nodes. This is used when collapsing optimized if-statements, to represent the unconditional execution of one of the clauses. """ - def __init__(self, body): + def __init__(self, body: Sequence[ast.AST]) -> None: self.body = body self.lineno = body[0].lineno @@ -618,12 +666,19 @@ def __init__(self, body): # TODO: the cause messages have too many commas. # TODO: Shouldn't the cause messages join with "and" instead of "or"? +def _make_expression_code_method(noun: str) -> Callable[[AstArcAnalyzer, ast.AST], None]: + """A function to make methods for expression-based callable _code_object__ methods.""" + def _code_object__expression_callable(self: AstArcAnalyzer, node: ast.AST) -> None: + start = self.line_for_node(node) + self.add_arc(-start, start, None, f"didn't run the {noun} on line {start}") + self.add_arc(start, -start, None, f"didn't finish the {noun} on line {start}") + return _code_object__expression_callable + class AstArcAnalyzer: """Analyze source text with an AST to find executable code paths.""" - @contract(text='unicode', statements=set) - def __init__(self, text, statements, multiline): + def __init__(self, text: str, statements: Set[int], multiline: Dict[int, int]) -> None: self.root_node = ast.parse(text) # TODO: I think this is happening in too many places. self.statements = {multiline.get(l, l) for l in statements} @@ -639,20 +694,20 @@ def __init__(self, text, statements, multiline): print(f"Multiline map: {self.multiline}") ast_dump(self.root_node) - self.arcs = set() + self.arcs: Set[TArc] = set() # A map from arc pairs to a list of pairs of sentence fragments: # { (start, end): [(startmsg, endmsg), ...], } # # For an arc from line 17, they should be usable like: # "Line 17 {endmsg}, because {startmsg}" - self.missing_arc_fragments = collections.defaultdict(list) - self.block_stack = [] + self.missing_arc_fragments: TArcFragments = collections.defaultdict(list) + self.block_stack: List[Block] = [] # $set_env.py: COVERAGE_TRACK_ARCS - Trace possible arcs added while parsing code. self.debug = bool(int(os.environ.get("COVERAGE_TRACK_ARCS", 0))) - def analyze(self): + def analyze(self) -> None: """Examine the AST tree from `root_node` to determine possible arcs. This sets the `arcs` attribute to be a set of (from, to) line number @@ -665,8 +720,13 @@ def analyze(self): if code_object_handler is not None: code_object_handler(node) - @contract(start=int, end=int) - def add_arc(self, start, end, smsg=None, emsg=None): + def add_arc( + self, + start: int, + end: int, + smsg: Optional[str]=None, + emsg: Optional[str]=None, + ) -> None: """Add an arc, including message fragments to use if it is missing.""" if self.debug: # pragma: debugging print(f"\nAdding possible arc: ({start}, {end}): {smsg!r}, {emsg!r}") @@ -676,25 +736,27 @@ def add_arc(self, start, end, smsg=None, emsg=None): if smsg is not None or emsg is not None: self.missing_arc_fragments[(start, end)].append((smsg, emsg)) - def nearest_blocks(self): + def nearest_blocks(self) -> Iterable[Block]: """Yield the blocks in nearest-to-farthest order.""" return reversed(self.block_stack) - @contract(returns=int) - def line_for_node(self, node): + def line_for_node(self, node: ast.AST) -> int: """What is the right line number to use for this node? This dispatches to _line__Node functions where needed. """ node_name = node.__class__.__name__ - handler = getattr(self, "_line__" + node_name, None) + handler = cast( + Optional[Callable[[ast.AST], int]], + getattr(self, "_line__" + node_name, None) + ) if handler is not None: return handler(node) else: return node.lineno - def _line_decorated(self, node): + def _line_decorated(self, node: ast.FunctionDef) -> int: """Compute first line number for things that can be decorated (classes and functions).""" lineno = node.lineno if env.PYBEHAVIOR.trace_decorated_def or env.PYBEHAVIOR.def_ast_no_decorator: @@ -702,12 +764,12 @@ def _line_decorated(self, node): lineno = node.decorator_list[0].lineno return lineno - def _line__Assign(self, node): + def _line__Assign(self, node: ast.Assign) -> int: return self.line_for_node(node.value) _line__ClassDef = _line_decorated - def _line__Dict(self, node): + def _line__Dict(self, node: ast.Dict) -> int: if node.keys: if node.keys[0] is not None: return node.keys[0].lineno @@ -721,13 +783,13 @@ def _line__Dict(self, node): _line__FunctionDef = _line_decorated _line__AsyncFunctionDef = _line_decorated - def _line__List(self, node): + def _line__List(self, node: ast.List) -> int: if node.elts: return self.line_for_node(node.elts[0]) else: return node.lineno - def _line__Module(self, node): + def _line__Module(self, node: ast.Module) -> int: if env.PYBEHAVIOR.module_firstline_1: return 1 elif node.body: @@ -742,8 +804,7 @@ def _line__Module(self, node): "Import", "ImportFrom", "Nonlocal", "Pass", } - @contract(returns='ArcStarts') - def add_arcs(self, node): + def add_arcs(self, node: ast.AST) -> Set[ArcStart]: """Add the arcs for `node`. Return a set of ArcStarts, exits from this node to the next. Because a @@ -760,7 +821,10 @@ def add_arcs(self, node): """ node_name = node.__class__.__name__ - handler = getattr(self, "_handle__" + node_name, None) + handler = cast( + Optional[Callable[[ast.AST], Set[ArcStart]]], + getattr(self, "_handle__" + node_name, None) + ) if handler is not None: return handler(node) else: @@ -773,8 +837,12 @@ def add_arcs(self, node): # Default for simple statements: one exit from this node. return {ArcStart(self.line_for_node(node))} - @contract(returns='ArcStarts') - def add_body_arcs(self, body, from_start=None, prev_starts=None): + def add_body_arcs( + self, + body: Sequence[ast.AST], + from_start: Optional[ArcStart]=None, + prev_starts: Optional[Set[ArcStart]]=None + ) -> Set[ArcStart]: """Add arcs for the body of a compound statement. `body` is the body node. `from_start` is a single `ArcStart` that can @@ -786,21 +854,23 @@ def add_body_arcs(self, body, from_start=None, prev_starts=None): """ if prev_starts is None: + assert from_start is not None prev_starts = {from_start} for body_node in body: lineno = self.line_for_node(body_node) first_line = self.multiline.get(lineno, lineno) if first_line not in self.statements: - body_node = self.find_non_missing_node(body_node) - if body_node is None: + maybe_body_node = self.find_non_missing_node(body_node) + if maybe_body_node is None: continue + body_node = maybe_body_node lineno = self.line_for_node(body_node) for prev_start in prev_starts: self.add_arc(prev_start.lineno, lineno, prev_start.cause) prev_starts = self.add_arcs(body_node) return prev_starts - def find_non_missing_node(self, node): + def find_non_missing_node(self, node: ast.AST) -> Optional[ast.AST]: """Search `node` looking for a child that has not been optimized away. This might return the node you started with, or it will work recursively @@ -817,12 +887,15 @@ def find_non_missing_node(self, node): if first_line in self.statements: return node - missing_fn = getattr(self, "_missing__" + node.__class__.__name__, None) - if missing_fn: - node = missing_fn(node) + missing_fn = cast( + Optional[Callable[[ast.AST], Optional[ast.AST]]], + getattr(self, "_missing__" + node.__class__.__name__, None) + ) + if missing_fn is not None: + ret_node = missing_fn(node) else: - node = None - return node + ret_node = None + return ret_node # Missing nodes: _missing__* # @@ -831,7 +904,7 @@ def find_non_missing_node(self, node): # find_non_missing_node) to find a node to use instead of the missing # node. They can return None if the node should truly be gone. - def _missing__If(self, node): + def _missing__If(self, node: ast.If) -> Optional[ast.AST]: # If the if-node is missing, then one of its children might still be # here, but not both. So return the first of the two that isn't missing. # Use a NodeList to hold the clauses as a single node. @@ -842,14 +915,14 @@ def _missing__If(self, node): return self.find_non_missing_node(NodeList(node.orelse)) return None - def _missing__NodeList(self, node): + def _missing__NodeList(self, node: NodeList) -> Optional[ast.AST]: # A NodeList might be a mixture of missing and present nodes. Find the # ones that are present. non_missing_children = [] for child in node.body: - child = self.find_non_missing_node(child) - if child is not None: - non_missing_children.append(child) + maybe_child = self.find_non_missing_node(child) + if maybe_child is not None: + non_missing_children.append(maybe_child) # Return the simplest representation of the present children. if not non_missing_children: @@ -858,7 +931,7 @@ def _missing__NodeList(self, node): return non_missing_children[0] return NodeList(non_missing_children) - def _missing__While(self, node): + def _missing__While(self, node: ast.While) -> Optional[ast.AST]: body_nodes = self.find_non_missing_node(NodeList(node.body)) if not body_nodes: return None @@ -868,16 +941,17 @@ def _missing__While(self, node): new_while.test = ast.Name() new_while.test.lineno = body_nodes.lineno new_while.test.id = "True" + assert hasattr(body_nodes, "body") new_while.body = body_nodes.body - new_while.orelse = None + new_while.orelse = [] return new_while - def is_constant_expr(self, node): + def is_constant_expr(self, node: ast.AST) -> Optional[str]: """Is this a compile-time constant?""" node_name = node.__class__.__name__ if node_name in ["Constant", "NameConstant", "Num"]: return "Num" - elif node_name == "Name": + elif isinstance(node, ast.Name): if node.id in ["True", "False", "None", "__debug__"]: return "Name" return None @@ -889,7 +963,6 @@ def is_constant_expr(self, node): # listcomps hidden in lists: x = [[i for i in range(10)]] # nested function definitions - # Exit processing: process_*_exits # # These functions process the four kinds of jump exits: break, continue, @@ -898,29 +971,25 @@ def is_constant_expr(self, node): # enclosing loop block, or the nearest enclosing finally block, whichever # is nearer. - @contract(exits='ArcStarts') - def process_break_exits(self, exits): + def process_break_exits(self, exits: Set[ArcStart]) -> None: """Add arcs due to jumps from `exits` being breaks.""" for block in self.nearest_blocks(): # pragma: always breaks if block.process_break_exits(exits, self.add_arc): break - @contract(exits='ArcStarts') - def process_continue_exits(self, exits): + def process_continue_exits(self, exits: Set[ArcStart]) -> None: """Add arcs due to jumps from `exits` being continues.""" for block in self.nearest_blocks(): # pragma: always breaks if block.process_continue_exits(exits, self.add_arc): break - @contract(exits='ArcStarts') - def process_raise_exits(self, exits): + def process_raise_exits(self, exits: Set[ArcStart]) -> None: """Add arcs due to jumps from `exits` being raises.""" for block in self.nearest_blocks(): if block.process_raise_exits(exits, self.add_arc): break - @contract(exits='ArcStarts') - def process_return_exits(self, exits): + def process_return_exits(self, exits: Set[ArcStart]) -> None: """Add arcs due to jumps from `exits` being returns.""" for block in self.nearest_blocks(): # pragma: always breaks if block.process_return_exits(exits, self.add_arc): @@ -937,17 +1006,16 @@ def process_return_exits(self, exits): # Every node type that represents a statement should have a handler, or it # should be listed in OK_TO_DEFAULT. - @contract(returns='ArcStarts') - def _handle__Break(self, node): + def _handle__Break(self, node: ast.Break) -> Set[ArcStart]: here = self.line_for_node(node) break_start = ArcStart(here, cause="the break on line {lineno} wasn't executed") - self.process_break_exits([break_start]) + self.process_break_exits({break_start}) return set() - @contract(returns='ArcStarts') - def _handle_decorated(self, node): + def _handle_decorated(self, node: ast.FunctionDef) -> Set[ArcStart]: """Add arcs for things that can be decorated (classes and functions).""" - main_line = last = node.lineno + main_line: int = node.lineno + last: Optional[int] = node.lineno decs = node.decorator_list if decs: if env.PYBEHAVIOR.trace_decorated_def or env.PYBEHAVIOR.def_ast_no_decorator: @@ -957,6 +1025,7 @@ def _handle_decorated(self, node): if last is not None and dec_start != last: self.add_arc(last, dec_start) last = dec_start + assert last is not None if env.PYBEHAVIOR.trace_decorated_def: self.add_arc(last, main_line) last = main_line @@ -977,19 +1046,18 @@ def _handle_decorated(self, node): self.add_arc(last, lineno) last = lineno # The body is handled in collect_arcs. + assert last is not None return {ArcStart(last)} _handle__ClassDef = _handle_decorated - @contract(returns='ArcStarts') - def _handle__Continue(self, node): + def _handle__Continue(self, node: ast.Continue) -> Set[ArcStart]: here = self.line_for_node(node) continue_start = ArcStart(here, cause="the continue on line {lineno} wasn't executed") - self.process_continue_exits([continue_start]) + self.process_continue_exits({continue_start}) return set() - @contract(returns='ArcStarts') - def _handle__For(self, node): + def _handle__For(self, node: ast.For) -> Set[ArcStart]: start = self.line_for_node(node.iter) self.block_stack.append(LoopBlock(start=start)) from_start = ArcStart(start, cause="the loop on line {lineno} never started") @@ -998,6 +1066,7 @@ def _handle__For(self, node): for xit in exits: self.add_arc(xit.lineno, start, xit.cause) my_block = self.block_stack.pop() + assert isinstance(my_block, LoopBlock) exits = my_block.break_exits from_start = ArcStart(start, cause="the loop on line {lineno} didn't complete") if node.orelse: @@ -1013,8 +1082,7 @@ def _handle__For(self, node): _handle__FunctionDef = _handle_decorated _handle__AsyncFunctionDef = _handle_decorated - @contract(returns='ArcStarts') - def _handle__If(self, node): + def _handle__If(self, node: ast.If) -> Set[ArcStart]: start = self.line_for_node(node.test) from_start = ArcStart(start, cause="the condition on line {lineno} was never true") exits = self.add_body_arcs(node.body, from_start=from_start) @@ -1022,51 +1090,50 @@ def _handle__If(self, node): exits |= self.add_body_arcs(node.orelse, from_start=from_start) return exits - @contract(returns='ArcStarts') - def _handle__Match(self, node): - start = self.line_for_node(node) - last_start = start - exits = set() - had_wildcard = False - for case in node.cases: - case_start = self.line_for_node(case.pattern) - pattern = case.pattern - while isinstance(pattern, ast.MatchOr): - pattern = pattern.patterns[-1] - if isinstance(pattern, ast.MatchAs): - had_wildcard = True - self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched") - from_start = ArcStart(case_start, cause="the pattern on line {lineno} never matched") - exits |= self.add_body_arcs(case.body, from_start=from_start) - last_start = case_start - if not had_wildcard: - exits.add(from_start) - return exits + if sys.version_info >= (3, 10): + def _handle__Match(self, node: ast.Match) -> Set[ArcStart]: + start = self.line_for_node(node) + last_start = start + exits = set() + had_wildcard = False + for case in node.cases: + case_start = self.line_for_node(case.pattern) + pattern = case.pattern + while isinstance(pattern, ast.MatchOr): + pattern = pattern.patterns[-1] + if isinstance(pattern, ast.MatchAs): + had_wildcard = True + self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched") + from_start = ArcStart( + case_start, + cause="the pattern on line {lineno} never matched", + ) + exits |= self.add_body_arcs(case.body, from_start=from_start) + last_start = case_start + if not had_wildcard: + exits.add(from_start) + return exits - @contract(returns='ArcStarts') - def _handle__NodeList(self, node): + def _handle__NodeList(self, node: NodeList) -> Set[ArcStart]: start = self.line_for_node(node) exits = self.add_body_arcs(node.body, from_start=ArcStart(start)) return exits - @contract(returns='ArcStarts') - def _handle__Raise(self, node): + def _handle__Raise(self, node: ast.Raise) -> Set[ArcStart]: here = self.line_for_node(node) raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed") - self.process_raise_exits([raise_start]) + self.process_raise_exits({raise_start}) # `raise` statement jumps away, no exits from here. return set() - @contract(returns='ArcStarts') - def _handle__Return(self, node): + def _handle__Return(self, node: ast.Return) -> Set[ArcStart]: here = self.line_for_node(node) return_start = ArcStart(here, cause="the return on line {lineno} wasn't executed") - self.process_return_exits([return_start]) + self.process_return_exits({return_start}) # `return` statement jumps away, no exits from here. return set() - @contract(returns='ArcStarts') - def _handle__Try(self, node): + def _handle__Try(self, node: ast.Try) -> Set[ArcStart]: if node.handlers: handler_start = self.line_for_node(node.handlers[0]) else: @@ -1099,10 +1166,10 @@ def _handle__Try(self, node): else: self.block_stack.pop() - handler_exits = set() + handler_exits: Set[ArcStart] = set() if node.handlers: - last_handler_start = None + last_handler_start: Optional[int] = None for handler_node in node.handlers: handler_start = self.line_for_node(handler_node) if last_handler_start is not None: @@ -1177,8 +1244,7 @@ def _handle__Try(self, node): return exits - @contract(starts='ArcStarts', exits='ArcStarts', returns='ArcStarts') - def _combine_finally_starts(self, starts, exits): + def _combine_finally_starts(self, starts: Set[ArcStart], exits: Set[ArcStart]) -> Set[ArcStart]: """Helper for building the cause of `finally` branches. "finally" clauses might not execute their exits, and the causes could @@ -1193,8 +1259,7 @@ def _combine_finally_starts(self, starts, exits): exits = {ArcStart(xit.lineno, cause) for xit in exits} return exits - @contract(returns='ArcStarts') - def _handle__While(self, node): + def _handle__While(self, node: ast.While) -> Set[ArcStart]: start = to_top = self.line_for_node(node.test) constant_test = self.is_constant_expr(node.test) top_is_body0 = False @@ -1211,6 +1276,7 @@ def _handle__While(self, node): self.add_arc(xit.lineno, to_top, xit.cause) exits = set() my_block = self.block_stack.pop() + assert isinstance(my_block, LoopBlock) exits.update(my_block.break_exits) from_start = ArcStart(start, cause="the condition on line {lineno} was never false") if node.orelse: @@ -1222,14 +1288,14 @@ def _handle__While(self, node): exits.add(from_start) return exits - @contract(returns='ArcStarts') - def _handle__With(self, node): + def _handle__With(self, node: ast.With) -> Set[ArcStart]: start = self.line_for_node(node) if env.PYBEHAVIOR.exit_through_with: self.block_stack.append(WithBlock(start=start)) exits = self.add_body_arcs(node.body, from_start=ArcStart(start)) if env.PYBEHAVIOR.exit_through_with: with_block = self.block_stack.pop() + assert isinstance(with_block, WithBlock) with_exit = {ArcStart(start)} if exits: for xit in exits: @@ -1256,7 +1322,7 @@ def _handle__With(self, node): # These methods are used by analyze() as the start of the analysis. # There is one for each construct with a code object. - def _code_object__Module(self, node): + def _code_object__Module(self, node: ast.Module) -> None: start = self.line_for_node(node) if node.body: exits = self.add_body_arcs(node.body, from_start=ArcStart(-start)) @@ -1267,7 +1333,7 @@ def _code_object__Module(self, node): self.add_arc(-start, start) self.add_arc(start, -start) - def _code_object__FunctionDef(self, node): + def _code_object__FunctionDef(self, node: ast.FunctionDef) -> None: start = self.line_for_node(node) self.block_stack.append(FunctionBlock(start=start, name=node.name)) exits = self.add_body_arcs(node.body, from_start=ArcStart(-start)) @@ -1276,7 +1342,7 @@ def _code_object__FunctionDef(self, node): _code_object__AsyncFunctionDef = _code_object__FunctionDef - def _code_object__ClassDef(self, node): + def _code_object__ClassDef(self, node: ast.ClassDef) -> None: start = self.line_for_node(node) self.add_arc(-start, start) exits = self.add_body_arcs(node.body, from_start=ArcStart(start)) @@ -1286,14 +1352,6 @@ def _code_object__ClassDef(self, node): f"didn't exit the body of class {node.name!r}", ) - def _make_expression_code_method(noun): # pylint: disable=no-self-argument - """A function to make methods for expression-based callable _code_object__ methods.""" - def _code_object__expression_callable(self, node): - start = self.line_for_node(node) - self.add_arc(-start, start, None, f"didn't run the {noun} on line {start}") - self.add_arc(start, -start, None, f"didn't finish the {noun} on line {start}") - return _code_object__expression_callable - _code_object__Lambda = _make_expression_code_method("lambda") _code_object__GeneratorExp = _make_expression_code_method("generator expression") _code_object__DictComp = _make_expression_code_method("dictionary comprehension") @@ -1305,14 +1363,18 @@ def _code_object__expression_callable(self, node): SKIP_DUMP_FIELDS = ["ctx"] -def _is_simple_value(value): +def _is_simple_value(value: Any) -> bool: """Is `value` simple enough to be displayed on a single line?""" return ( - value in [None, [], (), {}, set()] or + value in [None, [], (), {}, set(), frozenset(), Ellipsis] or isinstance(value, (bytes, int, float, str)) ) -def ast_dump(node, depth=0, print=print): # pylint: disable=redefined-builtin +def ast_dump( + node: ast.AST, + depth:int = 0, + print: Callable[[str], None]=print, # pylint: disable=redefined-builtin +) -> None: """Dump the AST for `node`. This recursively walks the AST, printing a readable version. @@ -1323,6 +1385,7 @@ def ast_dump(node, depth=0, print=print): # pylint: disable=redefined-builtin if lineno is not None: linemark = f" @ {node.lineno},{node.col_offset}" if hasattr(node, "end_lineno"): + assert hasattr(node, "end_col_offset") linemark += ":" if node.end_lineno != node.lineno: linemark += f"{node.end_lineno}," @@ -1344,7 +1407,7 @@ def ast_dump(node, depth=0, print=print): # pylint: disable=redefined-builtin else: print(head) if 0: - print("{}# mro: {}".format( + print("{}# mro: {}".format( # type: ignore[unreachable] indent, ", ".join(c.__name__ for c in node.__class__.__mro__[1:]), )) next_indent = indent + " " diff --git a/tox.ini b/tox.ini index bed5b547f..8120c870c 100644 --- a/tox.ini +++ b/tox.ini @@ -98,7 +98,7 @@ deps = setenv = {[testenv]setenv} T_AN=coverage/config.py coverage/files.py coverage/numbits.py - T_OZ=coverage/phystokens.py + T_OZ=coverage/parser.py coverage/phystokens.py TYPEABLE={env:T_AN} {env:T_OZ} commands =