Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][prism]: Excessive Splitting #29180

Closed
1 of 16 tasks
lostluck opened this issue Oct 27, 2023 · 10 comments · Fixed by #29968
Closed
1 of 16 tasks

[Bug][prism]: Excessive Splitting #29180

lostluck opened this issue Oct 27, 2023 · 10 comments · Fixed by #29968
Assignees

Comments

@lostluck
Copy link
Contributor

lostluck commented Oct 27, 2023

What happened?

The current policy for Prism is to split very aggressively when the element index hasn't advanced.

It should split, but only when a bundle is not making other indication of progress. It's a separate issue to optimize the splits for processing throughput.

(Previously, this issue was about a hang due to not having DoFns registered, but other deflake work which now reliably catches bundle failures, the original issue is no more. Commentary pulled this issue into the aggressive split problem.)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@johannaojeling
Copy link
Contributor

Referencing my comment from the other issue here, since we might be referring to the same problem of the pipeline hanging, but it may be related to excessive splitting rather than failures caused by unregistered functions. It also happens in other textio.Read pipelines, e.g. the standard wordcount example.

Dumping some logs below.

go run ./go/examples/wordcount/wordcount.go --output wordcount.txt

Success
[...]
2023/10/28 09:16:19 Prepared job with id: job-001 and staging token: job-001
2023/10/28 09:16:19 Staged binary artifact with token: job-001
2023/10/28 09:16:19 Submitted job: job-001
2023/10/28 09:16:19  (): starting job-001[go-job-1-1698477379421403000]
2023/10/28 09:16:19  (): running job-001[go-job-1-1698477379421403000]
2023/10/28 09:16:19 Job[job-001] state: RUNNING
2023/10/28 09:16:19 starting worker job-001[go-job-1-1698477379421403000]_go
2023/10/28 09:16:20 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:16:20.504Z worker.ID=job-001[go-job-1-1698477379421403000]_go worker.endpoint=localhost:64061
2023/10/28 09:16:20 INFO Writing to wordcount.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:343 time=2023-10-28T07:16:20.978Z worker.ID=job-001[go-job-1-1698477379421403000]_go worker.endpoint=localhost:64061
2023/10/28 09:16:20 INFO  (): pipeline completed job-001[go-job-1-1698477379421403000] source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2023-10-28T07:16:20.979Z worker.ID=job-001[go-job-1-1698477379421403000]_go worker.endpoint=localhost:64061
2023/10/28 09:16:20 INFO  (): terminating job-001[go-job-1-1698477379421403000] source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go:109 time=2023-10-28T07:16:20.979Z worker.ID=job-001[go-job-1-1698477379421403000]_go worker.endpoint=localhost:64061
2023/10/28 09:16:20 INFO Job[job-001] state: DONE source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/runners/universal/runnerlib/job.go:126 time=2023-10-28T07:16:20.979Z worker.ID=job-001[go-job-1-1698477379421403000]_go worker.endpoint=localhost:64061
Unexpected behavior
[...]
2023/10/28 09:17:36 Prepared job with id: job-001 and staging token: job-001
2023/10/28 09:17:36 Staged binary artifact with token: job-001
2023/10/28 09:17:36 Submitted job: job-001
2023/10/28 09:17:36  (): starting job-001[go-job-1-1698477456790632000]
2023/10/28 09:17:36  (): running job-001[go-job-1-1698477456790632000]
2023/10/28 09:17:36 Job[job-001] state: RUNNING
2023/10/28 09:17:36 starting worker job-001[go-job-1-1698477456790632000]_go
2023/10/28 09:17:37 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:37.781Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:38 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:38.083Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:38 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:38.285Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:38 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:38.488Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:38 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:38.690Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:38 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:38.891Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.093Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.295Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.496Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.597Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.697Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.697Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.797Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.898Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
2023/10/28 09:17:39 INFO Reading from gs://apache-beam-samples/shakespeare/kinglear.txt source=/Users/johannaojeling/repo/johannaojeling/beam/sdks/go/pkg/beam/io/textio/textio.go:226 time=2023-10-28T07:17:39.998Z worker.ID=job-001[go-job-1-1698477456790632000]_go worker.endpoint=localhost:64078
[...]

@ronoaldo
Copy link

ronoaldo commented Jan 9, 2024

I'm facing a similar issue posted by the previous comment by @johannaojeling in this sample pipeline:

https://github.com/ronoaldo/micro-beam/blob/d074d5fe72b32d36e84c1007ae68efebe13a3821/05_xlang/go/pipeline.go#L126

It's derived from the minimal wordcount and changed to allow for xlang transform execution demostration. However, it is also able to run as a "standalone" pipeline if no expansion server is provided using just Go pipeline code.

Without any expansion server (i.e., just Go code), it works fine under DirectRunner or DataflowRunner, and does not work on Prism with the stalling loop reading from the TextIO:

