diff --git a/e2e/producer.go b/e2e/producer.go index 0ba5c69..8438726 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -27,8 +27,13 @@ func (s *Service) produceMessage(ctx context.Context, partition int) { startTime := time.Now() + // This childCtx will ensure that we will abort our efforts to produce (including retries) when we exceed + // the SLA for producers. + childCtx, cancel := context.WithTimeout(ctx, s.config.Producer.AckSla) + defer cancel() + s.endToEndMessagesProducedInFlight.Inc() - s.client.Produce(ctx, record, func(r *kgo.Record, err error) { + s.client.Produce(childCtx, record, func(r *kgo.Record, err error) { ackDuration := time.Since(startTime) s.endToEndMessagesProducedInFlight.Dec() s.endToEndMessagesProducedTotal.Inc()