Skip to content

Commit

Permalink
fix(SQS): healthcheck false positive when node is stuck (#7234) (#7239)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5fd717a)

Co-authored-by: Roman <[email protected]>
  • Loading branch information
mergify[bot] and p0mvn authored Jan 4, 2024
1 parent a180f12 commit f5b4fd0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strconv"
"time"

"github.com/osmosis-labs/osmosis/v21/ingest/sqs/domain/json"
"github.com/osmosis-labs/osmosis/v21/ingest/sqs/domain/mvc"
)

Expand All @@ -20,9 +19,8 @@ type TimeWrapper struct {
}

const (
latestHeightKey = "latestHeight"
latestHeightField = "height"
latestHeightTimeKey = "timeLatestHeight"
latestHeightKey = "latestHeight"
latestHeightField = "height"
)

// NewChainInfoRepo creates a new repository for chain information
Expand Down Expand Up @@ -55,6 +53,12 @@ func (r *chainInfoRepo) StoreLatestHeight(ctx context.Context, tx mvc.Tx, height
}

// GetLatestHeight retrieves the latest blockchain height from Redis
//
// N.B. sometimes the node gets stuck and does not make progress.
// However, it returns 200 OK for the status endpoint and claims to be not catching up.
// This has caused the healthcheck to pass with false positives in production.
// As a result, we need to keep track of the last seen height that chain ingester pushes into
// the Redis repository.
func (r *chainInfoRepo) GetLatestHeight(ctx context.Context) (uint64, error) {
tx := r.repositoryManager.StartTx()
redisTx, err := tx.AsRedisTx()
Expand Down Expand Up @@ -82,68 +86,3 @@ func (r *chainInfoRepo) GetLatestHeight(ctx context.Context) (uint64, error) {

return height, nil
}

// GetLatestHeightRetrievalTime implements mvc.ChainInfoRepository.
func (r *chainInfoRepo) GetLatestHeightRetrievalTime(ctx context.Context) (time.Time, error) {
tx := r.repositoryManager.StartTx()
redisTx, err := tx.AsRedisTx()
if err != nil {
return time.Time{}, err
}

pipeliner, err := redisTx.GetPipeliner(ctx)
if err != nil {
return time.Time{}, err
}

cmd := pipeliner.Get(ctx, latestHeightTimeKey)

if err := tx.Exec(ctx); err != nil {
return time.Time{}, err
}

heightStr := cmd.Val()

var timeWrapper TimeWrapper
if err := json.Unmarshal([]byte(heightStr), &timeWrapper); err != nil {
return time.Time{}, err
}

return timeWrapper.Time, nil
}

// StoreLatestHeightRetrievalTime implements mvc.ChainInfoRepository.
func (r *chainInfoRepo) StoreLatestHeightRetrievalTime(ctx context.Context, time time.Time) error {
tx := r.repositoryManager.StartTx()
redisTx, err := tx.AsRedisTx()
if err != nil {
return err
}

pipeliner, err := redisTx.GetPipeliner(ctx)
if err != nil {
return err
}

timeWrapper := TimeWrapper{
Time: time.UTC(), // always in UTC
}

bz, err := json.Marshal(timeWrapper)
if err != nil {
return err
}

cmd := pipeliner.Set(ctx, latestHeightTimeKey, bz, 0)

if err := tx.Exec(ctx); err != nil {
return err
}

err = cmd.Err()
if err != nil {
return err
}

return nil
}
57 changes: 31 additions & 26 deletions ingest/sqs/chain_info/usecase/chain_info_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package usecase

import (
"context"
"sync"
"time"

"github.com/go-redis/redis"

"github.com/osmosis-labs/osmosis/v21/ingest/sqs/domain"
"github.com/osmosis-labs/osmosis/v21/ingest/sqs/domain/mvc"
)
Expand All @@ -14,6 +13,15 @@ type chainInfoUseCase struct {
contextTimeout time.Duration
chainInfoRepository mvc.ChainInfoRepository
redisRepositoryManager mvc.TxManager

// N.B. sometimes the node gets stuck and does not make progress.
// However, it returns 200 OK for the status endpoint and claims to be not catching up.
// This has caused the healthcheck to pass with false positives in production.
// As a result, we need to keep track of the last seen height and time to ensure that the height is
// updated within a reasonable time frame.
lastSeenMx sync.Mutex
lastSeenUpdatedHeight uint64
lastSeenUpdatedTime time.Time
}

// The max number of seconds allowed for there to be no updates
Expand All @@ -27,9 +35,20 @@ func NewChainInfoUsecase(timeout time.Duration, chainInfoRepository mvc.ChainInf
contextTimeout: timeout,
chainInfoRepository: chainInfoRepository,
redisRepositoryManager: redisRepositoryManager,

lastSeenMx: sync.Mutex{},
}
}

