From 5c90f8b7962b5a6e73e98803cf29e7d229e7b681 Mon Sep 17 00:00:00 2001 From: NthTensor Date: Sun, 20 Jul 2025 16:13:24 -0400 Subject: [PATCH] feat: add advanced worker sleeping --- Cargo.lock | 268 ++++++++++++++++++++++++++ Cargo.toml | 8 + benches/bevy_tasks.rs | 110 +++++++++++ benches/fork_join.rs | 6 +- ci/src/commands/clippy.rs | 5 +- ci/src/commands/doc_check.rs | 18 +- ci/src/commands/loom.rs | 23 --- src/blocker.rs | 100 ---------- src/job.rs | 100 ++++++---- src/latch.rs | 173 +++++++++++++++++ src/lib.rs | 5 +- src/scope.rs | 22 ++- src/signal.rs | 244 ------------------------ src/thread_pool.rs | 359 ++++++++++++++++++++--------------- src/unwind.rs | 2 +- src/util.rs | 46 +++++ 16 files changed, 905 insertions(+), 584 deletions(-) create mode 100644 benches/bevy_tasks.rs delete mode 100644 ci/src/commands/loom.rs delete mode 100644 src/blocker.rs create mode 100644 src/latch.rs delete mode 100644 src/signal.rs create mode 100644 src/util.rs diff --git a/Cargo.lock b/Cargo.lock index 536aefd..3efbd92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,17 +55,52 @@ dependencies = [ "serde", ] +[[package]] +name = "arraydeque" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" + [[package]] name = "assoc" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfdc70193dadb9d7287fa4b633f15f90c876915b31f6af17da307fc59c9859a8" +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb812ffb58524bdd10860d7d974e2f01cc0950c2438a74ee5ec2e2280c6c4ffa" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "pin-project-lite", + "slab", +] + [[package]] name = "async-task" version = "4.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" +dependencies = [ + "portable-atomic", +] [[package]] name = "atomic-wait" @@ -77,12 +112,56 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +dependencies = [ + "portable-atomic", +] + [[package]] name = "autocfg" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "bevy_platform" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7573dc824a1b08b4c93fdbe421c53e1e8188e9ca1dd74a414455fe571facb47" +dependencies = [ + "cfg-if", + "critical-section", + "foldhash", + "hashbrown", + "portable-atomic", + "portable-atomic-util", + "serde", + "spin", +] + +[[package]] +name = "bevy_tasks" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b674242641cab680688fc3b850243b351c1af49d4f3417a576debd6cca8dcf5" +dependencies = [ + "async-channel", + "async-executor", + "async-task", + "atomic-waker", + "bevy_platform", + "cfg-if", + "concurrent-queue", + "crossbeam-queue", + "derive_more", + "futures-lite", + "heapless", +] + [[package]] name = "bitflags" version = "2.9.1" @@ -107,6 +186,12 @@ version = "3.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "793db76d6187cd04dff33004d8e6c9cc4e05cd330500379d2394209271b4aeee" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "cast" version = "0.3.0" @@ -196,6 +281,15 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "condtype" version = "1.3.0" @@ -238,6 +332,12 @@ dependencies = [ "itertools", ] +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -257,6 +357,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -269,6 +378,26 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" +[[package]] +name = "derive_more" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a9b99b9cbbe49445b21764dc0625032a89b145a2642e67603e1c936f5458d05" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "divan" version = "0.1.21" @@ -300,6 +429,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.12" @@ -310,14 +445,51 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "forte" version = "1.0.0-dev" dependencies = [ + "arraydeque", "async-task", "atomic-wait", + "bevy_tasks", "chili", "criterion", + "crossbeam-queue", + "crossbeam-utils", "divan", "rayon", "shuttle", @@ -338,6 +510,31 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-lite" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "generator" version = "0.8.5" @@ -373,6 +570,35 @@ dependencies = [ "crunchy", ] +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + +[[package]] +name = "hashbrown" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" +dependencies = [ + "equivalent", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "portable-atomic", + "stable_deref_trait", +] + [[package]] name = "hermit-abi" version = "0.5.2" @@ -494,6 +720,12 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -528,6 +760,21 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -768,12 +1015,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "slab" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04dc19736151f35336d325007ac991178d504a119863a2fcb3758cdb5e52c50d" + [[package]] name = "smallvec" version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "portable-atomic", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "syn" version = "2.0.103" diff --git a/Cargo.toml b/Cargo.toml index 5385bca..583149d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,11 @@ resolver = "2" members = ["ci", "rayon-compat"] [dependencies] +arraydeque = "0.5.1" async-task = "4.7.1" atomic-wait = "1.1.0" +crossbeam-queue = "0.3.12" +crossbeam-utils = "0.8.21" shuttle = { version = "0.8.0", optional = true } tracing = { version = "0.1.41", features = ["release_max_level_off"] } tracing-subscriber = "0.3.19" @@ -22,6 +25,7 @@ tracing-subscriber = "0.3.19" divan = "0.1.17" rayon = "1.10.0" chili = "0.2" +bevy_tasks = { version = "0.16.1", features = [ "multi_threaded" ] } # Used for A/B perf testing criterion = { version = "0.5" } @@ -71,3 +75,7 @@ path = "tests/tests.rs" [[bench]] name = "fork_join" harness = false + +[[bench]] +name = "bevy_tasks" +harness = false diff --git a/benches/bevy_tasks.rs b/benches/bevy_tasks.rs new file mode 100644 index 0000000..87ad01e --- /dev/null +++ b/benches/bevy_tasks.rs @@ -0,0 +1,110 @@ +//! Comparative benchmarks against bevy_tasks + +struct BevyParChunks<'a, T>(core::slice::Chunks<'a, T>); +impl<'a, T> bevy_tasks::ParallelIterator> for BevyParChunks<'a, T> +where + T: 'a + Send + Sync, +{ + fn next_batch(&mut self) -> Option> { + self.0.next().map(|s| s.iter()) + } +} + +struct BevyParChunksMut<'a, T>(core::slice::ChunksMut<'a, T>); +impl<'a, T> bevy_tasks::ParallelIterator> for BevyParChunksMut<'a, T> +where + T: 'a + Send + Sync, +{ + fn next_batch(&mut self) -> Option> { + self.0.next().map(|s| s.iter_mut()) + } +} + +static THREAD_POOL: forte::ThreadPool = forte::ThreadPool::new(); + +fn forte_chunks(worker: &forte::Worker, data: &mut [T], func: &F) +where + T: Send + Sync, + F: Fn(&mut [T]) + Send + Sync, +{ + if data.len() <= CHUNK_SIZE { + func(data); + } else { + let split_index = data.len() / 2; + let (left, right) = data.split_at_mut(split_index); + worker.join( + |worker| forte_chunks::(worker, left, func), + |worker| forte_chunks::(worker, right, func), + ); + } +} + +#[divan::bench_group] +mod overhead { + + use divan::prelude::*; + + const LEN: &[usize] = &[100, 1000, 10_000, 100_000, 1_000_000, 10_000_000]; + + fn work(value: &mut usize) { + for i in 0..80 { + black_box(i); + } + // std::thread::sleep(Duration::from_nanos(100)); + black_box(value); + } + + #[divan::bench(args = LEN)] + fn serial(bencher: Bencher, len: usize) { + let mut vec: Vec<_> = (0..len).collect(); + bencher.bench_local(|| vec.iter_mut().for_each(work)); + } + + #[divan::bench(args = LEN)] + fn bevy_tasks(bencher: Bencher, len: usize) { + use crate::BevyParChunksMut; + use bevy_tasks::ParallelIterator; + + let mut vec: Vec<_> = (0..len).collect(); + let pool = bevy_tasks::TaskPoolBuilder::new() + .thread_name("bevy_tasks".to_string()) + .build(); + bencher.bench_local(|| { + BevyParChunksMut(vec.chunks_mut(100)).for_each(&pool, work); + }); + } + + #[divan::bench(args = LEN)] + fn rayon(bencher: Bencher, len: usize) { + use rayon::iter::ParallelIterator; + use rayon::slice::ParallelSliceMut; + + let mut vec: Vec<_> = (0..len).collect(); + bencher.bench_local(|| { + vec.par_chunks_mut(100) + .for_each(|c| c.iter_mut().for_each(work)) + }); + } + + #[divan::bench(args = LEN)] + fn forte(bencher: Bencher, len: usize) { + use crate::THREAD_POOL; + use crate::forte_chunks; + + let mut vec: Vec<_> = (0..len).collect(); + + THREAD_POOL.resize_to_available(); + + bencher.bench_local(|| { + THREAD_POOL.with_worker(|worker| { + forte_chunks::<100, _, _>(worker, &mut vec, &|c| { + c.iter_mut().for_each(work); + }); + }) + }); + } +} + +fn main() { + divan::main(); +} diff --git a/benches/fork_join.rs b/benches/fork_join.rs index 1d823ab..286554a 100644 --- a/benches/fork_join.rs +++ b/benches/fork_join.rs @@ -1,9 +1,9 @@ //! A benchmark for fork-join workloads adapted from `chili`. use chili::Scope; +use criterion::black_box; use divan::Bencher; use forte::Worker; -use tracing::debug; use tracing::info; use tracing_subscriber::fmt; use tracing_subscriber::layer::SubscriberExt; @@ -33,7 +33,7 @@ impl Node { // Returns an iterator over the number of layers. Also returns the total number // of nodes. const LAYERS: &[usize] = &[ - 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, + 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, // 10, 24, 27, ]; fn nodes() -> impl Iterator { @@ -80,12 +80,10 @@ fn forte(bencher: Bencher, nodes: (usize, usize)) { let (left, right) = worker.join( |w| { let sum = node.left.as_deref().map(|n| sum(n, w)).unwrap_or_default(); - debug!("sum: {}", sum); sum }, |w| { let sum = node.right.as_deref().map(|n| sum(n, w)).unwrap_or_default(); - debug!("sum: {}", sum); sum }, ); diff --git a/ci/src/commands/clippy.rs b/ci/src/commands/clippy.rs index 7f0ebac..eef88a2 100644 --- a/ci/src/commands/clippy.rs +++ b/ci/src/commands/clippy.rs @@ -13,10 +13,7 @@ pub struct ClippyCommand {} impl Prepare for ClippyCommand { fn prepare<'a>(&self, sh: &'a xshell::Shell, _flags: Flag) -> Vec> { vec![PreparedCommand::new::( - cmd!( - sh, - "cargo clippy --workspace --all-targets --all-features -- -Dwarnings" - ), + cmd!(sh, "cargo clippy --workspace -- -Dwarnings"), "Please fix clippy errors in output above.", )] } diff --git a/ci/src/commands/doc_check.rs b/ci/src/commands/doc_check.rs index 3880f98..f2a7d59 100644 --- a/ci/src/commands/doc_check.rs +++ b/ci/src/commands/doc_check.rs @@ -12,13 +12,15 @@ pub struct DocCheckCommand {} impl Prepare for DocCheckCommand { fn prepare<'a>(&self, sh: &'a xshell::Shell, _flags: Flag) -> Vec> { - vec![PreparedCommand::new::( - cmd!( - sh, - "cargo doc --workspace --all-features --no-deps --document-private-items --keep-going" - ), - "Please fix doc warnings in output above.", - ) - .with_env_var("RUSTDOCFLAGS", "-D warnings")] + vec![ + PreparedCommand::new::( + cmd!( + sh, + "cargo doc --workspace --no-deps --document-private-items --keep-going" + ), + "Please fix doc warnings in output above.", + ) + .with_env_var("RUSTDOCFLAGS", "-D warnings"), + ] } } diff --git a/ci/src/commands/loom.rs b/ci/src/commands/loom.rs deleted file mode 100644 index 46fadc5..0000000 --- a/ci/src/commands/loom.rs +++ /dev/null @@ -1,23 +0,0 @@ -use argh::FromArgs; - -use crate::Flag; -use crate::Prepare; -use crate::PreparedCommand; -use crate::commands::LoomCheckCommand; -use crate::commands::LoomClippyCommand; -use crate::commands::LoomTestCommand; - -/// Alias for running the `loom-check`, `loom-clippy` and `loom-test` subcommands. -#[derive(FromArgs, Default)] -#[argh(subcommand, name = "loom")] -pub struct LoomCommand {} - -impl Prepare for LoomCommand { - fn prepare<'a>(&self, sh: &'a xshell::Shell, flags: Flag) -> Vec> { - let mut commands = vec![]; - commands.append(&mut LoomCheckCommand::default().prepare(sh, flags)); - commands.append(&mut LoomClippyCommand::default().prepare(sh, flags)); - commands.append(&mut LoomTestCommand::default().prepare(sh, flags)); - commands - } -} diff --git a/src/blocker.rs b/src/blocker.rs deleted file mode 100644 index b9fbaa5..0000000 --- a/src/blocker.rs +++ /dev/null @@ -1,100 +0,0 @@ -//! Async blocking utilities. - -use core::task::RawWaker; -use core::task::RawWakerVTable; -use core::task::Waker; - -use crate::platform::*; - -// ----------------------------------------------------------------------------- -// States - -/// The blocker is not sleeping and has not been woken. -const IDLE: u32 = 0; - -// The blocker is sleeping or is about to go to sleep. -const WAIT: u32 = 1; - -// The blocker has been woken at least once since the last time it slept. -const WAKE: u32 = 2; - -// ----------------------------------------------------------------------------- -// Blocker - -/// A blocker lets you block a thread on the progress of a future. -pub struct Blocker { - /// The state of a blocker. - state: AtomicU32, - // When using shuttle, futex is not available so we must fall back to a mutex and condvar - #[cfg(feature = "shuttle")] - mutex: Mutex<()>, - #[cfg(feature = "shuttle")] - condvar: Condvar, -} - -impl Blocker { - /// Creates a new blocker. - pub fn new() -> Self { - Self { - state: AtomicU32::new(IDLE), - #[cfg(feature = "shuttle")] - mutex: Mutex::new(()), - #[cfg(feature = "shuttle")] - condvar: Condvar::new(), - } - } - - /// Creates an async waker from a signal. This can be used to schedule a - /// signal when a future completes. - /// - /// # Safety - /// - /// The blocker must outlive the waker. - pub unsafe fn as_waker(&self) -> Waker { - let this: *const Self = self; - let raw_waker = RawWaker::new(this.cast::<()>(), &RAW_WAKER_VTABLE); - // SAFETY: The RawWakerVTable api contract is upheald and these - // functions are all thread-safe. - unsafe { Waker::from_raw(raw_waker) } - } - - /// Returns true if calling `block` would have blocked the thread. - #[inline] - pub fn would_block(&self) -> bool { - self.state.load(Ordering::Relaxed) != WAKE - } - - // Blocks the thread until the future makes progress. - #[inline] - pub fn block(&self) { - let state = self.state.swap(WAIT, Ordering::Relaxed); - if state != WAKE { - #[cfg(not(feature = "shuttle"))] - atomic_wait::wait(&self.state, state); - #[cfg(feature = "shuttle")] - let _ = self.condvar.wait(self.mutex.lock().unwrap()); - } - self.state.store(IDLE, Ordering::Relaxed); - } -} - -const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - #[inline(always)] - |ptr| RawWaker::new(ptr, &RAW_WAKER_VTABLE), - wake, - wake, - |_| {}, -); - -fn wake(this: *const ()) { - // SAFETY: This was constructed to be non-null, and the blocker must outlive - // the waker, and we do not ever access blockers mutably, so it must be - // valid to convert this into an immutable reference. - let blocker = unsafe { &*this.cast::() }; - if blocker.state.swap(WAKE, Ordering::Relaxed) == WAIT { - #[cfg(not(feature = "shuttle"))] - atomic_wait::wake_all(&blocker.state); - #[cfg(feature = "shuttle")] - blocker.condvar.notify_all(); - } -} diff --git a/src/job.rs b/src/job.rs index 854d3f2..9aba42f 100644 --- a/src/job.rs +++ b/src/job.rs @@ -12,13 +12,14 @@ //! (c) Each job reference is executed exactly once. use alloc::boxed::Box; -use alloc::collections::VecDeque; +use arraydeque::ArrayDeque; use core::cell::UnsafeCell; -use core::mem::ManuallyDrop; +use core::mem::{ManuallyDrop, MaybeUninit}; use core::ptr::NonNull; +use core::sync::atomic::{Ordering, fence}; use std::thread::Result as ThreadResult; -use crate::signal::Signal; +use crate::latch::Latch; use crate::thread_pool::Worker; use crate::unwind; @@ -72,7 +73,7 @@ impl JobRef { /// The caller must ensure that `job_pointer` remains valid to pass to /// `execute_fn` until the job is executed. What exactly this means is /// dependent on the implementation of the execute function. - #[inline] + #[inline(always)] pub unsafe fn new_raw( job_pointer: NonNull<()>, execute_fn: unsafe fn(NonNull<()>, &Worker), @@ -85,13 +86,13 @@ impl JobRef { /// Returns an opaque handle that can be saved and compared, without making /// `JobRef` itself `Copy + Eq`. - #[inline] + #[inline(always)] pub fn id(&self) -> impl Eq + use<> { (self.job_pointer, self.execute_fn) } /// Executes the `JobRef` by passing the execute function on the job pointer. - #[inline] + #[inline(always)] pub fn execute(self, worker: &Worker) { // SAFETY: The constructor of `JobRef` is required to ensure this is valid. unsafe { (self.execute_fn)(self.job_pointer, worker) } @@ -105,23 +106,27 @@ unsafe impl Send for JobRef {} // Job queue pub struct JobQueue { - job_refs: UnsafeCell>, + job_refs: UnsafeCell>, } impl JobQueue { pub fn new() -> JobQueue { JobQueue { - job_refs: UnsafeCell::new(VecDeque::new()), + job_refs: UnsafeCell::new(ArrayDeque::new()), } } #[inline(always)] - pub fn push_back(&self, job_ref: JobRef) { + pub fn push_back(&self, job_ref: JobRef) -> Option { // SAFETY: The queue itself is only access mutably within `push_back`, // `pop_back` and `pop_front`. Since these functions never call each // other, we must have exclusive access to the queue. let job_refs = unsafe { &mut *self.job_refs.get() }; - job_refs.push_back(job_ref); + if let Err(full) = job_refs.push_back(job_ref) { + Some(full.element) + } else { + None + } } #[inline(always)] @@ -154,7 +159,8 @@ impl JobQueue { /// This is analogous to the chili type `JobStack` and the rayon type `StackJob`. pub struct StackJob { f: UnsafeCell>, - signal: Signal>, + completed: Latch, + return_value: UnsafeCell>>, } impl StackJob @@ -162,11 +168,13 @@ where F: FnOnce(&Worker) -> T + Send, T: Send, { - /// Creates a new `StackJob` and returns it directly. - pub fn new(f: F) -> StackJob { + /// Creates a new `StackJob` owned by the current worker. + #[inline(always)] + pub fn new(f: F, worker: &Worker) -> StackJob { StackJob { f: UnsafeCell::new(ManuallyDrop::new(f)), - signal: Signal::new(), + completed: worker.new_latch(), + return_value: UnsafeCell::new(MaybeUninit::uninit()), } } @@ -182,6 +190,7 @@ where /// least until the `JobRef` is executed or dropped. Additionally, the /// caller must ensure that they never create two different `JobRef`s that /// point to the same `StackJob`. + #[inline(always)] pub unsafe fn as_job_ref(&self) -> JobRef { let job_pointer = NonNull::from(self).cast(); // SAFETY: The caller ensures the `StackJob` will outlive the `JobRef`, @@ -200,7 +209,7 @@ where /// /// # Safety /// - /// This must not be called after `execute`. + /// This may only be called before the job is executed. #[inline(always)] pub unsafe fn unwrap(mut self) -> F { // SAFETY: This will not be used again. Given that `execute` has not @@ -212,8 +221,25 @@ where /// closure's return value is sent over this signal after the job is /// executed. #[inline(always)] - pub fn signal(&self) -> &Signal> { - &self.signal + pub fn completion_latch(&self) -> &Latch { + &self.completed + } + + /// Unwraps the job into it's return value. + /// + /// # Safety + /// + /// This may only be called after the job has finished executing. + #[inline(always)] + pub unsafe fn return_value(mut self) -> ThreadResult { + // Synchronize with the fence in `StackJob::execute`. + fence(Ordering::Acquire); + // Get a ref to the result. + let result_ref = self.return_value.get_mut(); + // SAFETY: The job has completed, which means the return value must have + // been initialized. This consumes the job, so there's no chance of this + // accidentally duplicating data. + unsafe { result_ref.assume_init_read() } } } @@ -226,34 +252,41 @@ where /// /// # Safety /// - /// The caller must ensure that `this` can be converted into an immutable - /// reference to a `StackJob`. If the stack job is allocated on the stack - /// (and it should be) then this amounts to ensuring this is called before - /// the stack frame containing the allocation is popped. Put another way: - /// don't leave a block where you allocate a stack job until you run it. - /// - /// This function must be called only once. + /// The caller must ensure that `this` is valid to access a `StackJob` + /// immutably at least until the `Latch` within the `StackJob` has been set. + /// As a consequence, this may not be run after a latch has been set. Since + /// this function sets the latch, the caller must ensure to only call this + /// function once. + #[inline(always)] unsafe fn execute(this: NonNull<()>, worker: &Worker) { // SAFETY: The caller ensures `this` can be converted into an immutable - // reference. + // reference until we set the latch, and the latch has not yet been set. let this = unsafe { this.cast::().as_ref() }; // Create an abort guard. If the closure panics, this will convert the // panic into an abort. Doing so prevents use-after-free for other elements of the stack. let abort_guard = unwind::AbortOnDrop; // SAFETY: This memory location is accessed only in this function and in - // `unwrap`. The latter cannot have been called, because it drops the - // stack job, so, since this function is called only once, we can + // `unwrap`. The latter cannot have been called because it consumes the + // stack job. And since this function is called only once, we can // guarantee that we have exclusive access. let f_ref = unsafe { &mut *this.f.get() }; // SAFETY: The caller ensures this function is called only once. let f = unsafe { ManuallyDrop::take(f_ref) }; // Run the job. If the job panics, we propagate the panic back to the main thread. let result = unwind::halt_unwinding(|| f(worker)); - // SAFETY: This is valid for the access used by `send` because - // `&this.signal` is an immutable reference to a `Signal`. Because - // `send` is only called in this function, and this function is never - // called again, `send` is never called again. - unsafe { Signal::send(&this.signal, result) } + // Get the uninitialized memory where we should put the return value. + let return_value = this.return_value.get(); + // SAFETY: The return value is only accessed here and in + // `StackJob::return_value`. Since the other method consumes the stack + // job, it's not possible for it to run concurrently. Therefore, we must + // have exclusive access to the return value. + unsafe { (*return_value).write(result) }; + // Latches do not participate in memory ordering, so we need to do this manually. + fence(Ordering::Release); + // SAFETY: The caller ensures the job is valid until the latch is set. + // Since the latch is a field of the job, the latch must be valid until + // it is set. + unsafe { Latch::set(&this.completed) }; // Forget the abort guard, re-enabling panics. core::mem::forget(abort_guard); } @@ -274,6 +307,7 @@ where F: FnOnce(&Worker) + Send, { /// Allocates a new `HeapJob` on the heap. + #[inline(always)] pub fn new(f: F) -> Box { Box::new(HeapJob { f }) } @@ -291,6 +325,7 @@ where /// outlived the data it closes over. In other words, if the closure /// references something, that thing must live until the `JobRef` is /// executed or dropped. + #[inline(always)] pub unsafe fn into_job_ref(self: Box) -> JobRef { // SAFETY: Pointers produced by `Box::into_raw` are never null. let job_pointer = unsafe { NonNull::new_unchecked(Box::into_raw(self)).cast() }; @@ -317,6 +352,7 @@ where /// The caller must ensure that `this` is a pointer, created by calling /// `Box::into_raw` on a `Box`. After the call `this` must be /// treated as dangling. + #[inline(always)] unsafe fn execute(this: NonNull<()>, worker: &Worker) { // SAFETY: The caller ensures `this` was created by `Box::into_raw` and // that this is called only once. diff --git a/src/latch.rs b/src/latch.rs new file mode 100644 index 0000000..7a65c19 --- /dev/null +++ b/src/latch.rs @@ -0,0 +1,173 @@ +//! A core concept in Rayon is the *latch*. + +use core::{ + pin::Pin, + task::{RawWaker, RawWakerVTable, Waker}, +}; + +use crate::platform::*; + +// ----------------------------------------------------------------------------- +// States + +/// The default state of a latch is `LOCKED`. When in the locked state, `check` +/// returns `false` and `wait` blocks. +const LOCKED: u32 = 0b00; + +/// The latch enters the `SIGNAL` state when it is set. When in this state, +/// `check` returns `true` and `wait` does not block. +const SIGNAL: u32 = 0b01; + +/// The latch enters the `ASLEEP` state when blocking with `wait`. +const ASLEEP: u32 = 0b10; + +// ----------------------------------------------------------------------------- +// Latch + +/// A [Latch] is a signaling mechanism used to indicate when an event has +/// occurred. The latch begins as *unset* (In the `LOCKED` state), and can later +/// be *set* by any thread (entering the *SIGNAL*) state. +/// +/// Each latch is associated with one *owner thread*. This is the thread that +/// may be blocking, waiting for the latch to complete. +/// +/// The general idea and spirit for latches (as well as some of the +/// documentation) is due to rayon. However the implementation is specific to +/// forte. +pub struct Latch { + /// Holds the internal state of the latch. This tracks if the latch has been + /// set or not. + state: AtomicU32, + /// The sleep controller for the owning thread. + sleep_controller: &'static SleepController, +} + +impl Latch { + /// Creates a new latch, owned by a specific thread. + pub fn new(sleep_controller: &'static SleepController) -> Latch { + Latch { + state: AtomicU32::new(LOCKED), + sleep_controller, + } + } + + /// Checks to see if the latch has been set. Returns true if it has been. + #[inline(always)] + pub fn check(&self) -> bool { + self.state.load(Ordering::Relaxed) == SIGNAL + } + + /// Waits for the latch to be set. In actuality, this may be woken. + /// + /// Returns true if the latch signal was received, and false otherwise. + #[inline(always)] + pub fn wait(&self) -> bool { + if self.state.load(Ordering::Relaxed) == SIGNAL { + return true; + } + let slept = self.sleep_controller.sleep(); + if slept { + self.state.load(Ordering::Relaxed) == SIGNAL + } else { + false + } + } + + /// Activates the latch, potentially unblocking the owning thread. + /// + /// This takes a raw pointer because the latch may be de-allocated by a + /// different thread while this function is executing. + /// + /// # Safety + /// + /// The latch pointer must be valid when passed to this function, and must + /// not be allowed to become dangling until after the latch is set. + #[inline(always)] + pub unsafe fn set(latch: *const Latch) { + // SAFETY: At this point, the latch must still be valid to dereference. + let sleep_controller = unsafe { (*latch).sleep_controller }; + // SAFETY: At this point, the latch must still be valid to dereference. + unsafe { (*latch).state.store(SIGNAL, Ordering::Relaxed) }; + sleep_controller.wake(); + } + + /// Restores the latch to the default state. + /// + /// # Safety + /// + /// This may only be called when in the `SIGNAL` state, eg. after either `wait` or + /// `check` has returned `true`. + #[inline(always)] + pub unsafe fn reset(&self) { + self.state.store(LOCKED, Ordering::Relaxed); + } +} + +// ----------------------------------------------------------------------------- +// Sleeper + +/// Used, in combination with a latch to park and unpark threads. +pub struct SleepController { + state: AtomicU32, +} + +impl Default for SleepController { + fn default() -> SleepController { + SleepController { + state: AtomicU32::new(LOCKED), + } + } +} + +impl SleepController { + pub fn wake(&self) -> bool { + let sleep_state = self.state.swap(SIGNAL, Ordering::Relaxed); + let asleep = sleep_state == ASLEEP; + if asleep { + atomic_wait::wake_one(&self.state); + } + asleep + } + + pub fn sleep(&self) -> bool { + let state = self.state.swap(ASLEEP, Ordering::Relaxed); + let sleep = state == LOCKED; + if sleep { + atomic_wait::wait(&self.state, ASLEEP); + } + self.state.store(LOCKED, Ordering::Relaxed); + sleep + } +} + +// ----------------------------------------------------------------------------- +// Async waker + +impl Latch { + /// Creates an async waker from a reference to a latch. + /// + /// # Safety + /// + /// The latch must outlive the waker. + pub unsafe fn as_waker(self: Pin<&Self>) -> Waker { + let this: *const Self = Pin::get_ref(self); + let raw_waker = RawWaker::new(this.cast::<()>(), &RAW_WAKER_VTABLE); + // SAFETY: The RawWakerVTable api contract is upheald and these + // functions are all thread-safe. + unsafe { Waker::from_raw(raw_waker) } + } +} + +const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + #[inline(always)] + |ptr| RawWaker::new(ptr, &RAW_WAKER_VTABLE), + wake, + wake, + |_| {}, +); + +fn wake(this: *const ()) { + let latch = this.cast::(); + // SAFETY: The latch must be valid for the duration + unsafe { Latch::set(latch) }; +} diff --git a/src/lib.rs b/src/lib.rs index 4aa6c49..16e1f14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,12 +35,12 @@ extern crate std; // ----------------------------------------------------------------------------- // Modules -mod blocker; mod job; +mod latch; mod scope; -mod signal; mod thread_pool; mod unwind; +mod util; // ----------------------------------------------------------------------------- // Top-level exports @@ -71,7 +71,6 @@ mod platform { // Core exports pub use alloc::sync::Arc; - pub use alloc::sync::Weak; pub use core::sync::atomic::AtomicBool; pub use core::sync::atomic::AtomicPtr; pub use core::sync::atomic::AtomicU32; diff --git a/src/scope.rs b/src/scope.rs index 7bc5e09..eaa6b5d 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -14,8 +14,8 @@ use scope_ptr::ScopePtr; use crate::ThreadPool; use crate::job::HeapJob; use crate::job::JobRef; +use crate::latch::Latch; use crate::platform::*; -use crate::signal::Signal; use crate::thread_pool::Worker; use crate::unwind; use crate::unwind::AbortOnDrop; @@ -30,8 +30,8 @@ pub struct Scope<'scope, 'env: 'scope> { /// and decremented when a `ScopePtr` is dropped or the owning thead is done /// using it. count: AtomicU32, - /// A signal used to communicate when the scope has been completed. - signal: Signal, + /// A latch used to communicate when the scope has been completed. + completed: Latch, /// If any job panics, we store the result here to propagate it. panic: AtomicPtr>, /// Makes `Scope` invariant over 'scope @@ -52,7 +52,7 @@ where // dropped at the end of this function, after the call to `complete`. The // abort guard above prevents the stack from being dropped early during a // panic unwind. - let scope = unsafe { Scope::new() }; + let scope = unsafe { Scope::new(worker) }; // Panics that occur within the closure should be caught and propagated once // all spawned work is complete. This is not a safety requirement, it's just // a nicer behavior than aborting. @@ -66,7 +66,8 @@ where // Now that the user has (presuamably) spawnd some work onto the scope, we // must wait for it to complete. // - // SAFETY: This is called only once. + // SAFETY: This is called only once, and we provide the same worker used to + // create the scope. unsafe { scope.complete(worker) }; // At this point all work on the scope is complete, so it is safe to drop // the scope. This also means we can relinquish our abort guard (returning @@ -86,10 +87,10 @@ impl<'scope, 'env> Scope<'scope, 'env> { /// The caller must promise not to move or mutably reference this scope /// until it is dropped, and must not allow the scope to be dropped until /// after `Scope::complete` is run and returns. - unsafe fn new() -> Scope<'scope, 'env> { + unsafe fn new(worker: &Worker) -> Scope<'scope, 'env> { Scope { count: AtomicU32::new(1), - signal: Signal::new(), + completed: worker.new_latch(), panic: AtomicPtr::new(ptr::null_mut()), _scope: PhantomData, _env: PhantomData, @@ -249,7 +250,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { // // SAFETY: The signal is passed as a reference, and is live for the // duration of the function. - unsafe { Signal::send(&self.signal, ()) }; + unsafe { Latch::set(&self.completed) }; } } @@ -294,7 +295,8 @@ impl<'scope, 'env> Scope<'scope, 'env> { /// /// # Safety /// - /// This must be called only once. + /// This must be called only once. This must be called with a reference to + /// the same worker the scope was created with. unsafe fn complete(&self, worker: &Worker) { // SAFETY: This is explicitly allowed, because every scope starts off // with a counter of 1. Because this is called only once, the following @@ -306,7 +308,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { // return. unsafe { self.remove_reference() }; // Wait for the remaining work to complete. - worker.wait_for_signal(&self.signal); + worker.wait_for(&self.completed); } } diff --git a/src/signal.rs b/src/signal.rs deleted file mode 100644 index 31b615f..0000000 --- a/src/signal.rs +++ /dev/null @@ -1,244 +0,0 @@ -//! This modules defines a basic signal that can be used to notify a waiting -//! thread about the completion of a job. Signals can be used to transport the -//! outcome or return value of a job. -//! -//! The implementation here is loosely adapted from chili and the oneshot crate, -//! modified to use a futex instead of a CAS loop. - -use core::cell::UnsafeCell; - -use crate::platform::*; - -// ----------------------------------------------------------------------------- -// States - -/// The default state of a signal, with no waiting recever and no sent value. -pub const IDLE: u32 = 0b00; - -/// A bit set by the recever when it is waiting, and needs the sender to wake it up. -pub const WAIT: u32 = 0b01; - -/// A bit set by the sender when data has been transmitted to the recever. -pub const SENT: u32 = 0b10; - -// ----------------------------------------------------------------------------- -// Signal - -/// A signal transmits a single value across threads, exactly once. Signals are -/// Forte's core synchronization primitive. They take the place of rayon's -/// latches or chili's oneshot channels. -/// -/// The api contract for signals is somewhat subtle, but it is governed by one -/// general principle: A signal must be dropped after used (data has been sent -/// over it). -#[cfg(not(feature = "shuttle"))] -pub struct Signal { - /// The state of the signal, used for synchronization and sleeping. - state: AtomicU32, - /// The value transmitted by the signal. - value: UnsafeCell>, -} - -#[cfg(not(feature = "shuttle"))] -impl Signal { - /// Creates a new signal. - pub fn new() -> Self { - Self { - state: AtomicU32::new(IDLE), - value: UnsafeCell::new(None), - } - } - - /// Receives the signal if it has been sent, without blocking. - /// - /// # Panics - /// - /// This panics if called on a signal on which data has already been - /// received. - /// - /// # Safety - /// - /// The caller must ensure that `recv` and `try_receive` are only called - /// from a single thread. - pub unsafe fn try_recv(&self) -> Option { - // If the SENT bit has been set, read it and return it. - if self.state.load(Ordering::Acquire) & SENT != 0 { - // SAFETY: The other thread only ever accesses this memory - // location once, before entering the SENT state. Because we are - // now in the SENT state, and there can be no other calls to - // `recv` or `try_recv` happening on other threads, we can - // guarantee that we have exclusive access to this memory - // location. - let value_ref = unsafe { &mut *self.value.get() }; - - // Read the value from the signal. - // - // Panics if `recv` or `try_recv` has already returned data. - let value = value_ref.take().unwrap(); - - Some(value) - } else { - None - } - } - - /// Receives the signal, or waits for it to be sent. - /// - /// # Panics - /// - /// This panics if called on a signal on which data has already been - /// received. - /// - /// # Safety - /// - /// The caller must ensure that `recv` and `try_receive` are only called - /// from a single thread. - #[cold] - pub unsafe fn recv(&self) -> T { - // Loop to mitigate spurious wake-ups. - loop { - // Set the WAIT bit and load the current state. - let state = self.state.fetch_or(WAIT, Ordering::Acquire); - - // If the SENT bit has been set, read it and return it. - if state & SENT != 0 { - // SAFETY: The other thread only ever accesses this memory - // location once, before entering the SENT state. Because we are - // now in the SENT state, and there can be no other calls to - // `recv` or `try_recv` happening on other threads, we can - // guarantee that we have exclusive access to this memory - // location. - let value_ref = unsafe { &mut *self.value.get() }; - - // Read the value from the signal. - // - // Panisc if `recv` or `try_recv` has already returned data. - return value_ref.take().unwrap(); - } - - // If a value has not been sent, wait until it is. - atomic_wait::wait(&self.state, state); - } - } - - /// Sends the signal to the receiving thread. - /// - /// # Panics - /// - /// This panics if called more than once on the same signal. - /// - /// # Safety - /// - /// Sending a signal may wake other threads, which may cause signals - /// allocated on that thread's stack to be deallocated. - /// - /// This function operates on `*const Self` instead of `&self` to allow it - /// to become dangling during this call. The caller must ensure that the - /// pointer is convertible to an immutable reference upon entry, and not - /// invalidated during the call by any actions other than `send` itself. - #[inline(always)] - pub unsafe fn send(signal: *const Self, value: T) { - // Load the current state of the signal. - // - // SAFETY: The caller ensures that this pointer is convertible to a - // reference, and we have not yet done anything that would cause another - // thread to invalidate it. - let state = unsafe { (*signal).state.load(Ordering::Relaxed) }; - - // Panic if the signal has already been sent. - if state & SENT != 0 { - panic!("attempted to send value over signal, but signal has already been sent"); - } - - // Access the value of the signal. - // - // SAFETY: The caller ensures that this pointer is convertible to a - // reference, and we have not yet done anything that would cause another - // thread to invalidate it. - let value_ref = unsafe { &(*signal).value }; - - // Write the value into the signal. - // - // SAFETY: For the unsafe cell: The other thread only ever accesses this - // memory location when the signal is in the SENT state. Because we are - // responsible for setting that state, and the assert above ensures that - // we are not already in that state, we can be sure that we have unique - // access to the memory location. - unsafe { *value_ref.get() = Some(value) }; - - // Set the bit for the SENT state. Note: This can cause the `signal` - // pointer to become dangling. - // - // SAFETY: The caller ensures that this pointer is convertible to a - // reference, and we have not yet done anything that would cause another - // thread to invalidate it. - let state = unsafe { (*signal).state.fetch_or(SENT, Ordering::Release) }; - if state & WAIT != 0 { - // If the WAIT bit is set, then we receiving thread is asleep and we must wake it. - // - // SAFETY: The caller ensures that this pointer is convertible to a - // reference. It's not possible for it to have been invalidated - // because when in the WAIT state, the receiver thread must either - // be asleep or about to sleep, so setting the SENT bit cannot have - // caused the signal to be deallocated. - atomic_wait::wake_one(unsafe { &(*signal).state }); - } - } -} - -impl Default for Signal { - fn default() -> Self { - Self::new() - } -} - -// SAFETY: References to signals have to be sent between threads for them to -// work, so they must be `Sync`. And signals themselves transmit values between -// threads, so the type `T` must be `Send`. -unsafe impl Sync for Signal {} - -// ----------------------------------------------------------------------------- -// Shuttle Compat - -#[cfg(feature = "shuttle")] -pub struct Signal { - mutex: Mutex>, - condvar: Condvar, -} - -#[cfg(feature = "shuttle")] -impl Signal { - pub fn new() -> Self { - Self { - mutex: Mutex::new(None), - condvar: Condvar::new(), - } - } - - /// Marked unsafe only for compatibility. - pub unsafe fn try_recv(&self) -> Option { - self.mutex.lock().unwrap().take() - } - - /// Marked unsafe only for compatibility. - #[cold] - pub unsafe fn recv(&self) -> T { - let mut state = self.mutex.lock().unwrap(); - loop { - match state.take() { - Some(value) => return value, - None => state = self.condvar.wait(state).unwrap(), - } - } - } - - #[inline(always)] - pub unsafe fn send(signal: *const Self, value: T) { - // SAFETY: The caller ensures that this pointer is convertible to a - // reference, and we have not yet done anything that would cause another - // thread to invalidate it. - let state = unsafe { &*signal }; - *state.mutex.lock().unwrap() = Some(value); - state.condvar.notify_all(); - } -} diff --git a/src/thread_pool.rs b/src/thread_pool.rs index d2ddf94..dc7304e 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -1,6 +1,6 @@ //! This module contains the api and worker logic for the Forte thread pool. -use alloc::collections::VecDeque; +use alloc::boxed::Box; use alloc::format; use alloc::string::ToString; use alloc::vec::Vec; @@ -15,6 +15,8 @@ use core::ptr::NonNull; use core::task::Context; use core::task::Poll; use core::time::Duration; +use crossbeam_queue::SegQueue; +use crossbeam_utils::CachePadded; use async_task::Runnable; use async_task::Task; @@ -22,34 +24,24 @@ use tracing::debug; use tracing::trace; use tracing::trace_span; -use crate::Scope; -use crate::blocker::Blocker; use crate::job::HeapJob; use crate::job::JobQueue; use crate::job::JobRef; use crate::job::StackJob; +use crate::latch::Latch; +use crate::latch::SleepController; use crate::platform::*; +use crate::scope::Scope; use crate::scope::with_scope; -use crate::signal::Signal; use crate::unwind; +use crate::util::XorShift64Star; // ----------------------------------------------------------------------------- -// Thread pool worker leases - -/// A lease is a capability that the thread pool hands out to threads, allowing -/// them to act as a worker on that pool. -pub struct Lease { - thread_pool: &'static ThreadPool, - index: usize, - heartbeat: Arc, -} - -// ----------------------------------------------------------------------------- -// Thread pool types +// Thread pool /// The "heartbeat interval" controls the frequency at which workers share work. #[cfg(not(feature = "shuttle"))] -pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(100); +pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(5); /// The `ThreadPool` object is used to orchestrate and distribute work to a pool /// of threads, and is generally the main entry point to using `Forte`. @@ -80,7 +72,7 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(100); /// worker.spawn(|_| { }); /// // Spawn another job using the thread pool directly (this will be slower). /// THREAD_POOL.spawn(|_| { }); -/// // Spawn a third job, which will automagically use the parent thread pool. +/// // Spawn a third job, which will automatically use the parent thread pool. /// // This will also be slower than using the worker. /// forte::spawn(|_| { }); /// }); @@ -120,49 +112,59 @@ pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(100); /// space. More granular control is possible through other methods such as /// [`ThreadPool::grow`], [`ThreadPool::shrink`], or [`ThreadPool::resize_to`]. pub struct ThreadPool { + /// The internal state of the thread pool. This mutex should only be + /// accessed infrequently. state: Mutex, - job_is_ready: Condvar, + /// A queue used for cooperatively sharing jobs between workers. + shared_jobs: SegQueue, + /// A condvar that is used to signal a new worker taking a lease on a seat. new_participant: Condvar, } +/// The internal state of a thread pool. struct ThreadPoolState { - shared_jobs: VecDeque, - tenants: Vec>, + /// The registry of seats. These seats may be "leased out" to different + /// threads temporarily, and will be re-used. The seats themselves are + /// leaked, and will never move or be deallocated. + seats: Vec, + /// Threads managed directly by this thread pool. managed_threads: ManagedThreads, } impl ThreadPoolState { - fn claim_shared_job(&mut self) -> Option { - self.shared_jobs.pop_front() - } - /// Claims a lease on the thread pool. A lease can be passed to /// [`Worker::occupy`] to enter a worker context for the thread pool. /// /// There are a finite number of leases available on each pool. If they are /// already claimed, this returns `None`. fn claim_lease(&mut self, thread_pool: &'static ThreadPool) -> Lease { - let heartbeat = Arc::new(AtomicBool::new(false)); - let tenant = Tenant { - heartbeat: Arc::downgrade(&heartbeat), - }; - - for (index, occupant) in self.tenants.iter_mut().enumerate() { - if occupant.is_none() { - *occupant = Some(tenant); + // First try to claim an unoccupied seat. + for (index, seat) in self.seats.iter_mut().enumerate() { + if !seat.occupied { + seat.occupied = true; return Lease { thread_pool, index, - heartbeat, + seat_data: seat.data, }; } } - self.tenants.push(Some(tenant)); + // If none are available, add a new seat. + let index = self.seats.len(); + let seat_data = Box::leak(Box::new(CachePadded::new(SeatData { + heartbeat: AtomicBool::new(false), + sleep_controller: SleepController::default(), + }))); + let seat = Seat { + occupied: true, + data: seat_data, + }; + self.seats.push(seat); Lease { thread_pool, - index: self.tenants.len(), - heartbeat, + index, + seat_data, } } @@ -172,35 +174,38 @@ impl ThreadPoolState { fn claim_leases(&mut self, thread_pool: &'static ThreadPool, num: usize) -> Vec { let mut leases = Vec::with_capacity(num); - for (index, occupant) in self.tenants.iter_mut().enumerate() { + // First try to claim unoccupied seats. + for (index, seat) in self.seats.iter_mut().enumerate() { if leases.len() == num { return leases; } - if occupant.is_none() { - let heartbeat = Arc::new(AtomicBool::new(false)); - let tenant = Tenant { - heartbeat: Arc::downgrade(&heartbeat), - }; - *occupant = Some(tenant); + if !seat.occupied { + seat.occupied = true; leases.push(Lease { thread_pool, index, - heartbeat, + seat_data: seat.data, }); } } + // Then create new seats as needed. while leases.len() != num { - let heartbeat = Arc::new(AtomicBool::new(false)); - let tenant = Tenant { - heartbeat: Arc::downgrade(&heartbeat), + let index = self.seats.len(); + let seat_data = Box::leak(Box::new(CachePadded::new(SeatData { + heartbeat: AtomicBool::new(false), + sleep_controller: SleepController::default(), + }))); + let seat = Seat { + occupied: true, + data: seat_data, }; - self.tenants.push(Some(tenant)); + self.seats.push(seat); leases.push(Lease { thread_pool, - index: self.tenants.len(), - heartbeat, + index, + seat_data, }); } @@ -208,8 +213,35 @@ impl ThreadPoolState { } } -struct Tenant { - heartbeat: Weak, +struct Seat { + occupied: bool, + data: &'static SeatData, +} + +/// A public interface that can be claimed and used by a worker.. +struct SeatData { + /// The heartbeat signal sent to the worker. + heartbeat: AtomicBool, + /// Allows other threads to wake the worker. + sleep_controller: SleepController, +} + +/// A lease represents ownership of one of a "seats" in a thread pool, and +/// allows the owning thread to participate in that pool as a worker. +pub struct Lease { + /// The thread pool against which this lease is held. + thread_pool: &'static ThreadPool, + /// The index of the claimed seat. + index: usize, + /// The seat being claimed by this lease. + seat_data: &'static SeatData, +} + +impl Drop for Lease { + fn drop(&mut self) { + let mut state = self.thread_pool.state.lock().unwrap(); + state.seats[self.index].occupied = false; + } } /// Manages threads spawned by the pool. @@ -246,14 +278,13 @@ impl ThreadPool { pub const fn new() -> ThreadPool { ThreadPool { state: Mutex::new(ThreadPoolState { - shared_jobs: VecDeque::new(), - tenants: Vec::new(), + seats: Vec::new(), managed_threads: ManagedThreads { workers: Vec::new(), heartbeat: None, }, }), - job_is_ready: Condvar::new(), + shared_jobs: SegQueue::new(), new_participant: Condvar::new(), } } @@ -427,7 +458,7 @@ impl ThreadPool { } // Wake any sleeping workers to ensure they will eventually see the termination notice. - self.job_is_ready.notify_all(); + // self.job_is_ready.notify_all(); let own_lease = Worker::map_current(|worker| worker.lease.index); @@ -496,7 +527,7 @@ impl ThreadPool { /// Spawns a job into the thread pool. /// /// See also: [`Worker::spawn`] and [`spawn`]. - #[inline] + #[inline(always)] pub fn spawn(&'static self, f: F) where F: FnOnce(&Worker) + Send + 'static, @@ -507,7 +538,7 @@ impl ThreadPool { /// Spawns a future onto the thread pool. /// /// See also: [`Worker::spawn_future`] and [`spawn_future`]. - #[inline] + #[inline(always)] pub fn spawn_future(&'static self, future: F) -> Task where F: Future + Send + 'static, @@ -522,13 +553,13 @@ impl ThreadPool { // second allocation. let job_pointer = runnable.into_raw(); - // Define a function to run the runnable that will be comparable with `JobRef`. - #[inline] + // Define a function to run the runnable that will be compatible with `JobRef`. + #[inline(always)] fn execute_runnable(this: NonNull<()>, _worker: &Worker) { // SAFETY: This pointer was created by the call to `Runnable::into_raw` just above. let runnable = unsafe { Runnable::<()>::from_raw(this) }; // Poll the task. This will drop the future if the task is - // canceled or the future completes.2 + // canceled or the future completes. runnable.run(); } @@ -538,7 +569,7 @@ impl ThreadPool { // Send this job off to be executed. self.with_worker(|worker| { - worker.queue.push_back(job_ref); + worker.enqueue(job_ref); }); }; @@ -564,7 +595,7 @@ impl ThreadPool { /// Spawns an async closure onto the thread pool. /// /// See also: [`Worker::spawn_async`] and [`spawn_async`]. - #[inline] + #[inline(always)] pub fn spawn_async(&'static self, f: Fn) -> Task where Fn: FnOnce() -> Fut + Send + 'static, @@ -580,7 +611,7 @@ impl ThreadPool { /// Blocks the thread waiting for a future to complete. /// /// See also: [`Worker::block_on`] and [`block_on`]. - #[inline] + #[inline(always)] pub fn block_on(&'static self, future: F) -> T where F: Future + Send, @@ -593,7 +624,7 @@ impl ThreadPool { /// results. /// /// See also: [`Worker::join`] and [`join`]. - #[inline] + #[inline(always)] pub fn join(&'static self, a: A, b: B) -> (RA, RB) where A: FnOnce(&Worker) -> RA + Send, @@ -608,6 +639,7 @@ impl ThreadPool { /// /// For more complete docs, see [`scope`]. If you have a reference to a /// worker, you should call [`Worker::scope`] instead. + #[inline(always)] pub fn scope<'env, F, T>(&'static self, f: F) -> T where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, @@ -646,6 +678,7 @@ pub struct Worker { migrated: Cell, lease: Lease, pub(crate) queue: JobQueue, + rng: XorShift64Star, // Make non-send _phantom: PhantomData<*const ()>, } @@ -665,7 +698,7 @@ impl Worker { /// /// Rust's thread locals are fairly costly, so this function is expensive. /// If you can avoid calling it, do so. - #[inline] + #[inline(always)] pub fn map_current(f: F) -> Option where F: FnOnce(&Worker) -> R, @@ -692,7 +725,7 @@ impl Worker { /// /// Rust's thread locals are fairly costly, so this function is expensive. /// If you can avoid calling it, do so. - #[inline] + #[inline(always)] pub fn with_current(f: F) -> R where F: FnOnce(Option<&Worker>) -> R, @@ -721,7 +754,7 @@ impl Worker { /// /// Rust's thread locals are fairly costly, so this function is expensive. /// If you can avoid calling it, do so. - #[inline] + #[inline(always)] pub fn occupy(lease: Lease, f: F) -> R where F: FnOnce(&Worker) -> R, @@ -738,6 +771,7 @@ impl Worker { migrated: Cell::new(false), lease, queue: JobQueue::new(), + rng: XorShift64Star::new(), _phantom: PhantomData, }; @@ -764,24 +798,53 @@ impl Worker { } /// Returns the index of the worker in the leases list. - #[inline] + #[inline(always)] pub fn index(&self) -> usize { self.lease.index } + /// Pushes a job onto the local queue, overflowing to the shared queue when + /// full. + #[inline(always)] + pub fn enqueue(&self, job_ref: JobRef) { + if let Some(job_ref) = self.queue.push_back(job_ref) { + self.lease.thread_pool.shared_jobs.push(job_ref); + } + } + /// Tries to promote the oldest job in the local stack to a shared job. If /// the local job queue is empty, or if the shared queue is full, this does /// nothing. If the promotion is successful, it tries to wake another /// thread to accept the shared work. This is lock free. #[cold] - fn promote(&self) { - let mut state = self.lease.thread_pool.state.lock().unwrap(); - if let Some(job) = self.queue.pop_front() { - state.shared_jobs.push_back(job); - self.lease.thread_pool.job_is_ready.notify_one(); + fn promote(&self, job_ref: JobRef) { + // Push the job onto the shared queue + self.lease.thread_pool.shared_jobs.push(job_ref); + + // Try to wake a worker to work on it + let state = self.lease.thread_pool.state.lock().unwrap(); + let num_seats = state.seats.len(); + let offset = self.rng.next_usize(num_seats); + for i in 0..num_seats { + let i = (i + offset) % num_seats; + if i == self.lease.index { + continue; + } + if state.seats[i].occupied { + let ready = state.seats[i].data.sleep_controller.wake(); + if ready { + return; + } + } } } + /// Create a new latch owned by the worker. + #[inline(always)] + pub fn new_latch(&self) -> Latch { + Latch::new(&self.lease.seat_data.sleep_controller) + } + /// Runs jobs until the provided signal is received. When this thread runs /// out of local or shared work and the signal is still yet to be received, /// this puts the thread to sleep, and the thread will not wake again until @@ -791,28 +854,14 @@ impl Worker { /// /// This panics if a value has already been received over this signal. The /// caller must ensure this won't be the case. - #[inline] - pub fn wait_for_signal(&self, signal: &Signal) -> T - where - T: Send, - { - loop { - // Short-circuit if the signal has already been sent. - // - // Panics if a value has already been received over this signal. - // - // SAFETY: The `try_recv` and `recv` functions are only called in - // this function, and are therefore only called on the current thread. - if let Some(value) = unsafe { signal.try_recv() } { - return value; - } - + #[inline(always)] + pub fn wait_for(&self, latch: &Latch) { + while !latch.check() { if self.yield_now() == Yield::Idle { - // If we run out of jobs, just sleep until the signal is received. - // - // SAFETY: The `try_recv` and `recv` functions are only called in - // this function, and are therefore only called on the current thread. - return unsafe { signal.recv() }; + let ready = latch.wait(); + if ready { + return; + } } } } @@ -821,7 +870,7 @@ impl Worker { /// the threadpool. /// /// The second value is true if the job was shared, or false if it was spawned locally. - #[inline] + #[inline(always)] pub fn find_work(&self) -> Option<(JobRef, bool)> { // We give preference first to things in our local deque, then in other // workers deques, and finally to injected jobs from the outside. The @@ -833,22 +882,17 @@ impl Worker { } /// Claims a shared job from the thread pool. - #[cold] + #[inline(always)] pub fn claim_shared_job(&self) -> Option { - self.lease - .thread_pool - .state - .lock() - .unwrap() - .claim_shared_job() + self.lease.thread_pool.shared_jobs.pop() } /// Cooperatively yields execution to the threadpool, allowing it to execute /// some work. /// /// This function only executes local work: work already queued on the - /// worker. It will never claim shaired work. - #[inline] + /// worker. It will never claim shared work. + #[inline(always)] pub fn yield_local(&self) -> Yield { match self.queue.pop_back() { Some(job_ref) => { @@ -867,7 +911,7 @@ impl Worker { /// is no work on the pool, this will lock the thread-pool mutex, so it /// should not be called within a hot loop. Consider using /// [`Worker::yield_local`] instead. - #[inline] + #[inline(always)] pub fn yield_now(&self) -> Yield { match self.find_work() { Some((job_ref, migrated)) => { @@ -881,18 +925,18 @@ impl Worker { /// Returns `true` if the current job is executing on a different thread /// from the one on which it was created. Returns `false` if not executing a /// job, or if the current job was created on the current thread. - #[inline] + #[inline(always)] pub fn migrated(&self) -> bool { self.migrated.get() } /// Executes a job. This wrapper swaps in the correct thread-migration flag /// before the job runs, then swaps it back to what it was before. - #[inline] + #[inline(always)] fn execute(&self, job_ref: JobRef, migrated: bool) { - let migrated = self.migrated.replace(migrated); + let outer_migrated = self.migrated.replace(migrated); job_ref.execute(self); - self.migrated.set(migrated); + self.migrated.set(outer_migrated); } } @@ -917,7 +961,7 @@ impl Worker { /// /// If you do not have access to a [`Worker`], you may call /// [`ThreadPool::spawn`] or simply [`spawn`]. - #[inline] + #[inline(always)] pub fn spawn(&self, f: F) where F: FnOnce(&Worker) + Send + 'static, @@ -935,7 +979,7 @@ impl Worker { let job_ref = unsafe { job.into_job_ref() }; // Queue the `JobRef` on the worker so that it will be evaluated. - self.queue.push_back(job_ref); + self.enqueue(job_ref); } /// Spawns a future onto the thread pool. See [`Worker::spawn`] for more @@ -962,7 +1006,7 @@ impl Worker { /// /// If you do not have access to a [`Worker`], you may call /// [`ThreadPool::spawn_future`] or simply [`spawn_future`]. - #[inline] + #[inline(always)] pub fn spawn_future(&self, future: F) -> Task where F: Future + Send + 'static, @@ -976,7 +1020,7 @@ impl Worker { /// /// If you do not have access to a [`Worker`], you may call /// [`ThreadPool::spawn_async`] or simply [`spawn_async`]. - #[inline] + #[inline(always)] pub fn spawn_async(&self, f: Fn) -> Task where Fn: FnOnce() -> Fut + Send + 'static, @@ -993,21 +1037,21 @@ impl Worker { /// /// If you do not have access to a [`Worker`], you may call /// [`ThreadPool::block_on`] or simply [`block_on`]. - #[inline] + #[inline(always)] pub fn block_on(&self, future: F) -> T where F: Future + Send, T: Send, { - // Create a new blocker, which will be used to block the thread until - // the future completes. - let blocker = Blocker::new(); + // Create a new latch to block the thread until the future completes. + let latch = pin!(self.new_latch()); + let latch = latch.into_ref(); // Convert the blocker into an async waker. // // SAFETY: The blocker lasts for the duration of this function, and // since the waker is only used within this function, it must outlive // the waker. - let waker = unsafe { blocker.as_waker() }; + let waker = unsafe { latch.as_waker() }; // Put the waker into an async context that can be used to poll futures. let mut ctx = Context::from_waker(&waker); // Pin the future, promising not to move it while it's being polled. @@ -1017,12 +1061,15 @@ impl Worker { match future.as_mut().poll(&mut ctx) { // While the future is incomplete, run other tasks or sleep. Poll::Pending => { - while blocker.would_block() { - if self.yield_now() == Yield::Idle { - blocker.block(); - break; - } - } + // This will not return until the latch is set. + self.wait_for(latch.get_ref()); + // We want to keep using the same latch every time we wait + // for the future to become ready, so we have to reset it + // here. + // + // SAFETY: The latch must be in the set state because we + // just waited for it. + unsafe { latch.reset() }; } // When it is complete, pull out the result and return it. Poll::Ready(res) => return res, @@ -1035,7 +1082,7 @@ impl Worker { /// /// If you do not have access to a [`Worker`], you may call /// [`ThreadPool::join`] or simply [`join`]. - #[inline] + #[inline(always)] pub fn join(&self, a: A, b: B) -> (RA, RB) where A: FnOnce(&Worker) -> RA + Send, @@ -1044,7 +1091,7 @@ impl Worker { RB: Send, { // Allocate a job to run the closure `a` on the stack. - let stack_job = StackJob::new(a); + let stack_job = StackJob::new(a, self); // SAFETY: The `StackJob` is allocated on the stack just above, is never // moved, and so will live for the entirety of this function in the same @@ -1060,13 +1107,18 @@ impl Worker { let job_ref_id = job_ref.id(); // Push the job onto the queue. - self.queue.push_back(job_ref); + self.enqueue(job_ref); // Check for a heartbeat, potentially promoting the job we just pushed // to a shared job. - if self.lease.heartbeat.load(Ordering::Relaxed) { - self.promote(); - self.lease.heartbeat.store(false, Ordering::Relaxed); + if self.lease.seat_data.heartbeat.load(Ordering::Relaxed) + && let Some(job_ref) = self.queue.pop_front() + { + self.promote(job_ref); + self.lease + .seat_data + .heartbeat + .store(false, Ordering::Relaxed); } // Run the second closure directly. @@ -1094,10 +1146,13 @@ impl Worker { } // Wait for the job to complete. - let result_a = self.wait_for_signal(stack_job.signal()); + self.wait_for(stack_job.completion_latch()); + + // SAFETY: The job must be complete, because we just waited on the latch. + let job_return_value = unsafe { stack_job.return_value() }; // If the job panicked, resume the panic on this thread. - match result_a { + match job_return_value { Ok(result_a) => (result_a, result_b), Err(error) => unwind::resume_unwinding(error), } @@ -1106,7 +1161,7 @@ impl Worker { /// Creates a scope onto which non-static work can be spawned. For more complete docs, see [`scope`]. /// /// If you do not have access to a worker, you can use [`ThreadPool::scope`] or simply [`scope`]. - #[inline] + #[inline(always)] pub fn scope<'env, F, T>(&self, f: F) -> T where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, @@ -1440,16 +1495,13 @@ fn managed_worker(lease: Lease, halt: Arc, barrier: Arc) { continue; } - let mut state = worker.lease.thread_pool.state.lock().unwrap(); - while !halt.load(Ordering::Relaxed) { - if let Some(job) = state.claim_shared_job() { - drop(state); + if let Some(job) = worker.claim_shared_job() { worker.execute(job, true); break; } - state = worker.lease.thread_pool.job_is_ready.wait(state).unwrap(); + worker.lease.seat_data.sleep_controller.sleep(); } } }); @@ -1481,27 +1533,24 @@ fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc) { let mut state = thread_pool.state.lock().unwrap(); while !halt.load(Ordering::Relaxed) { - let num_slots = state.tenants.len(); - let mut num_occupied: u32 = 0; + let num_seats = state.seats.len(); + let mut num_occupied: usize = 0; let mut sent_heartbeat = false; // Iterate through all the tenants, starting at the one queued to wake - for i in 0..num_slots { - let tenant_index = (queued_to_heartbeat + i) % num_slots; - // Just ignore slots that don't have a current tenant. - if let Some(tenant) = &mut state.tenants[tenant_index] { - // Clean up any old tenants who's heartbeat atomics have been de-allocated. - let Some(heartbeat) = tenant.heartbeat.upgrade() else { - state.tenants[tenant_index] = None; - continue; - }; - + for i in 0..num_seats { + let seat_index = (queued_to_heartbeat + i) % num_seats; + // Ignore unoccupied seats. + if state.seats[seat_index].occupied { // Send a single heartbeat to the first live tenant we find. if !sent_heartbeat { - heartbeat.store(true, Ordering::Relaxed); + state.seats[seat_index] + .data + .heartbeat + .store(true, Ordering::Relaxed); sent_heartbeat = true; // Start with the next tenant on the next invocation of the loop. - queued_to_heartbeat = (tenant_index + 1) % num_slots; + queued_to_heartbeat = (seat_index + 1) % num_seats; } // Count every occupied slot, even if we didn't sent them a heartbeat. @@ -1511,7 +1560,7 @@ fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc) { if num_occupied > 0 { drop(state); - let sleep_interval = HEARTBEAT_INTERVAL / num_occupied; + let sleep_interval = HEARTBEAT_INTERVAL / num_occupied as u32; thread::sleep(sleep_interval); state = thread_pool.state.lock().unwrap(); } else { diff --git a/src/unwind.rs b/src/unwind.rs index 11f3fdf..fa21c17 100644 --- a/src/unwind.rs +++ b/src/unwind.rs @@ -13,7 +13,7 @@ use std::thread::Result; /// `Err` result. The assumption is that any panic will be propagated /// later with `resume_unwinding`, and hence `f` can be treated as /// exception safe. -#[cold] +#[inline(always)] pub fn halt_unwinding(func: F) -> Result where F: FnOnce() -> R, diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..6f3a4d1 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,46 @@ +use core::{ + cell::Cell, + hash::Hasher, + sync::atomic::{AtomicUsize, Ordering}, +}; +use std::hash::DefaultHasher; + +/// [xorshift*] is a fast pseudorandom number generator which will +/// even tolerate weak seeding, as long as it's not zero. +/// +/// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* +pub struct XorShift64Star { + state: Cell, +} + +impl XorShift64Star { + pub fn new() -> Self { + // Any non-zero seed will do -- this uses the hash of a global counter. + let mut seed = 0; + while seed == 0 { + let mut hasher = DefaultHasher::new(); + static COUNTER: AtomicUsize = AtomicUsize::new(0); + hasher.write_usize(COUNTER.fetch_add(1, Ordering::Relaxed)); + seed = hasher.finish(); + } + + XorShift64Star { + state: Cell::new(seed), + } + } + + fn next(&self) -> u64 { + let mut x = self.state.get(); + debug_assert_ne!(x, 0); + x ^= x >> 12; + x ^= x << 25; + x ^= x >> 27; + self.state.set(x); + x.wrapping_mul(0x2545_f491_4f6c_dd1d) + } + + /// Return a value from `0..n`. + pub fn next_usize(&self, n: usize) -> usize { + (self.next() % n as u64) as usize + } +}