Skip to content

Commit

Permalink
scheduler: use explicit memory fences
Browse files Browse the repository at this point in the history
These were previously implied (on TSO platforms, such as x86) by the
atomic_store to sleeping and the sleep_locks acquire before the
wake_check loop, but this makes it more explicit. We might want to
consider in the future if it would be better (faster) to acquire each
possible lock on the sleeping path instead, so that we do each operation
with seq_cst, instead of using a fence to only order the operations we
care about directly.
  • Loading branch information
vtjnash committed Jan 26, 2022
1 parent 9c7cfa9 commit 6df6105
Showing 1 changed file with 62 additions and 26 deletions.
88 changes: 62 additions & 26 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ static const int16_t sleeping = 1;
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
// information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue.
// information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons.
// information: These observations require sequentially-consistent fences to be inserted between each of those operational phases.
// [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where
// * Dequeuer:
// * 1a: `jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping)`
// * 1b: `multiq_check_empty` returns true
// * Enqueuer:
// * 2a: `multiq_insert`
// * 2b: `jl_atomic_load_relaxed(&ptls->sleep_check_state)` in `jl_wakeup_thread` returns `not_sleeping`
// i.e., the dequeuer misses the enqueue and enqueuer misses the sleep state transition.


JULIA_DEBUG_SLEEPWAKE(
uint64_t wakeup_enter;
Expand Down Expand Up @@ -348,16 +358,20 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
}


static void wake_thread(int16_t tid)
static int wake_thread(int16_t tid)
{
jl_ptls_t other = jl_all_tls_states[tid];
int8_t state = sleeping;
jl_atomic_cmpswap(&other->sleep_check_state, &state, not_sleeping);
if (state == sleeping) {
uv_mutex_lock(&sleep_locks[tid]);
uv_cond_signal(&wake_signals[tid]);
uv_mutex_unlock(&sleep_locks[tid]);

if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) {
if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) {
uv_mutex_lock(&sleep_locks[tid]);
uv_cond_signal(&wake_signals[tid]);
uv_mutex_unlock(&sleep_locks[tid]);
return 1;
}
}
return 0;
}


Expand All @@ -372,37 +386,48 @@ static void wake_libuv(void)
JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
{
jl_task_t *ct = jl_current_task;
jl_ptls_t ptls = ct->ptls;
jl_task_t *uvlock = jl_atomic_load(&jl_uv_mutex.owner);
int16_t self = jl_atomic_load_relaxed(&ct->tid);
if (tid != self)
jl_fence(); // [^store_buffering_1]
jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner);
JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() );
if (tid == self || tid == -1) {
// we're already awake, but make sure we'll exit uv_run
jl_ptls_t ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping);
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping);
if (uvlock == ct)
uv_stop(jl_global_event_loop());
}
else {
// something added to the sticky-queue: notify that thread
wake_thread(tid);
// check if we need to notify uv_run too
jl_task_t *system_tid = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task);
if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) == system_tid)
wake_libuv();
if (wake_thread(tid)) {
// check if we need to notify uv_run too
jl_fence();
jl_task_t *tid_task = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task);
// now that we have changed the thread to not-sleeping, ensure that
// either it has not yet acquired the libuv lock, or that it will
// observe the change of state to not_sleeping
if (uvlock != ct && jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task)
wake_libuv();
}
}
// check if the other threads might be sleeping
if (tid == -1) {
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
int anysleep = 0;
for (tid = 0; tid < jl_n_threads; tid++) {
if (tid != self)
wake_thread(tid);
anysleep |= wake_thread(tid);
}
// check if we need to notify uv_run too
if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) != NULL)
wake_libuv();
if (uvlock != ct && anysleep) {
jl_fence();
if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL)
wake_libuv();
}
}
JULIA_DEBUG_SLEEPWAKE( wakeup_leave = cycleclock() );
}
Expand All @@ -426,7 +451,9 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT
{
// sleep_check_state is only transitioned from not_sleeping to sleeping
// by the thread itself. As a result, if this returns false, it will
// continue returning false. If it returns true, there are no guarantees.
// continue returning false. If it returns true, we know the total
// modification order of the fences.
jl_fence(); // [^store_buffering_1]
return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping;
}

Expand All @@ -452,18 +479,27 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
jl_cpu_pause();
jl_ptls_t ptls = ct->ptls;
if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) {
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
if (!multiq_check_empty()) {
// acquire sleep-check lock
jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping);
jl_fence(); // [^store_buffering_1]
if (!multiq_check_empty()) { // uses relaxed loads
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
continue;
}
task = get_next_task(trypoptask, q); // note: this should not yield
if (ptls != ct->ptls) {
// sigh, a yield was detected, so let's go ahead and handle it anyway by starting over
ptls = ct->ptls;
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
if (task)
return task;
continue;
}
task = get_next_task(trypoptask, q); // WARNING: this should not yield
if (ptls != ct->ptls)
continue; // oops, get_next_task did yield--start over
if (task) {
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
return task;
}

Expand Down Expand Up @@ -519,7 +555,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q)
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive
if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
continue;
}
Expand Down

0 comments on commit 6df6105

Please sign in to comment.