Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests.test_read_queries is flaky #26343

Closed
2 of 15 tasks
tvalentyn opened this issue Apr 19, 2023 · 5 comments
Closed
2 of 15 tasks

Comments

@tvalentyn
Copy link
Contributor

What happened?

Sample error from: https://ci-beam.apache.org/job/beam_PostCommit_Python39/1718/testReport/junit/apache_beam.io.gcp.bigquery_read_it_test/ReadAllBQTests/test_read_queries/

Error Message
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/transforms/core.py", line 1960, in <lambda>
    wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
  File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/testing/util.py", line 191, in _equal
    raise BeamAssertException(msg)
apache_beam.testing.util.BeamAssertException: Failed assert: [{'number': 1, 'str': 'abc'}, {'number': 2, 'str': 'def'}, {'number': 3, 'str': '你好'}, {'number': 4, 'str': 'привет'}, {'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}, {'number': 10, 'str': 'abcde', 'extra': 3}] == [{'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 2, 'str': 'def'}, {'number': 1, 'str': 'abc'}, {'number': 3, 'str': '你好'}, {'number': 4, 'str': 'привет'}], unexpected elements [{'number': 10, 'str': 'abcde', 'extra': 3}], missing elements [{'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}]

...

self = <apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests testMethod=test_read_queries>

    @skip(['PortableRunner', 'FlinkRunner'])
    @pytest.mark.it_postcommit
    def test_read_queries(self):
      # TODO(https://github.com/apache/beam/issues/20610): Remove experiment when
      # tests run on r_v2.
      args = self.args + ["--experiments=use_runner_v2"]
      with beam.Pipeline(argv=args) as p:
        result = (
            p
            | beam.Create([
                beam.io.ReadFromBigQueryRequest(query=self.query1),
                beam.io.ReadFromBigQueryRequest(
                    query=self.query2, use_standard_sql=False),
                beam.io.ReadFromBigQueryRequest(
                    table='%s.%s' % (self.dataset_id, self.table_name3))
            ])
            | beam.io.ReadAllFromBigQuery())
>       assert_that(
            result,
            equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + self.TABLE_DATA_3))

apache_beam/io/gcp/bigquery_read_it_test.py:809: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/pipeline.py:600: in __exit__
    self.result = self.run()
apache_beam/pipeline.py:550: in run
    return Pipeline.from_runner_api(
apache_beam/pipeline.py:577: in run
    return self.runner.run_pipeline(self, self._options)
apache_beam/runners/dataflow/test_dataflow_runner.py:66: in run_pipeline
    self.result.wait_until_finish(duration=wait_duration)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <DataflowPipelineResult <Job
 clientRequestId: '20230419080645509292-9631'
 createTime: '2023-04-19T08:06:55.403621Z'
...023-04-19T08:06:55.403621Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_BATCH, 1)> at 0x7f6d540733a0>
duration = None

    def wait_until_finish(self, duration=None):
      if not self.is_in_terminal_state():
        if not self.has_job:
          raise IOError('Failed to get the Dataflow job id.')
        consoleUrl = (
            "Console URL: [https://console.cloud.google.com/"](https://console.cloud.google.com/)
            f"dataflow/jobs/<RegionId>/{self.job_id()}"
            "?project=<ProjectId>")
        thread = threading.Thread(
            target=DataflowRunner.poll_for_job_completion,
            args=(self._runner, self, duration))
    
        # Mark the thread as a daemon thread so a keyboard interrupt on the main
        # thread will terminate everything. This is also the reason we will not
        # use thread.join() to wait for the polling thread.
        thread.daemon = True
        thread.start()
        while thread.is_alive():
          time.sleep(5.0)
    
        # TODO: Merge the termination code in poll_for_job_completion and
        # is_in_terminal_state.
        terminated = self.is_in_terminal_state()
        assert duration or terminated, (
            'Job did not reach to a terminal state after waiting indefinitely. '
            '{}'.format(consoleUrl))
    
        # TODO(https://github.com/apache/beam/issues/21695): Also run this check
        # if wait_until_finish was called after the pipeline completed.
        if terminated and self.state != PipelineState.DONE:
          # TODO(BEAM-1290): Consider converting this to an error log based on
          # theresolution of the issue.
          _LOGGER.error(consoleUrl)
>         raise DataflowRuntimeException(
              'Dataflow pipeline failed. State: %s, Error:\n%s' %
              (self.state, getattr(self._runner, 'last_error_msg', None)),
              self)
E         apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
E         Traceback (most recent call last):
E           File "apache_beam/runners/common.py", line 1418, in apache_beam.runners.common.DoFnRunner.process
E           File "apache_beam/runners/common.py", line 838, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E           File "apache_beam/runners/common.py", line 984, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/transforms/core.py", line 1960, in <lambda>
E             wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
E           File "/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Python39/src/sdks/python/apache_beam/testing/util.py", line 191, in _equal
E             raise BeamAssertException(msg)
E         apache_beam.testing.util.BeamAssertException: Failed assert: [{'number': 1, 'str': 'abc'}, {'number': 2, 'str': 'def'}, {'number': 3, 'str': '你好'}, {'number': 4, 'str': 'привет'}, {'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}, {'number': 10, 'str': 'abcde', 'extra': 3}] == [{'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 10, 'str': 'abcde', 'extra': 3}, {'number': 2, 'str': 'def'}, {'number': 1, 'str': 'abc'}, {'number': 3, 'str': '你好'}, {'number': 4, 'str': 'привет'}], unexpected elements [{'number': 10, 'str': 'abcde', 'extra': 3}], missing elements [{'number': 10, 'str': 'abcd'}, {'number': 20, 'str': 'defg'}, {'number': 30, 'str': '你好'}, {'number': 40, 'str': 'привет'}]
...

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@Abacn
Copy link
Contributor

Abacn commented Apr 19, 2023

This may be due to quota issue. It was failing on different Python PostCommit (3.9, 3.10) in the same time range yesterday then came back.

also see: https://ci-beam.apache.org/job/beam_PostCommit_Python310/728/

@tvalentyn
Copy link
Contributor Author

Thanks. That's a weird manifestation for a quota issue. Do you know which quota is missing so we request some?
cc: @ahmedabu98 who was taking a look on this.

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Apr 20, 2023

This is a weird one. the test essentially creates three read requests from three different BQ tables then checks the results. These failures are seen in 3.9 (#1718) and 3.10 (#728, #729).

The expected data is the aggregate of data in the three tables. In the failing tests mentioned above the pipeline still performs three reads. The hiccup looks to be that it repeats one of the read requests twice and ignores one of the read requests.

P.S. the evidence for this is in looking at the returned results from the read operation. One set of data is repeated, and another is missing.

@tvalentyn
Copy link
Contributor Author

That is strange. I suppose we could add logging when BQ calls that are being made and/or or what are the requests or otherwise see where the collision of two requests coming from.

Could it be that two pipelines running at the same time have side-effects b/c of some ids colliding based on a timestamp. ?

@damccorm
Copy link
Contributor

This looks like it is fixed, if I'm wrong it should get picked up by the dependency tooling though

@github-actions github-actions bot added this to the 2.61.0 Release milestone Oct 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants