-
Notifications
You must be signed in to change notification settings - Fork 63
FAQs
- What is error_code X?
- What is
offsets_not_available_at_group_coordinator
? - What is
offsets_out_of_range
? - What is
MessageTooBigException
? - How do I prevent data loss?
- How should I configure Kafunk to run across a WAN?
- How do I consume faster?
- How do I produce faster?
- Should I use
Consumer.stream
orConsumer.consume
? - I'm seeing lots of consumer heartbeat errors, and group rebalancing. What's up?
- How does Kafunk handle failures?
- How does logging work?
- What does this Kafunk log message mean?
- How do I monitor consumer lag/progress?
- Why is my consumer stalling?
The event offsets_not_available_at_group_coordinator
indicates that the Kafka broker responsible for storing a consumer's offsets - the group coordinator - doesn't have offsets for some or all of the partitions for a given topic, consumer group combination. This can happen if the consumer's ConsumerConfig.autoOffsetReset strategy is set to AutoOffsetReset.Halt or AutoOffsetReset.TryStartFromCommittedOffsets and one of is true:
- The consumer is running for the first time.
- The consumer hasn't committed offsets for longer than the offset retention period.
- The broker lost of the offsets.
In any case, the consumer should be pointed to offsets explicitly using either Consumer.commitOffsets or Consumer.commitOffsetsToTime or the consumer should use AutoOffsetReset.StartFromTime. Note that due to retention policies, there is potential for the consumer to skip over messages that were removed due to retention. See: Durability.
Scenario 2
can happen if the consumer goes offline for longer than the offset retention period - since the consumer is offline, offsets aren't being committed and eventually get deleted by the broker. The offset retention period can be extended, either by updating the server side configuration offsets.retention.minutes
, or on the client side by overriding ConsumerConfig.offsetRetentionTime. If consuming with periodic offset commits (with Consumer.consumePeriodicCommit
, or otherwise) then Kafunk will commit offsets periodically even if no new messages are received. However, it will only commit offsets that exist. Therefore, if offsets are missing, they must be committed explicitly as described above.
Scenario 3
shouldn't happen in theory, however there have been incidents in practice.
See also: Durability
The event offsets_out_of_range
indicates that the offsets requested in a FetchRequest
aren't available. This can happen in the following scenarios:
- The consumer is running for the first time and doesn't have an automatic
autoOffsetReset
value.Consumer.commitOffsets
should be called to explicitly initialize a consumer to an offset. - The consumer was assigned to invalid offsets, using
Consumer.commitOffsets
, for example. This is simply user error. Valid offsets for a topic can be discovered using the Offsets module. - The consumer fell behind the topic retention window. This can happen if the consumer was offline for a period of time, or the consumer is unable to keep up with the pace of retention even when online. In this case, messages have been lost. Action must be taken to ensure the consumer can keep up with both the producers and the retention window.
- A lagging broker became the leader for a partition, and it didn't have an offset that the prior leader had. This is the opposite of the previous scenario in that the consumer is ahead of where the broker is rather than behind. Note that simply having the consumer reset from an earlier available offset will not prevent message loss because the desired offset will be overwritten by a different message than what was there prior to the new broker becoming leader.
By default, Kafunk is configured to halt when it detects out of range offsets. This behavior can be changed by changing ConsumerConfig.autoOffsetReset.
See also:
- What is
offsets_not_available_at_group_coordinator
? - How do I consume faster?
- How do I prevent data loss?
Kafunk raises a MessageTooBigException
when attempting to decode a FetchResponse
and a message in a message set is bigger than the entire message set. This means that the message was truncated before being sent, and therefore corrupt. To fix this, make sure that ConsumerConfig.fetchMaxBytes is greater than the size of the largest message in a topic. An applicable Kafka server configuration point is max.message.bytes
.
See: Durability
Increased latencies across a WAN connection can warrant larger TCP window sizes. The applicable configuration points are:
- ChanConfig.receiveBufferSize controls the TCP receive buffer size.
- ChanConfig.sendBufferSize controls the TCP send buffer size.
The following consumer and producer configuration points are also applicable:
- ConsumerConfig.fetchMaxBytes controls the maximum amount of data Kafka will send for each partition of a fetch request. Increasing this value will increase throughput. Note however that setting this value too high may cause heartbeat requests to slow down beyond the session timeout period, causing the consumer instance to be ejected from the group.
- ProducerConfig.batchSizeBytes controls the maximum size of a batch of client side produce requests before being sent to the server. Increasing this value will increase throughput at the cost of latency.
- ProducerConfig.batchLingerMs controls the maximum amount of time to wait before sending a batch of client side produce requests to the server. Increasing this value will increase throughput at the cost of latency.
Note that the tradeoff being made is that of throughput versus latency. For some rules of thumb on tuning the TCP window with respect to network latency, look here.
See How should I configure Kafunk to run across a WAN?
See How should I configure Kafunk to run across a WAN?
Both Consumer.stream
and Consumer.consume
are implemented using Consumer.generations
, but they differ in terms of delegation of control. Consumer.consume
takes a function which is then called for every received message set, in parallel across partitions, but sequentially within a partition. The messages sets are pushed to the provided function. Consumer.stream
returns an async sequence which must be pulled. Before being iterated however, this async sequence can be transformed, allowing for data-driven parallelism.
As a rule of thumb, use Consumer.stream
only if you need fine grained control over parallelism, or if you're explicitly defining a streaming workflow. In all other cases, prefer to use Consumer.consume
or its variants.
Symptoms:
- Slow or stalled consumer progress.
- Event logs:
heartbeat_error
rejoining_group
join_group_error
sync_group_error
commit_offset_errors
- Kafka error codes:
- 22 - IllegalGenerationCode
- 25 - UnknownMemberIdCode
- 27 - RebalanceInProgressCode
Possible causes:
- Restarting consumers: if consumers are frequently restarting due to errors, they will attempt to rejoin the consumer group and this will cause frequent rebalancing which can prevent the consumer from making progress.
- Under-resourced consumers: if the consumer instances have insufficient machine resources (CPU, memory, network) then they may take too long to connect or join a group, resulting in group instability.
- Low session timeout: if ConsumerConfig.sessionTimeout is too low, consumers may have insufficient time to join the consumer group and synchronize before a rebalance is initiated. Related to this is ConsumerConfig.fetchMaxBytes which if set very high can slow the rate at which a group coordinator, which is also a leader for a partition, acknowledges heartbeats. As a result, it can initiate a rebalance. Increasing ConsumerConfig.sessionTimeout often times fixes the issue.
See: Fault Tolerance
Kafunk publishes all logging events to Log.Event
. By default, log events are written to STDOUT
at the INFO
level. This may be disabled by disposing Log.ConsolePrinterSubscription
. The default log level is INFO
and may be adjusted by mutating the field Log.MinLevel
. You may add your own logger by simply subscribing to Log.Event
. This area will soon undergo changes as tracked here.
See: Events
You may use existing tools for monitoring Kafka progress. However, you may also monitor consumer progress directly using the Kafka API. The ConsumerInfo.progress function takes a connection, a group id, a topic and an option set of partitions and returns an CosumerProgressInfo object representing the progress of the consumer in the topic. Take a look at the ConsumerProgress script for an example.
A common reason for a consumer completely stalling is the consumer provided handler function never completing. Since the consumer processes messages sequentially across partitions, subsequent message sets won't be fetched and processed if a handler stalls. To verify this, place a timeout on the consumer provided handler function. See Async.timeoutOption for an example.