Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it flakey #21121

Closed
damccorm opened this issue Jun 4, 2022 · 8 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

The test failed twice because another Dataflow job already existed with the same name.

(Are we running multiple occurrences of the test at the same time, and using the timestamp to generate the name? If so, Occasionally the same timestamp will be picked twice)

The workflow could not be created. Causes: (a8f47b4f3654731d): There is already an active job named beamapp-jenkins-0728180349-027689. If you want to submit a second job, try again by setting a different name."

https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/19388/

Both fail with this error:

def __ProcessHttpResponse(self, method_config, http_response, request): """Process the given http response.""" if http_response.status_code not in (http_client.OK, http_client.CREATED, http_client.NO_CONTENT): raise exceptions.HttpError.FromResponse( > http_response, method_config=method_config, request=request) E apitools.base.py.exceptions.HttpConflictError: HttpError accessing <

https://dataflow.googleapis.com/v1b3/projects/apache-beam-testing/locations/us-central1/jobs?alt=json

>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Wed, 28 Jul 2021 18:05:03 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '409', 'content-length': '318', '-content-encoding': 'gzip'}>, content <{ E "error": { E "code": 409, E "message": "(1ddd3f68174d9490): The workflow could not be created. Causes: (a8f47b4f3654731d): There is already an active job named beamapp-jenkins-0728180349-027689. If you want to submit a second job, try again by setting a different name.", E "status": "ALREADY_EXISTS" E } E } E >

Run 1:

https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/19388/testReport/junit/apache_beam.examples.streaming_wordcount_it_test/StreamingWordCountIT/test_streaming_wordcount_it/

 

Run 2

https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/19388/testReport/junit/apache_beam.examples.streaming_wordcount_it_test/StreamingWordCountIT/test_streaming_wordcount_it_2/

Imported from Jira BEAM-12673. Original Jira may contain additional context.
Reported by: [email protected].

@ryanthompson591
Copy link
Contributor

.take-issue

@ryanthompson591
Copy link
Contributor

ryanthompson591 commented Oct 17, 2022

This was reported in July 2021. I looked through about 40 builds.

Example:
https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/6210/testReport/apache_beam.examples.streaming_wordcount_it_test/StreamingWordCountIT/

I couldn't find any flakes. Perhaps jenkins had some issues that are now resolved.

@ryanthompson591
Copy link
Contributor

.close-issue

@Abacn
Copy link
Contributor

Abacn commented Oct 18, 2022

I actually encountered it a couple of times, most recently yesterday: https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/25312/console

Note that this test somehow not reported in "Test Result" but the logs show it is failing:

