Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flaky]: apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder #24313

Closed
BjornPrime opened this issue Nov 22, 2022 · 2 comments

Comments

@BjornPrime
Copy link
Contributor

What happened?

Transient failure encountered when running post-commit tests in GitHub Actions. See output below.

self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_state_with_custom_key_coder>

    def test_pardo_state_with_custom_key_coder(self):
      """Tests that state requests work correctly when the key coder is an
      SDK-specific coder, i.e. non standard coder. This is additionally enforced
      by Java's ProcessBundleDescriptorsTest and by Flink's
      ExecutableStageDoFnOperator which detects invalid encoding by checking for
      the correct key group of the encoded key."""
      index_state_spec = userstate.CombiningValueStateSpec('index', sum)
    
      # Test params
      # Ensure decent amount of elements to serve all partitions
      n = 200
      duplicates = 1
    
      split = n // (duplicates + 1)
      inputs = [(i % split, str(i % split)) for i in range(0, n)]
    
      # Use a DoFn which has to use FastPrimitivesCoder because the type cannot
      # be inferred
      class Input(beam.DoFn):
        def process(self, impulse):
          for i in inputs:
            yield i
    
      class AddIndex(beam.DoFn):
        def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
          k, v = kv
          index.add(1)
          yield k, v, index.read()
    
      expected = [(i % split, str(i % split), i // split + 1)
                  for i in range(0, n)]
    
      with self.create_pipeline() as p:
        assert_that(
            p
            | beam.Impulse()
            | beam.ParDo(Input())
            | beam.ParDo(AddIndex()),
>           equal_to(expected))
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1667837893   nanos: 328462600 } message: "Pipeline_options: {\'job_name\': \'test_pardo_state_with_custom_key_coder_1667837883.477778\', \'gcp_oauth_scopes\': [\'[https://www.googleapis.com/auth/bigquery\](https://www.googleapis.com/auth/bigquery/)', \'[https://www.googleapis.com/auth/cloud-platform\](https://www.googleapis.com/auth/cloud-platform/)', \'[https://www.googleapis.com/auth/devstorage.full_control\](https://www.googleapis.com/auth/devstorage.full_control/)', \'[https://www.googleapis.com/auth/userinfo.email\](https://www.googleapis.com/auth/userinfo.email/)', \'[https://www.googleapis.com/auth/datastore\](https://www.googleapis.com/auth/datastore/)', \'[https://www.googleapis.com/auth/spanner.admin\](https://www.googleapis.com/auth/spanner.admin/)', \'[https://www.googleapis.com/auth/spanner.data\](https://www.googleapis.com/auth/spanner.data/)', \'[https://www.googleapis.com/auth/bigquery\](https://www.googleapis.com/auth/bigquery/)', \'[https://www.googleapis.com/auth/cloud-platform\](https://www.googleapis.com/auth/cloud-platform/)', \'[https://www.googleapis.com/auth/devstorage.full_control\](https://www.googleapis.com/auth/devstorage.full_control/)', \'[https://www.googleapis.com/auth/userinfo.email\](https://www.googleapis.com/auth/userinfo.email/)', \'[https://www.googleapis.com/auth/datastore\](https://www.googleapis.com/auth/datastore/)', \'[https://www.googleapis.com/auth/spanner.admin\](https://www.googleapis.com/auth/spanner.admin/)', \'[https://www.googleapis.com/auth/spanner.data\](https://www.googleapis.com/auth/spanner.data/)'], \'experiments\': [\'state_cache_size=100\', \'data_buffer_time_limit_ms=1000\', \'beam_fn_api\'], \'sdk_location\': \'container\', \'job_endpoint\': \'localhost:63832\', \'environment_type\': \'beam:env:harness_subprocess_python:v1\', \'environment_config\': \'D:\\\\a\\\\beam\\\\beam\\\\sdks\\\\python\\\\target\\\\.tox\\\\py37-win\\\\Scripts\\\\python.exe -m apache_beam.runners.worker.sdk_worker_main\', \'sdk_worker_parallelism\': \'1\', \'environment_cache_millis\': \'0\'}" log_location: "D:\\a\\beam\\beam\\sdks\\python\\apache_beam\\runners\\worker\\sdk_worker_main.py:128" thread: "MainThread"
FAILED apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder

Issue Failure

Failure: Test is flaky

Issue Priority

Priority: 1

Issue Component

Component: sdk-py-core

@BjornPrime
Copy link
Contributor Author

Full job output can be found here: https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627

Also see fuller output on the test failure, which was truncated in the initial post:

================================== FAILURES ===================================
__ PortableRunnerTestWithSubprocesses.test_pardo_state_with_custom_key_coder __
[gw3] win32 -- Python 3.7.9 D:\a\beam\beam\sdks\python\target\.tox\py37-win\Scripts\python.exe

self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_state_with_custom_key_coder>

    def test_pardo_state_with_custom_key_coder(self):
      """Tests that state requests work correctly when the key coder is an
      SDK-specific coder, i.e. non standard coder. This is additionally enforced
      by Java's ProcessBundleDescriptorsTest and by Flink's
      ExecutableStageDoFnOperator which detects invalid encoding by checking for
      the correct key group of the encoded key."""
      index_state_spec = userstate.CombiningValueStateSpec('index', sum)
    
      # Test params
      # Ensure decent amount of elements to serve all partitions
      n = 200
      duplicates = 1
    
      split = n // (duplicates + 1)
      inputs = [(i % split, str(i % split)) for i in range(0, n)]
    
      # Use a DoFn which has to use FastPrimitivesCoder because the type cannot
      # be inferred
      class Input(beam.DoFn):
        def process(self, impulse):
          for i in inputs:
            yield i
    
      class AddIndex(beam.DoFn):
        def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
          k, v = kv
          index.add(1)
          yield k, v, index.read()
    
      expected = [(i % split, str(i % split), i // split + 1)
                  for i in range(0, n)]
    
      with self.create_pipeline() as p:
        assert_that(
            p
            | beam.Impulse()
            | beam.ParDo(Input())
            | beam.ParDo(AddIndex()),
>           equal_to(expected))

apache_beam\runners\portability\portable_runner_test.py:207: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam\pipeline.py:598: in __exit__
    self.result.wait_until_finish()
apache_beam\runners\portability\portable_runner.py:607: in wait_until_finish
    raise self._runtime_exception
apache_beam\runners\portability\portable_runner.py:613: in _observe_state
    for state_response in self._state_stream:
target\.tox\py37-win\lib\site-packages\grpc\_channel.py:[426](https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627#step:7:427): in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...%5B::1%5D:63832 {created_time:"2022-11-07T16:19:05.79894806+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self):
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(self._state,
                                               self._response_deserializer)
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler)
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return (self._state.response is not None or
                        (cygrpc.OperationType.receive_message
                         not in self._state.due and
                         self._state.code is not None))
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:63832 {created_time:"2022-11-07T16:19:05.7989[480](https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627#step:7:481)6+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target\.tox\py37-win\lib\site-packages\grpc\_channel.py:826: _MultiThreadedRendezvous
---------------------------- Captured stderr call -----------------------------

@damccorm
Copy link
Contributor

I think this is fixed. If not, it should get auto-flagged by our tooling anyways, so this should be safe to close

@github-actions github-actions bot added this to the 2.61.0 Release milestone Oct 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants