Skip to content

Commit

Permalink
Refactor some end-to-end functionality
Browse files Browse the repository at this point in the history
- Disable deletion of stale consumer groups by default
- Enforce a min prefix length of 3 chars
  • Loading branch information
weeco committed Jun 13, 2021
1 parent 8157df5 commit 1a777bf
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 116 deletions.
59 changes: 38 additions & 21 deletions docs/end-to-end.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,82 +3,94 @@
This page describes the end-to-end monitoring feature in KMinion, how it works, and what metrics it provides.

## Motivation

> What is the issue? Why did we build this feature?
We can monitor metrics like CPU usage, free disk space, or even consumer group lag.
However, these metrics don't give us a good idea of the performance characteristics an actual, real-world, client
experiences when connected to the cluster.
We can monitor metrics like CPU usage, free disk space, or even consumer group lag. However, these metrics don't give us
a good idea of the performance characteristics an actual, real-world, client experiences when connected to the cluster.

With the "classic" metrics lots of questions go unanswered:

With the "classic" metrics lots of questions go unanswered:
- Can a client produce messages to the cluster?
- Can clients produce & consume messages as well as commit group offsets with an acceptable latency?
- Is the cluster in a healthy state from a client's perspective?

## Approach & Implementation

> How do we solve those issues? How does the feature work?
The most reliably way to get real-world performance and availability metrics is to actually run a producer/consumer
The most reliably way to get real-world performance and availability metrics is to actually run a producer/consumer
ourselves. This is exactly what the end-to-end monitoring feature does!

## High Level Overview
In order to determine if the cluster is fully operational, and it's performance is within acceptable limits,
KMinion continuously produces and consumes messages to/from the cluster. That way we can measure things like ack-latency,

In order to determine if the cluster is fully operational, and it's performance is within acceptable limits, KMinion
continuously produces and consumes messages to/from the cluster. That way we can measure things like ack-latency,
commit-latency, and roundtrip-time.

KMinion creates and manages its own topic for the end-to-end test messages. The name of the topic can be configured.

**The first step** is to create a message and send it to the cluster.
- Every produced message is added to an internal tracker, so we can recognize messages being "lost".
A message is considered lost if it doesn't arrive back at the consumer within the configured time span.

- Every produced message is added to an internal tracker, so we can recognize messages being "lost". A message is
considered lost if it doesn't arrive back at the consumer within the configured time span.

**The second step** is to continuously consume the topic.
- As each message arrives, we calculate its roundtrip time (time from the point the message was created, until KMinion received it again)

- As each message arrives, we calculate its roundtrip time (time from the point the message was created, until KMinion
received it again)
- Consumer group offsets are committed periodically, while also recording the time each commit takes.

### Topic Management

The topic KMinion uses, is created and managed completely automatically (the topic name can be configured though).

KMinion continuously checks the topic and fixes issues/imbalances automatically:
- Add partitions to the topic, so it has at least as many partitions as there are brokers.
- Will reassign partitions to ensure every broker leads at least one partition, and that all partitions' replicas
are distributed evenly across the brokers. KMinion tries to assign partitionIDs to brokers that have the same broker id.

- Add partitions to the topic, so it has at least as many partitions as there are brokers.
- Will reassign partitions to ensure every broker leads at least one partition, and that all partitions' replicas are
distributed evenly across the brokers. KMinion tries to assign partitionIDs to brokers that have the same broker id.

### Consumer Group Management
On startup each KMinion instance generates a unique identifier (UUID) that is used to create its own consumer group.
It incorporates the shared prefix from the config.

On startup each KMinion instance generates a unique identifier (UUID) that is used to create its own consumer group. It
incorporates the shared prefix from the config.

That is necessary because:

- Offsets must not be shared among multiple instances.
- Each instance must always consume **all** partitions of the topic.

The instances' UUID is also embedded in every message, so each instance can easily filter out messages it didn't produce.
That's why it is perfectly fine to run multiple KMinion instances against the same cluster, using the same topic.

KMinion also monitors and deletes consumer groups that use it's configured prefix.
That way, when an instance exits/restarts, previous consumer groups will be cleaned up quickly (check happens every 20s).
The instances' UUID is also embedded in every message, so each instance can easily filter out messages it didn't
produce. That's why it is perfectly fine to run multiple KMinion instances against the same cluster, using the same
topic.

KMinion also monitors and deletes consumer groups that use it's configured prefix. That way, when an instance
exits/restarts, previous consumer groups will be cleaned up quickly (check happens every 20s).

## Available Metrics

The end-to-end monitoring feature exports the following metrics.

### Counters

| Name | Description |
| --- | --- |
| `kminion_end_to_end_messages_produced_total ` | Messages KMinion *tried* to send |
| `kminion_end_to_end_messages_acked_total ` | Messages actually sent and acknowledged by the cluster |
| `kminion_end_to_end_messages_received_total ` | Number of messages received (only counts those that match, i.e. that this instance actually produced itself) |
| `kminion_end_to_end_commits_total` | Number of successful offset commits |


