Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repository = "https://github.com/NthTensor/Forte"

[workspace]
resolver = "2"
members = ["ci", "rayon-compat"]
members = ["ci"]

[dependencies]
arraydeque = "0.5.1"
Expand Down
13 changes: 0 additions & 13 deletions rayon-compat/Cargo.toml

This file was deleted.

14 changes: 0 additions & 14 deletions rayon-compat/README.md

This file was deleted.

186 changes: 0 additions & 186 deletions rayon-compat/src/lib.rs

This file was deleted.

61 changes: 59 additions & 2 deletions src/latch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ const ASLEEP: u32 = 0b10;
// -----------------------------------------------------------------------------
// Latch

/// A [Latch] is a signaling mechanism used to indicate when an event has
/// A Latch is a signaling mechanism used to indicate when an event has
/// occurred. The latch begins as *unset* (In the `LOCKED` state), and can later
/// be *set* by any thread (entering the *SIGNAL*) state.
/// be *set* by any thread (entering the `SIGNAL`) state.
///
/// Each latch is associated with one *owner thread*. This is the thread that
/// may be blocking, waiting for the latch to complete.
Expand Down Expand Up @@ -62,10 +62,21 @@ impl Latch {
/// Returns true if the latch signal was received, and false otherwise.
#[inline(always)]
pub fn wait(&self) -> bool {
// First, check if the latch has been set.
//
// In the event of a race with `set`:
// + If this happens before the store, then we will go to sleep.
// + If this happens after the store, then we notice and return.
if self.state.load(Ordering::Relaxed) == SIGNAL {
return true;
}
// If it has not been set, go to sleep.
//
// In the event of a race with `set`, the `wake` will always cause this
// to return regardless of memory ordering.
let slept = self.sleep_controller.sleep();
// If we actually slept, check the status again to see if it has
// changed. Otherwise assume it hasn't.
if slept {
self.state.load(Ordering::Relaxed) == SIGNAL
} else {
Expand All @@ -86,8 +97,16 @@ impl Latch {
pub unsafe fn set(latch: *const Latch) {
// SAFETY: At this point, the latch must still be valid to dereference.
let sleep_controller = unsafe { (*latch).sleep_controller };
// First we set the state to true.
//
// In the event of a race with `wait`, this may cause `wait` to return.
// Otherwise the other thread will sleep within `wait.
//
// SAFETY: At this point, the latch must still be valid to dereference.
unsafe { (*latch).state.store(SIGNAL, Ordering::Relaxed) };
// We must try to wake the other thread, just in case it missed the
// notification and went to sleep. This garentees that the other thread
// will make progress.
sleep_controller.wake();
}

Expand Down Expand Up @@ -120,21 +139,59 @@ impl Default for SleepController {
}

impl SleepController {
// Attempt to wake the thread to which this belongs.
//
// Returns true if this allows the thread to make progress (by waking it up
// or catching it before it goes to sleep) and false if the thread was
// running.
pub fn wake(&self) -> bool {
// Set set the state to SIGNAL and read the current state, which must be
// either LOCKED or ASLEEP.
let sleep_state = self.state.swap(SIGNAL, Ordering::Relaxed);
let asleep = sleep_state == ASLEEP;
if asleep {
// If the state was ASLEEP, the thread is either asleep or about to
// go to sleep.
//
// + If it is about to go to sleep (but has not yet called
// `atomic_wait::wait`) then setting the state to SIGNAL above
// should prevent it from going to sleep.
//
// + If it is already waiting, the following notification will wake
// it up.
//
// Either way, after this call the other thread must make progress.
atomic_wait::wake_one(&self.state);
}
asleep
}

// Attempt to send the thread to sleep. This should only be called on a
// single thread, and we say that this controller "belongs" to that thread.
//
// Returns true if this thread makes a syscall to suspend the thread, and
// false if the thread was already woken (letting us skip the syscall).
pub fn sleep(&self) -> bool {
// Set the state to ASLEEP and read the current state, which must be
// either LOCKED or SIGNAL.
let state = self.state.swap(ASLEEP, Ordering::Relaxed);
// If the state is LOCKED, then we have not yet received a signal, and
// we should try to put the thread to sleep. Otherwise we should return
// early.
let sleep = state == LOCKED;
if sleep {
// If we have received a signal since entering the sleep state
// (meaning the state is not longer set to ASLEEP) then this will
// return emediately.
//
// If the state is still ASLEEP, then the next call to `wake` will
// register that and call `wake_on`.
//
// Either way, there is no way we can fail to receive a `wake`.
atomic_wait::wait(&self.state, ASLEEP);
}
// Set the state back to LOCKED so that we are ready to receive new
// signals.
self.state.store(LOCKED, Ordering::Relaxed);
sleep
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ mod util;
// Top-level exports

pub use scope::Scope;
pub use scope::ScopedSpawn;
pub use thread_pool::Spawn;
pub use thread_pool::ThreadPool;
pub use thread_pool::Worker;
pub use thread_pool::Yield;
pub use thread_pool::block_on;
pub use thread_pool::join;
pub use thread_pool::scope;
pub use thread_pool::spawn;
pub use thread_pool::spawn_async;
pub use thread_pool::spawn_future;

// -----------------------------------------------------------------------------
// Platform Support
Expand Down
Loading
Loading