Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Algod Importer: Longer Timeout #133

Merged
merged 16 commits into from
Aug 11, 2023
50 changes: 30 additions & 20 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

var (
waitForRoundTimeout = 5 * time.Second
waitForRoundTimeout = 30 * time.Second
)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
Expand Down Expand Up @@ -418,13 +418,29 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error
}

// SyncError is used to indicate algod and conduit are not synchronized.
// The retrievedRound is the round returned from an algod status call.
// The expectedRound is the round conduit expected to have gotten back.
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
type SyncError struct {
rnd uint64
expected uint64
retrievedRound uint64
expectedRound uint64
err error
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// NewSyncError creates a new SyncError.
func NewSyncError(retrievedRound, expectedRound uint64, err error) *SyncError {
return &SyncError{
retrievedRound: retrievedRound,
expectedRound: expectedRound,
err: err,
}
}

func (e *SyncError) Error() string {
return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected)
return fmt.Sprintf("wrong round returned from status for round: retrieved(%d) != expected(%d): %v", e.retrievedRound, e.expectedRound, e.err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shiqizng - as you pointed out yesterday, at face value it's hard to tell which round was which so I've added "retrieved" for the value gotten back from the endpoint vs. "expected" which is the importer's expectation.

}

func (e *SyncError) Unwrap() error {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
return e.err
}

func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) {
Expand All @@ -440,10 +456,8 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
if rnd <= status.LastRound {
return status.LastRound, nil
}
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
// algod's timeout should not be reached because context.WithTimeout is used
return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout: %w", err))
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// If there was a different error and the node is responsive, call status before returning a SyncError.
Expand All @@ -454,10 +468,7 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2))
}
if status2.LastRound < rnd {
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
return 0, NewSyncError(status2.LastRound, rnd, fmt.Errorf("status2.LastRound mismatch: %w", err))
}

// This is probably a connection error, not a SyncError.
Expand All @@ -470,11 +481,10 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)

nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
return blk, fmt.Errorf("importer algod.GetBlock() ctx cancelled: %w", err)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}
algodImp.logger.Errorf(err.Error())
algodImp.logger.Errorf("importer algod.GetBlock() called waitForRoundWithTimeout: %v", err)
return data.BlockData{}, err
}
start := time.Now()
Expand All @@ -483,7 +493,7 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error())
algodImp.logger.Errorf("importer algod.GetBlock() error getting block for round %d: %s", rnd, err.Error())
return data.BlockData{}, err
}
tmpBlk := new(models.BlockResponse)
Expand All @@ -503,9 +513,9 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)
delta, err = algodImp.getDelta(rnd)
if err != nil {
if nodeRound < rnd {
err = fmt.Errorf("ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)
err = fmt.Errorf("importer algod.GetBlock() ledger state delta not found: node round (%d) is behind required round (%d), ensure follower node has its sync round set to the required round: %w", nodeRound, rnd, err)
} else {
err = fmt.Errorf("ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
err = fmt.Errorf("importer algod.GetBlock() ledger state delta not found: node round (%d), required round (%d): verify follower node configuration and ensure follower node has its sync round set to the required round, re-deploying the follower node may be necessary: %w", nodeRound, rnd, err)
}
algodImp.logger.Error(err.Error())
return data.BlockData{}, err
Expand All @@ -523,10 +533,10 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
if err != nil {
target := &SyncError{}
if errors.As(err, &target) {
algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
algodImp.logger.Warnf("importer algod.GetBlock() sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
_, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx)
} else {
err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err)
err = fmt.Errorf("importer algod.GetBlock() error getting block for round %d, check node configuration: %s", rnd, err)
algodImp.logger.Errorf(err.Error())
}
return data.BlockData{}, err
Expand Down
18 changes: 10 additions & 8 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package algodimporter
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
Expand Down Expand Up @@ -699,8 +700,8 @@ func TestGetBlockErrors(t *testing.T) {
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"),
logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"},
err: fmt.Sprintf("wrong round returned from status for round: retrieved(50) != expected(200)"),
logs: []string{"wrong round returned from status for round: retrieved(50) != expected(200)", "sync error detected, attempting to set the sync round to recover the node"},
},
{
name: "Cannot get delta (caught up)",
Expand Down Expand Up @@ -755,7 +756,8 @@ func TestGetBlockErrors(t *testing.T) {
// Make sure each of the expected log messages are present
for _, log := range tc.logs {
found := false
for _, entry := range hook.AllEntries() {
hookEntries := hook.AllEntries()
for _, entry := range hookEntries {
fmt.Println(strings.Contains(entry.Message, log))
found = found || strings.Contains(entry.Message, log)
}
Expand Down