requirements: "beam:requirement:pardo:splittable_dofn:v1"
2024/01/08 21:21:44 Prepared job with id: job-001 and staging token: job-001
2024/01/08 21:21:44 Staged binary artifact with token: job-001
2024/01/08 21:21:44 Submitted job: job-001
2024/01/08 21:21:44  (): starting job-001[go-job-1-1704759704240538359]
2024/01/08 21:21:44  (): running job-001[go-job-1-1704759704240538359]
2024/01/08 21:21:44 Job[job-001] state: RUNNING
2024/01/08 21:21:44 starting worker job-001[go-job-1-1704759704240538359]_go
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.075Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.376Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.476Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.576Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.677Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.677Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.778Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.778Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.778Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.922Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:45 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:45.991Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.223Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.262Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.361Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.363Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.459Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.484Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.537Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.697Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
2024/01/08 21:21:46 INFO Reading from gs://dataflow-samples/shakespeare/kinglear.txt source=/home/ronoaldo/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/io/textio/textio.go:226 time=2024-01-09T00:21:46.883Z worker.ID=job-001[go-job-1-1704759704240538359]_go worker.endpoint=localhost:46495
^Csignal: interrupt

It does not seems to complete the pipeline regardless of how much it is left running.

I have tried to use register.Function2x1 in order to possibly fix it but it remains consistently failing when executing with Prism. Not sure if this is caused by unregistered functions but the symptom is the same. Should I open a separate issue for tracking this?

@lostluck
Copy link
Contributor Author

lostluck commented Jan 9, 2024

I suspect that this is an issue with Prism doing overly aggressive splitting which is probably not playing nicely with textio, and not related to function registration.

@lostluck
Copy link
Contributor Author

lostluck commented Jan 9, 2024

@ronoaldo I think a different issue should track what you've seen. Do tag me in that issue, as I think it shouldn't be too hard to start to improve the split approach to better tolerate fileio, while quickly passing the separation harness test. The splitting decision is very open to change right now.

I opened this one to track a specific bug in Prism WRT pipeline hangs, and I seem to have missed closing it after resolving the race condition*. The minimal_wordcount now correctly fails with errors relating to registration, instead of sometimes hanging forever, so I'd like to close this issue.

  • The issue was I didn't have a consolidated handler for waiting for pipeline termination and bundle termination, leading to sometimes the error being caught, and sometimes it being lost.

@lostluck
Copy link
Contributor Author

lostluck commented Jan 9, 2024

Lastly, @ronoaldo I'm also unable to replicate the bug you're seeing at repo HEAD or through versions of prism from Beam releases 2.50.0 to 2.53.0. What version are you seeing it on?

(edit: you do say it's all the time...)
(edit2: it also shows that the version is 2.53.0...)

OK. So far I'm simply unable to replicate it. Hmmm.

@lostluck
Copy link
Contributor Author

I can trigger the "aggressive splitting" if I add a time.Sleep(time.Millisecond) to the pipeline, and I can avoid the aggressive splitting if I account for any PCollection ElementCount increasing during the progress check interval. That is, if the Bundle isn't producing any output at all, then split, otherwise don't touch it.

While this is still a basic heuristic, having the TotalCount of elements emitted from the last progress available will let us make it more sophisticated in the future, based on the ratios of dataChannel inputs consumed since last time and total outputs and so on, so it's not only splitting when a bundle appears to be stuck. (eg. if we produced less than half output per input since the last progress tick.)

Anyway... #29968 might solve your issue, if you're able to patch it in and test it.

@lostluck lostluck changed the title [Bug][prism]: Unregistered Go functions cause prism execution to hang sometimes. [Bug][prism]: Excessive Splitting Jan 11, 2024
@lostluck lostluck self-assigned this Jan 11, 2024
@lostluck
Copy link
Contributor Author

Updated this issue to track the split discussion, since I do have a PR that appears to avoid the splitting aggression, at the cost of the general throughput (described elsewhere).

lostluck added a commit that referenced this issue Jan 12, 2024
…ss aggressively. (#29968)

* [prism] Return total element count to progress loop. Split less aggressively.

* Update comments.

---------

Co-authored-by: lostluck <[email protected]>
@github-actions github-actions bot added this to the 2.54.0 Release milestone Jan 12, 2024
@ronoaldo
Copy link

Sorry for the lack of quick reply! I'll try to run the same pipeline inside a docker container so that could help reproduce?

I'll try the fix as well and report back, thanks for the quick reply!

@ronoaldo
Copy link

tl, dr: network speed/storage caching/cdn was causing the latency and triggering the bug I reported; I guess that using the sleep is the best way to reproduce it.

Just sharing my findings when testing... Previously it was consistently failing for me, perhaps due do the network speed/latency. At first I thought it was due to Go 1.21.4 -- testing with another Go version was not causing it but I was able to reproduce the issue with that specific version using Docker.

Then I went ahead and started a for loop to test several go minor versions. And they all passed without issues. I then switched the file from kinglear to othello and it failed once, then never failed again. I guess that somehow then Cloud Storage cached the file and that reduced network latency during the local pipeline execution.

Combining this with your comment, it does seem to be related to the latency as by adding a time.Sleep reproduced it.

I'll test with the newer version of the library now to confirm that it passes as well. Thanks!

@lostluck
Copy link
Contributor Author

Thank you for reporting! Being inside Google's VPN definitely doesn't hurt my personal latency.

Longer term I'd like prism to track metrics like this so it's easier to write a test to reproduce it, and avoid a regression. I've added some initial considerations to #29650

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants