Skip to content

Commit

Permalink
Refactor produce methods so that it is no longer sequential
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 10, 2021
1 parent 778e87c commit a2daa17
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 40 deletions.
54 changes: 16 additions & 38 deletions e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,62 +11,40 @@ import (
"go.uber.org/zap"
)

type EndToEndMessage struct {
MinionID string `json:"minionID"` // unique for each running kminion instance
MessageID string `json:"messageID"` // unique for each message
Timestamp int64 `json:"createdUtcNs"` // when the message was created, unix nanoseconds

partition int // used in message tracker
hasArrived bool // used in tracker
}

func (m *EndToEndMessage) creationTime() time.Time {
return time.Unix(0, m.Timestamp)
}

// Sends a EndToEndMessage to every partition
func (s *Service) produceLatencyMessages(ctx context.Context) {
// produceMessagesToAllPartitions sends an EndToEndMessage to every partition on the given topic
func (s *Service) produceMessagesToAllPartitions(ctx context.Context) {
for i := 0; i < s.partitionCount; i++ {
err := s.produceSingleMessage(ctx, i)
if err != nil {
s.logger.Error("failed to produce message to end-to-end topic",
zap.String("topic_name", s.config.TopicManagement.Name),
zap.Int("partition", i),
zap.Error(err))
}
s.produceMessage(ctx, i)
}
}

func (s *Service) produceSingleMessage(ctx context.Context, partition int) error {
// produceMessage produces an end to end record to a single given partition. If it succeeds producing the record
// it will add it to the message tracker. If producing fails a message will be logged and the respective metrics
// will be incremented.
func (s *Service) produceMessage(ctx context.Context, partition int) {
topicName := s.config.TopicManagement.Name
record, msg := createEndToEndRecord(s.minionID, topicName, partition)

startTime := time.Now()

errCh := make(chan error)
s.endToEndMessagesProducedInFlight.Inc()
s.client.Produce(ctx, record, func(r *kgo.Record, err error) {
ackDuration := time.Since(startTime)
s.endToEndMessagesProducedInFlight.Dec()
s.endToEndMessagesProducedTotal.Inc()

errCh <- err

if err == nil {
s.endToEndAckLatency.WithLabelValues(strconv.Itoa(int(r.Partition))).Observe(ackDuration.Seconds())
s.messageTracker.addToTracker(msg)
} else {
if err != nil {
s.endToEndMessagesProducedFailed.Inc()
s.logger.Info("failed to produce message to end-to-end topic",
zap.String("topic_name", r.Topic),
zap.Int32("partition", r.Partition),
zap.Error(err))
return
}
})

err := <-errCh
if err != nil {
s.logger.Error("error producing record", zap.Error(err))
return err
}
return nil

s.endToEndAckLatency.WithLabelValues(strconv.Itoa(int(r.Partition))).Observe(ackDuration.Seconds())
s.messageTracker.addToTracker(msg)
})
}

func createEndToEndRecord(minionID string, topicName string, partition int) (*kgo.Record, *EndToEndMessage) {
Expand Down
6 changes: 4 additions & 2 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Service struct {
endToEndMessagesProducedFailed prometheus.Counter
endToEndMessagesReceived prometheus.Counter
endToEndCommits prometheus.Counter
lostMessages prometheus.Counter

endToEndAckLatency *prometheus.HistogramVec
endToEndRoundtripLatency *prometheus.HistogramVec
Expand Down Expand Up @@ -122,7 +123,8 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k
svc.endToEndMessagesProducedTotal = makeCounter("messages_produced_total", "Number of all messages produced to Kafka. This counter will be incremented when we receive a response (failure/timeout or success) from Kafka")
svc.endToEndMessagesProducedFailed = makeCounter("messages_produced_failed_total", "Number of messages failed to produce to Kafka because of a timeout or failure")
svc.endToEndMessagesReceived = makeCounter("messages_received_total", "Number of *matching* messages kminion received. Every roundtrip message has a minionID (randomly generated on startup) and a timestamp. Kminion only considers a message a match if it it arrives within the configured roundtrip SLA (and it matches the minionID)")
svc.endToEndCommits = makeCounter("commits_total", "Counts how many times kminions end-to-end test has committed messages")
svc.endToEndCommits = makeCounter("offset_commits_total", "Counts how many times kminions end-to-end test has committed offsets")
svc.lostMessages = makeCounter("messages_lost_total", "Number of messages that have been produced successfully but not received within the configured SLA duration")

// Latency Histograms
// More detailed info about how long stuff took
Expand Down Expand Up @@ -185,7 +187,7 @@ func (s *Service) startProducer(ctx context.Context) {
case <-ctx.Done():
return
case <-produceTicker.C:
s.produceLatencyMessages(ctx)
s.produceMessagesToAllPartitions(ctx)
}
}
}
Expand Down

0 comments on commit a2daa17

Please sign in to comment.