Skip to content

Commit

Permalink
Merge remote-tracking branch 'algorand/master' into docs-update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeph Grunschlag committed May 31, 2023
2 parents 152b458 + 32c1ad2 commit a53f7bc
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 24 deletions.
3 changes: 2 additions & 1 deletion conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,13 @@ func (p *pipelineImpl) Start() {
for {
pipelineRun:
metrics.PipelineRetryCount.Observe(float64(retry))
if retry > p.cfg.RetryCount {
if retry > p.cfg.RetryCount && p.cfg.RetryCount != 0 {
p.logger.Errorf("Pipeline has exceeded maximum retry count (%d) - stopping...", p.cfg.RetryCount)
return
}

if retry > 0 {
p.logger.Infof("Retry number %d resuming after a %s retry delay.", retry, p.cfg.RetryDelay)
time.Sleep(p.cfg.RetryDelay)
}

Expand Down
66 changes: 55 additions & 11 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path"
"path/filepath"
"strings"
"sync"
"testing"
"time"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/algorand/conduit/conduit"
"github.com/algorand/conduit/conduit/data"
Expand Down Expand Up @@ -685,17 +687,37 @@ func (e *errorImporter) GetBlock(_ uint64) (data.BlockData, error) {

// TestPipelineRetryVariables tests that modifying the retry variables results in longer time taken for a pipeline to run
func TestPipelineRetryVariables(t *testing.T) {
maxDuration := 5 * time.Second
epsilon := 250 * time.Millisecond // allow for some error in timing
tests := []struct {
name string
retryDelay time.Duration
retryCount uint64
totalDuration time.Duration
epsilon time.Duration
}{
{"0 seconds", 2 * time.Second, 0, 0 * time.Second, 1 * time.Second},
{"2 seconds", 2 * time.Second, 1, 2 * time.Second, 1 * time.Second},
{"4 seconds", 2 * time.Second, 2, 4 * time.Second, 1 * time.Second},
{"10 seconds", 2 * time.Second, 5, 10 * time.Second, 1 * time.Second},
{
name: "retryCount=0 (unlimited)",
retryDelay: 500 * time.Millisecond,
totalDuration: maxDuration,
},
{
name: "retryCount=1",
retryDelay: 500 * time.Millisecond,
retryCount: 1,
totalDuration: 500 * time.Millisecond,
},
{
name: "retryCount=2",
retryDelay: 500 * time.Millisecond,
retryCount: 2,
totalDuration: 1 * time.Second,
},
{
name: "retryCount=5",
retryDelay: 500 * time.Millisecond,
retryCount: 5,
totalDuration: 2500 * time.Millisecond,
},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
Expand All @@ -704,9 +726,10 @@ func TestPipelineRetryVariables(t *testing.T) {
var pImporter importers.Importer = errImporter
var pProcessor processors.Processor = &mockProcessor{}
var pExporter exporters.Exporter = &mockExporter{}
l, _ := test.NewNullLogger()
l, hook := test.NewNullLogger()
ctx, cf := context.WithCancel(context.Background())
pImpl := pipelineImpl{
ctx: context.Background(),
ctx: ctx,
cfg: &data.Config{
RetryCount: testCase.retryCount,
RetryDelay: testCase.retryDelay,
Expand Down Expand Up @@ -745,15 +768,36 @@ func TestPipelineRetryVariables(t *testing.T) {
err := pImpl.Init()
assert.Nil(t, err)
before := time.Now()
done := false
// test for "unlimited" timeout
go func() {
time.Sleep(maxDuration)
if !done {
cf()
assert.Equal(t, testCase.totalDuration, maxDuration)
}
}()
pImpl.Start()
pImpl.wg.Wait()
after := time.Now()
timeTaken := after.Sub(before)

msg := fmt.Sprintf("seconds taken: %s, expected duration seconds: %s, epsilon: %s", timeTaken.String(), testCase.totalDuration.String(), testCase.epsilon.String())
assert.WithinDurationf(t, before.Add(testCase.totalDuration), after, testCase.epsilon, msg)
assert.Equal(t, errImporter.GetBlockCount, testCase.retryCount+1)

msg := fmt.Sprintf("seconds taken: %s, expected duration seconds: %s, epsilon: %s", timeTaken.String(), testCase.totalDuration.String(), epsilon)
assert.WithinDurationf(t, before.Add(testCase.totalDuration), after, epsilon, msg)
if testCase.retryCount == 0 {
assert.GreaterOrEqual(t, errImporter.GetBlockCount, uint64(1))
} else {
assert.Equal(t, errImporter.GetBlockCount, testCase.retryCount+1)
}
done = true
fmt.Println(hook.AllEntries())
for _, entry := range hook.AllEntries() {
str, err := entry.String()
require.NoError(t, err)
if strings.HasPrefix(str, "Retry number 1") {
assert.Equal(t, "Retry number 1 resuming after a 500ms retry delay.", str)
}
}
})
}
}
Expand Down
10 changes: 4 additions & 6 deletions conduit/plugins/importers/algod/algod_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,7 @@ func checkRounds(logger *logrus.Logger, catchpointRound, nodeRound, targetRound
}

func (algodImp *algodImporter) needsCatchup(targetRound uint64) bool {
if algodImp.mode == followerMode {
if targetRound == 0 {
algodImp.logger.Info("No state deltas are ever available for round 0")
return true
}
if algodImp.mode == followerMode && targetRound != 0 {
// If we are in follower mode, check if the round delta is available.
_, err := algodImp.getDelta(targetRound)
if err != nil {
Expand Down Expand Up @@ -272,7 +268,9 @@ func (algodImp *algodImporter) catchupNode(network string, targetRound uint64) e
var err error
catchpoint, err = getMissingCatchpointLabel(URL, targetRound)
if err != nil {
return fmt.Errorf("unable to lookup catchpoint: %w", err)
// catchpoints are only available for past 6 months.
// This case handles the scenario where the catchpoint is not available.
algodImp.logger.Warnf("unable to lookup catchpoint: %s", err)
}
}
}
Expand Down
14 changes: 8 additions & 6 deletions conduit/plugins/importers/algod/algod_importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,10 @@ func TestInitCatchup(t *testing.T) {
catchpoint: "",
algodServer: NewAlgodServer(
GenesisResponder,
MakePostSyncRoundResponder(http.StatusOK),
MakeJsonResponderSeries("/v2/status", []int{http.StatusOK, http.StatusBadRequest}, []interface{}{models.NodeStatus{LastRound: 1235}}),
),
err: "failed to lookup catchpoint label list",
logs: []string{"failed to lookup catchpoint label list"},
}, {
name: "wait for node to catchup error",
adminToken: "admin",
Expand Down Expand Up @@ -727,20 +729,20 @@ func TestNeedsCatchup(t *testing.T) {
result: false,
},
{
name: "Follower mode round 0, no delta",
name: "Follower mode round 0, no block",
mode: followerMode,
round: 0,
responders: []algodCustomHandler{},
logMsg: "No state deltas are ever available for round 0",
logMsg: "Unable to fetch block for round 0",
result: true,
},
{
name: "Follower mode round 0, delta",
mode: followerMode,
round: 0,
responders: []algodCustomHandler{LedgerStateDeltaResponder},
logMsg: "No state deltas are ever available for round 0",
result: true,
responders: []algodCustomHandler{BlockResponder},
logMsg: "",
result: false,
},
{
name: "Archival mode, no block",
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/internal/initialize/conduit.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ log-level: INFO
#log-file:

# Number of retries to perform after a pipeline plugin error.
# Set to 0 to retry forever.
retry-count: 10

# Time duration to wait between retry attempts.
Expand Down

0 comments on commit a53f7bc

Please sign in to comment.