Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Algod Importer: Longer Timeout #133

Merged
merged 16 commits into from
Aug 11, 2023
50 changes: 30 additions & 20 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

var (
waitForRoundTimeout = 5 * time.Second
waitForRoundTimeout = 30 * time.Second
)

const catchpointsURL = "https://algorand-catchpoints.s3.us-east-2.amazonaws.com/consolidated/%s_catchpoints.txt"
Expand Down Expand Up @@ -418,13 +418,30 @@ func (algodImp *algodImporter) getDelta(rnd uint64) (sdk.LedgerStateDelta, error
}

// SyncError is used to indicate algod and conduit are not synchronized.
// The retrievedRound is the round returned from an algod status call.
// The expectedRound is the round conduit expected to have gotten back.
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
// err is the error that was received from the endpoint caller
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
type SyncError struct {
rnd uint64
expected uint64
retrievedRound uint64
expectedRound uint64
err error
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
}

// NewSyncError creates a new SyncError.
func NewSyncError(retrievedRound, expectedRound uint64, err error) *SyncError {
return &SyncError{
retrievedRound: retrievedRound,
expectedRound: expectedRound,
err: err,
}
}

func (e *SyncError) Error() string {
return fmt.Sprintf("wrong round returned from status for round: %d != %d", e.rnd, e.expected)
return fmt.Sprintf("wrong round returned from status for round: retrieved(%d) != expected(%d): %v", e.retrievedRound, e.expectedRound, e.err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shiqizng - as you pointed out yesterday, at face value it's hard to tell which round was which so I've added "retrieved" for the value gotten back from the endpoint vs. "expected" which is the importer's expectation.

}

func (e *SyncError) Unwrap() error {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
return e.err
}

func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Client, rnd uint64, to time.Duration) (uint64, error) {
Expand All @@ -440,10 +457,8 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
if rnd <= status.LastRound {
return status.LastRound, nil
}
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
// algod's timeout should not be reached because context.WithTimeout is used
return 0, NewSyncError(status.LastRound, rnd, fmt.Errorf("sync error, likely due to status after block timeout"))
}

// If there was a different error and the node is responsive, call status before returning a SyncError.
Expand All @@ -454,10 +469,7 @@ func waitForRoundWithTimeout(ctx context.Context, l *logrus.Logger, c *algod.Cli
return 0, fmt.Errorf("unable to get status after block and status: %w", errors.Join(err, err2))
}
if status2.LastRound < rnd {
return 0, &SyncError{
rnd: status.LastRound,
expected: rnd,
}
return 0, NewSyncError(status2.LastRound, rnd, fmt.Errorf("status2.LastRound mismatch: %w", err))
}

// This is probably a connection error, not a SyncError.
Expand All @@ -470,10 +482,7 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)

