-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into file-plugins-msgp
- Loading branch information
Showing
9 changed files
with
966 additions
and
151 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,83 @@ | ||
package pipeline | ||
|
||
import log "github.com/sirupsen/logrus" | ||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
log "github.com/sirupsen/logrus" | ||
|
||
"github.com/algorand/conduit/conduit/data" | ||
) | ||
|
||
// HandlePanic function to log panics in a common way | ||
func HandlePanic(logger *log.Logger) { | ||
if r := recover(); r != nil { | ||
logger.Panicf("conduit pipeline experienced a panic: %v", r) | ||
} | ||
} | ||
|
||
type empty struct{} | ||
|
||
type pluginInput interface { | ||
uint64 | data.BlockData | string | ||
} | ||
|
||
type pluginOutput interface { | ||
pluginInput | empty | ||
} | ||
|
||
// Retries is a wrapper for retrying a function call f() with a cancellation context, | ||
// a delay and a max retry count. It attempts to call the wrapped function at least once | ||
// and only after the first attempt will pay attention to a context cancellation. | ||
// This can allow the pipeline to receive a cancellation and guarantee attempting to finish | ||
// the round with at least one attempt for each pipeline component. | ||
// - Retry behavior is configured by p.cfg.retryCount. | ||
// - when 0, the function will retry forever or until the context is canceled | ||
// - when > 0, the function will retry p.cfg.retryCount times before giving up | ||
// | ||
// - Upon success: | ||
// - a nil error is returned even if there were intermediate failures | ||
// - the returned duration dur measures the time spent in the call to f() that succeeded | ||
// | ||
// - Upon failure: | ||
// - the return value y is the zero value of type Y and a non-nil error is returned | ||
// - when p.cfg.retryCount > 0, the error will be a join of all the errors encountered during the retries | ||
// - when p.cfg.retryCount == 0, the error will be the last error encountered | ||
// - the returned duration dur is the total time spent in the function, including retries | ||
func Retries[X pluginInput, Y pluginOutput](f func(x X) (Y, error), x X, p *pipelineImpl, msg string) (y Y, dur time.Duration, err error) { | ||
start := time.Now() | ||
|
||
for i := uint64(0); p.cfg.RetryCount == 0 || i <= p.cfg.RetryCount; i++ { | ||
// the first time through, we don't sleep or mind ctx's done signal | ||
if i > 0 { | ||
select { | ||
case <-p.ctx.Done(): | ||
dur = time.Since(start) | ||
err = fmt.Errorf("%s: retry number %d/%d with err: %w. Done signal received: %w", msg, i, p.cfg.RetryCount, err, context.Cause(p.ctx)) | ||
return | ||
default: | ||
time.Sleep(p.cfg.RetryDelay) | ||
} | ||
} | ||
opStart := time.Now() | ||
y, err = f(x) | ||
if err == nil { | ||
dur = time.Since(opStart) | ||
return | ||
} | ||
p.logger.Infof("%s: retry number %d/%d with err: %v", msg, i, p.cfg.RetryCount, err) | ||
} | ||
|
||
dur = time.Since(start) | ||
err = fmt.Errorf("%s: giving up after %d retries: %w", msg, p.cfg.RetryCount, err) | ||
return | ||
} | ||
|
||
// RetriesNoOutput applies the same logic as Retries, but for functions that return no output. | ||
func RetriesNoOutput[X pluginInput](f func(x X) error, a X, p *pipelineImpl, msg string) (time.Duration, error) { | ||
_, d, err := Retries(func(x X) (empty, error) { | ||
return empty{}, f(x) | ||
}, a, p, msg) | ||
return d, err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
package pipeline | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/algorand/conduit/conduit/data" | ||
log "github.com/sirupsen/logrus" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
// TestRetries tests the retry logic | ||
func TestRetries(t *testing.T) { | ||
errSentinelCause := errors.New("succeed after has failed") | ||
|
||
succeedAfterFactory := func(succeedAfter uint64, never bool) func(uint64) (uint64, error) { | ||
tries := uint64(0) | ||
|
||
return func(x uint64) (uint64, error) { | ||
if tries >= succeedAfter && !never { | ||
// ensure not to return the zero value on success | ||
return tries + 1, nil | ||
} | ||
tries++ | ||
return 0, fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1) | ||
} | ||
} | ||
|
||
succeedAfterFactoryNoOutput := func(succeedAfter uint64, never bool) func(uint64) error { | ||
tries := uint64(0) | ||
|
||
return func(x uint64) error { | ||
if tries >= succeedAfter && !never { | ||
return nil | ||
} | ||
tries++ | ||
return fmt.Errorf("%w: tries=%d", errSentinelCause, tries-1) | ||
} | ||
} | ||
|
||
cases := []struct { | ||
name string | ||
retryCount uint64 | ||
succeedAfter uint64 | ||
neverSucceed bool // neverSucceed trumps succeedAfter | ||
}{ | ||
{ | ||
name: "retry forever succeeds after 0", | ||
retryCount: 0, | ||
succeedAfter: 0, | ||
neverSucceed: false, | ||
}, | ||
{ | ||
name: "retry forever succeeds after 1", | ||
retryCount: 0, | ||
succeedAfter: 1, | ||
neverSucceed: false, | ||
}, | ||
{ | ||
name: "retry forever succeeds after 7", | ||
retryCount: 0, | ||
succeedAfter: 7, | ||
neverSucceed: false, | ||
}, | ||
{ | ||
name: "retry 5 succeeds after 0", | ||
retryCount: 5, | ||
succeedAfter: 0, | ||
neverSucceed: false, | ||
}, | ||
{ | ||
name: "retry 5 succeeds after 1", | ||
retryCount: 5, | ||
succeedAfter: 1, | ||
neverSucceed: false, | ||
}, | ||
{ | ||
name: "retry 5 succeeds after 5", | ||
retryCount: 5, | ||
succeedAfter: 5, | ||
neverSucceed: false, | ||
}, | ||
{ | ||
name: "retry 5 succeeds after 7", | ||
retryCount: 5, | ||
succeedAfter: 7, | ||
neverSucceed: false, | ||
}, | ||
{ | ||
name: "retry 5 never succeeds", | ||
retryCount: 5, | ||
succeedAfter: 0, | ||
neverSucceed: true, | ||
}, | ||
{ | ||
name: "retry foerever never succeeds", | ||
retryCount: 0, | ||
succeedAfter: 0, | ||
neverSucceed: true, | ||
}, | ||
} | ||
|
||
for _, tc := range cases { | ||
tc := tc | ||
|
||
// run cases for Retries() | ||
t.Run("Retries() "+tc.name, func(t *testing.T) { | ||
t.Parallel() | ||
ctx, ccf := context.WithCancelCause(context.Background()) | ||
p := &pipelineImpl{ | ||
ctx: ctx, | ||
ccf: ccf, | ||
logger: log.New(), | ||
cfg: &data.Config{ | ||
RetryCount: tc.retryCount, | ||
RetryDelay: 1 * time.Millisecond, | ||
}, | ||
} | ||
succeedAfter := succeedAfterFactory(tc.succeedAfter, tc.neverSucceed) | ||
|
||
if tc.retryCount == 0 && tc.neverSucceed { | ||
// avoid infinite loop by cancelling the context | ||
|
||
yChan := make(chan uint64) | ||
errChan := make(chan error) | ||
go func() { | ||
y, _, err := Retries(succeedAfter, 0, p, "test") | ||
yChan <- y | ||
errChan <- err | ||
}() | ||
time.Sleep(5 * time.Millisecond) | ||
errTestCancelled := errors.New("test cancelled") | ||
go func() { | ||
ccf(errTestCancelled) | ||
}() | ||
y := <-yChan | ||
err := <-errChan | ||
require.ErrorIs(t, err, errTestCancelled, tc.name) | ||
require.ErrorIs(t, err, errSentinelCause, tc.name) | ||
require.Zero(t, y, tc.name) | ||
return | ||
} | ||
|
||
y, _, err := Retries(succeedAfter, 0, p, "test") | ||
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false | ||
require.NoError(t, err, tc.name) | ||
|
||
// note we subtract 1 from y below because succeedAfter has added 1 to its output | ||
// to disambiguate with the zero value which occurs on failure | ||
require.Equal(t, tc.succeedAfter, y-1, tc.name) | ||
} else { // retryCount > 0 so doesn't retry forever | ||
if tc.neverSucceed || tc.succeedAfter > tc.retryCount { | ||
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name) | ||
require.ErrorIs(t, err, errSentinelCause, tc.name) | ||
require.Zero(t, y, tc.name) | ||
} else { // !tc.neverSucceed && succeedAfter <= retryCount | ||
require.NoError(t, err, tc.name) | ||
require.Equal(t, tc.succeedAfter, y-1, tc.name) | ||
} | ||
} | ||
}) | ||
|
||
// run cases for RetriesNoOutput() | ||
t.Run("RetriesNoOutput() "+tc.name, func(t *testing.T) { | ||
t.Parallel() | ||
ctx, ccf := context.WithCancelCause(context.Background()) | ||
p := &pipelineImpl{ | ||
ctx: ctx, | ||
ccf: ccf, | ||
logger: log.New(), | ||
cfg: &data.Config{ | ||
RetryCount: tc.retryCount, | ||
RetryDelay: 1 * time.Millisecond, | ||
}, | ||
} | ||
succeedAfterNoOutput := succeedAfterFactoryNoOutput(tc.succeedAfter, tc.neverSucceed) | ||
|
||
if tc.retryCount == 0 && tc.neverSucceed { | ||
// avoid infinite loop by cancelling the context | ||
|
||
errChan := make(chan error) | ||
go func() { | ||
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") | ||
errChan <- err | ||
}() | ||
time.Sleep(5 * time.Millisecond) | ||
errTestCancelled := errors.New("test cancelled") | ||
go func() { | ||
ccf(errTestCancelled) | ||
}() | ||
err := <-errChan | ||
require.ErrorIs(t, err, errTestCancelled, tc.name) | ||
require.ErrorIs(t, err, errSentinelCause, tc.name) | ||
return | ||
} | ||
|
||
_, err := RetriesNoOutput(succeedAfterNoOutput, 0, p, "test") | ||
if tc.retryCount == 0 { // WLOG tc.neverSucceed == false | ||
require.NoError(t, err, tc.name) | ||
} else { // retryCount > 0 so doesn't retry forever | ||
if tc.neverSucceed || tc.succeedAfter > tc.retryCount { | ||
require.ErrorContains(t, err, fmt.Sprintf("%d", tc.retryCount), tc.name) | ||
require.ErrorIs(t, err, errSentinelCause, tc.name) | ||
} else { // !tc.neverSucceed && succeedAfter <= retryCount | ||
require.NoError(t, err, tc.name) | ||
} | ||
} | ||
}) | ||
} | ||
} |
Oops, something went wrong.