Skip to content

Commit

Permalink
Add timeout for producing records
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 11, 2021
1 parent e699e83 commit d0348bd
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ func (s *Service) produceMessage(ctx context.Context, partition int) {

startTime := time.Now()

// This childCtx will ensure that we will abort our efforts to produce (including retries) when we exceed
// the SLA for producers.
childCtx, cancel := context.WithTimeout(ctx, s.config.Producer.AckSla)
defer cancel()

s.endToEndMessagesProducedInFlight.Inc()
s.client.Produce(ctx, record, func(r *kgo.Record, err error) {
s.client.Produce(childCtx, record, func(r *kgo.Record, err error) {
ackDuration := time.Since(startTime)
s.endToEndMessagesProducedInFlight.Dec()
s.endToEndMessagesProducedTotal.Inc()
Expand Down

0 comments on commit d0348bd

Please sign in to comment.