Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beam.CombineValues on DataFlow runner causes ambiguous failure with python SDK #21432

Closed
damccorm opened this issue Jun 4, 2022 · 6 comments
Closed

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

 

The following beam pipeline works correctly using DirectRunner but fails with a very vague error when using DataflowRunner.


(    
pipeline    
| beam.io.ReadFromPubSub(input_topic, with_attributes=True)    
| beam.Map(pubsub_message_to_row)
   
| beam.WindowInto(beam.transforms.window.FixedWindows(5))    
| beam.GroupBy(<beam.Row col name>)
   
| beam.CombineValues(<instance of beam.CombineFn subclass>)    
| beam.Values()  
| beam.io.gcp.bigquery.WriteToBigQuery(
. . . )
)

Stacktrace:


Traceback (most recent call last):
  File "src/read_quality_pipeline/__init__.py", line 128, in <module>

   (
  File "/home/pkg_dev/.cache/pypoetry/virtualenvs/apache-beam-poc-5nxBvN9R-py3.8/lib/python3.8/site-packages/apache_beam/pipeline.py",
line 597, in __exit__
    self.result.wait_until_finish()
  File "/home/pkg_dev/.cache/pypoetry/virtualenvs/apache-beam-poc-5nxBvN9R-py3.8/lib/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 1633, in wait_until_finish
    raise DataflowRuntimeException(
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException:
Dataflow pipeline failed. State: FAILED, Error:
Error processing pipeline. 

Log output:


2022-02-01T16:54:43.645Z: JOB_MESSAGE_WARNING: Autoscaling is enabled for Dataflow Streaming Engine.
Workers will scale between 1 and 100 unless maxNumWorkers is specified.
2022-02-01T16:54:43.736Z: JOB_MESSAGE_DETAILED:
Autoscaling is enabled for job 2022-02-01_08_54_40-8791019287477103665. The number of workers will be
between 1 and 100.
2022-02-01T16:54:43.757Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled
for job 2022-02-01_08_54_40-8791019287477103665.
2022-02-01T16:54:44.624Z: JOB_MESSAGE_ERROR: Error
processing pipeline. 

With the CombineValues step removed this pipeline successfully starts in dataflow.

 

I thought this was an issue with Dataflow on the server side since the Dataflow API (v1b3.projects.locations.jobs.messages) is just returning the textPayload: "Error processing pipeline". But then I found the issue BEAM-12636 where a go SDK user has the same error message but seemingly as a result of bugs in the go SDK?

Imported from Jira BEAM-13795. Original Jira may contain additional context.
Reported by: Jake_Zuliani.

@mingyao1993
Copy link

Hi, I have faced the same issue as well. Is there any workaround or stable versions to use? thanks!

with beam.Pipeline(options=pipeline_options) as p:
        p \
        | "Read From PubSub Subscription" >> ReadFromPubSub(
            subscription=subscription) \
        | beam.Map(lambda row: logging.info)
Traceback (most recent call last):
  File "src/main.py", line 101, in <module>
    run()
  File "src/main.py", line 54, in run
    p \
  File "/Users/name/.local/share/virtualenvs/QqobXVVz/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result.wait_until_finish()
  File "/Users/name/.local/share/virtualenvs/QqobXVVz/lib/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1667, in wait_until_finish
    raise DataflowRuntimeException(
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Error processing pipeline.

@damccorm damccorm removed the stale label Dec 2, 2022
@viniciusdsmello
Copy link

Hi, I'm facing the same issue here. Did anyone figure out a workaround?

@MOscity
Copy link

MOscity commented Jan 20, 2023

Hey, I was facing the same issue, whole pipeline worked with DirectRunner (all steps), but DataflowRunner failed after 1-3secs and emitted "ambiguous" logs, which didn't point at the CombineValues line. Else it worked fine without the the CountCombineFn Step in the DataflowRunner.
Update: I found a workaround, see below.

Original Error Logs:

ERROR:apache_beam.runners.dataflow.dataflow_runner:Console URL: https://console.cloud.google.com/dataflow/jobs/<RegionId>/2023-01-20_06_59_03-4426498189309546663?project=<ProjectId>
Traceback (most recent call last):
  File "./path/to/file/my_python.py", line 618, in <module>
    run_pipeline()
  File "./path/to/file/my_python.py", line 598, in run_pipeline
    print(f'----- After Step: {step}.')
  File "/home/myusername/.local/share/virtualenvs/pipenv_20-Y278SNFx/lib/python3.8/site-packages/apache_beam/pipeline.py", line 598, in __exit__
    self.result.wait_until_finish()
  File "/home/myusername/.local/share/virtualenvs/pipenv_20-Y278SNFx/lib/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1641, in wait_until_finish
    raise DataflowRuntimeException(
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Error processing pipeline.

Update: Workaround

I'm not sure if this is related to the Jira Ticket (BEAM-10297), but I found inspiration from this Apache Documentation about "CombinePerKey", and I wondered if I could just replace the beam.CombineValues(beam.combiners.CountCombineFn()) with another function.
First, beam.CombineValues(beam.transforms.combiners.CountCombineFn()) didn't work either with the DataflowRunner (but locally it worked). But with CombinePerKey instead CombineValues it worked!

Solution: Use beam.CombinePerKey with a Custom Function (appropriate to your application)!
With beam.CombinePerKey(CustomFn()) it also works with the DataflowRunner!


See Original "AverageFn" here: https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/#example-5-combining-with-a-combinefn
New Custom Combiner:

import apache_beam as beam
# [START COPIED CODE]
class CustomFn(beam.CombineFn):
    def create_accumulator(self):
        sum = 0.0
        count = 0
        accumulator = sum, count
        return accumulator

    def add_input(self, accumulator, input):
        sum, count = accumulator
        return sum + len(input), count + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)
     
    # [END COPIED CODE] - MODIFIED LINES:
    def extract_output(self, accumulator, MODE='AVERAGE'):
        sum, count = accumulator
        if count == 0:
            return float('NaN')

        if MODE == 'SUM':
            return int(sum)
        elif MODE == 'COUNT':
            return int(count)
        elif MODE == 'INT_AVERAGE':
            return int(sum / count)
        else:
            return sum / count


Functions to group and combine values:

# ...
def prepare_key_value_pairs(element):
    key_value_tuple = ( (element['key_column_1'], 
          element['key_column_...'],  
          element['key_column_n'] 
          ), 
          element['value_column_1'] )
    return key_value_tuple

def transform_data(data):
    data_out = (
            data
            | 'Step 1' >> beam.Map(prepare_key_value_pairs)
            | 'Step 2' >> beam.GroupByKey()
            
            # This line fails with DataflowRunner, but not with DirectRunner (locally):
            # | 'Old Step 3' >> beam.CombineValues(beam.combiners.CountCombineFn())

            # This works with DataflowRunner: CombinePerKey instead of CombineValues
            # https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/#example-5-combining-with-a-combinefn
            | 'New Step 3 New' >> beam.CombinePerKey(CustomFn())
    )
    return data_out

# ...

Later in the beam pipeline:


with beam.Pipeline(options=pipeline_options) as pipeline:        
        # ...
        data_raw = read_data(...)
        data_transformed = transform_data(data_raw)
        
        # ...

@linamartensson
Copy link

We started encountering this issue on Nov 2 2022 with a job running daily.
It runs from a template which was created on June 14 2022, but ran just fine at first. We're able to work around it with the suggestion here, but this is odd - and it doesn't line up with anything from the Dataflow release schedule as far as I can tell.

So - how could this have happened?
It's worrisome that a job that was already running could just stop. I'm also wondering if we may have done some Cloud change on our end that might have suddenly triggered it. Also, clearly we should have discovered this issue sooner, but here we are. ;)

@liferoad
Copy link
Collaborator

.take-issue

@liferoad
Copy link
Collaborator

Tried to reproduce this issue with Dataflow. Here is my E2E code:

# standard libraries
import json
import logging

# third party libraries
import apache_beam as beam
import google.auth
from apache_beam import Map
from apache_beam.io import ReadFromPubSub
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions
from apache_beam.runners import DataflowRunner


# data
# {'ride_id': 'e7b87b46-2174-4029-bb32-6498d01367c6', 'point_idx': 1298,
# 'latitude': 40.74747, 'longitude': -73.84869, 'timestamp': '2024-09-21T14:47:29.20965-04:00',
# 'meter_reading': 30.108835, 'meter_increment': 0.023196328, 'ride_status': 'enroute', 'passenger_count': 1}
def run():
    class ReadPubSubOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
            parser.add_argument(
                "--topic",
                # Run on Dataflow or authenticate to not get
                # 403 PermissionDenied
                default="projects/pubsub-public-data/topics/taxirides-realtime",
                help="PubSub topic to read",
            )

    options = ReadPubSubOptions(streaming=True)
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    # Sets the Google Cloud Region in which Cloud Dataflow runs.
    options.view_as(GoogleCloudOptions).region = "us-central1"

    dataflow_gcs_location = "gs://tmp_xqhu/dataflow"
    # Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = (
        "%s/staging" % dataflow_gcs_location
    )

    # Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = (
        "%s/temp" % dataflow_gcs_location
    )

    with beam.Pipeline(options=options, runner=DataflowRunner()) as p:
        # When reading from a topic, a new subscription is created.
        (
            p
            | "Read PubSub topic" >> ReadFromPubSub(topic=options.topic)
            | "Message" >> Map(lambda msg: json.loads(msg.decode("utf-8")))
            | "Convert" >> Map(lambda msg: (msg["timestamp"], [msg["passenger_count"]]))
            | "Combine" >> beam.CombineValues(sum)
            | Map(logging.info)
        )

I do not see any error:

image

@github-actions github-actions bot added this to the 2.62.0 Release milestone Nov 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants