Skip to content

Commit

Permalink
port network from master (#234)
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Sep 23, 2023
1 parent 5d9c385 commit 6cbea12
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 14 deletions.
2 changes: 1 addition & 1 deletion internal/network/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const (
// Tier1 Tier = 1
Tier2 Tier = 20
Tier3 Tier = 50
// Tier4 Tier = 100
Tier4 Tier = 100

// secPerMin is the number of seconds in a minute, it is here to allow easy
// modification of the program, should this value change.
Expand Down
34 changes: 27 additions & 7 deletions internal/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"runtime/trace"
"sync"
Expand All @@ -29,7 +30,8 @@ var (
lg logger.Interface = logger.Default
// waitFn returns the amount of time to wait before retrying depending on
// the current attempt. This variable exists to reduce the test time.
waitFn = cubicWait
waitFn = cubicWait
netWaitFn = expWait

mu sync.RWMutex
)
Expand All @@ -38,7 +40,7 @@ var (
// function wasn't able to complete without errors.
var ErrRetryFailed = errors.New("callback was unable to complete without errors within the allowed number of retries")

// withRetry will run the callback function fn. If the function returns
// WithRetry will run the callback function fn. If the function returns
// slack.RateLimitedError, it will delay, and then call it again up to
// maxAttempts times. It will return an error if it runs out of attempts.
func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func() error) error {
Expand All @@ -48,7 +50,7 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func(
}
for attempt := 0; attempt < maxAttempts; attempt++ {
var err error
trace.WithRegion(ctx, "withRetry.wait", func() {
trace.WithRegion(ctx, "WithRetry.wait", func() {
err = lim.Wait(ctx)
})
if err != nil {
Expand All @@ -61,23 +63,33 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func(
break
}

tracelogf(ctx, "error", "slackRetry: %s after %d attempts", cbErr, attempt+1)
tracelogf(ctx, "error", "WithRetry: %[1]s (%[1]T) after %[2]d attempts", cbErr, attempt+1)
var (
rle *slack.RateLimitedError
sce slack.StatusCodeError
ne *net.OpError // read tcp error: see #234
)
if errors.As(cbErr, &rle) {
switch {
case errors.As(cbErr, &rle):
tracelogf(ctx, "info", "got rate limited, sleeping %s", rle.RetryAfter)
time.Sleep(rle.RetryAfter)
continue
} else if errors.As(cbErr, &sce) {
case errors.As(cbErr, &sce):
if isRecoverable(sce.Code) {
// possibly transient error
delay := waitFn(attempt)
tracelogf(ctx, "info", "got server error %d, sleeping %s", sce.Code, delay)
time.Sleep(delay)
continue
}
case errors.As(cbErr, &ne):
if ne.Op == "read" || ne.Op == "write" {
// possibly transient error
delay := netWaitFn(attempt)
tracelogf(ctx, "info", "got network error %s, sleeping %s", ne.Op, delay)
time.Sleep(delay)
continue
}
}

return fmt.Errorf("callback error: %w", cbErr)
Expand All @@ -90,7 +102,7 @@ func WithRetry(ctx context.Context, lim *rate.Limiter, maxAttempts int, fn func(

// isRecoverable returns true if the status code is a recoverable error.
func isRecoverable(statusCode int) bool {
return (statusCode >= http.StatusInternalServerError && statusCode <= 599) || statusCode == 408
return (statusCode >= http.StatusInternalServerError && statusCode <= 599 && statusCode != 501) || statusCode == 408
}

// cubicWait is the wait time function. Time is calculated as (x+2)^3 seconds,
Expand All @@ -105,6 +117,14 @@ func cubicWait(attempt int) time.Duration {
return delay
}

func expWait(attempt int) time.Duration {
delay := time.Duration(2<<uint(attempt)) * time.Second
if delay > maxAllowedWaitTime {
return maxAllowedWaitTime
}
return delay
}

func tracelogf(ctx context.Context, category string, fmt string, a ...any) {
mu.RLock()
defer mu.RUnlock()
Expand Down
43 changes: 37 additions & 6 deletions internal/network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/http/httptest"
"reflect"
Expand All @@ -23,6 +24,14 @@ func calcRunDuration(rateLimit float64, attempts int) time.Duration {
return time.Duration(attempts) * time.Duration(float64(time.Second)/rateLimit)
}

func calcExpRunDuration(attempts int) time.Duration {
var sec time.Duration
for i := 0; i < attempts; i++ {
sec += expWait(i)
}
return sec
}

// retryFn will return slack.RateLimitedError for numAttempts time and err after.
func retryFn(numAttempts int, retryAfter time.Duration, err error) func() error {
i := 0
Expand All @@ -35,6 +44,18 @@ func retryFn(numAttempts int, retryAfter time.Duration, err error) func() error
}
}

// errSeqFn will return err for forTimes time and thenErr after.
func errSeqFn(err error, forTimes int, thenErr error) func() error {
i := 0
return func() error {
if i < forTimes {
i++
return err
}
return thenErr
}
}

func dAbs(d time.Duration) time.Duration {
if d < 0 {
return -d
Expand Down Expand Up @@ -110,7 +131,7 @@ func Test_withRetry(t *testing.T) {
false,
calcRunDuration(10.0, 4),
},
{"slackRetry should honour the value in the rate limit error",
{"should honour the value in the rate limit error",
args{
context.Background(),
rate.NewLimiter(1000, 1),
Expand All @@ -130,6 +151,17 @@ func Test_withRetry(t *testing.T) {
true,
calcRunDuration(10.0, 4),
},
{
"network error (#234)",
args{
context.Background(),
rate.NewLimiter(10.0, 1),
3,
errSeqFn(&net.OpError{Op: "read"}, 2, nil),
},
false,
calcExpRunDuration(2),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -138,10 +170,10 @@ func Test_withRetry(t *testing.T) {
t.Errorf("withRetry() error = %v, wantErr %v", err, tt.wantErr)
}
runTime := time.Since(start)
runTimeError := dAbs(runTime - tt.mustCompleteIn)
t.Logf("runtime = %s, mustCompleteIn = %s, error = ABS(%[1]s - %[2]s) = %[3]s", runTime, tt.mustCompleteIn, runTimeError)
if runTimeError > maxRunDurationError {
t.Errorf("runtime error %s is not within allowed threshold: %s", runTimeError, maxRunDurationError)
ξ := dAbs(runTime - tt.mustCompleteIn)
t.Logf("runtime = %s, mustCompleteIn = %s, ξ = ABS(%[1]s - %[2]s) = %[3]s", runTime, tt.mustCompleteIn, ξ)
if ξ > maxRunDurationError {
t.Errorf("runtime error %s is not within allowed threshold: %s", ξ, maxRunDurationError)
}
})
}
Expand Down Expand Up @@ -198,7 +230,6 @@ func Test500ErrorHandling(t *testing.T) {

const (
testRetryCount = 1
waitThreshold = 100 * time.Millisecond
)

// Create a test server that returns a 404 error.
Expand Down

0 comments on commit 6cbea12

Please sign in to comment.