Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Possibly unnecessary prefetch during GroupIntoBatches #26395

Open
2 of 15 tasks
nbali opened this issue Apr 23, 2023 · 16 comments
Open
2 of 15 tasks

[Bug]: Possibly unnecessary prefetch during GroupIntoBatches #26395

nbali opened this issue Apr 23, 2023 · 16 comments

Comments

@nbali
Copy link
Contributor

nbali commented Apr 23, 2023

What happened?

As I was inspecting why the Dataflow "Total streaming data processed" metric was so much higher (2-4× depending on the pipeline) than the actual data that was being processed as it only has one shuffle/gbk/gib/etc transform I stumbled upon something that I couldn't justify being there:

if (num % prefetchFrequency == 0) {
// Prefetch data and modify batch state (readLater() modifies this)
batch.readLater();
}

I might be missing some context here, but I'm not even sure what benefit this provides for any generic GroupIntoBatches transform, but it seems to be especially useless if called on a GroupIntoBatches.WithShardedKey (when every data the batch contains comes from the same worker already).

I would appreciate any insight before I submit a PR to change this.

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@nbali
Copy link
Contributor Author

nbali commented May 2, 2023

.

@Abacn
Copy link
Contributor

Abacn commented May 3, 2023

Thanks for reporting this. This if block exists at the first place when GroupIntoBatches was implemented (#2610), assuming there was some reason for that. WithShardedKey was added some time later (still 3y ago). Haven't looked into detail, would like to dig into it.

"Total streaming data processed" metric was so much higher (2-4× depending on the pipeline) than the actual data

Have you tested that this is caused by the pointed code path? Would be nice if there is some benchmark data to share with

@nbali
Copy link
Contributor Author

nbali commented May 3, 2023

"assuming there was some reason for that"
I assume too, but there is literally no comment or discussion about it anywhere I could find. Meanwhile when I just think it through I can't find any justification for it.

"Have you tested that this is caused by the pointed code path? Would be nice if there is some benchmark data to share with"
Kinda. I didn't launch any pipeline with modified code to pinpoint this at 100% certainty though.

Also I don't think my pipeline result can be considered good enough for benchmarking regarding consistency and straightforwardness.

... but basically what I did was that I ran the pipelines on the same kafka streams with different configs concurrently. So theoretically the "shuffled" data amount should be equal. It wasn't. Meanwhile everything else (read data from kafka, written data into bq, etc) were essentially identical. My original goal wasn't to test this, but to actually optimize the pipelines by using non-default config values as the default seemed rather unoptimized based on the GCS storage usage pattern, and average batch size, and sometimes the pipeline couldn't even scale down due to the estimated backlog size (when cpu was clearly available)

In order to do that I was playing with these configs:

options.setMaxBufferingDurationMilliSec(...);
options.setGcsUploadBufferSizeBytes(...);

When I increased them - as expected - the behaviour, cpu utilization, storage usage pattern, etc changed in a way that corresponds with having bigger batches, but I noticed increased costs due to the "processed data amount". So it was obvious for some reason it handles data differently, so I started checking the code what it might be, and this seems like the only thing that could cause this.

For example one run like I mentioned:
Pipeline1:

  • gcsUploadBufferSizeBytes: 2MB
  • maxBufferingDurationMilliSec: 30s
  • every other config identical
  • Total vCPU time 2 139,443 vCPU hr
  • Kafka read:
    • Elements added (Approximate) 8 617 148 267
    • Estimated size 5,89 TB
  • BQ Write:
    • 8 617 148 273 + 8 617 148 224
    • 5,97 TB + 5,99 TB
  • Total streaming data processed 26,66 TB
    • depending on if both reading and writing counts or only one of them this should be either around ~12TB or ~24TB, so 26.66 seems okay

Pipeline2:

  • gcsUploadBufferSizeBytes: 16MB
  • maxBufferingDurationMilliSec: 30s
  • every other config identical
  • Total vCPU time 2 232,822 vCPU hr
  • Kafka read:
    • Elements added (Approximate) 8 637 509 081
    • Estimated size 6,16 TB
  • BQ Write:
    • 8 637 509 008 + 8 637 509 087
    • 6,06 TB + 6,09 TB
  • Total streaming data processed 46,06 TB
    • 12/24 vs 46

@nbali
Copy link
Contributor Author

nbali commented May 10, 2023

nbali added a commit to nbali/beam that referenced this issue May 10, 2023
@Abacn
Copy link
Contributor

Abacn commented May 10, 2023

Thanks for sharing benchmark result! Let me help find people with experience to review

@Abacn
Copy link
Contributor

Abacn commented May 10, 2023

Also thanks for being persistent. I am also looking into it in the mean time, and the benchmark result looks impressive. To my understanding, at least there is no harm to add a parameter to the GroupIntoBatches PTransform specifically, something like GroupIntoBatches.disablePreFetch

@nbali
Copy link
Contributor Author

nbali commented May 10, 2023

Before I created the PRs I was thinking what option to choose, and I decided against having a specific method on the GroupIntoBatches (GIB from now) itself for various reasons:

  • GIB is becoming a widely used transform, not quite as popular - and never will be - as GBK, but it's getting there
    • it's already being used at multiple places and I don't see why this change would be recommended only for a few but not for the others, so we would have to add this at every place
    • I would assume every streaming sink that could benefit from batch insertion will eventually utilize GIB, and I see no reason to force developers to be aware of this performance optimization (not to mention "non-beam" devs who utilize GIB in business pipelines)
  • if we decide to let the developers configure this externally at a transform instance level it would mean a lot of boilerplate code, as GIB functionality is usually deep inside other transforms
  • this functionality isn't strictly related to GIB itself, grouping by size, count, duration etc are, but having the need/or not to prefetch data sounds like an implementation detail of the stateful functionality

So to finally sum it up, either stateful processing needs it, and we need it everywhere, or it doesn't and we don't need it at all either. I saw no reason not to so I would have just removed it completely, but I also created the experiment version, just so we can release/use it without potentially breaking a release.

@Abacn
Copy link
Contributor

Abacn commented May 12, 2023

yeah, thanks, get it. Experimental flag sounds good in turns of not changing the behavior of the default config.

I am trying to run some test also in this weekend.

@Abacn
Copy link
Contributor

Abacn commented May 12, 2023

CC: @kennknowles (from file history) may have insight

@Abacn
Copy link
Contributor

Abacn commented May 12, 2023

Update: I tested #26618 with a test pipeline and found no noticable change with the switch on/off. Basically I am generating a steady stream of 100k element per second and each of 1 kB, so it is 100 MB/s throughput.

pipeline option provided: -Dexec.args="--project=*** --tempLocation=gs://***/temp --region=us-central1 --runner=DataflowRunner --numWorkers=5 --autoscalingAlgorithm=NONE --dataflowWorkerJar=/Users/***beam/runners/google-cloud-dataflow-java/worker/build/libs/beam-runners-google-cloud-dataflow-java-legacy-worker-2.48.0-SNAPSHOT.jar --experiments=enable_streaming_engine"

and add the disable prefetch experiments option.

Both jobs are drained after 15 min.

Metric prefetch disabled prefetch enabled (current master)
Elapsed time 19 min 23 sec 19 min 10 sec
elements_read 28,003,335 54,323,517
Total streaming data 84.76 GB 167.62 GB
ratio (kB / element) 3.03 3.09

Both settings having similar ratio (3kB per element, which is 3 times the actual data size), however, disabling the prefetch show significant regression on the through put, and causing surging backlog:

Backlog:

(pregetch disabled)
image

(prefetch enabled)

image

@nbali Have you been able to test your changed code with a pipeline? To test modified java-core code with your pipeline, one can build both sdks-java-core and dataflow worker jar (for runner v1) sth like

./gradlew -Ppublishing  sdks:java:core:jar
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar

and use the compiled jar as the dependency of java core, and pass the shadow worker jar to dataflow as pipeline option above.


test pipeline:

public class GroupIntoBatchesTest {

  public static void main(String[] argv) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(argv).withValidation().as(PipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(GenerateSequence.from(0).withRate(100000, Duration.standardSeconds(1)))
        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
        .apply(MapElements.via(new MapToKVFn(1000)))
        .apply(GroupIntoBatches.ofSize(50000))
        .apply(ParDo.of(new DoFn<KV<Integer, Iterable<byte[]>>, Void>(){
          @ProcessElement
          public void procees(ProcessContext ctx) {
            System.out.println("grouped key:" + Objects.requireNonNull(ctx.element()).getKey());
          }
        }));
    p.run();

  }

  static class MapToKVFn extends SimpleFunction<Long, KV<Integer, byte[]>> {

    private transient Random rd;

    private final int valueSize;

    public MapToKVFn(int valueSize) {
      this.valueSize = valueSize;
    }

    @Override
    public KV<Integer, byte[]> apply(Long input) {
      if (rd == null) {
        rd = new Random();
      }
      byte[] data = new byte[valueSize];
      rd.nextBytes(data);
      return KV.of(Long.hashCode(input)%10, data);
    }
  }
}

@nbali
Copy link
Contributor Author

nbali commented May 15, 2023

I haven't had the time yet - I will try to make time in the upcoming days -, but your example has a theoretical maximum at 100k element/sec, executed for 900s, meanwhile it's only 28M and 54M. Which means it was 30-60k/sec. The hashing split it into 10 keys. So a single key received about 3-6k msg/sec. The windowing essentially closed every batch after 10s. So a window for a key received 30-60k element, which indicates totally different amount of prefetching (even if both are using the master version), yet the data amount is the same. Seems odd to me. Do you have the input/output stats of the GIB transform? Also over a minute delays with both cases with 10s windowing? It's seems CPU limited. I mean isn't this essentially single threaded?

@nbali
Copy link
Contributor Author

nbali commented Sep 26, 2023

@Abacn
Well, I only had time now at work. At least some. I haven't tried running it with the modified runner yet, only just by modifying parameters.

First of all I'm not saying your tests are invalid in any way, but I also did mine and saw huge differences. I didn't mention originally as I thought it's irrelevant and should happen with every GIB- my bad - but where I noticed this is the GroupIntoBatches inside BigQueryIO. More specifically this one:

GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(byteSize)
.withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())