=================================== FAILURES ===================================
01:47:54 _______________ StreamingWordCountIT.test_streaming_wordcount_it _______________
01:47:54 [gw0] linux -- Python 3.7.12 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7
01:47:54 
01:47:54 self = <apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT testMethod=test_streaming_wordcount_it>
01:47:54 
01:47:54     @pytest.mark.it_postcommit
01:47:54     def test_streaming_wordcount_it(self):
01:47:54       # Build expected dataset.
01:47:54       expected_msg = [('%d: 1' % num).encode('utf-8')
01:47:54                       for num in range(DEFAULT_INPUT_NUMBERS)]
01:47:54     
01:47:54       # Set extra options to the pipeline for test purpose
01:47:54       state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
01:47:54       pubsub_msg_verifier = PubSubMessageMatcher(
01:47:54           self.project, self.output_sub.name, expected_msg, timeout=400)
01:47:54       extra_opts = {
01:47:54           'input_subscription': self.input_sub.name,
01:47:54           'output_topic': self.output_topic.name,
01:47:54           'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
01:47:54           'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
01:47:54       }
01:47:54     
01:47:54       # Generate input data and inject to PubSub.
01:47:54       self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
01:47:54     
01:47:54       # Get pipeline options from command argument: --test-pipeline-options,
01:47:54       # and start pipeline job by calling pipeline main function.
01:47:54       streaming_wordcount.run(
01:47:54           self.test_pipeline.get_full_options_as_args(**extra_opts),
01:47:54 >         save_main_session=False)
01:47:54 
01:47:54 apache_beam/examples/streaming_wordcount_it_test.py:120: 
01:47:54 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
01:47:54 apache_beam/examples/streaming_wordcount.py:103: in run
01:47:54     output | beam.io.WriteToPubSub(known_args.output_topic)
01:47:54 apache_beam/pipeline.py:597: in __exit__
01:47:54     self.result = self.run()
01:47:54 apache_beam/pipeline.py:574: in run
01:47:54     return self.runner.run_pipeline(self, self._options)
01:47:54 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
01:47:54 
01:47:54 self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7f636ce38210>
01:47:54 pipeline = <apache_beam.pipeline.Pipeline object at 0x7f6368208f10>
01:47:54 options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7f63682084d0>
01:47:54 
01:47:54     def run_pipeline(self, pipeline, options):
01:47:54       """Execute test pipeline and verify test matcher"""
01:47:54       test_options = options.view_as(TestOptions)
01:47:54       on_success_matcher = test_options.on_success_matcher
01:47:54       wait_duration = test_options.wait_until_finish_duration
01:47:54       is_streaming = options.view_as(StandardOptions).streaming
01:47:54     
01:47:54       # [BEAM-1889] Do not send this to remote workers also, there is no need to
01:47:54       # send this option to remote executors.
01:47:54       test_options.on_success_matcher = None
01:47:54     
01:47:54       self.result = super().run_pipeline(pipeline, options)
01:47:54       if self.result.has_job:
01:47:54         # TODO(markflyhigh)(https://github.com/apache/beam/issues/18254): Use
01:47:54         # print since Nose dosen't show logs in some cases.
01:47:54         print('Worker logs: %s' % self.build_console_url(options))
01:47:54         _LOGGER.info('Console log: ')
01:47:54         _LOGGER.info(self.build_console_url(options))
01:47:54     
01:47:54       try:
01:47:54         self.wait_until_in_state(PipelineState.RUNNING)
01:47:54     
01:47:54         if is_streaming and not wait_duration:
01:47:54           _LOGGER.warning('Waiting indefinitely for streaming job.')
01:47:54         self.result.wait_until_finish(duration=wait_duration)
01:47:54     
01:47:54         if on_success_matcher:
01:47:54           from hamcrest import assert_that as hc_assert_that
01:47:54 >         hc_assert_that(self.result, pickler.loads(on_success_matcher))
01:47:54 E         AssertionError: 
01:47:54 E         Expected: (Test pipeline expected terminated in state: RUNNING and Expected 500 messages.)
01:47:54 E              but: Expected 500 messages. Got 501 messages. Diffs (item, count):
01:47:54 E           Expected but not in actual: dict_items([])
01:47:54 E           Unexpected: dict_items([(b'172: 1', 1)])
01:47:54 E           Unexpected (with all details): [(b'172: 1', {}, {}, DatetimeWithNanoseconds(2022, 10, 18, 5, 44, 30, 114000, tzinfo=datetime.timezone.utc), ''), (b'172: 1', {}, {}, DatetimeWithNanoseconds(2022, 10, 18, 5, 44, 29, 912000, tzinfo=datetime.timezone.utc), '')]
01:47:54 
01:47:54 apache_beam/runners/dataflow/test_dataflow_runner.py:70: AssertionError

@johnjcasey johnjcasey reopened this Oct 19, 2022
@Abacn
Copy link
Contributor

Abacn commented Oct 19, 2022

Also see in: https://ci-beam.apache.org/job/beam_PreCommit_Python_Commit/25339/ (this one is actual fail)

@ryanthompson591
Copy link
Contributor

Just wanted to update with what I did in case I don't come back to this.

Getting this test to run locally entailed two things.

  1. Change the timeout from 6 minutes to 12 minutes. This is because if you run it locally it takes 3 minutes to even start up dataflow and the test will time out.
  2. Here's a command to run it --
python -m pytest  -o log_cli=True -o log_level=Info apache_beam/examples/streaming_wordcount_it_test.py  --test-pipeline-options=' --runner=TestDataflowRunner --project=apache-beam-testing --region=us-central1 --staging_location=gs://temp-storage-for-end-to-end-tests/staging-it --temp_location=gs://temp-storage-for-end-to-end-tests/temp-it --output=gs://temp-storage-for-end-to-end-tests/py-it-cloud/output --sdk_location=dist/apache-beam-2.43.0.dev0.tar.gz --requirements_file=postcommit_requirements.txt --num_workers=1 --sleep_secs=20 --kms_key_name=projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test --dataflow_kms_key=projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test'

or

scripts/run_integration_test.sh --test_opts "apache_beam/examples/streaming_wordcount_it_test.py" --sdk_location "dist/apache-beam-2.43.0.dev0.tar.gz"

I checked the dataflow job that was linked in Abacn comment. The job showed "about 500" messages in each transform (which was the prediction) though it didn't promise exactness (the failure showed 501 messages).