### Histograms

| Name | Description |
| --- | --- |
| `kminion_end_to_end_produce_latency_seconds ` | Duration until the cluster acknowledged a message. |
| `kminion_end_to_end_commit_latency_seconds` | Duration of offset commits. Has a label for coordinator brokerID that answered the commit request |
| `kminion_end_to_end_roundtrip_latency_seconds ` | Duration from creation of a message, until it was received/consumed again. |

## Config Properties

All config properties related to this feature are located in `minion.endToEnd`.

```yaml
Expand Down Expand Up @@ -117,6 +129,11 @@ All config properties related to this feature are located in `minion.endToEnd`.
consumer:
# Prefix kminion uses when creating its consumer groups. Current kminion instance id will be appended automatically
groupIdPrefix: kminion-end-to-end

# Whether KMinion should try to delete empty consumer groups with the same prefix. This can be used if you want
# KMinion to cleanup it's old consumer groups. It should only be used if you use a unique prefix for KMinion.
deleteStaleConsumerGroups: false

# Defines the time limit beyond which a message is considered "lost" (failed the roundtrip),
# also used as the upper bound for histogram buckets in "roundtrip_latency"
roundtripSla: 20s
Expand Down
15 changes: 10 additions & 5 deletions docs/reference-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ logger:
level: info

kafka:
brokers: []
brokers: [ ]
clientId: "kminion"
rackId: ""
tls:
Expand Down Expand Up @@ -72,21 +72,21 @@ minion:
# AllowedGroups are regex strings of group ids that shall be exported
# You can specify allowed groups by providing literals like "my-consumergroup-name" or by providing regex expressions
# like "/internal-.*/".
allowedGroups: [".*"]
allowedGroups: [ ".*" ]
# IgnoredGroups are regex strings of group ids that shall be ignored/skipped when exporting metrics. Ignored groups
# take precedence over allowed groups.
ignoredGroups: []
ignoredGroups: [ ]
topics:
# Granularity can be per topic or per partition. If you want to reduce the number of exported metric series and
# you aren't interested in per partition metrics you could choose "topic".
granularity: partition
# AllowedTopics are regex strings of topic names whose topic metrics that shall be exported.
# You can specify allowed topics by providing literals like "my-topic-name" or by providing regex expressions
# like "/internal-.*/".
allowedTopics: [".*"]
allowedTopics: [ ".*" ]
# IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
# take precedence over allowed topics.
ignoredTopics: []
ignoredTopics: [ ]
logDirs:
# Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior
# to version 1.0.0 as describing log dirs was not supported back then.
Expand Down Expand Up @@ -132,6 +132,11 @@ minion:
consumer:
# Prefix kminion uses when creating its consumer groups. Current kminion instance id will be appended automatically
groupIdPrefix: kminion-end-to-end

# Whether KMinion should try to delete empty consumer groups with the same prefix. This can be used if you want
# KMinion to cleanup it's old consumer groups. It should only be used if you use a unique prefix for KMinion.
deleteStaleConsumerGroups: false

# This defines:
# - Upper bound for histogram buckets in "roundtrip_latency"
# - Time limit beyond which a message is considered "lost" (failed the roundtrip)
Expand Down
7 changes: 6 additions & 1 deletion e2e/config_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,24 @@ import (
)

type EndToEndConsumerConfig struct {
GroupIdPrefix string `koanf:"groupIdPrefix"`
GroupIdPrefix string `koanf:"groupIdPrefix"`
DeleteStaleConsumerGroups bool `koanf:"deleteStaleConsumerGroups"`

RoundtripSla time.Duration `koanf:"roundtripSla"`
CommitSla time.Duration `koanf:"commitSla"`
}

func (c *EndToEndConsumerConfig) SetDefaults() {
c.GroupIdPrefix = "kminion-end-to-end"
c.DeleteStaleConsumerGroups = false
c.RoundtripSla = 20 * time.Second
c.CommitSla = 10 * time.Second // no idea what to use as a good default value
}

func (c *EndToEndConsumerConfig) Validate() error {
if len(c.GroupIdPrefix) < 3 {
return fmt.Errorf("kminion prefix should be at least 3 characters long")
}

if c.RoundtripSla <= 0 {
return fmt.Errorf("consumer.roundtripSla must be greater than zero")
Expand Down
4 changes: 1 addition & 3 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ func (s *Service) startConsumeMessages(ctx context.Context) {

// Process messages
fetches.EachRecord(func(record *kgo.Record) {
if record != nil {
s.processMessage(record, receiveTimestamp)
}
s.processMessage(record, receiveTimestamp)
})
}
}
Expand Down
82 changes: 30 additions & 52 deletions e2e/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package e2e

import (
"context"
"fmt"
"strings"
"time"

Expand All @@ -23,75 +22,56 @@ const (
// Whenever a kminion instance starts up it creates a consumer-group for itself in order to not "collide" with other kminion instances.
// When an instance restarts (for whatever reason), it creates a new group again, so we'd end up with a lot of unused groups.
type groupTracker struct {
svc *Service // used to obtain stuff like logger, kafka client, ...
logger *zap.Logger
ctx context.Context // cancellation context

client *kgo.Client // kafka client

cfg Config
logger *zap.Logger
client *kgo.Client // kafka client
groupId string // our own groupId
potentiallyEmptyGroups map[string]time.Time // groupName -> utc timestamp when the group was first seen

isNotAuthorized bool // if we get a not authorized response while trying to delete old groups, this will be set to true, essentially disabling the tracker
}

func newGroupTracker(svc *Service, ctx context.Context) *groupTracker {

tracker := groupTracker{
svc: svc,
logger: svc.logger.Named("groupTracker"),
ctx: ctx,

client: svc.client,

groupId: svc.groupId,
func newGroupTracker(cfg Config, logger *zap.Logger, client *kgo.Client, groupID string) *groupTracker {
return &groupTracker{
cfg: cfg,
logger: logger.Named("groupTracker"),
client: client,
groupId: groupID,
potentiallyEmptyGroups: make(map[string]time.Time),

isNotAuthorized: false,
}

return &tracker
}

func (g *groupTracker) start() {
func (g *groupTracker) start(ctx context.Context) {
g.logger.Debug("starting group tracker")

deleteOldGroupsTicker := time.NewTicker(oldGroupCheckInterval)
// stop ticker when context is cancelled
go func() {
<-g.ctx.Done()
g.logger.Debug("stopping group tracker, context was cancelled")
deleteOldGroupsTicker.Stop()
}()

// look for old consumer groups and delete them
go func() {
for range deleteOldGroupsTicker.C {
err := g.checkAndDeleteOldConsumerGroups()
for {
select {
case <-ctx.Done():
g.logger.Debug("stopping group tracker, context was cancelled")
return
case <-deleteOldGroupsTicker.C:
childCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
err := g.checkAndDeleteOldConsumerGroups(childCtx)
if err != nil {
g.logger.Error("failed to check for old consumer groups: %w", zap.Error(err))
}
cancel()
}
}()
}

func (g *groupTracker) checkAndDeleteOldConsumerGroups() error {
if g.isNotAuthorized {
return nil
}
}

func (g *groupTracker) checkAndDeleteOldConsumerGroups(ctx context.Context) error {
groupsRq := kmsg.NewListGroupsRequest()
groupsRq.StatesFilter = []string{"Empty"}

g.logger.Debug("checking for empty kminion consumer groups...")
g.logger.Debug("checking for stale kminion consumer groups")

shardedResponse := g.client.RequestSharded(g.ctx, &groupsRq)
shardedResponse := g.client.RequestSharded(ctx, &groupsRq)

// find groups that start with the kminion prefix
matchingGroups := make([]string, 0, 10)
matchingGroups := make([]string, 0)
for _, shard := range shardedResponse {
if shard.Err != nil {
g.logger.Error("error in response to ListGroupsRequest", zap.Error(shard.Err))
g.logger.Error("error in response to ListGroupsRequest", zap.Int32("broker_id", shard.Meta.NodeID), zap.Error(shard.Err))
continue
}

Expand All @@ -108,21 +88,20 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error {
continue // skip our own consumer group
}

if strings.HasPrefix(name, g.svc.config.Consumer.GroupIdPrefix) {
if strings.HasPrefix(name, g.cfg.Consumer.GroupIdPrefix) {
matchingGroups = append(matchingGroups, name)
}
}
}

// save new (previously unseen) groups to tracker
g.logger.Debug(fmt.Sprintf("found %v matching kminion consumer groups", len(matchingGroups)), zap.Strings("groups", matchingGroups))
g.logger.Debug("checked for stale consumer groups", zap.Int("found_groups", len(matchingGroups)), zap.Strings("groups", matchingGroups))
for _, name := range matchingGroups {
_, exists := g.potentiallyEmptyGroups[name]
if !exists {
// add it with the current timestamp
now := time.Now()
g.potentiallyEmptyGroups[name] = now
g.logger.Debug("new empty kminion group, adding to tracker", zap.String("group", name), zap.Time("firstSeen", now))
g.potentiallyEmptyGroups[name] = time.Now()
g.logger.Debug("found new empty kminion group, adding it to the tracker", zap.String("group", name))
}
}

Expand Down Expand Up @@ -154,7 +133,7 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error {

deleteRq := kmsg.NewDeleteGroupsRequest()
deleteRq.Groups = groupsToDelete
deleteResp := g.client.RequestSharded(g.ctx, &deleteRq)
deleteResp := g.client.RequestSharded(ctx, &deleteRq)

// done, now just errors
// if we get a not authorized error we'll disable deleting groups
Expand Down Expand Up @@ -190,7 +169,6 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error {

if foundNotAuthorizedError {
g.logger.Info("disabling trying to delete old kminion consumer-groups since one of the last delete results had an 'GroupAuthorizationFailed' error")
g.isNotAuthorized = true
}

return nil
Expand Down
Loading

0 comments on commit 1a777bf

Please sign in to comment.