This is what I use on the BQ.Write:

.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(...)
.withAutoSharding()

For the benchmark I removed some irrelevant parts from my pipeline, but kept the core logic of Kafka.Read->BigQuery.Write. First deployed a flex template, then launched the template with different configurations to start consuming.

About the configurations:

  • the size is hardcoded, so that is 500'000 in every test.
  • the byteSize is configurable, I tested with 2 MB and 100 MB.
  • the bufferingDuration is configurable as well, I tested with 1 minute and 1 hour.

That gives me 4 scenario.

The source is a kafka stream with 12 partitions, about 1k elements/sec and 1 MB/sec for every partition.

Reaching the prefetchSize (500'000 / 5) takes typically almost 2 minutes, so if we trigger the batches more often we never call the "prefetch" code. The 2 MB limit most likely triggers it immediately in a few seconds, so we should never "prefetch". The 100 MB limit most likely triggers 1 prefetch (100MB / 1KB ~100'000).

After 22 min of runtime:

  • read from kafka: 13-15 GB, 17 million
  • streamed data:
    2 MB + 1 min -> 30,77 GB (*~2)
    2 MB + 1 hour -> 30,99 GB (*~2)
    100 MB + 1 min -> 63,38 GB (*~4)
    100 MB + 1 hour -> 65,09 GB (*~4)
  • GroupIntoBatches.WithShardedKey metrics from 18 million element input:
    2 MB + 1 min ->~7000 batches
    2 MB + 1 hour -> ~7000 batches
    100 MB + 1 min -> ~700 batches
    100 MB + 1 hour -> ~120 batches