I am convinced this is a real breakage and may indicate a more serious issue. I just couldn't reproduce the flake on my local runs as they took 8 minutes each.

@tvalentyn
Copy link
Contributor

encountered again:

_____ StreamingWordCountIT.test_streaming_wordcount_it _______________
[gw0] linux -- Python 3.7.12 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Commit/src/build/gradleenv/-1734967052/bin/python3.7

self = <apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT testMethod=test_streaming_wordcount_it>

    @pytest.mark.it_postcommit
    def test_streaming_wordcount_it(self):
      # Build expected dataset.
      expected_msg = [('%d: 1' % num).encode('utf-8')
                      for num in range(DEFAULT_INPUT_NUMBERS)]
    
      # Set extra options to the pipeline for test purpose
      state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
      pubsub_msg_verifier = PubSubMessageMatcher(
          self.project, self.output_sub.name, expected_msg, timeout=400)
      extra_opts = {
          'input_subscription': self.input_sub.name,
          'output_topic': self.output_topic.name,
          'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION,
          'on_success_matcher': all_of(state_verifier, pubsub_msg_verifier)
      }
    
      # Generate input data and inject to PubSub.
      self._inject_numbers(self.input_topic, DEFAULT_INPUT_NUMBERS)
    
      # Get pipeline options from command argument: --test-pipeline-options,
      # and start pipeline job by calling pipeline main function.
      streaming_wordcount.run(
          self.test_pipeline.get_full_options_as_args(**extra_opts),
>         save_main_session=False)

apache_beam/examples/streaming_wordcount_it_test.py:120: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/examples/streaming_wordcount.py:103: in run
    output | beam.io.WriteToPubSub(known_args.output_topic)
apache_beam/pipeline.py:600: in __exit__
    self.result = self.run()
apache_beam/pipeline.py:577: in run
    return self.runner.run_pipeline(self, self._options)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.dataflow.test_dataflow_runner.TestDataflowRunner object at 0x7fb48857f0d0>
pipeline = <apache_beam.pipeline.Pipeline object at 0x7fb488584490>
options = <apache_beam.options.pipeline_options.PipelineOptions object at 0x7fb4885841d0>

    def run_pipeline(self, pipeline, options):
      """Execute test pipeline and verify test matcher"""
      test_options = options.view_as(TestOptions)
      on_success_matcher = test_options.on_success_matcher
      wait_duration = test_options.wait_until_finish_duration
      is_streaming = options.view_as(StandardOptions).streaming
    
      # [BEAM-1889] Do not send this to remote workers also, there is no need to
      # send this option to remote executors.
      test_options.on_success_matcher = None
    
      self.result = super().run_pipeline(pipeline, options)
      if self.result.has_job:
        # TODO(markflyhigh)(https://github.com/apache/beam/issues/18254): Use
        # print since Nose dosen't show logs in some cases.
        print('Worker logs: %s' % self.build_console_url(options))
        _LOGGER.info('Console log: ')
        _LOGGER.info(self.build_console_url(options))
    
      try:
        self.wait_until_in_state(PipelineState.RUNNING)
    
        if is_streaming and not wait_duration:
          _LOGGER.warning('Waiting indefinitely for streaming job.')
        self.result.wait_until_finish(duration=wait_duration)
    
        if on_success_matcher:
          from hamcrest import assert_that as hc_assert_that
>         hc_assert_that(self.result, pickler.loads(on_success_matcher))
E         AssertionError: 
E         Expected: (Test pipeline expected terminated in state: RUNNING and Expected 500 messages.)
E              but: Expected 500 messages. Got 486 messages. Diffs (item, count):
E           Expected but not in actual: dict_items([(b'11: 1', 1), (b'159: 1', 1), (b'161: 1', 1), (b'176: 1', 1), (b'195: 1', 1), (b'202: 1', 1), (b'203: 1', 1), (b'217: 1', 1), (b'219: 1', 1), (b'277: 1', 1), (b'320: 1', 1), (b'446: 1', 1), (b'466: 1', 1), (b'485: 1', 1)])
E           Unexpected: dict_items([])
E           Unexpected (with all details): []

apache_beam/runners/dataflow/test_dataflow_runner.py:70: AssertionError

@ryanthompson591 ryanthompson591 removed their assignment Nov 17, 2022
@damccorm
Copy link
Contributor Author

I think this is fixed. If not, it should get auto-flagged by our tooling anyways, so this should be safe to close

@github-actions github-actions bot added this to the 2.61.0 Release milestone Oct 31, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants