Skip to content

Commit

Permalink
put deleting old consumer groups into its own file (group_tracker.go)
Browse files Browse the repository at this point in the history
  • Loading branch information
rikimaru0345 committed May 17, 2021
1 parent e36eb71 commit e1a8788
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 79 deletions.
89 changes: 71 additions & 18 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package e2e
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
)

Expand All @@ -25,10 +27,52 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
}
client.AssignPartitions(topic)

// Create a consumer group with the prefix
groupId := fmt.Sprintf("%v-%v", s.config.Consumer.GroupIdPrefix, s.minionID)
client.AssignGroup(groupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", groupId))
// Create our own consumer group
client.AssignGroup(s.groupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", s.groupId))

// Keep checking for the coordinator
var currentCoordinator atomic.Value
currentCoordinator.Store(kgo.BrokerMetadata{})

go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return

case <-ticker.C:
describeReq := kmsg.NewDescribeGroupsRequest()
describeReq.Groups = []string{s.groupId}
describeReq.IncludeAuthorizedOperations = false

shards := client.RequestSharded(ctx, &describeReq)
for _, shard := range shards {
// since we're only interested in the coordinator, we only check for broker errors on the response that contains our group
response, ok := shard.Resp.(*kmsg.DescribeGroupsResponse)
if !ok {
s.logger.Warn("cannot cast shard response to DescribeGroupsResponse")
continue
}
if len(response.Groups) == 0 {
s.logger.Warn("DescribeGroupsResponse contained no groups")
continue
}
group := response.Groups[0]
groupErr := kerr.ErrorForCode(group.ErrorCode)
if groupErr != nil {
s.logger.Error("couldn't describe end-to-end consumer group, error in group", zap.Error(groupErr), zap.Any("broker", shard.Meta))
continue
}

currentCoordinator.Store(shard.Meta)
break
}
}
}
}()

for {
select {
Expand Down Expand Up @@ -70,23 +114,32 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
// maybe ask travis about return value, we want to know what coordinator the offsets was committed to
// kminion probably already exposed coordinator for every group

// if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {
if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {

// startCommitTimestamp := timeNowMs()
startCommitTimestamp := timeNowMs()

// client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) {
// // got commit response
// if err != nil {
// s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err))
// return
// }
client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
// got commit response
latencyMs := timeNowMs() - startCommitTimestamp
commitLatency := time.Duration(latencyMs * float64(time.Millisecond))

// latencyMs := timeNowMs() - startCommitTimestamp
// commitLatency := time.Duration(latencyMs * float64(time.Millisecond))
if err != nil {
s.logger.Error("offset commit failed", zap.Error(err), zap.Int64("latencyMilliseconds", commitLatency.Milliseconds()))
return
}

// s.onOffsetCommit(commitLatency)
// })
// }
// todo: check each partitions error code

// only report commit latency if the coordinator is known
coordinator := currentCoordinator.Load().(kgo.BrokerMetadata)
if len(coordinator.Host) > 0 {
s.onOffsetCommit(commitLatency, coordinator.Host)
} else {
s.logger.Warn("won't report commit latency since broker coordinator is still unknown", zap.Int64("latencyMilliseconds", commitLatency.Milliseconds()))
}

})
}
}
}

Expand Down
197 changes: 197 additions & 0 deletions e2e/group_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package e2e

import (
"context"
"fmt"
"strings"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
)

const (
oldGroupCheckInterval = 5 * time.Second // how often to check for old kminion groups
oldGroupMaxAge = 20 * time.Second // maximum age after which an old group should be deleted
)

// groupTracker keeps checking for empty consumerGroups matching the kminion prefix.
// When a group was seen empty for some time, we delete it.
// Why?
// Whenever a kminion instance starts up it creates a consumer-group for itself in order to not "collide" with other kminion instances.
// When an instance restarts (for whatever reason), it creates a new group again, so we'd end up with a lot of unused groups.
type groupTracker struct {
svc *Service // used to obtain stuff like logger, kafka client, ...
logger *zap.Logger
ctx context.Context // cancellation context

client *kgo.Client // kafka client

groupId string // our own groupId
potentiallyEmptyGroups map[string]time.Time // groupName -> utc timestamp when the group was first seen

isNotAuthorized bool
}