nodeRound, err := waitForRoundWithTimeout(algodImp.ctx, algodImp.logger, algodImp.aclient, rnd, waitForRoundTimeout)
if err != nil {
// If context has expired.
if algodImp.ctx.Err() != nil {
return blk, fmt.Errorf("GetBlock ctx error: %w", err)
}
err = fmt.Errorf("called waitForRoundWithTimeout: %w", err)
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
Expand All @@ -483,13 +492,14 @@ func (algodImp *algodImporter) getBlockInner(rnd uint64) (data.BlockData, error)
dt := time.Since(start)
getAlgodRawBlockTimeSeconds.Observe(dt.Seconds())
if err != nil {
algodImp.logger.Errorf("error getting block for round %d: %s", rnd, err.Error())
err = fmt.Errorf("error getting block for round %d: %w", rnd, err)
algodImp.logger.Errorf(err.Error())
return data.BlockData{}, err
}
tmpBlk := new(models.BlockResponse)
err = msgpack.Decode(blockbytes, tmpBlk)
if err != nil {
return blk, err
return blk, fmt.Errorf("error decoding block for round %d: %w", rnd, err)
}

blk.BlockHeader = tmpBlk.Block.BlockHeader
Expand Down Expand Up @@ -523,10 +533,10 @@ func (algodImp *algodImporter) GetBlock(rnd uint64) (data.BlockData, error) {
if err != nil {
target := &SyncError{}
if errors.As(err, &target) {
algodImp.logger.Warnf("Sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
algodImp.logger.Warnf("importer algod.GetBlock() sync error detected, attempting to set the sync round to recover the node: %s", err.Error())
_, _ = algodImp.aclient.SetSyncRound(rnd).Do(algodImp.ctx)
} else {
err = fmt.Errorf("error getting block for round %d, check node configuration: %s", rnd, err)
err = fmt.Errorf("importer algod.GetBlock() error getting block for round %d, check node configuration: %s", rnd, err)
algodImp.logger.Errorf(err.Error())
}
return data.BlockData{}, err
Expand Down
52 changes: 32 additions & 20 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package algodimporter
import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/algorand/go-algorand-sdk/v2/client/v2/algod"
"github.com/algorand/go-algorand-sdk/v2/client/v2/common/models"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
Expand Down Expand Up @@ -681,34 +682,39 @@ func TestGetBlockErrors(t *testing.T) {
name: "Cannot wait for block",
rnd: 123,
blockAfterResponder: MakeJsonResponderSeries("/wait-for-block-after", []int{http.StatusOK, http.StatusNotFound}, []interface{}{models.NodeStatus{LastRound: 1}}),
err: fmt.Sprintf("error getting block for round 123"),
blockResponder: nil,
deltaResponder: nil,
err: "error getting block for round 123",
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get block",
rnd: 123,
blockAfterResponder: BlockAfterResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
blockResponder: MakeMsgpStatusResponder("get", "/v2/blocks/", http.StatusNotFound, ""),
err: fmt.Sprintf("error getting block for round 123"),
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, sdk.LedgerStateDelta{}),
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
err: "error getting block for round 123",
logs: []string{"error getting block for round 123"},
},
{
name: "Cannot get delta (node behind, re-send sync)",
name: "Cannot get delta - node behind, re-send sync",
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 50}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("wrong round returned from status for round: 50 != 200"),
logs: []string{"wrong round returned from status for round: 50 != 200", "Sync error detected, attempting to set the sync round to recover the node"},
err: "wrong round returned from status for round: retrieved(50) != expected(200)",
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
logs: []string{
"wrong round returned from status for round: retrieved(50) != expected(200)",
"sync error detected, attempting to set the sync round to recover the node",
},
},
{
name: "Cannot get delta (caught up)",
name: "Cannot get delta - caught up",
rnd: 200,
blockAfterResponder: MakeBlockAfterResponder(models.NodeStatus{LastRound: 200}),
blockResponder: BlockResponder,
deltaResponder: MakeMsgpStatusResponder("get", "/v2/deltas/", http.StatusNotFound, ""),
err: fmt.Sprintf("ledger state delta not found: node round (200), required round (200)"),
err: "ledger state delta not found: node round (200), required round (200)",
logs: []string{"ledger state delta not found: node round (200), required round (200)"},
},
}
Expand Down Expand Up @@ -752,21 +758,27 @@ func TestGetBlockErrors(t *testing.T) {
_, err = testImporter.GetBlock(tc.rnd)
noError := assert.ErrorContains(t, err, tc.err)

// Make sure each of the expected log messages are present
// Make sure each of the expected log messages are present in the hookEntries
hookEntries := hook.AllEntries()
for _, log := range tc.logs {
found := false
for _, entry := range hook.AllEntries() {
fmt.Println(strings.Contains(entry.Message, log))
found = found || strings.Contains(entry.Message, log)
fmt.Println("~~~\nCurrent log: ", log)
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
for _, entry := range hookEntries {
logIsSubstring := strings.Contains(entry.Message, log)
found = found || logIsSubstring
fmt.Printf("logIsSubstring=%t, found=%t:\n\t%s\n", logIsSubstring, found, entry.Message)
}
noError = noError && assert.True(t, found, "(%s) Expected log was not found: '%s'", tc.name, log)
if !noError {
fmt.Printf(">>>>>WE HAVE A PROBLEM<<<<<<\n")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it a little easier to find the failing needle in the printouts haystack since we aren't using require assertions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should use !found as the condition, in tests that provide multiple log entries would print WE HAVE A PROBLEM for all checks after the first one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
noError = noError && assert.True(t, found, "Expected log was not found: '%s'", log)
}

// Print logs if there was an error.
if !noError {
fmt.Println("An error was detected, printing logs")
fmt.Printf("An error was detected, printing logs (%s)\n", tc.name)
fmt.Println("------------------------------------")
for _, entry := range hook.AllEntries() {
for _, entry := range hookEntries {
fmt.Printf(" %s\n", entry.Message)
}
fmt.Println("------------------------------------")
Expand Down
6 changes: 5 additions & 1 deletion conduit/plugins/importers/algod/mock_algod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ func NewAlgodHandler(responders ...algodCustomHandler) *AlgodHandler {

// ServeHTTP implements the http.Handler interface for AlgodHandler
func (handler *AlgodHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for _, responder := range handler.responders {
for i, responder := range handler.responders {
_ = i
if responder == nil {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
continue
}
if responder(req, w) {
return
}
Expand Down