diff --git a/src/partr.c b/src/partr.c index 048a841158153..a57328a6aa5e4 100644 --- a/src/partr.c +++ b/src/partr.c @@ -31,6 +31,16 @@ static const int16_t sleeping = 1; // invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it. // information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue. // information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons. +// information: These observations require sequentially-consistent fences to be inserted between each of those operational phases. +// [^store_buffering_1]: These fences are used to avoid the cycle 2b -> 1a -> 1b -> 2a -> 2b where +// * Dequeuer: +// * 1a: `jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping)` +// * 1b: `multiq_check_empty` returns true +// * Enqueuer: +// * 2a: `multiq_insert` +// * 2b: `jl_atomic_load_relaxed(&ptls->sleep_check_state)` in `jl_wakeup_thread` returns `not_sleeping` +// i.e., the dequeuer misses the enqueue and enqueuer misses the sleep state transition. + JULIA_DEBUG_SLEEPWAKE( uint64_t wakeup_enter; @@ -348,16 +358,20 @@ static int sleep_check_after_threshold(uint64_t *start_cycles) } -static void wake_thread(int16_t tid) +static int wake_thread(int16_t tid) { jl_ptls_t other = jl_all_tls_states[tid]; int8_t state = sleeping; - jl_atomic_cmpswap(&other->sleep_check_state, &state, not_sleeping); - if (state == sleeping) { - uv_mutex_lock(&sleep_locks[tid]); - uv_cond_signal(&wake_signals[tid]); - uv_mutex_unlock(&sleep_locks[tid]); + + if (jl_atomic_load_relaxed(&other->sleep_check_state) == sleeping) { + if (jl_atomic_cmpswap_relaxed(&other->sleep_check_state, &state, not_sleeping)) { + uv_mutex_lock(&sleep_locks[tid]); + uv_cond_signal(&wake_signals[tid]); + uv_mutex_unlock(&sleep_locks[tid]); + return 1; + } } + return 0; } @@ -372,37 +386,48 @@ static void wake_libuv(void) JL_DLLEXPORT void jl_wakeup_thread(int16_t tid) { jl_task_t *ct = jl_current_task; - jl_ptls_t ptls = ct->ptls; - jl_task_t *uvlock = jl_atomic_load(&jl_uv_mutex.owner); int16_t self = jl_atomic_load_relaxed(&ct->tid); + if (tid != self) + jl_fence(); // [^store_buffering_1] + jl_task_t *uvlock = jl_atomic_load_relaxed(&jl_uv_mutex.owner); JULIA_DEBUG_SLEEPWAKE( wakeup_enter = cycleclock() ); if (tid == self || tid == -1) { // we're already awake, but make sure we'll exit uv_run + jl_ptls_t ptls = ct->ptls; if (jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping) - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); if (uvlock == ct) uv_stop(jl_global_event_loop()); } else { // something added to the sticky-queue: notify that thread - wake_thread(tid); - // check if we need to notify uv_run too - jl_task_t *system_tid = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task); - if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) == system_tid) - wake_libuv(); + if (wake_thread(tid)) { + // check if we need to notify uv_run too + jl_fence(); + jl_task_t *tid_task = jl_atomic_load_relaxed(&jl_all_tls_states[tid]->current_task); + // now that we have changed the thread to not-sleeping, ensure that + // either it has not yet acquired the libuv lock, or that it will + // observe the change of state to not_sleeping + if (uvlock != ct && jl_atomic_load_relaxed(&jl_uv_mutex.owner) == tid_task) + wake_libuv(); + } } // check if the other threads might be sleeping if (tid == -1) { // something added to the multi-queue: notify all threads // in the future, we might want to instead wake some fraction of threads, // and let each of those wake additional threads if they find work + int anysleep = 0; for (tid = 0; tid < jl_n_threads; tid++) { if (tid != self) - wake_thread(tid); + anysleep |= wake_thread(tid); } // check if we need to notify uv_run too - if (uvlock != ct && jl_atomic_load(&jl_uv_mutex.owner) != NULL) - wake_libuv(); + if (uvlock != ct && anysleep) { + jl_fence(); + if (jl_atomic_load_relaxed(&jl_uv_mutex.owner) != NULL) + wake_libuv(); + } } JULIA_DEBUG_SLEEPWAKE( wakeup_leave = cycleclock() ); } @@ -426,7 +451,9 @@ static int may_sleep(jl_ptls_t ptls) JL_NOTSAFEPOINT { // sleep_check_state is only transitioned from not_sleeping to sleeping // by the thread itself. As a result, if this returns false, it will - // continue returning false. If it returns true, there are no guarantees. + // continue returning false. If it returns true, we know the total + // modification order of the fences. + jl_fence(); // [^store_buffering_1] return jl_atomic_load_relaxed(&ptls->sleep_check_state) == sleeping; } @@ -452,18 +479,27 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) jl_cpu_pause(); jl_ptls_t ptls = ct->ptls; if (sleep_check_after_threshold(&start_cycles) || (!jl_atomic_load_relaxed(&_threadedregion) && ptls->tid == 0)) { - jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock - if (!multiq_check_empty()) { + // acquire sleep-check lock + jl_atomic_store_relaxed(&ptls->sleep_check_state, sleeping); + jl_fence(); // [^store_buffering_1] + if (!multiq_check_empty()) { // uses relaxed loads + if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + continue; + } + task = get_next_task(trypoptask, q); // note: this should not yield + if (ptls != ct->ptls) { + // sigh, a yield was detected, so let's go ahead and handle it anyway by starting over + ptls = ct->ptls; if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + if (task) + return task; continue; } - task = get_next_task(trypoptask, q); // WARNING: this should not yield - if (ptls != ct->ptls) - continue; // oops, get_next_task did yield--start over if (task) { if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us return task; } @@ -519,7 +555,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *trypoptask, jl_value_t *q) // thread 0 is the only thread permitted to run the event loop // so it needs to stay alive if (jl_atomic_load_relaxed(&ptls->sleep_check_state) != not_sleeping) - jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us + jl_atomic_store_relaxed(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us start_cycles = 0; continue; }