-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kinesis: More robust default fetch settings. #13539
Conversation
1) Default recordsPerFetch and recordBufferSize based on available memory rather than using hardcoded numbers. For this, we need an estimate of record size. Use 10 KB for regular records and 1 MB for aggregated records. With 1 GB heaps, 2 processors per task, and nonaggregated records, recordBufferSize comes out to the same as the old default (10000), and recordsPerFetch comes out slightly lower (1250 instead of 4000). 2) Default maxRecordsPerPoll based on whether records are aggregated or not (100 if not aggregated, 1 if aggregated). Prior default was 100. 3) Default fetchThreads based on processors divided by task count on Indexers, rather than overall processor count. 4) Additionally clean up the serialized JSON a bit by adding various JsonInclude annotations.
* Together with {@link KinesisIndexTaskTuningConfig#RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION}, don't take up more | ||
* than 15% of the heap. | ||
*/ | ||
private static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a typo since 15% is mentioned above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the intention here is that RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION + RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION
makes up the 15% threshold.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I misread it.
? KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE_AGGREGATE | ||
: KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE; | ||
|
||
return Ints.checkedCast(Math.max(1, memoryToUse / assumedRecordSize / fetchThreads)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I right to assume that for a 1GB-heap task with deaggregate=true and 1 available processor, the default recordsPerFetch from this calculation is 25?
memoryToUse = min(100_000_000, 1_000_000_000 * 0.05) = 50_000_000
assumedRecordSize = 1_000_000
recordsPerFetch = 50_000_000 / 1_000_000 / 2 = 25
If we carry the assumption that each record is 1MB, I think this result conflicts with the maximum size of data that GetRecords can return, which is 10 MB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of the Kinesis API is that in that case, the GetRecords API will return fewer records, using our limit as a cap. That would be OK if so; nothing wrong with getting less than this from the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, true
Merging since IT failure was unrelated |
PR apache#13539 refactored record supplier creation and introduced a bug: this method would throw NPE when recordsPerFetch was not provided by the user. recordsPerFetch isn't needed in this context at all, since the supervisor-side supplier doesn't fetch records. So this patch sets it to zero.
* Fix NPE in KinesisSupervisor#setupRecordSupplier. PR #13539 refactored record supplier creation and introduced a bug: this method would throw NPE when recordsPerFetch was not provided by the user. recordsPerFetch isn't needed in this context at all, since the supervisor-side supplier doesn't fetch records. So this patch sets it to zero. * Remove unused imports.
The current defaults can cause issues for people because they do not take into account the generally large difference in size between aggregated and nonaggregated records, and they do not take into account the amount of available memory. This patch improves the defaults such that they do take these things into account and would be appropriate in a wider variety of situations.
Default recordsPerFetch and recordBufferSize based on available memory
rather than using hardcoded numbers. For this, we need an estimate
of record size. Use 10 KB for regular records and 1 MB for aggregated
records. With 1 GB heaps, 2 processors per task, and nonaggregated
records, recordBufferSize comes out to the same as the old
default (10000), and recordsPerFetch comes out slightly lower (1250
instead of 4000).
Default maxRecordsPerPoll based on whether records are aggregated
or not (100 if not aggregated, 1 if aggregated). Prior default was 100.
Default fetchThreads based on processors divided by task count on
Indexers, rather than overall processor count.
Additionally clean up the serialized JSON a bit by adding various
JsonInclude annotations.