-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Replace use of Task.WaitAny for Task.WhenAny #73
Conversation
Will push other commit after CI shows red for failing test |
Travis red, but looks like there are some changes pending to it in another PR. |
Hey, thx for the PR. Regarding WaitAny vs WhenAny - I recall that some benchmarks show that the blocking call is faster. I think it depends on the scenario and something I've struggled to find a great solution for. On the one hand, it can be more efficient to have a few threads blocked, rather than incurring the cost of the Async machinery. However, if there are lots of computations like this, then more efficient to go the non-blocking route. |
Any insights into the various scenarios favoring one approach vs another for perf? I can take a deeper look and do some benchmark/profiling to see if we can come up with a single approach. |
Hey, just following up on this. I tested with the following workload: let N = 100L
let bufferSize = 100
let bufferTime = 1000
let P = 1000
let go n = async {
return!
AsyncSeq.init n id
|> AsyncSeq.mapAsync (fun i -> async {
do! Async.Sleep 0
return i })
|> AsyncSeq.bufferByCountAndTime2 bufferSize bufferTime
|> AsyncSeq.iter ignore
}
Seq.init P id
|> Seq.map (fun _ -> go N)
|> Async.Parallel
|> Async.RunSynchronously And your solution does quite a bit better, especially in cases where there is more parallelism. This is expected, since the current solution blocks, increasing contention. In the following, the first result is the existing solution and the second, your non-blocking solution:
I also tried an alternate implementation: static member chooseChoice (a:Async<'a>) (b:Async<'b>) : Async<Choice<'a * Async<'b>, 'b * Async<'a>>> = async {
let! ct = Async.CancellationToken
return!
Async.FromContinuations <| fun (ok,err,cnc) ->
let state = ref 0
let resA = TaskCompletionSource<_>()
let resB = TaskCompletionSource<_>()
let inline oka a =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
ok (Choice1Of2 (a, resB.Task |> Async.AwaitTask))
else
resA.SetResult a
let inline okb b =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
ok (Choice2Of2 (b, resA.Task |> Async.AwaitTask))
else
resB.SetResult b
let inline err (ex:exn) =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
err ex
let inline cnc ex =
if (Interlocked.CompareExchange(state, 1, 0) = 0) then
cnc ex
Async.startThreadPoolWithContinuations (a, oka, err, cnc, ct)
Async.startThreadPoolWithContinuations (b, okb, err, cnc, ct) } That actually does slightly better, but needs more testing with respect to how it handles cancellations. So, I'm tempted to merge this PR as is. |
Sweet, thanks for following up! |
Added tests for bufferByTime/bufferByCountAndTime (timing based tests, but should be little risk of false positives from them I think). Could probably be simpler, but this way worked out using the reported repro.
While I focused on those two for the tests since that's what the original issue reported (and bufferByCountAndTime was my use case too), I noticed chooseTasks/2 are being used in quite a few functions, so not sure if there should be more tests added.
Should fix #65