From d0348bd361b43a7e1aec4e461e171b02aa6e3514 Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim Date: Wed, 11 Aug 2021 09:47:38 +0200 Subject: [PATCH] Add timeout for producing records --- e2e/producer.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/e2e/producer.go b/e2e/producer.go index 0ba5c69..8438726 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -27,8 +27,13 @@ func (s *Service) produceMessage(ctx context.Context, partition int) { startTime := time.Now() + // This childCtx will ensure that we will abort our efforts to produce (including retries) when we exceed + // the SLA for producers. + childCtx, cancel := context.WithTimeout(ctx, s.config.Producer.AckSla) + defer cancel() + s.endToEndMessagesProducedInFlight.Inc() - s.client.Produce(ctx, record, func(r *kgo.Record, err error) { + s.client.Produce(childCtx, record, func(r *kgo.Record, err error) { ackDuration := time.Since(startTime) s.endToEndMessagesProducedInFlight.Dec() s.endToEndMessagesProducedTotal.Inc()