func newGroupTracker(svc *Service, ctx context.Context) *groupTracker {

tracker := groupTracker{
svc: svc,
logger: svc.logger.Named("groupTracker"),
ctx: ctx,

client: svc.client,

groupId: svc.groupId,
potentiallyEmptyGroups: make(map[string]time.Time),

isNotAuthorized: false,
}

return &tracker
}

func (g *groupTracker) start() {
g.logger.Debug("starting group tracker")

deleteOldGroupsTicker := time.NewTicker(oldGroupCheckInterval)
// stop ticker when context is cancelled
go func() {
<-g.ctx.Done()
g.logger.Debug("stopping group tracker, context was cancelled")
deleteOldGroupsTicker.Stop()
}()

// look for old consumer groups and delete them
go func() {
for range deleteOldGroupsTicker.C {
err := g.checkAndDeleteOldConsumerGroups()
if err != nil {
g.logger.Error("failed to check for old consumer groups: %w", zap.Error(err))
}
}
}()
}

func (g *groupTracker) checkAndDeleteOldConsumerGroups() error {
if g.isNotAuthorized {
return nil
}

groupsRq := kmsg.NewListGroupsRequest()
groupsRq.StatesFilter = []string{"Empty"}

g.logger.Debug("checking for empty kminion consumer groups...")

shardedResponse := g.client.RequestSharded(g.ctx, &groupsRq)

// find groups that start with the kminion prefix
matchingGroups := make([]string, 0, 10)
for _, shard := range shardedResponse {
if shard.Err != nil {
g.logger.Error("error in response to ListGroupsRequest", zap.Error(shard.Err))
continue
}

r, ok := shard.Resp.(*kmsg.ListGroupsResponse)
if !ok {
g.logger.Error("cannot cast responseShard.Resp to kmsg.ListGroupsResponse")
continue
}

for _, group := range r.Groups {
name := group.Group

if name == g.groupId {
continue // skip our own consumer group
}

if strings.HasPrefix(name, g.svc.config.Consumer.GroupIdPrefix) {
matchingGroups = append(matchingGroups, name)
}
}
}

// save new (previously unseen) groups to tracker
g.logger.Debug(fmt.Sprintf("found %v matching kminion consumer groups", len(matchingGroups)), zap.Strings("groups", matchingGroups))
for _, name := range matchingGroups {
_, exists := g.potentiallyEmptyGroups[name]
if !exists {
// add it with the current timestamp
now := time.Now().UTC()
g.potentiallyEmptyGroups[name] = now
g.logger.Debug("new empty kminion group, adding to tracker", zap.String("group", name), zap.Time("firstSeen", now))
}
}

// go through saved groups:
// - don't track the ones we don't see anymore (bc they got deleted or are not empty anymore)
// - mark the ones that are too old (have been observed as empty for too long)
groupsToDelete := make([]string, 0)
for name, firstSeen := range g.potentiallyEmptyGroups {
exists, _ := containsStr(matchingGroups, name)
if exists {
// still there, check age and maybe delete it
age := time.Now().UTC().Sub(firstSeen)
if age > oldGroupMaxAge {
// group was unused for too long, delete it
groupsToDelete = append(groupsToDelete, name)
delete(g.potentiallyEmptyGroups, name)
}
} else {
// does not exist anymore, it must have been deleted, or is in use now (no longer empty)
// don't track it anymore
delete(g.potentiallyEmptyGroups, name)
}
}

// actually delete the groups we've decided to delete
if len(groupsToDelete) == 0 {
return nil
}

deleteRq := kmsg.NewDeleteGroupsRequest()
deleteRq.Groups = groupsToDelete
deleteResp := g.client.RequestSharded(g.ctx, &deleteRq)

// done, now just errors
// if we get a not authorized error we'll disable deleting groups
foundNotAuthorizedError := false
deletedGroups := make([]string, 0)
for _, shard := range deleteResp {
if shard.Err != nil {
g.logger.Error("sharded consumer group delete request failed", zap.Error(shard.Err))
continue
}

resp, ok := shard.Resp.(*kmsg.DeleteGroupsResponse)
if !ok {
g.logger.Error("failed to cast shard response to DeleteGroupsResponse while handling an error for deleting groups", zap.String("shardHost", shard.Meta.Host), zap.NamedError("shardError", shard.Err))
continue
}

for _, groupResp := range resp.Groups {
err := kerr.ErrorForCode(groupResp.ErrorCode)
if err != nil {
g.logger.Error("failed to delete consumer group", zap.String("shard", shard.Meta.Host), zap.String("group", groupResp.Group), zap.Error(err))

if groupResp.ErrorCode == kerr.GroupAuthorizationFailed.Code {
foundNotAuthorizedError = true
}

} else {
deletedGroups = append(deletedGroups, groupResp.Group)
}
}
}
g.logger.Info("deleted old consumer groups", zap.Strings("deletedGroups", deletedGroups))

if foundNotAuthorizedError {
g.logger.Info("disabling trying to delete old kminion consumer-groups since one of the last delete results had an 'GroupAuthorizationFailed' error")
g.isNotAuthorized = true
}

return nil
}
2 changes: 1 addition & 1 deletion e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s *Service) produceToManagementTopic(ctx context.Context) error {
startTime := timeNowMs()
s.endToEndMessagesProduced.Inc()

s.logger.Info("producing message...", zap.Any("record", record))
s.logger.Debug("producing message...", zap.Any("record", record))

err = s.client.Produce(ctx, record, func(r *kgo.Record, err error) {
endTime := timeNowMs()
Expand Down
19 changes: 14 additions & 5 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ type Service struct {
client *kgo.Client

// Service
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
lastRoundtripTimestamp float64 // creation time (in utc ms) of the message that most recently passed the roundtripSla check
minionID string // unique identifier, reported in metrics, in case multiple instances run at the same time
groupId string // our own consumer group
groupTracker *groupTracker // tracks consumer groups starting with the kminion prefix and deletes them if they are unused for some time

// todo: tracker for in-flight messages
// lastRoundtripTimestamp float64 // creation time (in utc ms) of the message that most recently passed the roundtripSla check

// Metrics
endToEndMessagesProduced prometheus.Counter
Expand All @@ -44,15 +48,20 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN
return nil, fmt.Errorf("failed to create kafka client for e2e: %w", err)
}

minionId := uuid.NewString()

svc := &Service{
config: cfg,
logger: logger.With(zap.String("source", "end_to_end")),
kafkaSvc: kafkaSvc,
client: client,

minionID: uuid.NewString(),
minionID: minionId,
groupId: fmt.Sprintf("%v-%v", cfg.Consumer.GroupIdPrefix, minionId),
}

svc.groupTracker = newGroupTracker(svc, ctx)

makeCounter := func(name string, help string) prometheus.Counter {
return promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricNamespace,
Expand Down Expand Up @@ -124,7 +133,7 @@ func (s *Service) Start(ctx context.Context) error {
// called from e2e when a message is acknowledged
func (s *Service) onAck(partitionId int32, duration time.Duration) {
s.endToEndMessagesAcked.Inc()
s.endToEndAckLatency.WithLabelValues(string(partitionId)).Observe(duration.Seconds())
s.endToEndAckLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds())
}

// called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again)
Expand All @@ -139,7 +148,7 @@ func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) {
// }

s.endToEndMessagesReceived.Inc()
s.endToEndRoundtripLatency.WithLabelValues(string(partitionId)).Observe(duration.Seconds())
s.endToEndRoundtripLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds())
}

// called from e2e when an offset commit is confirmed
Expand Down
Loading

0 comments on commit e1a8788

Please sign in to comment.