Skip to content

Commit

Permalink
Fix offset commit metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 16, 2021
1 parent ba20914 commit d38052f
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 68 deletions.
26 changes: 18 additions & 8 deletions e2e/client_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ type clientHooks struct {
}

func newEndToEndClientHooks(logger *zap.Logger) *clientHooks {

return &clientHooks{
logger: logger.Named("e2e_hooks"),
currentCoordinator: &atomic.Value{},
}
}

func (c *clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) {
func (c *clientHooks) OnBrokerConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) {
if err != nil {
c.logger.Error("kafka connection failed", zap.String("broker_host", meta.Host), zap.Int32("broker_id", meta.NodeID), zap.Error(err))
return
Expand All @@ -41,7 +40,7 @@ func (c *clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
zap.String("host", meta.Host))
}

// OnWrite is passed the broker metadata, the key for the request that
// OnBrokerWrite is passed the broker metadata, the key for the request that
// was written, the number of bytes written, how long the request
// waited before being written, how long it took to write the request,
// and any error.
Expand All @@ -50,7 +49,7 @@ func (c *clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
// OnWrite is called after a write to a broker.
//
// OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error)
func (c *clientHooks) OnWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
func (c *clientHooks) OnBrokerWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
keyName := kmsg.NameForKey(key)
if keyName != "OffsetCommit" {
return
Expand All @@ -61,17 +60,28 @@ func (c *clientHooks) OnWrite(meta kgo.BrokerMetadata, key int16, bytesWritten i
// zap.NamedError("err", err))
}

// OnRead is passed the broker metadata, the key for the response that
// OnBrokerRead is passed the broker metadata, the key for the response that
// was read, the number of bytes read, how long the Client waited
// before reading the response, how long it took to read the response,
// and any error.
//
// The bytes written does not count any tls overhead.
// OnRead is called after a read from a broker.
// OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
func (c *clientHooks) OnRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
expectedRes := kmsg.NewOffsetCommitResponse()
if key != expectedRes.Key() {
func (c *clientHooks) OnBrokerRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
offsetCommitRes := kmsg.NewOffsetCommitResponse()
joinGroupRes := kmsg.NewJoinGroupResponse()
heartbeatRes := kmsg.NewHeartbeatResponse()
syncGroupRes := kmsg.NewSyncGroupResponse()
consumerGroupMsgKeys := []int16{
offsetCommitRes.Key(),
joinGroupRes.Key(),
heartbeatRes.Key(),
syncGroupRes.Key(),
}

isMessageFromGroupCoordinator := isInArray(key, consumerGroupMsgKeys)
if !isMessageFromGroupCoordinator {
return
}

Expand Down
26 changes: 17 additions & 9 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"context"
"encoding/json"
"strconv"
"time"

"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -46,19 +47,26 @@ func (s *Service) commitOffsets(ctx context.Context) {
}

startCommitTimestamp := time.Now()
client.CommitOffsets(ctx, uncommittedOffset, func(_ *kgo.Client, req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
// Got commit response

childCtx, cancel := context.WithTimeout(ctx, s.config.Consumer.CommitSla)
client.CommitOffsets(childCtx, uncommittedOffset, func(_ *kgo.Client, req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
cancel()

coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata)
coordinatorID := strconv.Itoa(int(coordinator.NodeID))

latency := time.Since(startCommitTimestamp)
s.offsetCommitLatency.WithLabelValues(coordinatorID).Observe(latency.Seconds())
s.offsetCommitsTotal.WithLabelValues(coordinatorID).Inc()
// We do this to ensure that a series with that coordinator id is initialized
s.offsetCommitsTotal.WithLabelValues(coordinatorID).Add(0)

if s.logCommitErrors(r, err) > 0 {
// If we have at least one error in our commit response we want to report it as an error with an appropriate
// reason as label.
if errCode := s.logCommitErrors(r, err); errCode != "" {
s.offsetCommitsFailedTotal.WithLabelValues(coordinatorID, errCode).Inc()
return
}

// only report commit latency if the coordinator wasn't set too long ago
if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second {
coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata)
s.onOffsetCommit(coordinator.NodeID, latency)
}
})
}

Expand Down
2 changes: 1 addition & 1 deletion e2e/message_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
// message arrived early enough
pID := strconv.Itoa(msg.partition)
t.svc.messagesReceived.WithLabelValues(pID).Inc()
t.svc.endToEndRoundtripLatency.WithLabelValues(pID).Observe(latency.Seconds())
t.svc.roundtripLatency.WithLabelValues(pID).Observe(latency.Seconds())

// We mark the message as arrived so that we won't mark the message as lost and overwrite that modified message
// into the cache.
Expand Down
2 changes: 1 addition & 1 deletion e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {
return
}

s.endToEndAckLatency.WithLabelValues(pID).Observe(ackDuration.Seconds())
s.produceLatency.WithLabelValues(pID).Observe(ackDuration.Seconds())
})
}

Expand Down
46 changes: 11 additions & 35 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ type Service struct {
messagesProducedTotal *prometheus.CounterVec
messagesProducedFailed *prometheus.CounterVec
messagesReceived *prometheus.CounterVec
offsetCommits prometheus.Counter
offsetCommitsTotal *prometheus.CounterVec
offsetCommitsFailedTotal *prometheus.CounterVec
lostMessages *prometheus.CounterVec

endToEndAckLatency *prometheus.HistogramVec
endToEndRoundtripLatency *prometheus.HistogramVec
endToEndCommitLatency *prometheus.HistogramVec
produceLatency *prometheus.HistogramVec
roundtripLatency *prometheus.HistogramVec
offsetCommitLatency *prometheus.HistogramVec
}

// NewService creates a new instance of the e2e moinitoring service (wow)
Expand Down Expand Up @@ -92,14 +93,6 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
svc.groupTracker = newGroupTracker(cfg, logger, client, groupID)
svc.messageTracker = newMessageTracker(svc)

makeCounter := func(name string, help string) prometheus.Counter {
return promauto.NewCounter(prometheus.CounterOpts{
Namespace: metricNamespace,
Subsystem: "end_to_end",
Name: name,
Help: help,
})
}
makeCounterVec := func(name string, labelNames []string, help string) *prometheus.CounterVec {
return promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: metricNamespace,
Expand Down Expand Up @@ -132,15 +125,16 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
svc.messagesProducedTotal = makeCounterVec("messages_produced_total", []string{"partition_id"}, "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka")
svc.messagesProducedFailed = makeCounterVec("messages_produced_failed_total", []string{"partition_id"}, "Number of messages failed to produce to Kafka because of a timeout or failure")
svc.messagesReceived = makeCounterVec("messages_received_total", []string{"partition_id"}, "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
svc.offsetCommits = makeCounter("offset_commits_total", "Counts how many times kminions end-to-end test has committed offsets")
svc.offsetCommitsTotal = makeCounterVec("offset_commits_total", []string{"coordinator_id"}, "Counts how many times kminions end-to-end test has committed offsets")
svc.offsetCommitsFailedTotal = makeCounterVec("offset_commits_failed_total", []string{"coordinator_id", "reason"}, "Number of offset commits that returned an error or timed out")
svc.lostMessages = makeCounterVec("messages_lost_total", []string{"partition_id"}, "Number of messages that have been produced successfully but not received within the configured SLA duration")

// Latency Histograms
// More detailed info about how long stuff took
// Since histograms also have an 'infinite' bucket, they can be used to detect small hickups "lost" messages
svc.endToEndAckLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partition_id"}, "Time until we received an ack for a produced message")
svc.endToEndRoundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partition_id"}, "Time it took between sending (producing) and receiving (consuming) a message")
svc.endToEndCommitLatency = makeHistogramVec("commit_latency_seconds", cfg.Consumer.CommitSla, []string{"group_coordinator_broker_id"}, "Time kafka took to respond to kminion's offset commit")
svc.produceLatency = makeHistogramVec("produce_latency_seconds", cfg.Producer.AckSla, []string{"partition_id"}, "Time until we received an ack for a produced message")
svc.roundtripLatency = makeHistogramVec("roundtrip_latency_seconds", cfg.Consumer.RoundtripSla, []string{"partition_id"}, "Time it took between sending (producing) and receiving (consuming) a message")
svc.offsetCommitLatency = makeHistogramVec("offset_commit_latency_seconds", cfg.Consumer.CommitSla, []string{"coordinator_id"}, "Time kafka took to respond to kminion's offset commit")

return svc, nil
}
Expand Down Expand Up @@ -191,7 +185,7 @@ func (s *Service) Start(ctx context.Context) error {
return nil
}
}

go s.startOffsetCommits(ctx)
go s.startProducer(ctx)

// keep track of groups, delete old unused groups
Expand Down Expand Up @@ -243,22 +237,4 @@ func (s *Service) startOffsetCommits(ctx context.Context) {
s.commitOffsets(ctx)
}
}

}

// called from e2e when an offset commit is confirmed
func (s *Service) onOffsetCommit(brokerId int32, duration time.Duration) {

// todo:
// if the commit took too long, don't count it in 'commits' but add it to the histogram?
// and how do we want to handle cases where we get an error??
// should we have another metric that tells us about failed commits? or a label on the counter?
brokerIdStr := fmt.Sprintf("%v", brokerId)
s.endToEndCommitLatency.WithLabelValues(brokerIdStr).Observe(duration.Seconds())

if duration > s.config.Consumer.CommitSla {
return
}

s.offsetCommits.Inc()
}
45 changes: 31 additions & 14 deletions e2e/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package e2e

import (
"context"
"math"
"time"

Expand Down Expand Up @@ -33,29 +34,36 @@ func containsStr(ar []string, x string) (bool, int) {
return false, -1
}

// logs all errors, returns number of errors
func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) int {
// logCommitErrors logs all errors in commit response and returns a well formatted error code if there was one
func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) string {
if err != nil {
s.logger.Error("offset commit failed", zap.Error(err))
return 1
if err == context.DeadlineExceeded {
s.logger.Warn("offset commit failed because SLA has been exceeded")
return "OFFSET_COMMIT_SLA_EXCEEDED"
}

s.logger.Warn("offset commit failed", zap.Error(err))
return "RESPONSE_ERROR"
}

errCount := 0
lastErrCode := ""
for _, t := range r.Topics {
for _, p := range t.Partitions {
err := kerr.ErrorForCode(p.ErrorCode)
if err != nil {
s.logger.Error("error committing partition offset",
zap.String("topic", t.Topic),
zap.Int32("partition_id", p.Partition),
zap.Error(err),
)
errCount++
typedErr := kerr.TypedErrorForCode(p.ErrorCode)
if typedErr == nil {
continue
}

s.logger.Warn("error committing partition offset",
zap.String("topic", t.Topic),
zap.Int32("partition_id", p.Partition),
zap.Error(typedErr),
)
lastErrCode = typedErr.Message
}
}

return errCount
return lastErrCode
}

func safeUnwrap(err error) string {
Expand All @@ -64,3 +72,12 @@ func safeUnwrap(err error) string {
}
return err.Error()
}

func isInArray(num int16, arr []int16) bool {
for _, n := range arr {
if num == n {
return true
}
}
return false
}

0 comments on commit d38052f

Please sign in to comment.