From 02b3f819b6b740ac57f7d533fc46f506532b2664 Mon Sep 17 00:00:00 2001 From: Ian Butterworth Date: Fri, 11 Feb 2022 21:17:45 -0500 Subject: [PATCH] Add a `:dynamic` scheduling option for `Threads.@threads` (#43919) Co-authored-by: Julian Samaroo Co-authored-by: Takafumi Arakaki <29282+tkf@users.noreply.github.com> Co-authored-by: Valentin Churavy --- NEWS.md | 3 ++ base/threadingconstructs.jl | 85 +++++++++++++++++++++++++++++++------ test/threads_exec.jl | 61 ++++++++++++++++++++++++-- 3 files changed, 132 insertions(+), 17 deletions(-) diff --git a/NEWS.md b/NEWS.md index d182f079dfd1f6..a1167283d0b63e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -73,6 +73,9 @@ Command-line option changes Multi-threading changes ----------------------- +* A new `:dynamic` schedule option for `Threads.@threads` which is similar to the default behavior except iterations + will be scheduled dynamically to available worker threads rather than pinned to each thread. This option is more + composable with (possibly nested) `@spawn` and `@threads` loops ([#43919]) Build system changes -------------------- diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 9dbbf8adad66a6..123fa62b120828 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -22,14 +22,14 @@ See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the """ nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint))) -function threading_run(func) +function threading_run(fun, static) ccall(:jl_enter_threaded_region, Cvoid, ()) n = nthreads() tasks = Vector{Task}(undef, n) for i = 1:n - t = Task(func) - t.sticky = true - ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1) + t = Task(() -> fun(i)) # pass in tid + t.sticky = static + static && ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1) tasks[i] = t schedule(t) end @@ -48,7 +48,7 @@ function _threadsfor(iter, lbody, schedule) quote local threadsfor_fun let range = $(esc(range)) - function threadsfor_fun(onethread=false) + function threadsfor_fun(tid=1; onethread=false) r = range # Load into local variable lenr = length(r) # divide loop iterations among threads @@ -56,7 +56,6 @@ function _threadsfor(iter, lbody, schedule) tid = 1 len, rem = lenr, 0 else - tid = threadid() len, rem = divrem(lenr, nthreads()) end # not enough iterations for all the threads? @@ -86,15 +85,17 @@ function _threadsfor(iter, lbody, schedule) end end end - if ccall(:jl_in_threaded_region, Cint, ()) != 0 + if $(schedule === :dynamic) + threading_run(threadsfor_fun, false) + elseif ccall(:jl_in_threaded_region, Cint, ()) != 0 $(if schedule === :static :(error("`@threads :static` cannot be used concurrently or nested")) else # only use threads when called from outside @threads - :(threadsfor_fun(true)) + :(threadsfor_fun(onethread = true)) end) else - threading_run(threadsfor_fun) + threading_run(threadsfor_fun, true) end nothing end @@ -110,15 +111,73 @@ A barrier is placed at the end of the loop which waits for all tasks to finish execution. The `schedule` argument can be used to request a particular scheduling policy. -The only currently supported value is `:static`, which creates one task per thread -and divides the iterations equally among them. Specifying `:static` is an error -if used from inside another `@threads` loop or from a thread other than 1. + +Except for `:static` scheduling, how the iterations are assigned to tasks, and how the tasks +are assigned to the worker threads is undefined. The exact assignments can be different +for each execution. The scheduling option is a hint. The loop body code (including any code +transitively called from it) must not make assumptions about the distribution of iterations +to tasks or the worker thread in which they are executed. The loop body for each iteration +must be able to make forward progress independent of other iterations and be free from data +races. As such, synchronizations across iterations may deadlock. + +For example, the above conditions imply that: + +- The lock taken in an iteration *must* be released within the same iteration. +- Communicating between iterations using blocking primitives like `Channel`s is incorrect. +- Write only to locations not shared across iterations (unless a lock or atomic operation is used). + + +Schedule options are: +- `:static` creates one task per thread and divides the iterations equally among + them, assigning each task specifically to each thread. + Specifying `:static` is an error if used from inside another `@threads` loop + or from a thread other than 1. +- `:dynamic` will schedule iterations dynamically to available worker threads, + assuming that the workload for each iteration is uniform. + +Without the scheduler argument, the exact scheduling is unspecified; i.e. it may be +different across Julia releases. Currently, the behavior is dependent on the calling thread. +The default is `:static` when called from thread 1. The loop will be executed without threading +when called from other threads. The default schedule (used when no `schedule` argument is present) is subject to change. +For example, an illustration of the different scheduling strategies where `busywait` +is a non-yielding timed loop that runs for a number of seconds. + +```julia-repl +julia> function busywait(seconds) + tstart = time_ns() + while (time_ns() - tstart) / 1e9 < seconds + end + end + +julia> @time begin + Threads.@spawn busywait(5) + Threads.@threads :static for i in 1:Threads.nthreads() + busywait(1) + end + end +6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time) + +julia> @time begin + Threads.@spawn busywait(5) + Threads.@threads :dynamic for i in 1:Threads.nthreads() + busywait(1) + end + end +2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time) +``` + +The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able +to run two of the 1-second iterations to complete the for loop. + !!! compat "Julia 1.5" The `schedule` argument is available as of Julia 1.5. +!!! compat "Julia 1.8" + The `:dynamic` option for the `schedule` argument is available as of Julia 1.8. + See also: [`@spawn`](@ref Threads.@spawn), [`nthreads()`](@ref Threads.nthreads), [`threadid()`](@ref Threads.threadid), `pmap` in [`Distributed`](@ref man-distributed), `BLAS.set_num_threads` in [`LinearAlgebra`](@ref man-linalg). @@ -133,7 +192,7 @@ macro threads(args...) # for now only allow quoted symbols sched = nothing end - if sched !== :static + if sched !== :static && sched !== :dynamic throw(ArgumentError("unsupported schedule argument in @threads")) end elseif na == 1 diff --git a/test/threads_exec.jl b/test/threads_exec.jl index 5057fd5843d699..7b03f48f5eec6d 100644 --- a/test/threads_exec.jl +++ b/test/threads_exec.jl @@ -722,15 +722,68 @@ let a = zeros(nthreads()) end # static schedule -function _atthreads_static_schedule() +function _atthreads_static_schedule(n) + ids = zeros(Int, n) + Threads.@threads :static for i = 1:n + ids[i] = Threads.threadid() + end + return ids +end +@test _atthreads_static_schedule(nthreads()) == 1:nthreads() +@test _atthreads_static_schedule(1) == [1;] +@test_throws( + "`@threads :static` cannot be used concurrently or nested", + @threads(for i = 1:1; _atthreads_static_schedule(nthreads()); end), +) + +# dynamic schedule +function _atthreads_dynamic_schedule(n) + inc = Threads.Atomic{Int}(0) + flags = zeros(Int, n) + Threads.@threads :dynamic for i = 1:n + Threads.atomic_add!(inc, 1) + flags[i] = 1 + end + return inc[], flags +end +@test _atthreads_dynamic_schedule(nthreads()) == (nthreads(), ones(nthreads())) +@test _atthreads_dynamic_schedule(1) == (1, ones(1)) +@test _atthreads_dynamic_schedule(10) == (10, ones(10)) +@test _atthreads_dynamic_schedule(nthreads() * 2) == (nthreads() * 2, ones(nthreads() * 2)) + +# nested dynamic schedule +function _atthreads_dynamic_dynamic_schedule() + inc = Threads.Atomic{Int}(0) + Threads.@threads :dynamic for _ = 1:nthreads() + Threads.@threads :dynamic for _ = 1:nthreads() + Threads.atomic_add!(inc, 1) + end + end + return inc[] +end +@test _atthreads_dynamic_dynamic_schedule() == nthreads() * nthreads() + +function _atthreads_static_dynamic_schedule() ids = zeros(Int, nthreads()) + inc = Threads.Atomic{Int}(0) Threads.@threads :static for i = 1:nthreads() ids[i] = Threads.threadid() + Threads.@threads :dynamic for _ = 1:nthreads() + Threads.atomic_add!(inc, 1) + end end - return ids + return ids, inc[] +end +@test _atthreads_static_dynamic_schedule() == (1:nthreads(), nthreads() * nthreads()) + +# errors inside @threads :dynamic +function _atthreads_dynamic_with_error(a) + Threads.@threads :dynamic for i in eachindex(a) + error("user error in the loop body") + end + a end -@test _atthreads_static_schedule() == [1:nthreads();] -@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end +@test_throws "user error in the loop body" _atthreads_dynamic_with_error(zeros(nthreads())) try @macroexpand @threads(for i = 1:10, j = 1:10; end)