As you can see there are huge differences in the results, but also compared to your test pipeline in the code as well, for example you don't use .withShardedKey() and trigger it at a much higher frequency.

For the next run I have increased the 100 MB option to 256 MB to trigger prefetching more (and also switched to n1-highmem-8 from n1-standard-8 just to be sure avoid an OOME). Everything else is the same.

Drained after 40 min:

  • read from kafka: 16-19 GB, 20-21 million
  • streamed data:
    2 MB + 1 min -> 39 GB (*~2)
    2 MB + 1 hour -> 39 GB (*~2)
    256 MB + 1 min -> 75 GB (*~4)
    256 MB + 1 hour -> 96 GB (*~5)

I could only guess the reason you couldn't see it is that you fire too quickly and it might be still cached? I would modify the rate from 100'000/sec to 10'000/sec for example (so with the key creation, it will be 1'000/sec for every key on average).

When I have time again, I will try with a modified runner.

@nbali
Copy link
Contributor Author

nbali commented Sep 26, 2023

Quick update, runtime 11h, same as the previous example with the exception that the message size has been reduced to ~1/10, but I process every message 10 times, so still 1MB/sec for every partition, but 10k elements/sec.

  • read from kafka: 270-280 GB, 380 million (it's 470 GB and 3.8 billion at the GIB)
  • streamed data:
    2 MB + 1 min -> 1 TB (*~2)
    2 MB + 1 hour -> 1 TB (*~2)
    256 MB + 1 min -> 1,48 TB (*~3)
    256 MB + 1 hour -> 1,58 TB (*~3)

@nbali
Copy link
Contributor Author

nbali commented Sep 29, 2023

@Abacn Tested with the modified GroupIntoBatches transform too, and it didn't influence the processed data amount for me either in any of the tested scenarios. Something is still causing non-linear increase in the processed data amount, but this isn't it.

@Abacn
Copy link
Contributor

Abacn commented Nov 28, 2023

We identified a cause of large ratio of (shuffled data) / (processed data): #27283 which is recently fixed in #29517. It may/may not related to this use case

@nbali
Copy link
Contributor Author

nbali commented Nov 29, 2023

@Abacn I see how that would cause a similar pattern in unexpected shuffled data amount, and I also get it how that change fixes that issue, but what I can't see is where do we use an Iterable in the tested scenarios that would cause this? When checking GroupIntoBatchesDoFn the obvious pick would be BagState, but that is what this test was about, and we clearly didn't read it. Looking at the other states (CombiningState, ValueState) nothing really sticks out for me... OR is that the registerByteObserver() was called regularly on the BagState's Iterable content by the framework and although we clearly didn't explicitly read it, it was read as this unintended side-effect therefore causing the costs?

EDIT:
Nvm, did a quick test. The shuffled data is still non linear. The diff between 1 MB + 1 minute vs 256MB + 1 hour is over 2*.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment