diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index ffbce5893a96..584263ec241e 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -18,6 +18,7 @@ """S3 file system implementation for accessing files on AWS S3.""" # pytype: skip-file +import traceback from apache_beam.io.aws import s3io from apache_beam.io.filesystem import BeamIOError @@ -315,14 +316,13 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = s3io.parse_s3_path(path, object_optional=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL or \ - (len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] - lineage.add('s3', *components) + lineage.add('s3', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py b/sdks/python/apache_beam/io/aws/s3filesystem_test.py index 87403f482bd2..036727cd7a70 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py @@ -272,7 +272,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("s3", *expected_segments) + lineage_mock.add.assert_called_once_with( + "s3", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index 4495245dc54a..4b7462cae03c 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -18,6 +18,7 @@ """Azure Blob Storage Implementation for accesing files on Azure Blob Storage. """ +import traceback from apache_beam.io.azure import blobstorageio from apache_beam.io.filesystem import BeamIOError @@ -317,15 +318,14 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = blobstorageio.parse_azfs_path( path, blob_optional=True, get_account=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL \ - or(len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] - lineage.add('abs', *components) + lineage.add('abs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py index 138fe5f78b20..c3418e137e87 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py @@ -330,7 +330,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("abs", *expected_segments) + lineage_mock.add.assert_called_once_with( + "abs", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index f9d4303c8c78..eb433bd60583 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -286,24 +286,16 @@ def _check_state_for_finalize_write(self, writer_results, num_shards): def _report_sink_lineage(self, dst_glob, dst_files): """ - Report sink Lineage. Report every file if number of files no more than 100, - otherwise only report at directory level. + Report sink Lineage. Report every file if number of files no more than 10, + otherwise only report glob. """ - if len(dst_files) <= 100: + # There is rollup at the higher level, but this loses glob information. + # Better to report multiple globs than just the parent directory. + if len(dst_files) <= 10: for dst in dst_files: FileSystems.report_sink_lineage(dst) else: - dst = dst_glob - # dst_glob has a wildcard for shard number (see _shard_name_template) - sep = dst_glob.find('*') - if sep > 0: - dst = dst[:sep] - try: - dst, _ = FileSystems.split(dst) - except ValueError: - return # lineage report is fail-safe - - FileSystems.report_sink_lineage(dst) + FileSystems.report_sink_lineage(dst_glob) @check_accessible(['file_path_prefix']) def finalize_write( diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index a02bc6de32c7..49b1b1d125f1 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -39,7 +39,6 @@ from apache_beam.io import range_trackers from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata -from apache_beam.io.filesystem import FileSystem from apache_beam.io.filesystems import FileSystems from apache_beam.io.restriction_trackers import OffsetRange from apache_beam.options.value_provider import StaticValueProvider @@ -170,37 +169,11 @@ def _get_concat_source(self) -> concat_source.ConcatSource: splittable=splittable) single_file_sources.append(single_file_source) - self._report_source_lineage(files_metadata) + FileSystems.report_source_lineage(pattern) self._concat_source = concat_source.ConcatSource(single_file_sources) return self._concat_source - def _report_source_lineage(self, files_metadata): - """ - Report source Lineage. depend on the number of files, report full file - name, only dir, or only top level - """ - if len(files_metadata) <= 100: - for file_metadata in files_metadata: - FileSystems.report_source_lineage(file_metadata.path) - else: - size_track = set() - for file_metadata in files_metadata: - if len(size_track) >= 100: - FileSystems.report_source_lineage( - file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL) - return - - try: - base, _ = FileSystems.split(file_metadata.path) - except ValueError: - pass - else: - size_track.add(base) - - for base in size_track: - FileSystems.report_source_lineage(base) - def open_file(self, file_name): return FileSystems.open( file_name, @@ -382,7 +355,7 @@ def process(self, element: Union[str, FileMetadata], *args, match_results = FileSystems.match([element]) metadata_list = match_results[0].metadata_list for metadata in metadata_list: - self._report_source_lineage(metadata.path) + FileSystems.report_source_lineage(metadata.path) splittable = ( self._splittable and _determine_splittability_from_compression_type( @@ -397,28 +370,6 @@ def process(self, element: Union[str, FileMetadata], *args, metadata, OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY)) - def _report_source_lineage(self, path): - """ - Report source Lineage. Due to the size limit of Beam metrics, report full - file name or only top level depend on the number of files. - - * Number of files<=100, report full file paths; - - * Otherwise, report top level only. - """ - if self._size_track is None: - self._size_track = set() - elif len(self._size_track) == 0: - FileSystems.report_source_lineage( - path, level=FileSystem.LineageLevel.TOP_LEVEL) - return - - self._size_track.add(path) - FileSystems.report_source_lineage(path) - - if len(self._size_track) >= 100: - self._size_track.clear() - class _ReadRange(DoFn): def __init__( diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 840fdf3309e7..bdc25dcf0fe5 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -934,11 +934,7 @@ def delete(self, paths): """ raise NotImplementedError - class LineageLevel: - FILE = 'FILE' - TOP_LEVEL = 'TOP_LEVEL' - - def report_lineage(self, path, unused_lineage, level=None): + def report_lineage(self, path, unused_lineage): """ Report Lineage metrics for path. diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index 87f45f3308ee..1d64f88684b8 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -391,27 +391,21 @@ def get_chunk_size(path): return filesystem.CHUNK_SIZE @staticmethod - def report_source_lineage(path, level=None): + def report_source_lineage(path): """ - Report source :class:`~apache_beam.metrics.metric.LineageLevel`. + Report source :class:`~apache_beam.metrics.metric.Lineage`. Args: path: string path to be reported. - level: the level of file path. default to - :class:`~apache_beam.io.filesystem.FileSystem.LineageLevel`.FILE. """ - filesystem = FileSystems.get_filesystem(path) - filesystem.report_lineage(path, Lineage.sources(), level=level) + FileSystems.get_filesystem(path).report_lineage(path, Lineage.sources()) @staticmethod - def report_sink_lineage(path, level=None): + def report_sink_lineage(path): """ Report sink :class:`~apache_beam.metrics.metric.Lineage`. Args: path: string path to be reported. - level: the level of file path. default to - :class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE. """ - filesystem = FileSystems.get_filesystem(path) - filesystem.report_lineage(path, Lineage.sinks(), level=level) + FileSystems.get_filesystem(path).report_lineage(path, Lineage.sinks()) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 11e0d098b2f3..9f60b5af6726 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1163,7 +1163,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self.table_reference.datasetId, self.table_reference.tableId) Lineage.sources().add( - "bigquery", + 'bigquery', self.table_reference.projectId, self.table_reference.datasetId, self.table_reference.tableId) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 325f70ddfd96..96aca2c410d8 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -26,6 +26,7 @@ # pytype: skip-file +import traceback from typing import BinaryIO # pylint: disable=unused-import from apache_beam.io.filesystem import BeamIOError @@ -366,14 +367,13 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = gcsio.parse_gcs_path(path, object_optional=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL \ - or(len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] - lineage.add('gcs', *components) + lineage.add('gcs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index ec7fa94b05fd..ade8529dcac8 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -382,7 +382,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("gcs", *expected_segments) + lineage_mock.add.assert_called_once_with( + "gcs", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index e9fe7dd4b1c2..daf69b8d030c 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -364,3 +364,6 @@ def try_delete(path): if exceptions: raise BeamIOError("Delete operation failed", exceptions) + + def report_lineage(self, path, lineage): + lineage.add('filesystem', 'localhost', path, last_segment_sep='/') diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 92f5c7cbe2f6..8fd8457489ae 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -811,6 +811,9 @@ def contains(self, value): else: return False + def flattened(self): + return self.as_trie().flattened() + def to_proto(self) -> metrics_pb2.BoundedTrie: return metrics_pb2.BoundedTrie( bound=self._bound, @@ -822,7 +825,9 @@ def from_proto(proto: metrics_pb2.BoundedTrie) -> 'BoundedTrieData': return BoundedTrieData( bound=proto.bound, singleton=tuple(proto.singleton) if proto.singleton else None, - root=_BoundedTrieNode.from_proto(proto.root) if proto.root else None) + root=( + _BoundedTrieNode.from_proto(proto.root) + if proto.HasField('root') else None)) def as_trie(self): if self._root is not None: diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 33af25e20ca4..9cf42370f4b1 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -33,6 +33,7 @@ from typing import Dict from typing import FrozenSet from typing import Iterable +from typing import Iterator from typing import List from typing import Optional from typing import Set @@ -342,8 +343,8 @@ class Lineage: SINK = "sinks" _METRICS = { - SOURCE: Metrics.string_set(LINEAGE_NAMESPACE, SOURCE), - SINK: Metrics.string_set(LINEAGE_NAMESPACE, SINK) + SOURCE: Metrics.bounded_trie(LINEAGE_NAMESPACE, SOURCE), + SINK: Metrics.bounded_trie(LINEAGE_NAMESPACE, SINK) } def __init__(self, label: str) -> None: @@ -392,8 +393,32 @@ def get_fq_name( return ':'.join((system, subtype, segs)) return ':'.join((system, segs)) + @staticmethod + def _get_fqn_parts( + system: str, + *segments: str, + subtype: Optional[str] = None, + last_segment_sep: Optional[str] = None) -> Iterator[str]: + yield system + ':' + if subtype: + yield subtype + ':' + if segments: + for segment in segments[:-1]: + yield segment + '.' + if last_segment_sep: + sub_segments = segments[-1].split(last_segment_sep) + for sub_segment in sub_segments[:-1]: + yield sub_segment + last_segment_sep + yield sub_segments[-1] + else: + yield segments[-1] + def add( - self, system: str, *segments: str, subtype: Optional[str] = None) -> None: + self, + system: str, + *segments: str, + subtype: Optional[str] = None, + last_segment_sep: Optional[str] = None) -> None: """ Adds the given details as Lineage. @@ -414,21 +439,35 @@ def add( The first positional argument serves as system, if full segments are provided, or the full FQN if it is provided as a single argument. """ - system_or_details = system - if len(segments) == 0 and subtype is None: - self.metric.add(system_or_details) - else: - self.metric.add(self.get_fq_name(system, *segments, subtype=subtype)) + self.add_raw( + *self._get_fqn_parts( + system, + *segments, + subtype=subtype, + last_segment_sep=last_segment_sep)) + + def add_raw(self, *rollup_segments: str) -> None: + """Adds the given fqn as lineage. + + `rollup_segments` should be an iterable of strings whose concatenation + is a valid Dataplex FQN. In particular, this means they will often have + trailing delimiters. + """ + self.metric.add(rollup_segments) @staticmethod - def query(results: MetricResults, label: str) -> Set[str]: + def query(results: MetricResults, + label: str, + truncated_marker: str = '*') -> Set[str]: if not label in Lineage._METRICS: raise ValueError("Label {} does not exist for Lineage", label) response = results.query( MetricsFilter().with_namespace(Lineage.LINEAGE_NAMESPACE).with_name( - label))[MetricResults.STRINGSETS] + label))[MetricResults.BOUNDED_TRIES] result = set() for metric in response: - result.update(metric.committed) - result.update(metric.attempted) + for fqn in metric.committed.flattened(): + result.add(''.join(fqn[:-1]) + (truncated_marker if fqn[-1] else '')) + for fqn in metric.attempted.flattened(): + result.add(''.join(fqn[:-1]) + (truncated_marker if fqn[-1] else '')) return result diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 524a2143172d..2e2e51b267a7 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -271,14 +271,19 @@ def test_fq_name(self): def test_add(self): lineage = Lineage(Lineage.SOURCE) - stringset = set() + added = set() # override - lineage.metric = stringset + lineage.metric = added lineage.add("s", "1", "2") lineage.add("s:3.4") lineage.add("s", "5", "6.7") lineage.add("s", "1", "2", subtype="t") - self.assertSetEqual(stringset, {"s:1.2", "s:3.4", "s:t:1.2", "s:5.`6.7`"}) + lineage.add("sys", "seg1", "seg2", "seg3/part2/part3", last_segment_sep='/') + self.assertSetEqual( + added, + {('s:', '1.', '2'), ('s:3.4:', ), ('s:', '5.', '6.7'), + ('s:', 't:', '1.', '2'), + ('sys:', 'seg1.', 'seg2.', 'seg3/', 'part2/', 'part3')}) if __name__ == '__main__':