diff --git a/e2e/config_consumer.go b/e2e/config_consumer.go index 6c65fe1..8868c40 100644 --- a/e2e/config_consumer.go +++ b/e2e/config_consumer.go @@ -13,7 +13,7 @@ const ( ) type EndToEndConsumerConfig struct { - GroupId string `koanf:"groupId"` + GroupIdPrefix string `koanf:"groupIdPrefix"` RebalancingProtocol string `koanf:"rebalancingProtocol"` RoundtripSla time.Duration `koanf:"roundtripSla"` @@ -21,7 +21,7 @@ type EndToEndConsumerConfig struct { } func (c *EndToEndConsumerConfig) SetDefaults() { - c.GroupId = "kminion-end-to-end" + c.GroupIdPrefix = "kminion-end-to-end" c.RebalancingProtocol = "cooperativeSticky" c.RoundtripSla = 20 * time.Second c.CommitSla = 10 * time.Second // no idea what to use as a good default value diff --git a/e2e/consumer.go b/e2e/consumer.go index ad15b26..90966bc 100644 --- a/e2e/consumer.go +++ b/e2e/consumer.go @@ -7,7 +7,6 @@ import ( "time" "github.com/twmb/franz-go/pkg/kgo" - "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" ) @@ -26,17 +25,17 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error { } client.AssignPartitions(topic) - // todo: use minionID as part of group id - // - client.AssignGroup(s.config.Consumer.GroupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit()) - s.logger.Info("Starting to consume " + topicName) + // Create a consumer group with the prefix + groupId := fmt.Sprintf("%v-%v", s.config.Consumer.GroupIdPrefix, s.minionID) + client.AssignGroup(groupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit()) + s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", groupId)) for { select { case <-ctx.Done(): return nil default: - fetches := client.PollRecords(ctx, 10) + fetches := client.PollFetches(ctx) errors := fetches.Errors() for _, err := range errors { // Log all errors and continue afterwards as we might get errors and still have some fetch results @@ -66,24 +65,28 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error { // Commit offsets for processed messages // todo: the normal way to commit offsets with franz-go is pretty good, but in our special case // we want to do it manually, seperately for each partition, so we can track how long it took - if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil { - startCommitTimestamp := timeNowMs() + // todo: use findGroupCoordinatorID + // maybe ask travis about return value, we want to know what coordinator the offsets was committed to + // kminion probably already exposed coordinator for every group - client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { - // got commit response - if err != nil { - s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err)) - return - } + // if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil { - latencyMs := timeNowMs() - startCommitTimestamp - commitLatency := time.Duration(latencyMs * float64(time.Millisecond)) + // startCommitTimestamp := timeNowMs() - // todo: partitionID - s.onOffsetCommit(0, commitLatency) - }) - } + // client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, _ *kmsg.OffsetCommitResponse, err error) { + // // got commit response + // if err != nil { + // s.logger.Error(fmt.Sprintf("record had an error on commit: %v\n", err)) + // return + // } + + // latencyMs := timeNowMs() - startCommitTimestamp + // commitLatency := time.Duration(latencyMs * float64(time.Millisecond)) + + // s.onOffsetCommit(commitLatency) + // }) + // } } } diff --git a/e2e/producer.go b/e2e/producer.go index 02c9b02..7d25897 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -11,8 +11,9 @@ import ( ) type EndToEndMessage struct { - MinionID string `json:"minionID"` - Timestamp float64 `json:"timestamp"` + MinionID string `json:"minionID"` // unique for each running kminion instance + MessageID string `json:"messageID"` // unique for each message + Timestamp float64 `json:"timestamp"` // when the message was created, unix milliseconds } func (s *Service) produceToManagementTopic(ctx context.Context) error { @@ -32,6 +33,8 @@ func (s *Service) produceToManagementTopic(ctx context.Context) error { startTime := timeNowMs() s.endToEndMessagesProduced.Inc() + s.logger.Info("producing message...", zap.Any("record", record)) + err = s.client.Produce(ctx, record, func(r *kgo.Record, err error) { endTime := timeNowMs() ackDurationMs := endTime - startTime @@ -56,8 +59,11 @@ func (s *Service) produceToManagementTopic(ctx context.Context) error { func createEndToEndRecord(topicName string, minionID string) (*kgo.Record, error) { timestamp := timeNowMs() + msgId := uuid.NewString() + message := EndToEndMessage{ MinionID: minionID, + MessageID: msgId, Timestamp: timestamp, } mjson, err := json.Marshal(message) @@ -66,7 +72,7 @@ func createEndToEndRecord(topicName string, minionID string) (*kgo.Record, error } record := &kgo.Record{ Topic: topicName, - Key: []byte(uuid.NewString()), + // Key: []byte(msgId), Value: []byte(mjson), } diff --git a/e2e/service.go b/e2e/service.go index e5ce61c..cb1ff51 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -31,9 +31,9 @@ type Service struct { endToEndMessagesReceived prometheus.Counter endToEndMessagesCommitted prometheus.Counter - endToEndAckLatency prometheus.Histogram - endToEndRoundtripLatency prometheus.Histogram - endToEndCommitLatency prometheus.Histogram + endToEndAckLatency *prometheus.HistogramVec + endToEndRoundtripLatency *prometheus.HistogramVec + endToEndCommitLatency *prometheus.HistogramVec } // NewService creates a new instance of the e2e moinitoring service (wow) @@ -61,14 +61,14 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN Help: help, }) } - makeHistogram := func(name string, maxLatency time.Duration, help string) prometheus.Histogram { - return promauto.NewHistogram(prometheus.HistogramOpts{ + makeHistogramVec := func(name string, maxLatency time.Duration, labelNames []string, help string) *prometheus.HistogramVec { + return promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricNamespace, Subsystem: "end_to_end", Name: name, Help: help, Buckets: createHistogramBuckets(maxLatency), - }) + }, labelNames) } // Low-level info @@ -81,9 +81,9 @@ func NewService(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, metricN // Latency Histograms // More detailed info about how long stuff took // Since histograms also have an 'infinite' bucket, they can be used to detect small hickups "lost" messages - svc.endToEndAckLatency = makeHistogram("produce_latency_seconds", cfg.Producer.AckSla, "Time until we received an ack for a produced message") - svc.endToEndRoundtripLatency = makeHistogram("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, "Time it took between sending (producing) and receiving (consuming) a message") - svc.endToEndCommitLatency = makeHistogram("commit_latency_seconds", cfg.Consumer.CommitSla, "Time kafka took to respond to kminion's offset commit") + svc.endToEndAckLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partitionId"}, "Time until we received an ack for a produced message") + svc.endToEndRoundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partitionId"}, "Time it took between sending (producing) and receiving (consuming) a message") + svc.endToEndCommitLatency = makeHistogramVec("commit_latency_seconds", cfg.Consumer.CommitSla, []string{"groupCoordinator"}, "Time kafka took to respond to kminion's offset commit") return svc, nil } @@ -124,7 +124,7 @@ func (s *Service) Start(ctx context.Context) error { // called from e2e when a message is acknowledged func (s *Service) onAck(partitionId int32, duration time.Duration) { s.endToEndMessagesAcked.Inc() - s.endToEndAckLatency.Observe(duration.Seconds()) + s.endToEndAckLatency.WithLabelValues(string(partitionId)).Observe(duration.Seconds()) } // called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again) @@ -139,18 +139,18 @@ func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) { // } s.endToEndMessagesReceived.Inc() - s.endToEndRoundtripLatency.Observe(duration.Seconds()) + s.endToEndRoundtripLatency.WithLabelValues(string(partitionId)).Observe(duration.Seconds()) } // called from e2e when an offset commit is confirmed -func (s *Service) onOffsetCommit(partitionId int32, duration time.Duration) { +func (s *Service) onOffsetCommit(duration time.Duration, groupCoordinator string) { // todo: // if the commit took too long, don't count it in 'commits' but add it to the histogram? // and how do we want to handle cases where we get an error?? // should we have another metric that tells us about failed commits? or a label on the counter? - s.endToEndCommitLatency.Observe(duration.Seconds()) + s.endToEndCommitLatency.WithLabelValues(groupCoordinator).Observe(duration.Seconds()) if duration > s.config.Consumer.CommitSla { return diff --git a/e2e/topic.go b/e2e/topic.go index 96d9e89..dd4aeab 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -3,6 +3,7 @@ package e2e import ( "context" "fmt" + "strings" "time" "github.com/twmb/franz-go/pkg/kmsg" @@ -217,15 +218,59 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse, return res, nil } +func (s *Service) checkAndDeleteOldConsumerGroups(ctx context.Context) error { + var groupsRq kmsg.ListGroupsRequest + groupsRq.Default() + groupsRq.StatesFilter = []string{"Empty"} + + s.logger.Info("checking for empty consumer groups with kminion prefix...") + + shardedResponse := s.client.RequestSharded(ctx, &groupsRq) + errorCount := 0 + + matchingGroups := make([]string, 0, 10) + + for _, responseShard := range shardedResponse { + if responseShard.Err != nil { + errorCount++ + s.logger.Error("error in response to ListGroupsRequest", zap.Error(responseShard.Err)) + continue + } + + r, ok := responseShard.Resp.(*kmsg.ListGroupsResponse) + if !ok { + s.logger.Error("cannot cast responseShard.Resp to kmsg.ListGroupsResponse") + errorCount++ + continue + } + + for _, group := range r.Groups { + name := group.Group + if strings.HasPrefix(name, s.config.Consumer.GroupIdPrefix) { + matchingGroups = append(matchingGroups, name) + } + } + } + + s.logger.Info(fmt.Sprintf("found %v matching consumer groups", len(matchingGroups))) + for i, name := range matchingGroups { + s.logger.Info(fmt.Sprintf("consumerGroups %v: %v", i, name)) + } + + return nil +} + func (s *Service) initEndToEnd(ctx context.Context) { validateTopicTicker := time.NewTicker(s.config.TopicManagement.ReconciliationInterval) produceTicker := time.NewTicker(s.config.ProbeInterval) + deleteOldGroupsTicker := time.NewTicker(5 * time.Second) // stop tickers when context is cancelled go func() { <-ctx.Done() produceTicker.Stop() validateTopicTicker.Stop() + deleteOldGroupsTicker.Stop() }() // keep checking end-to-end topic @@ -238,6 +283,15 @@ func (s *Service) initEndToEnd(ctx context.Context) { } }() + // look for old consumer groups and delete them + go func() { + for range deleteOldGroupsTicker.C { + err := s.checkAndDeleteOldConsumerGroups(ctx) + if err != nil { + s.logger.Error("failed to check for old consumer groups: %w", zap.Error(err)) + } + } + }() // start consuming topic go s.ConsumeFromManagementTopic(ctx) diff --git a/main.go b/main.go index a52da3b..2d85723 100644 --- a/main.go +++ b/main.go @@ -64,9 +64,11 @@ func main() { if err != nil { logger.Fatal("failed to setup minion service", zap.Error(err)) } - err = minionSvc.Start(ctx) - if err != nil { - logger.Fatal("failed to start minion service", zap.Error(err)) + if false { + err = minionSvc.Start(ctx) + if err != nil { + logger.Fatal("failed to start minion service", zap.Error(err)) + } } // Create end to end testing service