Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Test runner(Python SDK) fails when generic type is specified as a type for pardo element #33189

Closed
1 of 17 tasks
m-morozov-kw opened this issue Nov 22, 2024 · 7 comments
Closed
1 of 17 tasks

Comments

@m-morozov-kw
Copy link

m-morozov-kw commented Nov 22, 2024

What happened?

Test env details:
Python 3.11.3
Beam SDK version: 2.60.0

Hello! We want to use generic types as a type annotation for elements in Pardo in Python SDK to improve code readability, but it seems like Test Runner doesn't support that. Are there any options for how we can work around that?
Please find the reproduction snippet:
Create a file like /tmp/test_pardo.py

import apache_beam as beam
from typing import TypeVar, Generic

from apache_beam.testing.util import assert_that, equal_to
from apache_beam.testing.test_pipeline import TestPipeline

T = TypeVar("T")


class ResultContainer(Generic[T]):
    def __init__(self, payload: T) -> None:
        self.payload = payload


class SomeDoFn(beam.DoFn):
    def process(self, data: ResultContainer[int]):
        yield data.payload + 1


def test_pardo():
    with TestPipeline() as p:
        output = p | beam.Create([ResultContainer(1)]) | beam.ParDo(SomeDoFn())
        assert_that(
            label="check result",
            actual=output,
            matcher=equal_to([2]),
        )

Run it as a test

pytest /tmp/test_pardo.py

Expected: test passes
Actual behavior:

    def __subclasscheck__(self, cls):
>       raise TypeError("Subscripted generics cannot be used with"
                        " class and instance checks")
E       TypeError: Subscripted generics cannot be used with class and instance checks

Thank you

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@m-morozov-kw
Copy link
Author

m-morozov-kw commented Nov 22, 2024

.add-labels python

Copy link
Contributor

Label generic type annotations cannot be managed because it does not exist in the repo. Please check your spelling.

@m-morozov-kw m-morozov-kw changed the title [Bug]: Test runner(Oython SDK) fails when generic type is specified as a type for pardo element [Bug]: Test runner(Python SDK) fails when generic type is specified as a type for pardo element Nov 22, 2024
@liferoad
Copy link
Collaborator

@jrmccluskey do we support this?

@jrmccluskey
Copy link
Contributor

jrmccluskey commented Nov 25, 2024

I dug into this a little bit, and we can get to a functional pipeline with a few small tweaks:

import apache_beam as beam
from typing import Generic
from typing import TypeVar

from apache_beam.testing.util import assert_that, equal_to
from apache_beam.testing.test_pipeline import TestPipeline

T = TypeVar("T")


class ResultContainer(Generic[T]):
    def __init__(self, payload: T) -> None:
        self.payload = payload

class DifferentDoFn(beam.DoFn):
    def process(self, data: int):
        yield ResultContainer[int](data)

class SomeDoFn(beam.DoFn):
    def process(self, data: ResultContainer[int]):
        yield data.payload + 1

def test_pardo():
    with TestPipeline() as p:
        output = (p | beam.Create([1])
                    | beam.ParDo(DifferentDoFn()) 
                    | beam.ParDo(SomeDoFn()))
        assert_that(
            label="check result",
            actual=output,
            matcher=equal_to([2]),
        )

if __name__ == '__main__':
  test_pardo()

The problem the type hinting infrastructure runs into is that creating the subscripted generic type directly in the Create function leads to some funky comparisons in our type checking code. We were comparing a generic type (ResultContainer(1) not carrying a type at all) and the type ResultContainer[int]. Regardless of the former not having a type, Python's built-in comparison tools for isinstance() and issubclass() don't like these generics, hence the error message.

In the short term, providing a pattern like this where you add an extra DoFn to construct the types should be a valid workaround, in the longer term this problem is a little tricky but seems fixable with the help of some third-party type introspection code that I've started tinkering with.

@m-morozov-kw
Copy link
Author

@jrmccluskey, thank you for the feedback and for providing a workaround example. I'm unsure if we can utilize this workaround since it requires changing many places in the code where we instantiate ResultContainer. But it was good to know.

@jrmccluskey
Copy link
Contributor

I actually came back to this while looking at another type checking issue and realized that there's a better way to do this!

import apache_beam as beam
from typing import TypeVar, Generic

from apache_beam.testing.util import assert_that, equal_to
from apache_beam.testing.test_pipeline import TestPipeline

T = TypeVar("T")


class ResultContainer(Generic[T]):
  def __init__(self, payload: T) -> None:
    self.payload = payload


class SomeDoFn(beam.DoFn):
  def process(self, data: ResultContainer[int]):
    yield data.payload + 1


def test_pardo():
  with TestPipeline() as p:
    output = (
        p | beam.Create([ResultContainer(1)]).with_output_types(
            ResultContainer[int])
        | beam.ParDo(SomeDoFn()))
    assert_that(
        label="check result",
        actual=output,
        matcher=equal_to([2]),
    )


if __name__ == '__main__':
  test_pardo()

The with_output_types annotation addresses the same problem but avoids forcing an extra DoFn to wrap the output. Hopefully this is a little more helpful!

@m-morozov-kw
Copy link
Author

I actually came back to this while looking at another type checking issue and realized that there's a better way to do this!

import apache_beam as beam
from typing import TypeVar, Generic

from apache_beam.testing.util import assert_that, equal_to
from apache_beam.testing.test_pipeline import TestPipeline

T = TypeVar("T")


class ResultContainer(Generic[T]):
  def __init__(self, payload: T) -> None:
    self.payload = payload


class SomeDoFn(beam.DoFn):
  def process(self, data: ResultContainer[int]):
    yield data.payload + 1


def test_pardo():
  with TestPipeline() as p:
    output = (
        p | beam.Create([ResultContainer(1)]).with_output_types(
            ResultContainer[int])
        | beam.ParDo(SomeDoFn()))
    assert_that(
        label="check result",
        actual=output,
        matcher=equal_to([2]),
    )


if __name__ == '__main__':
  test_pardo()

The with_output_types annotation addresses the same problem but avoids forcing an extra DoFn to wrap the output. Hopefully this is a little more helpful!

Oh, that is cool. That code looks more idiomatic in terms of beam code. Thank you

@github-actions github-actions bot added this to the 2.62.0 Release milestone Dec 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants