Skip to content

Commit

Permalink
remove old methods; extract 'processMessage' for a small cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
rikimaru0345 committed Apr 29, 2021
1 parent c8e360a commit 9813ad2
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 38 deletions.
66 changes: 30 additions & 36 deletions minion/endtoend_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
errors := fetches.Errors()
for _, err := range errors {
// Log all errors and continue afterwards as we might get errors and still have some fetch results
s.logger.Error("failed to fetch records from kafka",
s.logger.Error("kafka fetch error",
zap.String("topic", err.Topic),
zap.Int32("partition", err.Partition),
zap.Error(err.Err))
Expand All @@ -59,34 +59,13 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
continue
}

// Deserialize message
var msg TopicManagementRecord
if jerr := json.Unmarshal(record.Value, &msg); jerr != nil {
continue // failed, maybe sent by an older version?
}

if msg.MinionID != s.minionID {
continue // we didn't send this message
}

if msg.Timestamp < s.lastRoundtripTimestamp {
continue // received an older message
}

latencyMs := receiveTimestamp - msg.Timestamp
if latencyMs > s.Cfg.EndToEnd.Consumer.RoundtripSla.Milliseconds() {
s.endToEndWithinRoundtripSla.Set(0) // we're no longer within the roundtrip sla
continue // message is too old
}

// Message is a match and arrived in time!
s.lastRoundtripTimestamp = msg.Timestamp
s.endToEndMessagesReceived.Inc()
s.endToEndRoundtripLatency.Observe(float64(latencyMs) / 1000)
s.processMessage(record, receiveTimestamp)
}

//
// Commit offsets for processed messages
// todo:
// - do we need to keep track of what offset to commit for which partition??
if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {

startCommitTimestamp := timeNowMs()
Expand Down Expand Up @@ -114,18 +93,33 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {

}

func (s *Service) ConsumeDurationMs(ctx context.Context) (int64, bool) {
ms, exists := s.getCachedItem("end_to_end_consume_duration")
if exists {
return ms.(int64), true
// todo: extract whole end-to-end feature into its own package
// todo: then also create a "tracker" that knows about in-flight messages, and the latest successful roundtrips

// processMessage takes a message and:
// - checks if it matches minionID and latency
// - updates metrics accordingly
func (s *Service) processMessage(record *kgo.Record, receiveTimestamp int64) {
var msg EndToEndMessage
if jerr := json.Unmarshal(record.Value, &msg); jerr != nil {
return // maybe older version
}

if msg.MinionID != s.minionID {
return // not from us
}
return 0, false
}

func (s *Service) OffsetCommitAvailability(ctx context.Context) bool {
ok, exists := s.getCachedItem("end_to_end_consumer_offset_availability")
if !exists {
return false
if msg.Timestamp < s.lastRoundtripTimestamp {
return // msg older than what we recently processed (out of order, should never happen)
}
return ok.(bool)

latencyMs := receiveTimestamp - msg.Timestamp
if latencyMs > s.Cfg.EndToEnd.Consumer.RoundtripSla.Milliseconds() {
s.endToEndWithinRoundtripSla.Set(0)
return // too late!
}

s.lastRoundtripTimestamp = msg.Timestamp
s.endToEndMessagesReceived.Inc()
s.endToEndRoundtripLatency.Observe(float64(latencyMs) / 1000)
}
4 changes: 2 additions & 2 deletions minion/endtoend_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
)

type TopicManagementRecord struct {
type EndToEndMessage struct {
MinionID string `json:"minionID"`
Timestamp int64 `json:"timestamp"`
}
Expand Down Expand Up @@ -60,7 +60,7 @@ func (s *Service) produceToManagementTopic(ctx context.Context) error {
func createEndToEndRecord(topicName string, minionID string) (*kgo.Record, error) {

timestamp := timeNowMs()
message := TopicManagementRecord{
message := EndToEndMessage{
MinionID: minionID,
Timestamp: timestamp,
}
Expand Down

0 comments on commit 9813ad2

Please sign in to comment.