diff --git a/e2e/service.go b/e2e/service.go index be6f23b..5631405 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -62,7 +62,8 @@ func NewService(ctx context.Context, cfg Config, logger *zap.Logger, kafkaSvc *k kgo.ConsumerGroup(groupID), kgo.ConsumeTopics(cfg.TopicManagement.Name), kgo.Balancers(kgo.CooperativeStickyBalancer()), - kgo.DisableAutoCommit()) + kgo.DisableAutoCommit(), + kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd())) // Prepare hooks hooks := newEndToEndClientHooks(logger) @@ -213,16 +214,6 @@ func (s *Service) startOffsetCommits(ctx context.Context) { } -// called from e2e when a message completes a roundtrip (send to kafka, receive msg from kafka again) -func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) { - if duration > s.config.Consumer.RoundtripSla { - return // message is too old - } - - s.messagesReceived.Inc() - s.endToEndRoundtripLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds()) -} - // called from e2e when an offset commit is confirmed func (s *Service) onOffsetCommit(brokerId int32, duration time.Duration) {