Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis: More robust default fetch settings. #13539

Merged
merged 4 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 38 additions & 37 deletions docs/development/extensions-core/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec:
"endpoint": "kinesis.us-east-1.amazonaws.com",
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"recordsPerFetch": 4000,
"fetchDelayMillis": 0
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kinesis",
Expand All @@ -121,10 +119,10 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec:
|`type`|The supervisor type; this should always be `kinesis`.|yes|
|`spec`|Container object for the supervisor configuration.|yes|
|`dataSchema`|The schema that will be used by the Kinesis indexing task during ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema).|yes|
|`ioConfig`|A `KinesisSupervisorIOConfig` object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task. See [KinesisSupervisorIOConfig](#kinesissupervisorioconfig) below.|yes|
|`tuningConfig`|A `KinesisSupervisorTuningConfig` object for configuring performance-related settings for the supervisor and indexing tasks. See [KinesisSupervisorTuningConfig](#kinesissupervisortuningconfig) below.|no|
|`ioConfig`|An [`ioConfig`](#ioconfig) object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task.|yes|
|`tuningConfig`|A [`tuningConfig`](#tuningconfig) object for configuring performance-related settings for the supervisor and indexing tasks.|no|

### KinesisSupervisorIOConfig
### `ioConfig`

|Field|Type|Description|Required|
|-----|----|-----------|--------|
Expand All @@ -140,7 +138,7 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec:
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT6H)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*. Messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`recordsPerFetch`|Integer|The number of records to request per call to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (default == 4000)|
|`recordsPerFetch`|Integer|The number of records to request per call to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)|
|`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent calls to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (default == 0)|
|`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no|
|`awsExternalId`|String|The AWS external id to use for additional permissions.|no|
Expand Down Expand Up @@ -243,9 +241,7 @@ The following example demonstrates a supervisor spec with `lagBased` autoScaler
"endpoint": "kinesis.us-east-1.amazonaws.com",
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"recordsPerFetch": 4000,
"fetchDelayMillis": 0
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kinesis",
Expand All @@ -271,7 +267,7 @@ For more information, see [Data formats](../../ingestion/data-formats.md). You c

<a name="tuningconfig"></a>

### KinesisSupervisorTuningConfig
### `tuningConfig`

The `tuningConfig` is optional. If no `tuningConfig` is specified, default parameters are used.

Expand All @@ -296,16 +292,16 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param
|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.| no (default == 8)|
|`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)|
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)|
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)|
|`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for space to become available in the buffer before timing out.| no (default == 5000)|
|`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again.|no (default == 5000)|
|`fetchThreads`|Integer|Size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|no (default == procs * 2, where "procs" is the number of processors on the server that the task is running on) |
|`fetchThreads`|Integer|Size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|no (default == procs * 2, where "procs" is the number of processors available to the task) |
|`segmentWriteOutMediumFactory`|Object|Segment write-out medium to use when creating segments. See below for more information.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.| no (default == P2147483647D)|
|`logParseExceptions`|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|no, default == false|
|`maxParseExceptions`|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set.|no, unlimited default|
|`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#task-reports). Overridden if `reportParseExceptions` is set.|no, default == 0|
|`maxRecordsPerPoll`|Integer|The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`|no, default == 100|
|`maxRecordsPerPoll`|Integer|The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)|
|`repartitionTransitionDuration`|ISO8601 Period|When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600.|no, (default == PT2M)|
|`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value (`PT5S`), the supervisor ignores the value and uses the minimum value instead.|no (default == PT30S, min == PT5S)|
|`useListShards`|Boolean|Indicates if `listShards` API of AWS Kinesis SDK can be used to prevent `LimitExceededException` during ingestion. Please note that the necessary `IAM` permissions must be set for this to work.|no (default == false)|
Expand Down Expand Up @@ -593,16 +589,27 @@ There is also ongoing work to support automatic segment compaction of sharded se
Hadoop (see [here](https://github.com/apache/druid/pull/5102)).

### Determining Fetch Settings
Internally, the Kinesis Indexing Service uses the Kinesis Record Supplier abstraction for fetching Kinesis data records and storing the records
locally. The way the Kinesis Record Supplier fetches records is to have a separate thread run the fetching operation per each Kinesis Shard, the
max number of threads is determined by `fetchThreads`. For example, a Kinesis stream with 3 shards will have 3 threads, each fetching from a shard separately.
There is a delay between each fetching operation, which is controlled by `fetchDelayMillis`. The maximum number of records to be fetched per thread per
operation is controlled by `recordsPerFetch`. Note that this is not the same as `maxRecordsPerPoll`.

The records fetched by each thread will be pushed to a queue in the order that they are fetched. The records are stored in this queue until `poll()` is called
by either the supervisor or the indexing task. `poll()` will attempt to drain the internal buffer queue up to a limit of `max(maxRecordsPerPoll, q.size())`.
Here `maxRecordsPerPoll` controls the theoretical maximum records to drain out of the buffer queue, so setting this parameter to a reasonable value is essential
in preventing the queue from overflowing or memory exceeding heap size.
Kinesis indexing tasks fetch records using `fetchThreads` threads.
If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
of `fetchDelayMillis`.
The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.

When using Kinesis Producer Library's aggregation feature (i.e. when [`deaggregate`](#deaggregation) is set),
each of these parameters refers to aggregated records rather than individual records.

The default values for these parameters are:

- `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
particular server).
- `fetchDelayMillis`: 0 (no delay between fetches).
- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).

Kinesis places the following restrictions on calls to fetch records:

Expand All @@ -611,25 +618,19 @@ Kinesis places the following restrictions on calls to fetch records:
- Each shard can read up to 2 MB per second.
- The maximum size of data that GetRecords can return is 10 MB.

Values for `recordsPerFetch` and `fetchDelayMillis` should be chosen to maximize throughput under the above constraints.
The values that you choose will depend on the average size of a record and the number of consumers you have reading from
a given shard (which will be `replicas` unless you have other consumers also reading from this Kinesis stream).

If the above limits are violated, AWS will throw ProvisionedThroughputExceededException errors on subsequent calls to
read data. When this happens, the Kinesis indexing service will pause by `fetchDelayMillis` and then attempt the call
again.
If the above limits are exceeded, Kinesis throws ProvisionedThroughputExceededException errors. If this happens, Druid
Kinesis tasks pause by `fetchDelayMillis` or 3 seconds, whichever is larger, and then attempt the call again.

Internally, each indexing task maintains a buffer that stores the fetched but not yet processed record. `recordsPerFetch` and `fetchDelayMillis`
control this behavior. The number of records that the indexing task fetch from the buffer is controlled by `maxRecordsPerPoll`, which
determines the number of records to be processed per each ingestion loop in the task.
In most cases, the default settings for fetch parameters are sufficient to achieve good performance without excessive
memory usage. However, in some cases, you may need to adjust these parameters in order to more finely control fetch rate
and memory usage. Optimal values depend on the average size of a record and the number of consumers you have reading
from a given shard, which will be `replicas` unless you have other consumers also reading from this Kinesis stream.

## Deaggregation
See [issue](https://github.com/apache/druid/issues/6714)

The Kinesis indexing service supports de-aggregation of multiple rows packed into a single record by the Kinesis
Producer Library's aggregate method for more efficient data transfer.

To enable this feature, set `deaggregate` to true when submitting a supervisor-spec.
To enable this feature, set `deaggregate` to true in your `ioConfig` when submitting a supervisor spec.

## Resharding

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,25 @@
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.utils.RuntimeInfo;

import java.util.Map;

public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
private static final String TYPE = "index_kinesis";
private static final Logger log = new Logger(KinesisIndexTask.class);

private final boolean useListShards;
private final AWSCredentialsConfig awsCredentialsConfig;
private RuntimeInfo runtimeInfo;

@JsonCreator
public KinesisIndexTask(
Expand All @@ -66,6 +72,13 @@ public KinesisIndexTask(
this.awsCredentialsConfig = awsCredentialsConfig;
}

@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
this.runtimeInfo = toolbox.getAdjustedRuntimeInfo();
return super.runTask(toolbox);
}

@Override
protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
{
Expand All @@ -84,29 +97,38 @@ protected KinesisRecordSupplier newTaskRecordSupplier()
{
KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) super.ioConfig);
KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig);
int fetchThreads = tuningConfig.getFetchThreads() != null
? tuningConfig.getFetchThreads()
: Runtime.getRuntime().availableProcessors() * 2;
final int fetchThreads = computeFetchThreads(runtimeInfo, tuningConfig.getFetchThreads());
final int recordsPerFetch = ioConfig.getRecordsPerFetchOrDefault(runtimeInfo.getMaxHeapSizeBytes(), fetchThreads);
final int recordBufferSize =
tuningConfig.getRecordBufferSizeOrDefault(runtimeInfo.getMaxHeapSizeBytes(), ioConfig.isDeaggregate());
final int maxRecordsPerPoll = tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate());

Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record supplier"
log.info(
"Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], recordsPerFetch [%d], "
+ "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].",
fetchThreads,
ioConfig.getFetchDelayMillis(),
recordsPerFetch,
recordBufferSize,
maxRecordsPerPoll,
ioConfig.isDeaggregate()
);

return new KinesisRecordSupplier(
KinesisRecordSupplier.getAmazonKinesisClient(
ioConfig.getEndpoint(),
awsCredentialsConfig,
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
ioConfig.getRecordsPerFetch(),
recordsPerFetch,
ioConfig.getFetchDelayMillis(),
fetchThreads,
ioConfig.isDeaggregate(),
tuningConfig.getRecordBufferSize(),
recordBufferSize,
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getMaxRecordsPerPoll(),
maxRecordsPerPoll,
false,
useListShards
);
Expand Down Expand Up @@ -136,4 +158,22 @@ AWSCredentialsConfig getAwsCredentialsConfig()
{
return awsCredentialsConfig;
}

@VisibleForTesting
static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads)
{
final int fetchThreads;
if (configuredFetchThreads != null) {
fetchThreads = configuredFetchThreads;
} else {
fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
}

Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record supplier"
);

return fetchThreads;
}
}
Loading