From c7d87d2d720925d13c0e1c6fd35ccd13e3587b4f Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Fri, 16 Dec 2016 16:28:38 -0500 Subject: [PATCH] make retry() more flexible --- base/error.jl | 75 ++++++++++++++++++++++++++++--------------- base/pmap.jl | 48 +++++++++++++-------------- base/workerpool.jl | 2 +- test/error.jl | 25 ++++++++++----- test/parallel_exec.jl | 11 +++---- 5 files changed, 95 insertions(+), 66 deletions(-) diff --git a/base/error.jl b/base/error.jl index fdf945657ebe8c..d082895782e11f 100644 --- a/base/error.jl +++ b/base/error.jl @@ -82,45 +82,70 @@ macro assert(ex, msgs...) return :($(esc(ex)) ? $(nothing) : throw(Main.Base.AssertionError($msg))) end -# NOTE: Please keep the constant values specified below in sync with the doc string -const DEFAULT_RETRY_N = 1 -const DEFAULT_RETRY_ON = e->true -const DEFAULT_RETRY_MAX_DELAY = 10.0 +immutable ExponentialBackOff + n::Int + first_delay::Float64 + max_delay::Float64 + factor::Float64 + jitter::Float64 + + function ExponentialBackOff(n, first_delay, max_delay, factor, jitter) + all(x->x>=0, (n, first_delay, max_delay, factor, jitter)) || error("all inputs must be non-negative") + new(n, first_delay, max_delay, factor, jitter) + end +end """ - retry(f, [retry_on]; n=1, max_delay=10.0) -> Function + ExponentialBackOff(; n=1, first_delay=0.05, max_delay=10.0, factor=5.0, jitter=0.1) -Returns a lambda that retries function `f` up to `n` times in the -event of an exception. If `retry_on` is a `Type` then retry only -for exceptions of that type. If `retry_on` is a function -`test_error(::Exception) -> Bool` then retry only if it is true. +A `Float64` iterator of length `n` whose elements exponentially increase at a +rate in the interval `factor` * (1 ± `jitter`). The first element is +`first_delay` and all elements are clamped to `max_delay`. +""" +ExponentialBackOff(; n=1, first_delay=0.05, max_delay=10.0, factor=5.0, jitter=0.1) = + ExponentialBackOff(n, first_delay, max_delay, factor, jitter) +start(ebo::ExponentialBackOff) = (ebo.n, min(ebo.first_delay, ebo.max_delay)) +function next(ebo::ExponentialBackOff, state) + next_n = state[1]-1 + curr_delay = state[2] + next_delay = min(ebo.max_delay, state[2] * ebo.factor * (1.0 - ebo.jitter + (rand() * 2.0 * ebo.jitter))) + (curr_delay, (next_n, next_delay)) +end +done(ebo::ExponentialBackOff, state) = state[1]<1 +length(ebo::ExponentialBackOff) = ebo.n -The first retry happens after a gap of 50 milliseconds or `max_delay`, -whichever is lower. Subsequently, the delays between retries are -exponentially increased with a random factor up to `max_delay`. +""" + retry(f::Function; delays=Base.ExponentialBackOff(), check=nothing) -> Function + +Returns an anonymous function that calls function `f`. If an exception arises, +`f` is repeatedly called again, each time `check` returns `true`, after waiting the +number of seconds specified in `delays`. `check` should input `delays`'s +current state and the `Exception`. -**Examples** +# Examples ```julia -retry(http_get, e -> e.status == "503")(url) -retry(read, UVError)(io) +retry(f, delays=fill(5.0, 3)) +retry(f, delays=rand(5:10, 2)) +retry(f, delays=Base.ExponentialBackOff(n=3, first_delay=5, max_delay=1000)) +retry(http_get, check=(s,e)->e.status == "503")(url) +retry(read, check=(s,e)->isa(e, UVError))(io) ``` """ -function retry(f::Function, retry_on::Function=DEFAULT_RETRY_ON; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) +function retry(f::Function; delays=ExponentialBackOff(), check=nothing) (args...) -> begin - delay = min(0.05, max_delay) - for i = 1:n+1 + state = start(delays) + while true try return f(args...) catch e - if i > n || try retry_on(e) end !== true - rethrow(e) + done(delays, state) && rethrow(e) + if check != nothing + state, retry_or_not = check(state, e) + retry_or_not || rethrow(e) end end - delay = min(max_delay, delay) - sleep(delay * (0.8 + (rand() * 0.2))) - delay = delay * 5 + (delay, state) = next(delays, state) + sleep(delay) end end end - -retry(f::Function, t::Type; kw...) = retry(f, e->isa(e, t); kw...) diff --git a/base/pmap.jl b/base/pmap.jl index 2e2c9920c4fb1f..876903740832a7 100644 --- a/base/pmap.jl +++ b/base/pmap.jl @@ -30,7 +30,7 @@ pgenerate(f, c) = pgenerate(default_worker_pool(), f, c) pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...)) """ - pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection + pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[]), retry_check=nothing) -> collection Transform collection `c` by applying `f` to each element using available workers and tasks. @@ -59,27 +59,27 @@ you can specify an error handling function via argument `on_error` which takes i the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value which is then returned inline with the results to the caller. -Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through -to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails, -all items in the batch are retried. +Failed computation can also be retried via `retry_delays`, `retry_check`, which +are passed through to `retry` as keyword arguments `delays` and `check`, +respectively. If batching is specified, and an entire batch fails, all items in +the batch are retried. The following are equivalent: * `pmap(f, c; distributed=false)` and `asyncmap(f,c)` -* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)` -* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)` +* `pmap(f, c; retry_delays=Base.ExponentialBackOff())` and `asyncmap(retry(remote(f)),c)` +* `pmap(f, c; retry_delays=Base.ExponentialBackOff(), on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)` """ function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing, - retry_n=0, - retry_max_delay=DEFAULT_RETRY_MAX_DELAY, - retry_on=DEFAULT_RETRY_ON, + retry_delays=[], + retry_check=nothing, # deprecated keyword args: err_retry=nothing, err_stop=nothing, pids=nothing) #15409 if err_retry !== nothing depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap) if err_retry == true - f = retry(f) + f = retry(f, delays=retry_delays, check=retry_check) end end if pids !== nothing @@ -110,8 +110,8 @@ function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_er f = remote(p, f) end - if retry_n > 0 - f = wrap_retry(f, retry_on, retry_n, retry_max_delay) + if length(retry_delays) > 0 + f = retry(f, delays=retry_delays, check=retry_check) end if on_error !== nothing f = wrap_on_error(f, on_error) @@ -122,18 +122,18 @@ function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_er # During batch processing, We need to ensure that if on_error is set, it is called # for each element in error, and that we return as many elements as the original list. # retry, if set, has to be called element wise and we will do a best-effort - # to ensure that we do not call mapped function on the same element more than retry_n. + # to ensure that we do not call mapped function on the same element more than length(retry_delays). # This guarantee is not possible in case of worker death / network errors, wherein # we will retry the entire batch on a new worker. - if (on_error !== nothing) || (retry_n > 0) + if (on_error !== nothing) || (length(retry_delays) > 0) f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true) end f = wrap_batch(f, p, on_error) results = asyncmap(f, c; ntasks=()->nworkers(p), batch_size=batch_size) # handle error processing.... - if (on_error !== nothing) || (retry_n > 0) - process_batch_errors!(p, f_orig, results, on_error, retry_on, retry_n, retry_max_delay) + if (on_error !== nothing) || (length(retry_delays) > 0) + process_batch_errors!(p, f_orig, results, on_error, retry_delays, retry_check) end return results @@ -158,8 +158,6 @@ function wrap_on_error(f, on_error; capture_data=false) end end -wrap_retry(f, retry_on, n, max_delay) = retry(f, retry_on; n=n, max_delay=max_delay) - function wrap_batch(f, p, on_error) f = asyncmap_batch(f) return batch -> begin @@ -177,9 +175,9 @@ end asyncmap_batch(f) = batch -> asyncmap(x->f(x...), batch) -function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay) +function process_batch_errors!(p, f, results, on_error, retry_delays, retry_check) # Handle all the ones in error in another pmap, with batch size set to 1 - if (on_error !== nothing) || (retry_n > 0) + if (on_error !== nothing) || (length(retry_delays) > 0) reprocess = [] for (idx, v) in enumerate(results) if isa(v, BatchProcessingError) @@ -190,13 +188,11 @@ function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry if length(reprocess) > 0 errors = [x[2] for x in reprocess] exceptions = [x.ex for x in errors] - if (retry_n > 0) && all([retry_on(ex) for ex in exceptions]) - retry_n = retry_n - 1 + state = start(retry_delays) + if (length(retry_delays) > 0) && + (retry_check==nothing || all([retry_check(state,ex)[2] for ex in exceptions])) error_processed = pmap(p, f, [x.data for x in errors]; - on_error=on_error, - retry_on=retry_on, - retry_n=retry_n, - retry_max_delay=retry_max_delay) + on_error = on_error, retry_delays = retry_delays[2:end], retry_check = retry_check) elseif on_error !== nothing error_processed = map(on_error, exceptions) else diff --git a/base/workerpool.jl b/base/workerpool.jl index 5dd46b069f012e..c6ffe3f757193d 100644 --- a/base/workerpool.jl +++ b/base/workerpool.jl @@ -208,7 +208,7 @@ end """ remote([::AbstractWorkerPool], f) -> Function -Returns a lambda that executes function `f` on an available worker +Returns an anonymous function that executes function `f` on an available worker using [`remotecall_fetch`](@ref). """ remote(f) = (args...; kwargs...)->remotecall_fetch(f, default_worker_pool(), args...; kwargs...) diff --git a/test/error.jl b/test/error.jl index 4871727dc13578..c079b95a625989 100644 --- a/test/error.jl +++ b/test/error.jl @@ -1,5 +1,14 @@ # This file is a part of Julia. License is MIT: http://julialang.org/license +@test length(Base.ExponentialBackOff(n=10)) == 10 +@test collect(Base.ExponentialBackOff(n=10, first_delay=0.01))[1] == 0.01 +@test maximum(Base.ExponentialBackOff(n=10, max_delay=0.06)) == 0.06 +ratio(x) = x[2:end]./x[1:end-1] +@test all(x->isapprox(x,10.0), ratio(collect( + Base.ExponentialBackOff(n=10, max_delay=Inf, factor=10, jitter=0.0)))) +srand(12345) +@test (mean(ratio(collect(Base.ExponentialBackOff(n=100, max_delay=Inf, factor=1, jitter=0.1)))) - 1.0) < 1e-4 + let function foo_error(c, n) c[1] += 1 @@ -16,42 +25,42 @@ let # Success on second attempt c = [0] - @test retry(foo_error;n=1)(c,1) == 7 + @test retry(foo_error)(c,1) == 7 @test c[1] == 2 # 2 failed retry attempts, so exception is raised c = [0] - ex = try retry(foo_error;n=2)(c,3) catch e; e end + ex = try retry(foo_error, delays=Base.ExponentialBackOff(n=2))(c,3) catch e; e end @test ex.msg == "foo" @test c[1] == 3 c = [0] - ex = try retry(foo_error, ErrorException)(c,2) catch e; e end + ex = try retry(foo_error, check=(s,e)->(s,isa(e, ErrorException)))(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" @test c[1] == 2 c = [0] - ex = try retry(foo_error, e->e.msg == "foo")(c,2) catch e; e end + ex = try retry(foo_error, check=(s,e)->(s,e.msg == "foo"))(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" @test c[1] == 2 # No retry if condition does not match c = [0] - ex = try retry(foo_error, e->e.msg == "bar"; n=3)(c,2) catch e; e end + ex = try retry(foo_error, check=(s,e)->(s,e.msg == "bar"))(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" @test c[1] == 1 c = [0] - ex = try retry(foo_error, e->e.http_status_code == "503")(c,2) catch e; e end + ex = try retry(foo_error, check=(s,e)->(s,try e.http_status_code == "503" end != true))(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" - @test c[1] == 1 + @test c[1] == 2 c = [0] - ex = try retry(foo_error, SystemError)(c,2) catch e; e end + ex = try retry(foo_error, check=(s,e)->(s,isa(e,SystemError)))(c,2) catch e; e end @test typeof(ex) == ErrorException @test ex.msg == "foo" @test c[1] == 1 diff --git a/test/parallel_exec.jl b/test/parallel_exec.jl index 5b71135236e303..cc849897d8a57e 100644 --- a/test/parallel_exec.jl +++ b/test/parallel_exec.jl @@ -683,9 +683,8 @@ pmap_args = [ (:distributed, [:default, false]), (:batch_size, [:default,2]), (:on_error, [:default, e -> unmangle_exception(e).msg == "foobar"]), - (:retry_on, [:default, e -> unmangle_exception(e).msg == "foobar"]), - (:retry_n, [:default, typemax(Int)-1]), - (:retry_max_delay, [0, 0.001]) + (:retry_delays, [:default, fill(0.01, 1000)]), + (:retry_check, [:default, (s,e) -> (s,unmangle_exception(e).msg == "foobar")]), ] kwdict = Dict() @@ -702,7 +701,7 @@ function walk_args(i) testw = kwdict[:distributed] === false ? [1] : workers() - if (kwdict[:on_error] === :default) && (kwdict[:retry_n] === :default) + if (kwdict[:on_error] === :default) && (kwdict[:retry_delays] === :default) mapf = x -> (x*2, myid()) results_test = pmap_res -> begin results = [x[1] for x in pmap_res] @@ -712,7 +711,7 @@ function walk_args(i) @test p in pids end end - elseif kwdict[:retry_n] !== :default + elseif kwdict[:retry_delays] !== :default mapf = x -> iseven(myid()) ? error("foobar") : (x*2, myid()) results_test = pmap_res -> begin results = [x[1] for x in pmap_res] @@ -726,7 +725,7 @@ function walk_args(i) end end end - else (kwdict[:on_error] !== :default) && (kwdict[:retry_n] === :default) + else (kwdict[:on_error] !== :default) && (kwdict[:retry_delays] === :default) mapf = x -> iseven(x) ? error("foobar") : (x*2, myid()) results_test = pmap_res -> begin w = testw