// GetLatestHeight retrieves the latest blockchain height
//
// Despite being a getter, this method also validates that the height is updated within a reasonable time frame.
//
// Sometimes the node gets stuck and does not make progress.
// However, it returns 200 OK for the status endpoint and claims to be not catching up.
// This has caused the healthcheck to pass with false positives in production.
// As a result, we need to keep track of the last seen height and time. Chain ingester pushes
// the latest height into Redis. This method checks that the height is updated within a reasonable time frame.
func (p *chainInfoUseCase) GetLatestHeight(ctx context.Context) (uint64, error) {
ctx, cancel := context.WithTimeout(ctx, p.contextTimeout)
defer cancel()
Expand All @@ -39,42 +58,28 @@ func (p *chainInfoUseCase) GetLatestHeight(ctx context.Context) (uint64, error)
return 0, err
}

// Current UTC time
currentTimeUTC := time.Now().UTC()

latestHeightRetrievalTime, err := p.chainInfoRepository.GetLatestHeightRetrievalTime(ctx)
if err != nil {
// If there is no entry, then we can assume that the height has never been retrieved,
// so we store the current time.
// TODO: clean up this error handling
if err.Error() == redis.Nil.Error() {
// Store the latest height retrieval time
if err := p.chainInfoRepository.StoreLatestHeightRetrievalTime(ctx, currentTimeUTC); err != nil {
return 0, err
}

return latestHeight, nil
}
p.lastSeenMx.Lock()
defer p.lastSeenMx.Unlock()

return 0, err
}
currentTimeUTC := time.Now().UTC()

// Time since last height retrieval
timeDeltaSecs := int(currentTimeUTC.Sub(latestHeightRetrievalTime).Seconds())
timeDeltaSecs := int(currentTimeUTC.Sub(p.lastSeenUpdatedTime).Seconds())

isHeightUpdated := latestHeight > p.lastSeenUpdatedHeight

// Validate that it does not exceed the max allowed time delta
if timeDeltaSecs > MaxAllowedHeightUpdateTimeDeltaSecs {
if !isHeightUpdated && timeDeltaSecs > MaxAllowedHeightUpdateTimeDeltaSecs {
return 0, domain.StaleHeightError{
StoredHeight: latestHeight,
TimeSinceLastUpdate: timeDeltaSecs,
MaxAllowedTimeDeltaSecs: MaxAllowedHeightUpdateTimeDeltaSecs,
}
}

// Store the latest height retrieval time
if err := p.chainInfoRepository.StoreLatestHeightRetrievalTime(ctx, currentTimeUTC); err != nil {
return 0, err
}
// Update the last seen height and time
p.lastSeenUpdatedHeight = latestHeight
p.lastSeenUpdatedTime = currentTimeUTC

return latestHeight, nil
}
16 changes: 9 additions & 7 deletions ingest/sqs/domain/mvc/chainInfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mvc

import (
"context"
"time"
)

// ChainInfoRepository represents the contract for a repository handling chain information
Expand All @@ -12,14 +11,17 @@ type ChainInfoRepository interface {

// GetLatestHeight retrieves the latest blockchain height
GetLatestHeight(ctx context.Context) (uint64, error)

// GetLatestHeightRetrievalTime retrieves the latest blockchain height retrieval time.
GetLatestHeightRetrievalTime(ctx context.Context) (time.Time, error)

// StoreLatestHeightRetrievalTime stores the latest blockchain height retrieval time.
StoreLatestHeightRetrievalTime(ctx context.Context, time time.Time) error
}

type ChainInfoUsecase interface {
// GetLatestHeight retrieves the latest blockchain height
//
// Despite being a getter, this method also validates that the height is updated within a reasonable time frame.
//
// Sometimes the node gets stuck and does not make progress.
// However, it returns 200 OK for the status endpoint and claims to be not catching up.
// This has caused the healthcheck to pass with false positives in production.
// As a result, we need to keep track of the last seen height and time. Chain ingester pushes
// the latest height into Redis. This method checks that the height is updated within a reasonable time frame.
GetLatestHeight(ctx context.Context) (uint64, error)
}

0 comments on commit f5b4fd0

Please sign in to comment.