Skip to content

Commit

Permalink
Add clarifying log message during consumer init
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 16, 2021
1 parent f6ab00e commit 1545c20
Showing 1 changed file with 2 additions and 7 deletions.
9 changes: 2 additions & 7 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,12 @@ func (s *Service) Start(ctx context.Context) error {
// consumer is ready. Only if the consumer is ready we want to start the producer to ensure that we will not
// miss messages because the consumer wasn't ready.
initCh := make(chan bool)
s.logger.Info("initializing consumer and waiting until it has received the first messages")
s.logger.Info("initializing consumer and waiting until it has received the first record batch")
go s.startConsumeMessages(ctx, initCh)

// Produce an init message until the consumer received at least one fetch
initTicker := time.NewTicker(500 * time.Millisecond)
initTicker := time.NewTicker(1000 * time.Second)
isInitialized := false
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, 60*time.Second)
defer timeoutCancel()

for !isInitialized {
select {
Expand All @@ -189,9 +187,6 @@ func (s *Service) Start(ctx context.Context) error {
isInitialized = true
s.logger.Info("consumer has been successfully initialized")
break
case <-timeoutCtx.Done():
s.logger.Error("failed to initialize consumer successfully")
return fmt.Errorf("failed to initialize consumer within 30s")
case <-ctx.Done():
return nil
}
Expand Down

0 comments on commit 1545c20

Please sign in to comment.