From e77029f900acfeae56b93a9d4d33297e3fd04723 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 9 Dec 2024 23:15:12 -0500 Subject: [PATCH 01/17] Fix typehint in ReshufflePerKey on global window setting. --- sdks/python/apache_beam/transforms/util.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index a03652de2496..903e26fa202a 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -75,6 +75,7 @@ from apache_beam.typehints.decorators import get_signature from apache_beam.typehints.sharded_key_type import ShardedKeyType from apache_beam.utils import shared +from apache_beam.utils.timestamp import Timestamp from apache_beam.utils import windowed_value from apache_beam.utils.annotations import deprecated from apache_beam.utils.sharded_key import ShardedKey @@ -966,7 +967,7 @@ def restore_timestamps(element): key, windowed_values = element return [wv.with_value((key, wv.value)) for wv in windowed_values] - ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any) + ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner @@ -1005,7 +1006,6 @@ def __init__(self, num_buckets=None): generated. """ self.num_buckets = num_buckets if num_buckets else self._DEFAULT_NUM_BUCKETS - valid_buckets = isinstance(num_buckets, int) and num_buckets > 0 if not (num_buckets is None or valid_buckets): raise ValueError( @@ -1015,12 +1015,12 @@ def __init__(self, num_buckets=None): def expand(self, pcoll): # type: (pvalue.PValue) -> pvalue.PCollection return ( - pcoll | 'AddRandomKeys' >> - Map(lambda t: (random.randrange(0, self.num_buckets), t) - ).with_input_types(T).with_output_types(Tuple[int, T]) - | ReshufflePerKey() - | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( - Tuple[int, T]).with_output_types(T)) + pcoll | 'AddRandomKeys' >> + Map(lambda t: (random.randrange(0, self.num_buckets), t) + ).with_input_types(T).with_output_types(Tuple[int, T]) + | ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types(Tuple[int, T]) + | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( + Tuple[int, T]).with_output_types(T)) def to_runner_api_parameter(self, unused_context): # type: (PipelineContext) -> Tuple[str, None] From 6d60c65b9a4587356b2a4fbcc989616f18aaf150 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 16:42:25 -0500 Subject: [PATCH 02/17] Only update the type hint on global window setting. Need more work in non-global windows. --- sdks/python/apache_beam/transforms/util.py | 6 ++- .../apache_beam/transforms/util_test.py | 41 +++++++++++++++++++ sdks/python/setup.py | 3 +- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 903e26fa202a..f0db6b483d18 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -954,6 +954,8 @@ def restore_timestamps(element): window.GlobalWindows.windowed_value((key, value), timestamp) for (value, timestamp) in values ] + + ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) else: # typing: All conditional function variants must have identical signatures @@ -967,7 +969,9 @@ def restore_timestamps(element): key, windowed_values = element return [wv.with_value((key, wv.value)) for wv in windowed_values] - ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) + # TODO(https://github.com/apache/beam/issues/33356): Support reshuffling + # unpicklable objects with a non-global window setting. + ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index d86509c7dde3..dd1a960b2d84 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1011,6 +1011,47 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam): label="formatted_after_reshuffle") + def test_reshuffle_unpicklable_in_global_window(self): + global _Unpicklable + + class _Unpicklable(object): + def __init__(self, value): + self.value = value + + def __getstate__(self): + raise NotImplementedError() + + def __setstate__(self, state): + raise NotImplementedError() + + class _UnpicklableCoder(beam.coders.Coder): + + def encode(self, value): + return str(value.value).encode() + + def decode(self, encoded): + return _Unpicklable(int(encoded.decode())) + + def to_type_hint(self): + return _Unpicklable + + def is_deterministic(self): + return True + + beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) + + with TestPipeline() as pipeline: + data = [_Unpicklable(i) for i in range(5)] + expected_data = [0, 10, 20, 30, 40] + result = ( + pipeline + | beam.Create(data) + | beam.WindowInto(GlobalWindows()) + | beam.Reshuffle() + | beam.Map(lambda u: u.value * 10)) + assert_that(result, equal_to(expected_data)) + + class WithKeysTest(unittest.TestCase): def setUp(self): self.l = [1, 2, 3] diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 53c7a532e706..2bac10b3e055 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -336,7 +336,8 @@ def get_portability_package_data(): *get_portability_package_data() ] }, - ext_modules=extensions, + #ext_modules=extensions, + ext_modules=[], install_requires=[ 'crcmod>=1.7,<2.0', 'orjson>=3.9.7,<4', From 9a66c9672047377a1fe9b3740ab94c06ef6ee1fd Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 16:44:22 -0500 Subject: [PATCH 03/17] Apply yapf --- sdks/python/apache_beam/transforms/util.py | 16 +++++++++------- sdks/python/apache_beam/transforms/util_test.py | 2 -- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index f0db6b483d18..a160f5a24a3c 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -955,7 +955,8 @@ def restore_timestamps(element): for (value, timestamp) in values ] - ungrouped = pcoll | Map(reify_timestamps).with_input_types(Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) + ungrouped = pcoll | Map(reify_timestamps).with_input_types( + Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) else: # typing: All conditional function variants must have identical signatures @@ -1019,12 +1020,13 @@ def __init__(self, num_buckets=None): def expand(self, pcoll): # type: (pvalue.PValue) -> pvalue.PCollection return ( - pcoll | 'AddRandomKeys' >> - Map(lambda t: (random.randrange(0, self.num_buckets), t) - ).with_input_types(T).with_output_types(Tuple[int, T]) - | ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types(Tuple[int, T]) - | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( - Tuple[int, T]).with_output_types(T)) + pcoll | 'AddRandomKeys' >> + Map(lambda t: (random.randrange(0, self.num_buckets), t) + ).with_input_types(T).with_output_types(Tuple[int, T]) + | ReshufflePerKey().with_input_types(Tuple[int, T]).with_output_types( + Tuple[int, T]) + | 'RemoveRandomKeys' >> Map(lambda t: t[1]).with_input_types( + Tuple[int, T]).with_output_types(T)) def to_runner_api_parameter(self, unused_context): # type: (PipelineContext) -> Tuple[str, None] diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index dd1a960b2d84..7f166f78ef0a 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1010,7 +1010,6 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam): equal_to(expected_data), label="formatted_after_reshuffle") - def test_reshuffle_unpicklable_in_global_window(self): global _Unpicklable @@ -1025,7 +1024,6 @@ def __setstate__(self, state): raise NotImplementedError() class _UnpicklableCoder(beam.coders.Coder): - def encode(self, value): return str(value.value).encode() From 96fe67491fc126c4d93376ce6fdc639ad946382d Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 19:46:13 -0500 Subject: [PATCH 04/17] Fix some failed tests. --- sdks/python/apache_beam/transforms/util.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index a160f5a24a3c..9ea7f6e1dbdc 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -33,6 +33,7 @@ from typing import Callable from typing import Iterable from typing import List +from typing import Optional from typing import Tuple from typing import TypeVar from typing import Union @@ -75,10 +76,10 @@ from apache_beam.typehints.decorators import get_signature from apache_beam.typehints.sharded_key_type import ShardedKeyType from apache_beam.utils import shared -from apache_beam.utils.timestamp import Timestamp from apache_beam.utils import windowed_value from apache_beam.utils.annotations import deprecated from apache_beam.utils.sharded_key import ShardedKey +from apache_beam.utils.timestamp import Timestamp if TYPE_CHECKING: from apache_beam.runners.pipeline_context import PipelineContext @@ -956,7 +957,8 @@ def restore_timestamps(element): ] ungrouped = pcoll | Map(reify_timestamps).with_input_types( - Tuple[K, V]).with_output_types(Tuple[K, Tuple[V, Timestamp]]) + Tuple[K, V]).with_output_types( + Tuple[K, Tuple[V, Optional[Timestamp]]]) else: # typing: All conditional function variants must have identical signatures From 1be1ef10ae68e127a4dcde774c825cf9a4ca5935 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 20:53:00 -0500 Subject: [PATCH 05/17] Revert change to setup.py --- sdks/python/apache_beam/transforms/util.py | 1 + sdks/python/setup.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 9ea7f6e1dbdc..43d4a6c20e94 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -1013,6 +1013,7 @@ def __init__(self, num_buckets=None): generated. """ self.num_buckets = num_buckets if num_buckets else self._DEFAULT_NUM_BUCKETS + valid_buckets = isinstance(num_buckets, int) and num_buckets > 0 if not (num_buckets is None or valid_buckets): raise ValueError( diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 2bac10b3e055..53c7a532e706 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -336,8 +336,7 @@ def get_portability_package_data(): *get_portability_package_data() ] }, - #ext_modules=extensions, - ext_modules=[], + ext_modules=extensions, install_requires=[ 'crcmod>=1.7,<2.0', 'orjson>=3.9.7,<4', From 37f0a7674333c00f5b80699bea1921666f1701ad Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 23:13:05 -0500 Subject: [PATCH 06/17] Fix custom coders not being used in reshuffle in non-global windows --- sdks/python/apache_beam/coders/coders.py | 5 ++ sdks/python/apache_beam/coders/typecoders.py | 2 + sdks/python/apache_beam/transforms/util.py | 7 +-- .../apache_beam/transforms/util_test.py | 52 ++++++++++++------- .../python/apache_beam/typehints/typehints.py | 9 ++++ sdks/python/setup.py | 3 +- 6 files changed, 55 insertions(+), 23 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index a0c55da81800..d53be59f3ada 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1402,6 +1402,11 @@ def __hash__(self): return hash( (self.wrapped_value_coder, self.timestamp_coder, self.window_coder)) + @classmethod + def from_type_hint(cls, typehint, registry): + # type: (Any, CoderRegistry) -> WindowedValueCoder + return cls(registry.get_coder(typehint.inner_type)) + Coder.register_structured_urn( common_urns.coders.WINDOWED_VALUE.urn, WindowedValueCoder) diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 1667cb7a916a..abfb9e72a047 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -94,6 +94,8 @@ def register_standard_coders(self, fallback_coder): self._register_coder_internal(str, coders.StrUtf8Coder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) self._register_coder_internal(typehints.DictConstraint, coders.MapCoder) + self._register_coder_internal(typehints.WindowedTypeConstraint, + coders.WindowedValueCoder) # Default fallback coders applied in that order until the first matching # coder found. default_fallback_coders = [ diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 43d4a6c20e94..0787c90650c4 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -74,6 +74,7 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.typehints import trivial_inference from apache_beam.typehints.decorators import get_signature +from apache_beam.typehints.native_type_compatibility import convert_to_beam_type from apache_beam.typehints.sharded_key_type import ShardedKeyType from apache_beam.utils import shared from apache_beam.utils import windowed_value @@ -972,9 +973,9 @@ def restore_timestamps(element): key, windowed_values = element return [wv.with_value((key, wv.value)) for wv in windowed_values] - # TODO(https://github.com/apache/beam/issues/33356): Support reshuffling - # unpicklable objects with a non-global window setting. - ungrouped = pcoll | Map(reify_timestamps).with_output_types(Any) + ungrouped = pcoll | Map(reify_timestamps).with_input_types( + Tuple[K, V]).with_output_types( + Tuple[K, typehints.WindowedValue[convert_to_beam_type(V)]]) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 7f166f78ef0a..55f4488beb3a 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1010,32 +1010,32 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam): equal_to(expected_data), label="formatted_after_reshuffle") - def test_reshuffle_unpicklable_in_global_window(self): - global _Unpicklable - - class _Unpicklable(object): - def __init__(self, value): - self.value = value + global _Unpicklable + global _UnpicklableCoder + class _Unpicklable(object): + def __init__(self, value): + self.value = value - def __getstate__(self): - raise NotImplementedError() + def __getstate__(self): + raise NotImplementedError() - def __setstate__(self, state): - raise NotImplementedError() + def __setstate__(self, state): + raise NotImplementedError() - class _UnpicklableCoder(beam.coders.Coder): - def encode(self, value): - return str(value.value).encode() + class _UnpicklableCoder(beam.coders.Coder): + def encode(self, value): + return str(value.value).encode() - def decode(self, encoded): - return _Unpicklable(int(encoded.decode())) + def decode(self, encoded): + return _Unpicklable(int(encoded.decode())) - def to_type_hint(self): - return _Unpicklable + def to_type_hint(self): + return _Unpicklable - def is_deterministic(self): - return True + def is_deterministic(self): + return True + def test_reshuffle_unpicklable_in_global_window(self): beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) with TestPipeline() as pipeline: @@ -1049,6 +1049,20 @@ def is_deterministic(self): | beam.Map(lambda u: u.value * 10)) assert_that(result, equal_to(expected_data)) + def test_reshuffle_unpicklable_in_non_global_window(self): + beam.coders.registry.register_coder(_Unpicklable, _UnpicklableCoder) + + with TestPipeline() as pipeline: + data = [_Unpicklable(i) for i in range(5)] + expected_data = [0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40] + result = ( + pipeline + | beam.Create(data) + | beam.WindowInto(window.SlidingWindows(size=3, period=1)) + | beam.Reshuffle() + | beam.Map(lambda u: u.value * 10)) + assert_that(result, equal_to(expected_data)) + class WithKeysTest(unittest.TestCase): def setUp(self): diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index 0e18e887c2a0..a65a0f753826 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -1213,6 +1213,15 @@ def type_check(self, instance): repr(self.inner_type), instance.value.__class__.__name__)) + def bind_type_variables(self, bindings): + bound_inner_type = bind_type_variables(self.inner_type, bindings) + if bound_inner_type == self.inner_type: + return self + return WindowedValue[bound_inner_type] + + def __repr__(self): + return 'WindowedValue[%s]' % repr(self.inner_type) + class GeneratorHint(IteratorHint): """A Generator type hint. diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 53c7a532e706..505da7f94c16 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -336,7 +336,8 @@ def get_portability_package_data(): *get_portability_package_data() ] }, - ext_modules=extensions, + # ext_modules=extensions, + ext_modules=[], install_requires=[ 'crcmod>=1.7,<2.0', 'orjson>=3.9.7,<4', From 9300dad8098b4913d36198ed28fec7d6e5eccbf4 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Wed, 11 Dec 2024 23:20:25 -0500 Subject: [PATCH 07/17] Revert changes in setup.py. Reformat. --- sdks/python/apache_beam/coders/typecoders.py | 4 ++-- sdks/python/apache_beam/transforms/util_test.py | 1 + sdks/python/setup.py | 3 +-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index abfb9e72a047..892f508d0136 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -94,8 +94,8 @@ def register_standard_coders(self, fallback_coder): self._register_coder_internal(str, coders.StrUtf8Coder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) self._register_coder_internal(typehints.DictConstraint, coders.MapCoder) - self._register_coder_internal(typehints.WindowedTypeConstraint, - coders.WindowedValueCoder) + self._register_coder_internal( + typehints.WindowedTypeConstraint, coders.WindowedValueCoder) # Default fallback coders applied in that order until the first matching # coder found. default_fallback_coders = [ diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py index 55f4488beb3a..db73310dfe25 100644 --- a/sdks/python/apache_beam/transforms/util_test.py +++ b/sdks/python/apache_beam/transforms/util_test.py @@ -1012,6 +1012,7 @@ def format_with_timestamp(element, timestamp=beam.DoFn.TimestampParam): global _Unpicklable global _UnpicklableCoder + class _Unpicklable(object): def __init__(self, value): self.value = value diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 505da7f94c16..53c7a532e706 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -336,8 +336,7 @@ def get_portability_package_data(): *get_portability_package_data() ] }, - # ext_modules=extensions, - ext_modules=[], + ext_modules=extensions, install_requires=[ 'crcmod>=1.7,<2.0', 'orjson>=3.9.7,<4', From bff8b3c6557f20b4e8e0a8616001cbba90404064 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 14:01:28 -0500 Subject: [PATCH 08/17] Make WindowedValue a generic class. Support its conversion to the correct type constraint in Beam. --- sdks/python/apache_beam/transforms/util.py | 2 +- .../typehints/native_type_compatibility.py | 11 +++++++++++ sdks/python/apache_beam/utils/windowed_value.py | 11 ++++++++--- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 0787c90650c4..6a2817617430 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -975,7 +975,7 @@ def restore_timestamps(element): ungrouped = pcoll | Map(reify_timestamps).with_input_types( Tuple[K, V]).with_output_types( - Tuple[K, typehints.WindowedValue[convert_to_beam_type(V)]]) + Tuple[K, windowed_value.WindowedValue[V]]) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 6f704b37a969..5b58d9417fc8 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -26,6 +26,7 @@ import typing from apache_beam.typehints import typehints +from apache_beam.utils.windowed_value import WindowedValue _LOGGER = logging.getLogger(__name__) @@ -267,6 +268,12 @@ def convert_to_beam_type(typ): # TODO(https://github.com/apache/beam/issues/20076): Currently unhandled. _LOGGER.info('Converting NewType type hint to Any: "%s"', typ) return typehints.Any + elif typ_module == 'apache_beam.utils.windowed_value' and \ + typ.__name__ == 'WindowedValue': + # Need to pass through WindowedValue class so that it can be converted + # to the correct type constraint in Beam + # This is needed to fix https://github.com/apache/beam/issues/33356 + pass elif (typ_module != 'typing') and (typ_module != 'collections.abc'): # Only translate types from the typing and collections.abc modules. return typ @@ -324,6 +331,10 @@ def convert_to_beam_type(typ): match=_match_is_exactly_collection, arity=1, beam_type=typehints.Collection), + _TypeMapEntry( + match=_match_issubclass(WindowedValue), + arity = 1, + beam_type=typehints.WindowedValue), ] # Find the first matching entry. diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index f6232ce2f6b0..1de19930ca5c 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -28,11 +28,13 @@ from typing import TYPE_CHECKING from typing import Any from typing import Callable +from typing import Generic from typing import Iterable from typing import List from typing import Optional from typing import Sequence from typing import Tuple +from typing import TypeVar from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP @@ -43,6 +45,9 @@ from apache_beam.transforms.window import BoundedWindow +T = TypeVar('T') + + class PaneInfoTiming(object): """The timing of a PaneInfo.""" @@ -193,7 +198,7 @@ def _construct_well_known_pane_infos(): PANE_INFO_UNKNOWN = _BYTE_TO_PANE_INFO[0xF] -class WindowedValue(object): +class WindowedValue(Generic[T]): """A windowed value having a value, a timestamp and set of windows. Attributes: @@ -207,7 +212,7 @@ class WindowedValue(object): """ def __init__( self, - value, + value, # type: T timestamp, # type: TimestampTypes windows, # type: Tuple[BoundedWindow, ...] pane_info=PANE_INFO_UNKNOWN # type: PaneInfo @@ -259,7 +264,7 @@ def __hash__(self): (hash(self.pane_info) & 0xFFFFFFFFFFFFF)) def with_value(self, new_value): - # type: (Any) -> WindowedValue + # type: (Any) -> WindowedValue[T] """Creates a new WindowedValue with the same timestamps and windows as this. From 618d4e493cd6a623ffce95648388cc8e2bc97bc9 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 16:11:03 -0500 Subject: [PATCH 09/17] Cython does not support Python generic class. Add a subclass as a workroundand keep it un-cythonized. --- sdks/python/apache_beam/transforms/util.py | 2 +- .../apache_beam/typehints/native_type_compatibility.py | 8 ++++---- sdks/python/apache_beam/utils/windowed_value.py | 8 ++++++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 6a2817617430..f301b14a74fc 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -975,7 +975,7 @@ def restore_timestamps(element): ungrouped = pcoll | Map(reify_timestamps).with_input_types( Tuple[K, V]).with_output_types( - Tuple[K, windowed_value.WindowedValue[V]]) + Tuple[K, windowed_value.TypedWindowedValue[V]]) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 5b58d9417fc8..3c41f3b08f93 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -26,7 +26,7 @@ import typing from apache_beam.typehints import typehints -from apache_beam.utils.windowed_value import WindowedValue +from apache_beam.utils.windowed_value import TypedWindowedValue _LOGGER = logging.getLogger(__name__) @@ -269,7 +269,7 @@ def convert_to_beam_type(typ): _LOGGER.info('Converting NewType type hint to Any: "%s"', typ) return typehints.Any elif typ_module == 'apache_beam.utils.windowed_value' and \ - typ.__name__ == 'WindowedValue': + typ.__name__ == 'TypedWindowedValue': # Need to pass through WindowedValue class so that it can be converted # to the correct type constraint in Beam # This is needed to fix https://github.com/apache/beam/issues/33356 @@ -332,8 +332,8 @@ def convert_to_beam_type(typ): arity=1, beam_type=typehints.Collection), _TypeMapEntry( - match=_match_issubclass(WindowedValue), - arity = 1, + match=_match_issubclass(TypedWindowedValue), + arity=1, beam_type=typehints.WindowedValue), ] diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 1de19930ca5c..c6b1266484be 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -44,7 +44,6 @@ if TYPE_CHECKING: from apache_beam.transforms.window import BoundedWindow - T = TypeVar('T') @@ -198,7 +197,7 @@ def _construct_well_known_pane_infos(): PANE_INFO_UNKNOWN = _BYTE_TO_PANE_INFO[0xF] -class WindowedValue(Generic[T]): +class WindowedValue(object): """A windowed value having a value, a timestamp and set of windows. Attributes: @@ -278,6 +277,11 @@ def __reduce__(self): self.value, self.timestamp, self.windows, self.pane_info) +class TypedWindowedValue(WindowedValue, Generic[T]): + def __init__(self, *args, **kwargs): + raise NotImplementedError("This class is solely for type inference") + + # TODO(robertwb): Move this to a static method. From 6204023cf10f4ac1385f3c78bdb316bfba72f497 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 16:14:39 -0500 Subject: [PATCH 10/17] Add comments --- sdks/python/apache_beam/utils/windowed_value.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index c6b1266484be..20d96a95c2b6 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -277,6 +277,12 @@ def __reduce__(self): self.value, self.timestamp, self.windows, self.pane_info) +# During type inference of WindowedValue, we need to make it generic and pass +# in the inner value type. We cannot do that directly on WindowedValue class +# because it is cythonized and it seems cython could not handle generic classes. +# The workaround here is creating a subclass and keep it uncythonized. +# This class should be used solely for type inference, and should never be used +# for creating instances. class TypedWindowedValue(WindowedValue, Generic[T]): def __init__(self, *args, **kwargs): raise NotImplementedError("This class is solely for type inference") From f3bc937a00e5ced1d2b3cd060b8716560b597919 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 16:31:34 -0500 Subject: [PATCH 11/17] Fix type error. --- sdks/python/apache_beam/utils/windowed_value.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 20d96a95c2b6..0c0d0bb931cf 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -283,7 +283,7 @@ def __reduce__(self): # The workaround here is creating a subclass and keep it uncythonized. # This class should be used solely for type inference, and should never be used # for creating instances. -class TypedWindowedValue(WindowedValue, Generic[T]): +class TypedWindowedValue(Generic[T], WindowedValue): def __init__(self, *args, **kwargs): raise NotImplementedError("This class is solely for type inference") From ed302e9c69e6b2466c09abeaf48c4aec0c7d0846 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 16:50:21 -0500 Subject: [PATCH 12/17] Remove the base class of WindowedValue in TypedWindowedValue. --- sdks/python/apache_beam/utils/windowed_value.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 0c0d0bb931cf..84e9f3cda8dd 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -283,7 +283,7 @@ def __reduce__(self): # The workaround here is creating a subclass and keep it uncythonized. # This class should be used solely for type inference, and should never be used # for creating instances. -class TypedWindowedValue(Generic[T], WindowedValue): +class TypedWindowedValue(Generic[T]): def __init__(self, *args, **kwargs): raise NotImplementedError("This class is solely for type inference") From 86cfdd86e6bf31c1eb770a4f0051e380fe768837 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 17:15:03 -0500 Subject: [PATCH 13/17] Move TypedWindowedValue out from windowed_value.py --- sdks/python/apache_beam/transforms/util.py | 5 ++--- .../typehints/native_type_compatibility.py | 16 +++++++++++++++- .../apache_beam/utils/windowed_value.py | 19 ++----------------- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index f301b14a74fc..c9fd2c76b0db 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -74,7 +74,7 @@ from apache_beam.transforms.window import TimestampedValue from apache_beam.typehints import trivial_inference from apache_beam.typehints.decorators import get_signature -from apache_beam.typehints.native_type_compatibility import convert_to_beam_type +from apache_beam.typehints.native_type_compatibility import TypedWindowedValue from apache_beam.typehints.sharded_key_type import ShardedKeyType from apache_beam.utils import shared from apache_beam.utils import windowed_value @@ -974,8 +974,7 @@ def restore_timestamps(element): return [wv.with_value((key, wv.value)) for wv in windowed_values] ungrouped = pcoll | Map(reify_timestamps).with_input_types( - Tuple[K, V]).with_output_types( - Tuple[K, windowed_value.TypedWindowedValue[V]]) + Tuple[K, V]).with_output_types(Tuple[K, TypedWindowedValue[V]]) # TODO(https://github.com/apache/beam/issues/19785) Using global window as # one of the standard window. This is to mitigate the Dataflow Java Runner diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 3c41f3b08f93..d3fd6b4cab82 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -24,9 +24,12 @@ import sys import types import typing +from typing import Generic +from typing import TypeVar from apache_beam.typehints import typehints -from apache_beam.utils.windowed_value import TypedWindowedValue + +T = TypeVar('T') _LOGGER = logging.getLogger(__name__) @@ -217,6 +220,17 @@ def convert_collections_to_typing(typ): return typ +# During type inference of WindowedValue, we need to make it generic and pass +# in the inner value type. We cannot do that directly on WindowedValue class +# because it is cythonized and it seems cython could not handle generic classes. +# The workaround here is creating a subclass and keep it uncythonized. +# This class should be used solely for type inference, and should never be used +# for creating instances. +class TypedWindowedValue(Generic[T]): + def __init__(self, *args, **kwargs): + raise NotImplementedError("This class is solely for type inference") + + def convert_to_beam_type(typ): """Convert a given typing type to a Beam type. diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 84e9f3cda8dd..f6232ce2f6b0 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -28,13 +28,11 @@ from typing import TYPE_CHECKING from typing import Any from typing import Callable -from typing import Generic from typing import Iterable from typing import List from typing import Optional from typing import Sequence from typing import Tuple -from typing import TypeVar from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP @@ -44,8 +42,6 @@ if TYPE_CHECKING: from apache_beam.transforms.window import BoundedWindow -T = TypeVar('T') - class PaneInfoTiming(object): """The timing of a PaneInfo.""" @@ -211,7 +207,7 @@ class WindowedValue(object): """ def __init__( self, - value, # type: T + value, timestamp, # type: TimestampTypes windows, # type: Tuple[BoundedWindow, ...] pane_info=PANE_INFO_UNKNOWN # type: PaneInfo @@ -263,7 +259,7 @@ def __hash__(self): (hash(self.pane_info) & 0xFFFFFFFFFFFFF)) def with_value(self, new_value): - # type: (Any) -> WindowedValue[T] + # type: (Any) -> WindowedValue """Creates a new WindowedValue with the same timestamps and windows as this. @@ -277,17 +273,6 @@ def __reduce__(self): self.value, self.timestamp, self.windows, self.pane_info) -# During type inference of WindowedValue, we need to make it generic and pass -# in the inner value type. We cannot do that directly on WindowedValue class -# because it is cythonized and it seems cython could not handle generic classes. -# The workaround here is creating a subclass and keep it uncythonized. -# This class should be used solely for type inference, and should never be used -# for creating instances. -class TypedWindowedValue(Generic[T]): - def __init__(self, *args, **kwargs): - raise NotImplementedError("This class is solely for type inference") - - # TODO(robertwb): Move this to a static method. From f9e4746341fc448e07ececb703f45ae64af6837c Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 17:22:47 -0500 Subject: [PATCH 14/17] Revise the comments --- .../typehints/native_type_compatibility.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index d3fd6b4cab82..5c7d2f015134 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -220,12 +220,13 @@ def convert_collections_to_typing(typ): return typ -# During type inference of WindowedValue, we need to make it generic and pass -# in the inner value type. We cannot do that directly on WindowedValue class -# because it is cythonized and it seems cython could not handle generic classes. -# The workaround here is creating a subclass and keep it uncythonized. -# This class should be used solely for type inference, and should never be used -# for creating instances. +# During type inference of WindowedValue, we need to pass in the inner value +# type. This cannot be achieved immediately with WindowedValue class because it +# is not parameterized. Changing it to a generic class (e.g. WindowedValue[T]) +# could work in theory. However, the class is cythonized and it seems that +# cython does not handle generic classes well. +# The workaround here is to create a separate class solely for the type +# inference purpose. This class should never be used for creating instances. class TypedWindowedValue(Generic[T]): def __init__(self, *args, **kwargs): raise NotImplementedError("This class is solely for type inference") From 5a67660a54f03237fc388643523d743f1c5bd2f8 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 17:29:36 -0500 Subject: [PATCH 15/17] Fix the module location when matching. --- sdks/python/apache_beam/typehints/native_type_compatibility.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 5c7d2f015134..6f6e0e69b1a1 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -283,7 +283,7 @@ def convert_to_beam_type(typ): # TODO(https://github.com/apache/beam/issues/20076): Currently unhandled. _LOGGER.info('Converting NewType type hint to Any: "%s"', typ) return typehints.Any - elif typ_module == 'apache_beam.utils.windowed_value' and \ + elif typ_module == 'apache_beam.typehints.native_type_compatibility' and \ typ.__name__ == 'TypedWindowedValue': # Need to pass through WindowedValue class so that it can be converted # to the correct type constraint in Beam From d453b4275710800a188d2b7a21af6b9cef642f1a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 13 Dec 2024 19:05:38 -0500 Subject: [PATCH 16/17] Fix test failure where __name__ of a type alias not found in python 3.9 --- sdks/python/apache_beam/typehints/native_type_compatibility.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 6f6e0e69b1a1..381d4f7aae2b 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -284,7 +284,7 @@ def convert_to_beam_type(typ): _LOGGER.info('Converting NewType type hint to Any: "%s"', typ) return typehints.Any elif typ_module == 'apache_beam.typehints.native_type_compatibility' and \ - typ.__name__ == 'TypedWindowedValue': + getattr(typ, "__name__", typ.__origin__.__name__) == 'TypedWindowedValue': # Need to pass through WindowedValue class so that it can be converted # to the correct type constraint in Beam # This is needed to fix https://github.com/apache/beam/issues/33356 From 7854f433c285c3cbbeac69085d61ef7a295b55d0 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 13 Dec 2024 17:30:15 -0800 Subject: [PATCH 17/17] Add a note about the window coder. --- sdks/python/apache_beam/coders/coders.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index d53be59f3ada..724f268a8312 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1405,6 +1405,9 @@ def __hash__(self): @classmethod def from_type_hint(cls, typehint, registry): # type: (Any, CoderRegistry) -> WindowedValueCoder + # Ideally this'd take two parameters so that one could hint at + # the window type as well instead of falling back to the + # pickle coders. return cls(registry.get_coder(typehint.inner_type))