Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline function does not run in parallel in GCP using DataflowRunner #29275

Closed
alexjolig opened this issue Nov 2, 2023 · 1 comment
Closed

Comments

@alexjolig
Copy link

I have a Dofn function in my pipeline which is running in GCP dataflow and is suppose to do some process per products in parallel.

class Step1(DoFn):
    def process(self, element):
        # Get a list of products
        for idx, item in enumerate(product_list):
            yield product, idx

class Step2(DoFn):
    def process(self, element):
        # Get index and product
        logger.info(f"::: Processing product number {index} STARTED at {datetime.now()}:::::")
        # Do some process ....
        logger.info(f"::: FINISHED product number {index} at {datetime.now()}:::::")

with Pipeline(options=pipeline_options) as pipeline:
    results = (
        pipeline
        | "Read from PubSub" >> io.ReadFromPubSub()
        | "Product list"     >> ParDo(Step1())
        | "Process Product"  >> ParDo(Step2())
        | "Group data" >> GroupBy()
        ...
    )

So Step2 is suppose to run per product in parallel. But actually what I get in logs is:

::: Processing product number 0 STARTED at <some_time> :::::
::: FINISHED product number 0 at <some_time>:::::
::: Processing product number 1 STARTED at <some_time> :::::
::: FINISHED product number 1 at <some_time>:::::
::: Processing product number 2 STARTED at <some_time> :::::
::: FINISHED product number 2 at <some_time>:::::
::: Processing product number 3 STARTED at <some_time> :::::
::: FINISHED product number 3 at <some_time>:::::
...

That shows that Instead of running Step2 in parallel, everything is running sequentially, which takes a long time to finish for huge amount of products.

As apache beam documentation suggests, I tried the following options in PipelineOptions, and I double checked if they are actually set in the job in GCP but the result was the same:

  • direct_num_workers=0
  • direct_running_mode='multi_threading'
  • direct_running_mode='multi_processing'

Also as I've mentioned in the question title the runner is Google cloud's DataflowRunner.

Is there something I'm missing here? Aren't ParDo functions suppose to run in parallel?

@alexjolig
Copy link
Author

I actually found the issue and solved it. I've added the solution in a stackoverflow post.

@github-actions github-actions bot added this to the 2.53.0 Release milestone Nov 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant