From 9ef19a90048332b56ed09434e2c9ea4049f97166 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 Dec 2024 15:42:12 -0800 Subject: [PATCH 01/17] Migrate lineage counters to bounded tries. --- .../python/apache_beam/io/aws/s3filesystem.py | 2 +- .../apache_beam/io/aws/s3filesystem_test.py | 3 +- .../io/azure/blobstoragefilesystem.py | 2 +- .../io/azure/blobstoragefilesystem_test.py | 3 +- sdks/python/apache_beam/io/gcp/bigquery.py | 2 +- .../apache_beam/io/gcp/gcsfilesystem.py | 2 +- .../apache_beam/io/gcp/gcsfilesystem_test.py | 3 +- sdks/python/apache_beam/metrics/metric.py | 51 ++++++++++++++++--- .../python/apache_beam/metrics/metric_test.py | 11 ++-- 9 files changed, 61 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index ffbce5893a96..494de14c83a8 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -325,4 +325,4 @@ def report_lineage(self, path, lineage, level=None): (len(components) > 1 and components[-1] == ''): # bucket only components = components[:-1] - lineage.add('s3', *components) + lineage.add('s3', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/aws/s3filesystem_test.py b/sdks/python/apache_beam/io/aws/s3filesystem_test.py index 87403f482bd2..036727cd7a70 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem_test.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem_test.py @@ -272,7 +272,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("s3", *expected_segments) + lineage_mock.add.assert_called_once_with( + "s3", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index 4495245dc54a..ff908451b1b7 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -328,4 +328,4 @@ def report_lineage(self, path, lineage, level=None): or(len(components) > 1 and components[-1] == ''): # bucket only components = components[:-1] - lineage.add('abs', *components) + lineage.add('abs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py index 138fe5f78b20..c3418e137e87 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem_test.py @@ -330,7 +330,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("abs", *expected_segments) + lineage_mock.add.assert_called_once_with( + "abs", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 11e0d098b2f3..9f60b5af6726 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -1163,7 +1163,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): self.table_reference.datasetId, self.table_reference.tableId) Lineage.sources().add( - "bigquery", + 'bigquery', self.table_reference.projectId, self.table_reference.datasetId, self.table_reference.tableId) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 325f70ddfd96..7e293ccd9d9f 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -376,4 +376,4 @@ def report_lineage(self, path, lineage, level=None): or(len(components) > 1 and components[-1] == ''): # bucket only components = components[:-1] - lineage.add('gcs', *components) + lineage.add('gcs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index ec7fa94b05fd..ade8529dcac8 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -382,7 +382,8 @@ def test_lineage(self): def _verify_lineage(self, uri, expected_segments): lineage_mock = mock.MagicMock() self.fs.report_lineage(uri, lineage_mock) - lineage_mock.add.assert_called_once_with("gcs", *expected_segments) + lineage_mock.add.assert_called_once_with( + "gcs", *expected_segments, last_segment_sep='/') if __name__ == '__main__': diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 33af25e20ca4..15f6b53c78e0 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -33,6 +33,7 @@ from typing import Dict from typing import FrozenSet from typing import Iterable +from typing import Iterator from typing import List from typing import Optional from typing import Set @@ -342,8 +343,8 @@ class Lineage: SINK = "sinks" _METRICS = { - SOURCE: Metrics.string_set(LINEAGE_NAMESPACE, SOURCE), - SINK: Metrics.string_set(LINEAGE_NAMESPACE, SINK) + SOURCE: Metrics.bounded_trie(LINEAGE_NAMESPACE, SOURCE), + SINK: Metrics.bounded_trie(LINEAGE_NAMESPACE, SINK) } def __init__(self, label: str) -> None: @@ -392,8 +393,32 @@ def get_fq_name( return ':'.join((system, subtype, segs)) return ':'.join((system, segs)) + @staticmethod + def _get_fqn_parts( + system: str, + *segments: str, + subtype: Optional[str] = None, + last_segment_sep: Optional[str] = None) -> Iterator[str]: + yield system + ':' + if subtype: + yield subtype + ':' + if segments: + for segment in segments[:-1]: + yield segment + '.' + if last_segment_sep: + sub_segments = segments[-1].split(last_segment_sep) + for sub_segment in sub_segments[:-1]: + yield sub_segment + last_segment_sep + yield sub_segments[-1] + else: + yield segments[-1] + def add( - self, system: str, *segments: str, subtype: Optional[str] = None) -> None: + self, + system: str, + *segments: str, + subtype: Optional[str] = None, + last_segment_sep: Optional[str] = None) -> None: """ Adds the given details as Lineage. @@ -414,11 +439,21 @@ def add( The first positional argument serves as system, if full segments are provided, or the full FQN if it is provided as a single argument. """ - system_or_details = system - if len(segments) == 0 and subtype is None: - self.metric.add(system_or_details) - else: - self.metric.add(self.get_fq_name(system, *segments, subtype=subtype)) + self.add_raw( + *self._get_fqn_parts( + system, + *segments, + subtype=subtype, + last_segment_sep=last_segment_sep)) + + def add_raw(self, *rollup_segments: str) -> None: + """Adds the given fqn as lineage. + + `rollup_segments` should be an iterable of strings whose concatenation + is a valid Dataplex FQN. In particular, this means they will often have + trailing delimiters. + """ + self.metric.add(rollup_segments) @staticmethod def query(results: MetricResults, label: str) -> Set[str]: diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 524a2143172d..2e2e51b267a7 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -271,14 +271,19 @@ def test_fq_name(self): def test_add(self): lineage = Lineage(Lineage.SOURCE) - stringset = set() + added = set() # override - lineage.metric = stringset + lineage.metric = added lineage.add("s", "1", "2") lineage.add("s:3.4") lineage.add("s", "5", "6.7") lineage.add("s", "1", "2", subtype="t") - self.assertSetEqual(stringset, {"s:1.2", "s:3.4", "s:t:1.2", "s:5.`6.7`"}) + lineage.add("sys", "seg1", "seg2", "seg3/part2/part3", last_segment_sep='/') + self.assertSetEqual( + added, + {('s:', '1.', '2'), ('s:3.4:', ), ('s:', '5.', '6.7'), + ('s:', 't:', '1.', '2'), + ('sys:', 'seg1.', 'seg2.', 'seg3/', 'part2/', 'part3')}) if __name__ == '__main__': From 76db105bbef9a0af4421bdc7764598d3d2867afc Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 Dec 2024 15:44:49 -0800 Subject: [PATCH 02/17] Add lineage support for local files. --- sdks/python/apache_beam/io/localfilesystem.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index e9fe7dd4b1c2..5525f3b96f1d 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -364,3 +364,9 @@ def try_delete(path): if exceptions: raise BeamIOError("Delete operation failed", exceptions) + + def report_lineage(self, path, lineage, level=None): + if level == FileSystem.LineageLevel.TOP_LEVEL: + lineage.add('filesystem', 'localhost') + else: + lineage.add('filesystem', 'localhost', path, last_segment_sep='/') From 30288ae2cf1761405e7ce09e0501350b22a3ec45 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 Dec 2024 16:03:00 -0800 Subject: [PATCH 03/17] Remove file-specific lineage bounding. --- .../python/apache_beam/io/aws/s3filesystem.py | 7 ++- .../io/azure/blobstoragefilesystem.py | 7 ++- sdks/python/apache_beam/io/filebasedsink.py | 20 +++---- sdks/python/apache_beam/io/filebasedsource.py | 53 +------------------ sdks/python/apache_beam/io/filesystem.py | 6 +-- sdks/python/apache_beam/io/filesystems.py | 16 ++---- .../apache_beam/io/gcp/gcsfilesystem.py | 7 ++- sdks/python/apache_beam/io/localfilesystem.py | 7 +-- 8 files changed, 25 insertions(+), 98 deletions(-) diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index 494de14c83a8..229f69b039a0 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -315,14 +315,13 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = s3io.parse_s3_path(path, object_optional=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL or \ - (len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] lineage.add('s3', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index ff908451b1b7..e547eecc9b97 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -317,15 +317,14 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = blobstorageio.parse_azfs_path( path, blob_optional=True, get_account=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL \ - or(len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] lineage.add('abs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py index f9d4303c8c78..eb433bd60583 100644 --- a/sdks/python/apache_beam/io/filebasedsink.py +++ b/sdks/python/apache_beam/io/filebasedsink.py @@ -286,24 +286,16 @@ def _check_state_for_finalize_write(self, writer_results, num_shards): def _report_sink_lineage(self, dst_glob, dst_files): """ - Report sink Lineage. Report every file if number of files no more than 100, - otherwise only report at directory level. + Report sink Lineage. Report every file if number of files no more than 10, + otherwise only report glob. """ - if len(dst_files) <= 100: + # There is rollup at the higher level, but this loses glob information. + # Better to report multiple globs than just the parent directory. + if len(dst_files) <= 10: for dst in dst_files: FileSystems.report_sink_lineage(dst) else: - dst = dst_glob - # dst_glob has a wildcard for shard number (see _shard_name_template) - sep = dst_glob.find('*') - if sep > 0: - dst = dst[:sep] - try: - dst, _ = FileSystems.split(dst) - except ValueError: - return # lineage report is fail-safe - - FileSystems.report_sink_lineage(dst) + FileSystems.report_sink_lineage(dst_glob) @check_accessible(['file_path_prefix']) def finalize_write( diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index a02bc6de32c7..49b1b1d125f1 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -39,7 +39,6 @@ from apache_beam.io import range_trackers from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystem import FileMetadata -from apache_beam.io.filesystem import FileSystem from apache_beam.io.filesystems import FileSystems from apache_beam.io.restriction_trackers import OffsetRange from apache_beam.options.value_provider import StaticValueProvider @@ -170,37 +169,11 @@ def _get_concat_source(self) -> concat_source.ConcatSource: splittable=splittable) single_file_sources.append(single_file_source) - self._report_source_lineage(files_metadata) + FileSystems.report_source_lineage(pattern) self._concat_source = concat_source.ConcatSource(single_file_sources) return self._concat_source - def _report_source_lineage(self, files_metadata): - """ - Report source Lineage. depend on the number of files, report full file - name, only dir, or only top level - """ - if len(files_metadata) <= 100: - for file_metadata in files_metadata: - FileSystems.report_source_lineage(file_metadata.path) - else: - size_track = set() - for file_metadata in files_metadata: - if len(size_track) >= 100: - FileSystems.report_source_lineage( - file_metadata.path, level=FileSystem.LineageLevel.TOP_LEVEL) - return - - try: - base, _ = FileSystems.split(file_metadata.path) - except ValueError: - pass - else: - size_track.add(base) - - for base in size_track: - FileSystems.report_source_lineage(base) - def open_file(self, file_name): return FileSystems.open( file_name, @@ -382,7 +355,7 @@ def process(self, element: Union[str, FileMetadata], *args, match_results = FileSystems.match([element]) metadata_list = match_results[0].metadata_list for metadata in metadata_list: - self._report_source_lineage(metadata.path) + FileSystems.report_source_lineage(metadata.path) splittable = ( self._splittable and _determine_splittability_from_compression_type( @@ -397,28 +370,6 @@ def process(self, element: Union[str, FileMetadata], *args, metadata, OffsetRange(0, range_trackers.OffsetRangeTracker.OFFSET_INFINITY)) - def _report_source_lineage(self, path): - """ - Report source Lineage. Due to the size limit of Beam metrics, report full - file name or only top level depend on the number of files. - - * Number of files<=100, report full file paths; - - * Otherwise, report top level only. - """ - if self._size_track is None: - self._size_track = set() - elif len(self._size_track) == 0: - FileSystems.report_source_lineage( - path, level=FileSystem.LineageLevel.TOP_LEVEL) - return - - self._size_track.add(path) - FileSystems.report_source_lineage(path) - - if len(self._size_track) >= 100: - self._size_track.clear() - class _ReadRange(DoFn): def __init__( diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index 840fdf3309e7..bdc25dcf0fe5 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -934,11 +934,7 @@ def delete(self, paths): """ raise NotImplementedError - class LineageLevel: - FILE = 'FILE' - TOP_LEVEL = 'TOP_LEVEL' - - def report_lineage(self, path, unused_lineage, level=None): + def report_lineage(self, path, unused_lineage): """ Report Lineage metrics for path. diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py index 87f45f3308ee..1d64f88684b8 100644 --- a/sdks/python/apache_beam/io/filesystems.py +++ b/sdks/python/apache_beam/io/filesystems.py @@ -391,27 +391,21 @@ def get_chunk_size(path): return filesystem.CHUNK_SIZE @staticmethod - def report_source_lineage(path, level=None): + def report_source_lineage(path): """ - Report source :class:`~apache_beam.metrics.metric.LineageLevel`. + Report source :class:`~apache_beam.metrics.metric.Lineage`. Args: path: string path to be reported. - level: the level of file path. default to - :class:`~apache_beam.io.filesystem.FileSystem.LineageLevel`.FILE. """ - filesystem = FileSystems.get_filesystem(path) - filesystem.report_lineage(path, Lineage.sources(), level=level) + FileSystems.get_filesystem(path).report_lineage(path, Lineage.sources()) @staticmethod - def report_sink_lineage(path, level=None): + def report_sink_lineage(path): """ Report sink :class:`~apache_beam.metrics.metric.Lineage`. Args: path: string path to be reported. - level: the level of file path. default to - :class:`~apache_beam.io.filesystem.FileSystem.Lineage`.FILE. """ - filesystem = FileSystems.get_filesystem(path) - filesystem.report_lineage(path, Lineage.sinks(), level=level) + FileSystems.get_filesystem(path).report_lineage(path, Lineage.sinks()) diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 7e293ccd9d9f..a933f783cc0b 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -366,14 +366,13 @@ def delete(self, paths): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): + def report_lineage(self, path, lineage): try: components = gcsio.parse_gcs_path(path, object_optional=True) except ValueError: # report lineage is fail-safe + traceback.print_exc() return - if level == FileSystem.LineageLevel.TOP_LEVEL \ - or(len(components) > 1 and components[-1] == ''): - # bucket only + if components and not components[-1]: components = components[:-1] lineage.add('gcs', *components, last_segment_sep='/') diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py index 5525f3b96f1d..daf69b8d030c 100644 --- a/sdks/python/apache_beam/io/localfilesystem.py +++ b/sdks/python/apache_beam/io/localfilesystem.py @@ -365,8 +365,5 @@ def try_delete(path): if exceptions: raise BeamIOError("Delete operation failed", exceptions) - def report_lineage(self, path, lineage, level=None): - if level == FileSystem.LineageLevel.TOP_LEVEL: - lineage.add('filesystem', 'localhost') - else: - lineage.add('filesystem', 'localhost', path, last_segment_sep='/') + def report_lineage(self, path, lineage): + lineage.add('filesystem', 'localhost', path, last_segment_sep='/') From d704422042d353ae27f742736959dd32ffec17bf Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 17 Dec 2024 15:14:35 -0800 Subject: [PATCH 04/17] Add missing imports. --- sdks/python/apache_beam/io/aws/s3filesystem.py | 1 + sdks/python/apache_beam/io/azure/blobstoragefilesystem.py | 1 + sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 1 + 3 files changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/io/aws/s3filesystem.py b/sdks/python/apache_beam/io/aws/s3filesystem.py index 229f69b039a0..584263ec241e 100644 --- a/sdks/python/apache_beam/io/aws/s3filesystem.py +++ b/sdks/python/apache_beam/io/aws/s3filesystem.py @@ -18,6 +18,7 @@ """S3 file system implementation for accessing files on AWS S3.""" # pytype: skip-file +import traceback from apache_beam.io.aws import s3io from apache_beam.io.filesystem import BeamIOError diff --git a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py index e547eecc9b97..4b7462cae03c 100644 --- a/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py +++ b/sdks/python/apache_beam/io/azure/blobstoragefilesystem.py @@ -18,6 +18,7 @@ """Azure Blob Storage Implementation for accesing files on Azure Blob Storage. """ +import traceback from apache_beam.io.azure import blobstorageio from apache_beam.io.filesystem import BeamIOError diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index a933f783cc0b..96aca2c410d8 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -26,6 +26,7 @@ # pytype: skip-file +import traceback from typing import BinaryIO # pylint: disable=unused-import from apache_beam.io.filesystem import BeamIOError From 71a5cedb46ec6b1d5f14d3dc5718111e46e366da Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 17 Dec 2024 17:06:26 -0800 Subject: [PATCH 05/17] Deserialization fix. --- sdks/python/apache_beam/metrics/cells.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 92f5c7cbe2f6..8f3d7bba22ad 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -822,7 +822,9 @@ def from_proto(proto: metrics_pb2.BoundedTrie) -> 'BoundedTrieData': return BoundedTrieData( bound=proto.bound, singleton=tuple(proto.singleton) if proto.singleton else None, - root=_BoundedTrieNode.from_proto(proto.root) if proto.root else None) + root=( + _BoundedTrieNode.from_proto(proto.root) + if proto.HasField('root') else None)) def as_trie(self): if self._root is not None: From 4cbf2577199bc916867a34e8e043fb61d306123b Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 18 Dec 2024 10:46:33 -0500 Subject: [PATCH 06/17] Revert three commits related to supporting custom coder in reshuffle - Fix custom coder not being used in Reshuffle (global window) (#33339) - Fix custom coders not being used in Reshuffle (non global window) #33363 - Add missing to_type_hint to WindowedValueCoder #33403 --- sdks/python/apache_beam/coders/coders.py | 11 ---- sdks/python/apache_beam/coders/coders_test.py | 6 --- sdks/python/apache_beam/coders/typecoders.py | 2 - sdks/python/apache_beam/transforms/util.py | 13 +---- .../apache_beam/transforms/util_test.py | 54 ------------------- .../typehints/native_type_compatibility.py | 26 --------- .../python/apache_beam/typehints/typehints.py | 9 ---- 7 files changed, 2 insertions(+), 119 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 22d041f34f8b..57d8197a3a00 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1438,17 +1438,6 @@ def __hash__(self): return hash( (self.wrapped_value_coder, self.timestamp_coder, self.window_coder)) - @classmethod - def from_type_hint(cls, typehint, registry): - # type: (Any, CoderRegistry) -> WindowedValueCoder - # Ideally this'd take two parameters so that one could hint at - # the window type as well instead of falling back to the - # pickle coders. - return cls(registry.get_coder(typehint.inner_type)) - - def to_type_hint(self): - return typehints.WindowedValue[self.wrapped_value_coder.to_type_hint()] - Coder.register_structured_urn( common_urns.coders.WINDOWED_VALUE.urn, WindowedValueCoder) diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py index bddd2cb57e06..dc9780e36be3 100644 --- a/sdks/python/apache_beam/coders/coders_test.py +++ b/sdks/python/apache_beam/coders/coders_test.py @@ -258,12 +258,6 @@ def test_numpy_int(self): _ = indata | "CombinePerKey" >> beam.CombinePerKey(sum) -class WindowedValueCoderTest(unittest.TestCase): - def test_to_type_hint(self): - coder = coders.WindowedValueCoder(coders.VarIntCoder()) - self.assertEqual(coder.to_type_hint(), typehints.WindowedValue[int]) # type: ignore[misc] - - if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 892f508d0136..1667cb7a916a 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -94,8 +94,6 @@ def register_standard_coders(self, fallback_coder): self._register_coder_internal(str, coders.StrUtf8Coder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) self._register_coder_internal(typehints.DictConstraint, coders.MapCoder) - self._register_coder_internal( - typehints.WindowedTypeConstraint, coders.WindowedValueCoder) # Default fallback coders applied in that order until the first matching # coder found. default_fallback_coders = [ diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index c9fd2c76b0db..a03652de2496 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -33,7 +33,6 @@ from typing import Callable from typing import Iterable from typing import List -from typing import Optional from typing import Tuple from typing import TypeVar from typing import Union @@ -74,13 +73,11 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.typehints import trivial_inference from apache_beam.typehints.decorators import get_signature -from apache_beam.typehints.native_type_compatibility import TypedWindowedValue from apache_beam.typehints.sharded_key_type import ShardedKeyType from apache_beam.utils import shared from apache_beam.utils import windowed_value from apache_beam.utils.annotations import deprecated from apache_beam.utils.sharded_key import ShardedKey -from apache_beam.utils.timestamp import Timestamp if TYPE_CHECKING: from apache_beam.runners.pipeline_context import PipelineContext @@ -956,10 +953,6 @@ def restore_timestamps(element): window.GlobalWindows.windowed_value((key, value), timestamp) for (value, timestamp) in values ] - - ungrouped = pcoll | Map(reify_timestamps).with_input_types( - Tuple[K, V]).with_output_types( - Tuple[K, Tuple[V, Optional[Timestamp]]]) else: # typing: All conditional function variants must have identical signatures @@ -973,8 +966,7 @@ def restore_timestamps(element): key, windowed_values = element return [wv.with_value((key, wv.value)) for wv in windowed_values] - ungrouped = pcoll | Map(reify_timestamps).with_input_types( - Tuple[K, V]).with_output_types(Tuple[K, TypedWindowedValue[V]]) + ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner @@ -1026,8 +1018,7 @@ def expand(self, pcoll): pcoll | 'AddRandomKeys' >> Map(lambda t: (random.randrange(0, self.num_buckets), t) ).with_input_types(T).with_output_types(Tuple[int, T]) - | ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types( - Tuple[int, T]) + | ReshufflePerKey() | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( Tuple[int, T]).with_output_types(T)) diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index db73310dfe25..d86509c7dde3 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1010,60 +1010,6 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam): equal_to(expected_data), label="formatted_after_reshuffle") - global _Unpicklable - global _UnpicklableCoder - - class _Unpicklable(object): - def __init__(self, value): - self.value = value - - def __getstate__(self): - raise NotImplementedError() - - def __setstate__(self, state): - raise NotImplementedError() - - class _UnpicklableCoder(beam.coders.Coder): - def encode(self, value): - return str(value.value).encode() - - def decode(self, encoded): - return _Unpicklable(int(encoded.decode())) - - def to_type_hint(self): - return _Unpicklable - - def is_deterministic(self): - return True - - def test_reshuffle_unpicklable_in_global_window(self): - beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) - - with TestPipeline() as pipeline: - data = [_Unpicklable(i) for i in range(5)] - expected_data = [0, 10, 20, 30, 40] - result = ( - pipeline - | beam.Create(data) - | beam.WindowInto(GlobalWindows()) - | beam.Reshuffle() - | beam.Map(lambda u: u.value * 10)) - assert_that(result, equal_to(expected_data)) - - def test_reshuffle_unpicklable_in_non_global_window(self): - beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) - - with TestPipeline() as pipeline: - data = [_Unpicklable(i) for i in range(5)] - expected_data = [0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40] - result = ( - pipeline - | beam.Create(data) - | beam.WindowInto(window.SlidingWindows(size=3, period=1)) - | beam.Reshuffle() - | beam.Map(lambda u: u.value * 10)) - assert_that(result, equal_to(expected_data)) - class WithKeysTest(unittest.TestCase): def setUp(self): diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 381d4f7aae2b..6f704b37a969 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -24,13 +24,9 @@ import sys import types import typing -from typing import Generic -from typing import TypeVar from apache_beam.typehints import typehints -T = TypeVar('T') - _LOGGER = logging.getLogger(__name__) # Describes an entry in the type map in convert_to_beam_type. @@ -220,18 +216,6 @@ def convert_collections_to_typing(typ): return typ -# During type inference of WindowedValue, we need to pass in the inner value -# type. This cannot be achieved immediately with WindowedValue class because it -# is not parameterized. Changing it to a generic class (e.g. WindowedValue[T]) -# could work in theory. However, the class is cythonized and it seems that -# cython does not handle generic classes well. -# The workaround here is to create a separate class solely for the type -# inference purpose. This class should never be used for creating instances. -class TypedWindowedValue(Generic[T]): - def __init__(self, *args, **kwargs): - raise NotImplementedError("This class is solely for type inference") - - def convert_to_beam_type(typ): """Convert a given typing type to a Beam type. @@ -283,12 +267,6 @@ def convert_to_beam_type(typ): # TODO(https://github.com/apache/beam/issues/20076): Currently unhandled. _LOGGER.info('Converting NewType type hint to Any: "%s"', typ) return typehints.Any - elif typ_module == 'apache_beam.typehints.native_type_compatibility' and \ - getattr(typ, "__name__", typ.__origin__.__name__) == 'TypedWindowedValue': - # Need to pass through WindowedValue class so that it can be converted - # to the correct type constraint in Beam - # This is needed to fix https://github.com/apache/beam/issues/33356 - pass elif (typ_module != 'typing') and (typ_module != 'collections.abc'): # Only translate types from the typing and collections.abc modules. return typ @@ -346,10 +324,6 @@ def convert_to_beam_type(typ): match=_match_is_exactly_collection, arity=1, beam_type=typehints.Collection), - _TypeMapEntry( - match=_match_issubclass(TypedWindowedValue), - arity=1, - beam_type=typehints.WindowedValue), ] # Find the first matching entry. diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index a65a0f753826..0e18e887c2a0 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -1213,15 +1213,6 @@ def type_check(self, instance): repr(self.inner_type), instance.value.__class__.__name__)) - def bind_type_variables(self, bindings): - bound_inner_type = bind_type_variables(self.inner_type, bindings) - if bound_inner_type == self.inner_type: - return self - return WindowedValue[bound_inner_type] - - def __repr__(self): - return 'WindowedValue[%s]' % repr(self.inner_type) - class GeneratorHint(IteratorHint): """A Generator type hint. From 14f7cafb5e0d749101b3a64046892d64b8e93d4b Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 17 Dec 2024 17:18:00 -0800 Subject: [PATCH 07/17] Update lineage query function. --- sdks/python/apache_beam/metrics/cells.py | 3 +++ sdks/python/apache_beam/metrics/metric.py | 12 ++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py index 8f3d7bba22ad..8fd8457489ae 100644 --- a/sdks/python/apache_beam/metrics/cells.py +++ b/sdks/python/apache_beam/metrics/cells.py @@ -811,6 +811,9 @@ def contains(self, value): else: return False + def flattened(self): + return self.as_trie().flattened() + def to_proto(self) -> metrics_pb2.BoundedTrie: return metrics_pb2.BoundedTrie( bound=self._bound, diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index 15f6b53c78e0..9cf42370f4b1 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -456,14 +456,18 @@ def add_raw(self, *rollup_segments: str) -> None: self.metric.add(rollup_segments) @staticmethod - def query(results: MetricResults, label: str) -> Set[str]: + def query(results: MetricResults, + label: str, + truncated_marker: str = '*') -> Set[str]: if not label in Lineage._METRICS: raise ValueError("Label {} does not exist for Lineage", label) response = results.query( MetricsFilter().with_namespace(Lineage.LINEAGE_NAMESPACE).with_name( - label))[MetricResults.STRINGSETS] + label))[MetricResults.BOUNDED_TRIES] result = set() for metric in response: - result.update(metric.committed) - result.update(metric.attempted) + for fqn in metric.committed.flattened(): + result.add(''.join(fqn[:-1]) + (truncated_marker if fqn[-1] else '')) + for fqn in metric.attempted.flattened(): + result.add(''.join(fqn[:-1]) + (truncated_marker if fqn[-1] else '')) return result From 330f57b3716b0f602385cdbfa8befdec04bb556a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 18 Dec 2024 09:12:44 -0800 Subject: [PATCH 08/17] [YAML] Better docs for Filter and MapToFields. (#33274) * [YAML] Better docs for Filter and MapToFields. * Remove redundant optional indicators. * Update sdks/python/apache_beam/yaml/yaml_mapping.py Co-authored-by: Jeff Kinard --------- Co-authored-by: Jeff Kinard --- .../apache_beam/yaml/generate_yaml_docs.py | 2 +- sdks/python/apache_beam/yaml/yaml_mapping.py | 30 +++++++++++++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/generate_yaml_docs.py b/sdks/python/apache_beam/yaml/generate_yaml_docs.py index 27e17029f387..fe5727f3ef92 100644 --- a/sdks/python/apache_beam/yaml/generate_yaml_docs.py +++ b/sdks/python/apache_beam/yaml/generate_yaml_docs.py @@ -250,7 +250,7 @@ def main(): if options.markdown_file or options.html_file: if '-' in transforms[0]: extra_docs = 'Supported languages: ' + ', '.join( - t.split('-')[-1] for t in sorted(transforms)) + t.split('-')[-1] for t in sorted(transforms)) + '.' else: extra_docs = '' markdown_out.write( diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py index 8f4a2118c236..7f7da7aca6a9 100644 --- a/sdks/python/apache_beam/yaml/yaml_mapping.py +++ b/sdks/python/apache_beam/yaml/yaml_mapping.py @@ -23,6 +23,7 @@ from typing import Callable from typing import Collection from typing import Dict +from typing import Iterable from typing import List from typing import Mapping from typing import Optional @@ -619,6 +620,13 @@ def _PyJsFilter( See more complete documentation on [YAML Filtering](https://beam.apache.org/documentation/sdks/yaml-udf/#filtering). + + Args: + keep: An expression evaluating to true for those records that should be kept. + language: The language of the above expression. + Defaults to generic. + error_handling: Whether and where to output records that throw errors when + the above expressions are evaluated. """ # pylint: disable=line-too-long keep_fn = _as_callable_for_pcoll(pcoll, keep, "keep", language or 'generic') return pcoll | beam.Filter(keep_fn) @@ -664,14 +672,32 @@ def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'): @beam.ptransform.ptransform_fn @maybe_with_exception_handling_transform_fn -def _PyJsMapToFields(pcoll, language='generic', **mapping_args): +def _PyJsMapToFields( + pcoll, + fields: Mapping[str, Union[str, Mapping[str, str]]], + append: Optional[bool] = False, + drop: Optional[Iterable[str]] = None, + language: Optional[str] = None): """Creates records with new fields defined in terms of the input fields. See more complete documentation on [YAML Mapping Functions](https://beam.apache.org/documentation/sdks/yaml-udf/#mapping-functions). + + Args: + fields: The output fields to compute, each mapping to the expression or + callable that creates them. + append: Whether to append the created fields to the set of + fields already present, outputting a union of both the new fields and + the original fields for each record. Defaults to False. + drop: If `append` is true, enumerates a subset of fields from the + original record that should not be kept + language: The language used to define (and execute) the + expressions and/or callables in `fields`. Defaults to generic. + error_handling: Whether and where to output records that throw errors when + the above expressions are evaluated. """ # pylint: disable=line-too-long input_schema, fields = normalize_fields( - pcoll, language=language, **mapping_args) + pcoll, fields, drop or (), append, language=language or 'generic') if language == 'javascript': options.YamlOptions.check_enabled(pcoll.pipeline, 'javascript') From 116df9fed1cec0b89b132e351b2f48576e343913 Mon Sep 17 00:00:00 2001 From: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> Date: Wed, 18 Dec 2024 13:17:00 -0500 Subject: [PATCH 09/17] Fix env variable loading in Cost Benchmark workflow (#33404) * Fix env variable loading in Cost Benchmark workflow * fix output file for tf mnist * add load test requirements file arg * update mnist args * revert how args are passed * assign result correctly --- .github/workflows/beam_Python_CostBenchmarks_Dataflow.yml | 5 +++-- .../tensorflow_mnist_classification_cost_benchmark.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml b/.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml index 18fe37e142ac..209325c429a1 100644 --- a/.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml +++ b/.github/workflows/beam_Python_CostBenchmarks_Dataflow.yml @@ -92,7 +92,7 @@ jobs: -PloadTest.mainClass=apache_beam.testing.benchmarks.wordcount.wordcount \ -Prunner=DataflowRunner \ -PpythonVersion=3.10 \ - '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \ + '-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_1 }} --job_name=benchmark-tests-wordcount-python-${{env.NOW_UTC}} --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_wordcount-${{env.NOW_UTC}}.txt' \ - name: Run Tensorflow MNIST Image Classification on Dataflow uses: ./.github/actions/gradle-command-self-hosted-action timeout-minutes: 30 @@ -102,4 +102,5 @@ jobs: -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.tensorflow_mnist_classification_cost_benchmark \ -Prunner=DataflowRunner \ -PpythonVersion=3.10 \ - '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_2 }} --job_name=benchmark-tests-tf-mnist-classification-python-${{env.NOW_UTC}} --input_file=gs://apache-beam-ml/testing/inputs/it_mnist_data.csv --output_file=gs://temp-storage-for-end-to-end-tests/wordcount/result_tf_mnist-${{env.NOW_UTC}}.txt --model=gs://apache-beam-ml/models/tensorflow/mnist/' \ \ No newline at end of file + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/tensorflow_tests_requirements.txt \ + '-PloadTest.args=${{ env.beam_Python_Cost_Benchmarks_Dataflow_test_arguments_2 }} --job_name=benchmark-tests-tf-mnist-classification-python-${{env.NOW_UTC}} --input_file=gs://apache-beam-ml/testing/inputs/it_mnist_data.csv --output_file=gs://temp-storage-for-end-to-end-tests/inference/result_tf_mnist-${{env.NOW_UTC}}.txt --model=gs://apache-beam-ml/models/tensorflow/mnist/' \ \ No newline at end of file diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/tensorflow_mnist_classification_cost_benchmark.py b/sdks/python/apache_beam/testing/benchmarks/inference/tensorflow_mnist_classification_cost_benchmark.py index f7e12dcead03..223b973e5fbe 100644 --- a/sdks/python/apache_beam/testing/benchmarks/inference/tensorflow_mnist_classification_cost_benchmark.py +++ b/sdks/python/apache_beam/testing/benchmarks/inference/tensorflow_mnist_classification_cost_benchmark.py @@ -31,7 +31,7 @@ def test(self): extra_opts['input'] = self.pipeline.get_option('input_file') extra_opts['output'] = self.pipeline.get_option('output_file') extra_opts['model_path'] = self.pipeline.get_option('model') - tensorflow_mnist_classification.run( + self.result = tensorflow_mnist_classification.run( self.pipeline.get_full_options_as_args(**extra_opts), save_main_session=False) From 5eed396caf9e0065d8ed82edcc236bad5b71ba22 Mon Sep 17 00:00:00 2001 From: Shingo Furuyama Date: Thu, 19 Dec 2024 03:56:19 +0900 Subject: [PATCH 10/17] fix error on start-build-env.sh (#33401) --- start-build-env.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/start-build-env.sh b/start-build-env.sh index b788146eb988..0f23f32a269c 100755 --- a/start-build-env.sh +++ b/start-build-env.sh @@ -91,7 +91,7 @@ RUN echo "${USER_NAME} ALL=NOPASSWD: ALL" > "/etc/sudoers.d/beam-build-${USER_ID ENV HOME "${DOCKER_HOME_DIR}" ENV GOPATH ${DOCKER_HOME_DIR}/beam/sdks/go/examples/.gogradle/project_gopath # This next command still runs as root causing the ~/.cache/go-build to be owned by root -RUN go get github.com/linkedin/goavro/v2 +RUN go mod init beam-build-${USER_ID} && go get github.com/linkedin/goavro/v2 RUN chown -R ${USER_NAME}:${GROUP_ID} ${DOCKER_HOME_DIR}/.cache UserSpecificDocker From ec2e150a0950787b65db5cd22473c28cde820c7e Mon Sep 17 00:00:00 2001 From: kennknowles Date: Thu, 19 Dec 2024 13:06:03 +0000 Subject: [PATCH 11/17] Moving to 2.63.0-SNAPSHOT on master branch. --- .asf.yaml | 1 + gradle.properties | 4 ++-- sdks/go/pkg/beam/core/core.go | 2 +- sdks/python/apache_beam/version.py | 2 +- sdks/typescript/package.json | 2 +- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.asf.yaml b/.asf.yaml index 50886f2cea5a..a6449ffb8b5f 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -49,6 +49,7 @@ github: protected_branches: master: {} + release-2.62.0: {} release-2.61.0: {} release-2.60.0: {} release-2.59.0: {} diff --git a/gradle.properties b/gradle.properties index 3923dc204272..dea5966f825d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,8 +30,8 @@ signing.gnupg.useLegacyGpg=true # buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy. # To build a custom Beam version make sure you change it in both places, see # https://github.com/apache/beam/issues/21302. -version=2.62.0-SNAPSHOT -sdk_version=2.62.0.dev +version=2.63.0-SNAPSHOT +sdk_version=2.63.0.dev javaVersion=1.8 diff --git a/sdks/go/pkg/beam/core/core.go b/sdks/go/pkg/beam/core/core.go index 1b478f483077..a183ddf384ed 100644 --- a/sdks/go/pkg/beam/core/core.go +++ b/sdks/go/pkg/beam/core/core.go @@ -27,7 +27,7 @@ const ( // SdkName is the human readable name of the SDK for UserAgents. SdkName = "Apache Beam SDK for Go" // SdkVersion is the current version of the SDK. - SdkVersion = "2.62.0.dev" + SdkVersion = "2.63.0.dev" // DefaultDockerImage represents the associated image for this release. DefaultDockerImage = "apache/beam_go_sdk:" + SdkVersion diff --git a/sdks/python/apache_beam/version.py b/sdks/python/apache_beam/version.py index 9974bb68bccf..39185712b141 100644 --- a/sdks/python/apache_beam/version.py +++ b/sdks/python/apache_beam/version.py @@ -17,4 +17,4 @@ """Apache Beam SDK version information and utilities.""" -__version__ = '2.62.0.dev' +__version__ = '2.63.0.dev' diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json index 9ccfcaa663d1..3ed0a0e427f4 100644 --- a/sdks/typescript/package.json +++ b/sdks/typescript/package.json @@ -1,6 +1,6 @@ { "name": "apache-beam", - "version": "2.62.0-SNAPSHOT", + "version": "2.63.0-SNAPSHOT", "devDependencies": { "@google-cloud/bigquery": "^5.12.0", "@types/mocha": "^9.0.0", From ca8a35a790d15dbea67384d253c8fa6d7c6cf92a Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Thu, 19 Dec 2024 08:08:04 -0500 Subject: [PATCH 12/17] Bump Iceberg to v1.6.1 (#33294) --- .github/trigger_files/IO_Iceberg_Integration_Tests.json | 2 +- sdks/java/io/iceberg/build.gradle | 2 +- sdks/java/io/iceberg/hive/build.gradle | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 2160d3c68005..a84f69a97721 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 5 + "modification": 6 } diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index 0cfa8da4eb7d..319848b7626b 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -37,7 +37,7 @@ def hadoopVersions = [ hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")} -def iceberg_version = "1.4.2" +def iceberg_version = "1.6.1" def parquet_version = "1.12.0" def orc_version = "1.9.2" diff --git a/sdks/java/io/iceberg/hive/build.gradle b/sdks/java/io/iceberg/hive/build.gradle index 2d0d2bcc5cde..9884b45af7a1 100644 --- a/sdks/java/io/iceberg/hive/build.gradle +++ b/sdks/java/io/iceberg/hive/build.gradle @@ -30,7 +30,7 @@ ext.summary = "Runtime dependencies needed for Hive catalog integration." def hive_version = "3.1.3" def hbase_version = "2.6.1-hadoop3" def hadoop_version = "3.4.1" -def iceberg_version = "1.4.2" +def iceberg_version = "1.6.1" def avatica_version = "1.25.0" dependencies { From 55b0ce929b70af58b0c467446a38b1a950ec6d4e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 19 Dec 2024 08:48:30 -0500 Subject: [PATCH 13/17] Add 2.63.0 section to CHANGES.md --- CHANGES.md | 45 +++++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 06b92953c662..edca7a196127 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -31,7 +31,6 @@ ## New Features / Improvements -* The datetime module is now available for use in jinja templatization for yaml. * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes @@ -54,7 +53,7 @@ * ([#X](https://github.com/apache/beam/issues/X)). --> -# [2.62.0] - Unreleased +# [2.63.0] - Unreleased ## Highlights @@ -63,24 +62,14 @@ ## I/Os -* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). * Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939)) -* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)). -* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125)) ## New Features / Improvements -* Added support for stateful processing in Spark Runner for streaming pipelines. Timer functionality is not yet supported and will be implemented in a future release ([#33237](https://github.com/apache/beam/issues/33237)). -* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)). -* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)). - * This enables initial Java GroupIntoBatches support. -* Support OrderedListState in Prism ([#32929](https://github.com/apache/beam/issues/32929)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes -* Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used. * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). ## Deprecations @@ -90,16 +79,44 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). -* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). -* Fixed (CVE-2024-47561)[https://www.cve.org/CVERecord?id=CVE-2024-47561] (Java) by upgrading Avro version to 1.11.4 ## Known Issues * ([#X](https://github.com/apache/beam/issues/X)). +# [2.62.0] - Unreleased + +## I/Os + +* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)). +* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for types `date`, `time`, `timestamp`, and `timestamp(tz)` ([#32939](https://github.com/apache/beam/pull/32939)) +* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)). +* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125)) + +## New Features / Improvements + +* Added support for stateful processing in Spark Runner for streaming pipelines. Timer functionality is not yet supported and will be implemented in a future release ([#33237](https://github.com/apache/beam/issues/33237)). +* The datetime module is now available for use in jinja templatization for yaml. +* Improved batch performance of SparkRunner's GroupByKey ([#20943](https://github.com/apache/beam/pull/20943)). +* Support OnWindowExpiration in Prism ([#32211](https://github.com/apache/beam/issues/32211)). + * This enables initial Java GroupIntoBatches support. +* Support OrderedListState in Prism ([#32929](https://github.com/apache/beam/issues/32929)). + +## Breaking Changes + +* Upgraded ZetaSQL to 2024.11.1 ([#32902](https://github.com/apache/beam/pull/32902)). Java11+ is now needed if Beam's ZetaSQL component is used. + +## Bugfixes + +* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)). + +## Security Fixes + +* Fixed (CVE-2024-47561)[https://www.cve.org/CVERecord?id=CVE-2024-47561] (Java) by upgrading Avro version to 1.11.4 + # [2.61.0] - 2024-11-25 ## Highlights From 8fee3ca5753ed2ca7563e46ce17cef4d9d95e3cc Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 19 Dec 2024 18:10:13 +0000 Subject: [PATCH 14/17] Improve existing Python multi-lang SchemaTransform examples (#33361) * improve python multi-lang examples * minor adjustments --- .../python/wordcount_external.py | 52 +++++++------- .../ExtractWordsProvider.java | 72 +++++++++++++------ .../schematransforms/JavaCountProvider.java | 52 +++++++------- .../schematransforms/WriteWordsProvider.java | 34 +++++---- .../python/apache_beam/transforms/external.py | 3 +- 5 files changed, 126 insertions(+), 87 deletions(-) diff --git a/examples/multi-language/python/wordcount_external.py b/examples/multi-language/python/wordcount_external.py index 580c0269d361..7298d81c1b44 100644 --- a/examples/multi-language/python/wordcount_external.py +++ b/examples/multi-language/python/wordcount_external.py @@ -18,8 +18,8 @@ import logging import apache_beam as beam -from apache_beam.io import ReadFromText from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.transforms.external import BeamJarExpansionService from apache_beam.transforms.external_transform_provider import ExternalTransformProvider from apache_beam.typehints.row_type import RowTypeConstraint """A Python multi-language pipeline that counts words using multiple Java SchemaTransforms. @@ -60,39 +60,35 @@ --expansion_service_port """ -# Original Java transform is in ExtractWordsProvider.java EXTRACT_IDENTIFIER = "beam:schematransform:org.apache.beam:extract_words:v1" -# Original Java transform is in JavaCountProvider.java COUNT_IDENTIFIER = "beam:schematransform:org.apache.beam:count:v1" -# Original Java transform is in WriteWordsProvider.java WRITE_IDENTIFIER = "beam:schematransform:org.apache.beam:write_words:v1" def run(input_path, output_path, expansion_service_port, pipeline_args): pipeline_options = PipelineOptions(pipeline_args) - # Discover and get external transforms from this expansion service - provider = ExternalTransformProvider("localhost:" + expansion_service_port) - # Get transforms with identifiers, then use them as you would a regular - # native PTransform - Extract = provider.get_urn(EXTRACT_IDENTIFIER) - Count = provider.get_urn(COUNT_IDENTIFIER) - Write = provider.get_urn(WRITE_IDENTIFIER) - with beam.Pipeline(options=pipeline_options) as p: - lines = p | 'Read' >> ReadFromText(input_path) - - words = (lines - | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) - | 'Extract Words' >> Extract()) - word_counts = words | 'Count Words' >> Count() - formatted_words = ( - word_counts - | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( - row.word, row.count))).with_output_types( - RowTypeConstraint.from_fields([('line', str)]))) - - formatted_words | 'Write' >> Write(file_path_prefix=output_path) + expansion_service = BeamJarExpansionService( + "examples:multi-language:shadowJar") + if expansion_service_port: + expansion_service = "localhost:" + expansion_service_port + + provider = ExternalTransformProvider(expansion_service) + # Retrieve portable transforms + Extract = provider.get_urn(EXTRACT_IDENTIFIER) + Count = provider.get_urn(COUNT_IDENTIFIER) + Write = provider.get_urn(WRITE_IDENTIFIER) + + _ = (p + | 'Read' >> beam.io.ReadFromText(input_path) + | 'Prepare Rows' >> beam.Map(lambda line: beam.Row(line=line)) + | 'Extract Words' >> Extract(drop=["king", "palace"]) + | 'Count Words' >> Count() + | 'Format Text' >> beam.Map(lambda row: beam.Row(line="%s: %s" % ( + row.word, row.count))).with_output_types( + RowTypeConstraint.from_fields([('line', str)])) + | 'Write' >> Write(file_path_prefix=output_path)) if __name__ == '__main__': @@ -110,8 +106,10 @@ def run(input_path, output_path, expansion_service_port, pipeline_args): help='Output file') parser.add_argument('--expansion_service_port', dest='expansion_service_port', - required=True, - help='Expansion service port') + required=False, + help='Expansion service port. If left empty, the ' + 'existing multi-language examples service will ' + 'be used by default.') known_args, pipeline_args = parser.parse_known_args() run(known_args.input, known_args.output, known_args.expansion_service_port, diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java index 724dbce276fb..b7224ecec6b4 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/ExtractWordsProvider.java @@ -21,9 +21,12 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; +import java.util.Arrays; +import java.util.List; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -36,7 +39,6 @@ /** Splits a line into separate words and returns each word. */ @AutoService(SchemaTransformProvider.class) public class ExtractWordsProvider extends TypedSchemaTransformProvider { - public static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); @Override public String identifier() { @@ -45,32 +47,60 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - return PCollectionRowTuple.of( - "output", - input.get("input").apply(ParDo.of(new ExtractWordsFn())).setRowSchema(OUTPUT_SCHEMA)); - } - }; + return new ExtractWordsTransform(configuration); } - static class ExtractWordsFn extends DoFn { - @ProcessElement - public void processElement(@Element Row element, OutputReceiver receiver) { - // Split the line into words. - String line = Preconditions.checkStateNotNull(element.getString("line")); - String[] words = line.split("[^\\p{L}]+", -1); + static class ExtractWordsTransform extends SchemaTransform { + private static final Schema OUTPUT_SCHEMA = Schema.builder().addStringField("word").build(); + private final List drop; - for (String word : words) { - if (!word.isEmpty()) { - receiver.output(Row.withSchema(OUTPUT_SCHEMA).withFieldValue("word", word).build()); - } - } + ExtractWordsTransform(Configuration configuration) { + this.drop = configuration.getDrop(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + return PCollectionRowTuple.of( + "output", + input + .getSinglePCollection() + .apply( + ParDo.of( + new DoFn() { + @ProcessElement + public void process(@Element Row element, OutputReceiver receiver) { + // Split the line into words. + String line = Preconditions.checkStateNotNull(element.getString("line")); + String[] words = line.split("[^\\p{L}]+", -1); + Arrays.stream(words) + .filter(w -> !drop.contains(w)) + .forEach( + word -> + receiver.output( + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue("word", word) + .build())); + } + })) + .setRowSchema(OUTPUT_SCHEMA)); } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration {} + public abstract static class Configuration { + public static Builder builder() { + return new AutoValue_ExtractWordsProvider_Configuration.Builder(); + } + + @SchemaFieldDescription("List of words to drop.") + public abstract List getDrop(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setDrop(List foo); + + public abstract Configuration build(); + } + } } diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java index cabea594ae18..90d02d92c3cb 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/JavaCountProvider.java @@ -44,35 +44,37 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - Schema outputSchema = - Schema.builder().addStringField("word").addInt64Field("count").build(); + return new JavaCountTransform(); + } + + static class JavaCountTransform extends SchemaTransform { + static final Schema OUTPUT_SCHEMA = + Schema.builder().addStringField("word").addInt64Field("count").build(); - PCollection wordCounts = - input - .get("input") - .apply(Count.perElement()) - .apply( - MapElements.into(TypeDescriptors.rows()) - .via( - kv -> - Row.withSchema(outputSchema) - .withFieldValue( - "word", - Preconditions.checkStateNotNull( - kv.getKey().getString("word"))) - .withFieldValue("count", kv.getValue()) - .build())) - .setRowSchema(outputSchema); + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection wordCounts = + input + .get("input") + .apply(Count.perElement()) + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + kv -> + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue( + "word", + Preconditions.checkStateNotNull( + kv.getKey().getString("word"))) + .withFieldValue("count", kv.getValue()) + .build())) + .setRowSchema(OUTPUT_SCHEMA); - return PCollectionRowTuple.of("output", wordCounts); - } - }; + return PCollectionRowTuple.of("output", wordCounts); + } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration {} + public abstract static class Configuration {} } diff --git a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java index 0b2017c5587a..faf9590a7f16 100644 --- a/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java +++ b/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/schematransforms/WriteWordsProvider.java @@ -42,24 +42,32 @@ public String identifier() { @Override protected SchemaTransform from(Configuration configuration) { - return new SchemaTransform() { - @Override - public PCollectionRowTuple expand(PCollectionRowTuple input) { - input - .get("input") - .apply( - MapElements.into(TypeDescriptors.strings()) - .via(row -> Preconditions.checkStateNotNull(row.getString("line")))) - .apply(TextIO.write().to(configuration.getFilePathPrefix())); + return new WriteWordsTransform(configuration); + } + + static class WriteWordsTransform extends SchemaTransform { + private final String filePathPrefix; + + WriteWordsTransform(Configuration configuration) { + this.filePathPrefix = configuration.getFilePathPrefix(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + input + .get("input") + .apply( + MapElements.into(TypeDescriptors.strings()) + .via(row -> Preconditions.checkStateNotNull(row.getString("line")))) + .apply(TextIO.write().to(filePathPrefix)); - return PCollectionRowTuple.empty(input.getPipeline()); - } - }; + return PCollectionRowTuple.empty(input.getPipeline()); + } } @DefaultSchema(AutoValueSchema.class) @AutoValue - protected abstract static class Configuration { + public abstract static class Configuration { public static Builder builder() { return new AutoValue_WriteWordsProvider_Configuration.Builder(); } diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index fb37a8fd974d..9ca5886f4cc2 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -239,7 +239,8 @@ def dict_to_row(schema_proto, py_value): extra = set(py_value.keys()) - set(row_type._fields) if extra: raise ValueError( - f"Unknown fields: {extra}. Valid fields: {row_type._fields}") + f"Transform '{self.identifier()}' was configured with unknown " + f"fields: {extra}. Valid fields: {set(row_type._fields)}") return row_type( *[ dict_to_row_recursive( From a51a0e1688978cd5c90e1de4fc7616ff068eb7b4 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:54:06 +0000 Subject: [PATCH 15/17] Move Dataflow Python Managed tests to prod (#33426) --- .../trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json | 2 +- sdks/python/apache_beam/transforms/managed_iceberg_it_test.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json index b26833333238..c537844dc84a 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json +++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_IO_Dataflow.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 3 } diff --git a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py index a09203f313eb..20cb52335c76 100644 --- a/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py +++ b/sdks/python/apache_beam/transforms/managed_iceberg_it_test.py @@ -40,7 +40,6 @@ def setUp(self): self.args = self.test_pipeline.get_full_options_as_args() self.args.extend([ '--experiments=enable_managed_transforms', - '--dataflow_endpoint=https://dataflow-staging.sandbox.googleapis.com', ]) def _create_row(self, num: int): From 30f128b93f52de652194aad19c520c33939b489d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 20 Dec 2024 09:46:30 -0800 Subject: [PATCH 16/17] Bump github.com/docker/docker in /sdks (#33420) Bumps [github.com/docker/docker](https://github.com/docker/docker) from 27.3.1+incompatible to 27.4.1+incompatible. - [Release notes](https://github.com/docker/docker/releases) - [Commits](https://github.com/docker/docker/compare/v27.3.1...v27.4.1) --- updated-dependencies: - dependency-name: github.com/docker/docker dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- sdks/go.mod | 2 +- sdks/go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/go.mod b/sdks/go.mod index ffe9d942f799..80edcf942fae 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -149,7 +149,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect - github.com/docker/docker v27.3.1+incompatible // but required to resolve issue docker has with go1.20 + github.com/docker/docker v27.4.1+incompatible // but required to resolve issue docker has with go1.20 github.com/docker/go-units v0.5.0 // indirect github.com/envoyproxy/go-control-plane v0.13.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect diff --git a/sdks/go.sum b/sdks/go.sum index a32b86613ba7..36d6823457db 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -787,8 +787,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI= -github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v27.4.1+incompatible h1:ZJvcY7gfwHn1JF48PfbyXg7Jyt9ZCWDW+GGXOIxEwp4= +github.com/docker/docker v27.4.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= From 7c86bf3103fec1d3a6a3a3cbe6ace8a14a85b723 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 20 Dec 2024 09:48:09 -0800 Subject: [PATCH 17/17] Bump google.golang.org/api from 0.212.0 to 0.214.0 in /sdks (#33429) Bumps [google.golang.org/api](https://github.com/googleapis/google-api-go-client) from 0.212.0 to 0.214.0. - [Release notes](https://github.com/googleapis/google-api-go-client/releases) - [Changelog](https://github.com/googleapis/google-api-go-client/blob/main/CHANGES.md) - [Commits](https://github.com/googleapis/google-api-go-client/compare/v0.212.0...v0.214.0) --- updated-dependencies: - dependency-name: google.golang.org/api dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- sdks/go.mod | 6 +++--- sdks/go.sum | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/go.mod b/sdks/go.mod index 80edcf942fae..31dcd9f53456 100644 --- a/sdks/go.mod +++ b/sdks/go.mod @@ -53,12 +53,12 @@ require ( github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c go.mongodb.org/mongo-driver v1.17.1 - golang.org/x/net v0.32.0 + golang.org/x/net v0.33.0 golang.org/x/oauth2 v0.24.0 golang.org/x/sync v0.10.0 golang.org/x/sys v0.28.0 golang.org/x/text v0.21.0 - google.golang.org/api v0.212.0 + google.golang.org/api v0.214.0 google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 google.golang.org/grpc v1.67.2 google.golang.org/protobuf v1.36.0 @@ -195,5 +195,5 @@ require ( golang.org/x/tools v0.24.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect ) diff --git a/sdks/go.sum b/sdks/go.sum index 36d6823457db..fde28c9e6be5 100644 --- a/sdks/go.sum +++ b/sdks/go.sum @@ -1388,8 +1388,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= -golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= -golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1707,8 +1707,8 @@ google.golang.org/api v0.108.0/go.mod h1:2Ts0XTHNVWxypznxWOYUeI4g3WdP9Pk2Qk58+a/ google.golang.org/api v0.110.0/go.mod h1:7FC4Vvx1Mooxh8C5HWjzZHcavuS2f6pmJpZx60ca7iI= google.golang.org/api v0.111.0/go.mod h1:qtFHvU9mhgTJegR31csQ+rwxyUTHOKFqCKWp1J0fdw0= google.golang.org/api v0.114.0/go.mod h1:ifYI2ZsFK6/uGddGfAD5BMxlnkBqCmqHSDUVi45N5Yg= -google.golang.org/api v0.212.0 h1:BcRj3MJfHF3FYD29rk7u9kuu1SyfGqfHcA0hSwKqkHg= -google.golang.org/api v0.212.0/go.mod h1:gICpLlpp12/E8mycRMzgy3SQ9cFh2XnVJ6vJi/kQbvI= +google.golang.org/api v0.214.0 h1:h2Gkq07OYi6kusGOaT/9rnNljuXmqPnaig7WGPmKbwA= +google.golang.org/api v0.214.0/go.mod h1:bYPpLG8AyeMWwDU6NXoB00xC0DFkikVvd5MfwoxjLqE= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -1852,8 +1852,8 @@ google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc= google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697 h1:pgr/4QbFyktUv9CtQ/Fq4gzEE6/Xs7iCXbktaGzLHbQ= google.golang.org/genproto/googleapis/api v0.0.0-20241118233622-e639e219e697/go.mod h1:+D9ySVjN8nY8YCVjc5O7PZDIdZporIDY3KaGfJunh88= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583 h1:IfdSdTcLFy4lqUQrQJLkLt1PB+AsqVz6lwkWPzWEz10= -google.golang.org/genproto/googleapis/rpc v0.0.0-20241206012308-a4fef0638583/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 h1:8ZmaLZE4XWrtU3MyClkYqqtl6Oegr3235h7jxsDyqCY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576/go.mod h1:5uTbfoYQed2U9p3KIj2/Zzm02PYhndfdmML0qC3q3FU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=