Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Algod Importer: Longer Timeout #133

Merged
merged 16 commits into from
Aug 11, 2023
52 changes: 32 additions & 20 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

var (
waitForRoundTimeout = 5 * time.Second
waitForRoundTimeout = 30 * time.Second
)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
Expand Down Expand Up @@ -419,12 +419,31 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error

// SyncError is used to indicate algod and conduit are not synchronized.
type SyncError struct {
rnd uint64
expected uint64
// retrievedRound is the round returned from an algod status call.
retrievedRound uint64

// expectedRound is the round conduit expected to have gotten back.
expectedRound uint64

// err is the error that was received from the endpoint caller.
err error
}

// NewSyncError creates a new SyncError.
func NewSyncError(retrievedRound, expectedRound uint64, err error) *SyncError {
return &SyncError{
retrievedRound: retrievedRound,
expectedRound: expectedRound,
err: err,
}
}

func (e *SyncError) Error() string {
return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected)
return fmt.Sprintf("wrong round returned from status for round: retrieved(%d) != expected(%d): %v", e.retrievedRound, e.expectedRound, e.err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shiqizng - as you pointed out yesterday, at face value it's hard to tell which round was which so I've added "retrieved" for the value gotten back from the endpoint vs. "expected" which is the importer's expectation.

}

func (e *SyncError) Unwrap() error {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
return e.err
}

func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) {
Expand All @@ -440,10 +459,8 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
if rnd <= status.LastRound {
return status.LastRound, nil
}
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
// algod's timeout should not be reached because context.WithTimeout is used
return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout"))
}

// If there was a different error and the node is responsive, call status before returning a SyncError.
Expand All @@ -454,10 +471,7 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2))
}
if status2.LastRound < rnd {
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
return 0, NewSyncError(status2.LastRound, rnd, fmt.Errorf("status2.LastRound mismatch: %w", err))
}

// This is probably a connection error, not a SyncError.
Expand All @@ -470,10 +484,7 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)

nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
err = fmt.Errorf("called waitForRoundWithTimeout: %w", err)
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
Expand All @@ -483,13 +494,14 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error())
err = fmt.Errorf("error getting block for round %d: %w", rnd, err)
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
return blk, fmt.Errorf("error decoding block for round %d: %w", rnd, err)
}

blk.BlockHeader = tmpBlk.Block.BlockHeader
Expand Down Expand Up @@ -523,10 +535,10 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
if err != nil {
target := &SyncError{}
if errors.As(err, &target) {
algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
algodImp.logger.Warnf("importer algod.GetBlock() sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
_, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx)
} else {
err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err)
err = fmt.Errorf("importer algod.GetBlock() error getting block for round %d, check node configuration: %s", rnd, err)
algodImp.logger.Errorf(err.Error())
}
return data.BlockData{}, err
Expand Down
51 changes: 31 additions & 20 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package algodimporter
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
Expand Down Expand Up @@ -681,34 +682,39 @@ func TestGetBlockErrors(t *testing.T) {
name: "Cannot wait for block",
rnd: 123,
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}),
err: fmt.Sprintf("error getting block for round 123"),
blockResponder: nil,
deltaResponder: nil,
err: "error getting block for round 123",
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get block",
rnd: 123,
blockAfterResponder: BlockAfterResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""),
err: fmt.Sprintf("error getting block for round 123"),
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
err: "error getting block for round 123",
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get delta (node behind, re-send sync)",
name: "Cannot get delta - node behind, re-send sync",
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"),
logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"},
err: "wrong round returned from status for round: retrieved(50) != expected(200)",
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
logs: []string{
"wrong round returned from status for round: retrieved(50) != expected(200)",
"sync error detected, attempting to set the sync round to recover the node",
},
},
{
name: "Cannot get delta (caught up)",
name: "Cannot get delta - caught up",
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 200}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("ledger state delta not found: node round (200), required round (200)"),
err: "ledger state delta not found: node round (200), required round (200)",
logs: []string{"ledger state delta not found: node round (200), required round (200)"},
},
}
Expand Down Expand Up @@ -752,21 +758,26 @@ func TestGetBlockErrors(t *testing.T) {
_, err = testImporter.GetBlock(tc.rnd)
noError := assert.ErrorContains(t, err, tc.err)

// Make sure each of the expected log messages are present
// Make sure each of the expected log messages are present in the hookEntries
hookEntries := hook.AllEntries()
for _, log := range tc.logs {
found := false
for _, entry := range hook.AllEntries() {
fmt.Println(strings.Contains(entry.Message, log))
found = found || strings.Contains(entry.Message, log)
for _, entry := range hookEntries {
logIsSubstring := strings.Contains(entry.Message, log)
found = found || logIsSubstring
fmt.Printf("logIsSubstring=%t, found=%t:\n\t%s\n", logIsSubstring, found, entry.Message)
}
noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log)
if !found {
fmt.Printf(">>>>>>WE HAVE A PROBLEM<<<<<<\n")
}
noError = noError && assert.True(t, found, "(%s) Expected log was not found: '%s'", tc.name, log)
}

// Print logs if there was an error.
if !noError {
fmt.Println("An error was detected, printing logs")
fmt.Printf("An error was detected, printing logs (%s)\n", tc.name)
fmt.Println("------------------------------------")
for _, entry := range hook.AllEntries() {
for _, entry := range hookEntries {
fmt.Printf(" %s\n", entry.Message)
}
fmt.Println("------------------------------------")
Expand Down
6 changes: 5 additions & 1 deletion conduit/plugins/importers/algod/mock_algod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ func NewAlgodHandler(responders ...algodCustomHandler) *AlgodHandler {

// ServeHTTP implements the http.Handler interface for AlgodHandler
func (handler *AlgodHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for _, responder := range handler.responders {
for i, responder := range handler.responders {
_ = i
if responder == nil {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if responder(req, w) {
return
}
Expand Down