diff --git a/Cargo.lock b/Cargo.lock index 3efbd92..4274d5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -136,7 +136,7 @@ dependencies = [ "cfg-if", "critical-section", "foldhash", - "hashbrown", + "hashbrown 0.15.4", "portable-atomic", "portable-atomic-util", "serde", @@ -378,6 +378,20 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "derive_more" version = "1.0.0" @@ -490,6 +504,7 @@ dependencies = [ "criterion", "crossbeam-queue", "crossbeam-utils", + "dashmap", "divan", "rayon", "shuttle", @@ -497,13 +512,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "forte-rayon-compat" -version = "1.12.0-dev" -dependencies = [ - "forte", -] - [[package]] name = "funty" version = "2.0.0" @@ -579,6 +587,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.4" @@ -665,6 +679,15 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.27" @@ -726,6 +749,19 @@ version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link 0.2.1", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -867,6 +903,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -948,6 +993,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.219" @@ -1281,7 +1332,7 @@ dependencies = [ "windows-collections", "windows-core", "windows-future", - "windows-link", + "windows-link 0.1.3", "windows-numerics", ] @@ -1302,7 +1353,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ "windows-implement", "windows-interface", - "windows-link", + "windows-link 0.1.3", "windows-result", "windows-strings", ] @@ -1314,7 +1365,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ "windows-core", - "windows-link", + "windows-link 0.1.3", "windows-threading", ] @@ -1346,6 +1397,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-numerics" version = "0.2.0" @@ -1353,7 +1410,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ "windows-core", - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -1362,7 +1419,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -1371,7 +1428,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -1420,7 +1477,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 583149d..3a5057c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/NthTensor/Forte" [workspace] resolver = "2" -members = ["ci", "rayon-compat"] +members = ["ci"] [dependencies] arraydeque = "0.5.1" @@ -26,6 +26,8 @@ divan = "0.1.17" rayon = "1.10.0" chili = "0.2" bevy_tasks = { version = "0.16.1", features = [ "multi_threaded" ] } +# Used for some benchmarks +dashmap = "6.1.0" # Used for A/B perf testing criterion = { version = "0.5" } @@ -36,7 +38,7 @@ shuttle = ["dep:shuttle"] debug = true [profile.bench] -debug = true +opt-level = 3 # Custom profile for shuttle tests: enable release optimizations so that the shuttle # tests are less slow, but don't disable debug assertions. @@ -68,10 +70,6 @@ missing_docs = "warn" unsafe_op_in_unsafe_fn = "warn" unused_qualifications = "warn" -[[test]] -name = "integration" -path = "tests/tests.rs" - [[bench]] name = "fork_join" harness = false @@ -79,3 +77,11 @@ harness = false [[bench]] name = "bevy_tasks" harness = false + +[[bench]] +name = "flood_fill" +harness = false + +[[bench]] +name = "flat_scope" +harness = false diff --git a/benches/bevy_tasks.rs b/benches/bevy_tasks.rs index 87ad01e..d3cbe33 100644 --- a/benches/bevy_tasks.rs +++ b/benches/bevy_tasks.rs @@ -1,15 +1,5 @@ //! Comparative benchmarks against bevy_tasks -struct BevyParChunks<'a, T>(core::slice::Chunks<'a, T>); -impl<'a, T> bevy_tasks::ParallelIterator> for BevyParChunks<'a, T> -where - T: 'a + Send + Sync, -{ - fn next_batch(&mut self) -> Option> { - self.0.next().map(|s| s.iter()) - } -} - struct BevyParChunksMut<'a, T>(core::slice::ChunksMut<'a, T>); impl<'a, T> bevy_tasks::ParallelIterator> for BevyParChunksMut<'a, T> where @@ -97,7 +87,7 @@ mod overhead { bencher.bench_local(|| { THREAD_POOL.with_worker(|worker| { - forte_chunks::<100, _, _>(worker, &mut vec, &|c| { + forte_chunks::<8, _, _>(worker, &mut vec, &|c| { c.iter_mut().for_each(work); }); }) diff --git a/benches/flat_scope.rs b/benches/flat_scope.rs new file mode 100644 index 0000000..0aca71e --- /dev/null +++ b/benches/flat_scope.rs @@ -0,0 +1,90 @@ +//! A benchmark for fork-join workloads adapted from `chili`. + +use std::hash::{DefaultHasher, Hash, Hasher}; + +use criterion::black_box; +use divan::Bencher; +use tracing_subscriber::fmt; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +const SIZES: &[usize] = &[8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4012, 8196]; + +fn sizes() -> impl Iterator { + SIZES.iter().cloned() +} + +// ----------------------------------------------------------------------------- +// Benchmark + +#[divan::bench(args = sizes(), threads = false)] +fn baseline(bencher: Bencher, size: usize) { + bencher.bench_local(move || { + for i in 0..size { + for j in 0..200 { + let mut s = DefaultHasher::new(); + i.hash(&mut s); + j.hash(&mut s); + black_box(s.finish()); + } + } + }); +} + +static COMPUTE: forte::ThreadPool = forte::ThreadPool::new(); + +#[divan::bench(args = sizes(), threads = false)] +fn forte(bencher: Bencher, size: usize) { + use forte::Worker; + + COMPUTE.with_worker(|worker| { + bencher.bench_local(|| { + worker.scope(|scope| { + for i in 0..size { + scope.spawn_on(worker, move |_: &Worker| { + for j in 0..200 { + let mut s = DefaultHasher::new(); + i.hash(&mut s); + j.hash(&mut s); + black_box(s.finish()); + } + }); + } + }); + }); + }); +} + +#[divan::bench(args = sizes(), threads = false)] +fn rayon(bencher: Bencher, size: usize) { + use rayon::scope; + + bencher.bench_local(|| { + scope(|scope| { + for i in 0..size { + scope.spawn(move |_| { + for j in 0..200 { + let mut s = DefaultHasher::new(); + i.hash(&mut s); + j.hash(&mut s); + black_box(s.finish()); + } + }); + } + }); + }); +} + +fn main() { + let fmt_layer = fmt::layer() + .without_time() + .with_target(false) + .with_thread_names(true) + .compact(); + + tracing_subscriber::registry().with(fmt_layer).init(); + + COMPUTE.resize_to_available(); + + divan::main(); +} diff --git a/benches/flood_fill.rs b/benches/flood_fill.rs new file mode 100644 index 0000000..5ccb631 --- /dev/null +++ b/benches/flood_fill.rs @@ -0,0 +1,211 @@ +//! A benchmark for fork-join workloads adapted from `chili`. + +use std::collections::{HashSet, VecDeque}; +use std::hash::{DefaultHasher, Hash, Hasher}; + +use criterion::black_box; +use dashmap::DashSet; +use divan::Bencher; +use tracing_subscriber::fmt; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +const SIZES: &[usize] = &[8, 16, 32, 64, 128, 256, 512]; + +fn sizes() -> impl Iterator { + SIZES.iter().cloned() +} + +// ----------------------------------------------------------------------------- +// Benchmark + +#[divan::bench(args = sizes(), threads = false)] +fn baseline(bencher: Bencher, size: usize) { + bencher.bench_local(move || { + let mut visited = HashSet::new(); + + let mut queue = VecDeque::new(); + queue.push_back((size / 2, size / 2)); + + while let Some((x, y)) = queue.pop_front() { + // Visit the y - 1 square + if y > 0 { + if visited.insert((x, y - 1)) { + queue.push_back((x, y - 1)); + } + } + + // Visit the y + 1 square + if y < size - 1 { + if visited.insert((x, y + 1)) { + queue.push_back((x, y + 1)); + } + } + + // Visit the x - 1 square + if x > 0 { + if visited.insert((x - 1, y)) { + queue.push_back((x - 1, y)); + } + } + + // Visit the x + 1 square + if x < size - 1 { + if visited.insert((x + 1, y)) { + queue.push_back((x + 1, y)); + } + } + + for i in 0..200 { + let mut s = DefaultHasher::new(); + i.hash(&mut s); + black_box(s.finish()); + } + } + }); +} + +static COMPUTE: forte::ThreadPool = forte::ThreadPool::new(); + +#[divan::bench(args = sizes(), threads = false)] +fn forte(bencher: Bencher, size: usize) { + use forte::{Scope, Worker}; + + fn visit<'scope, 'env>( + size: usize, + visited: &'env DashSet<(usize, usize)>, + x: usize, + y: usize, + scope: &'scope Scope<'scope, 'env>, + worker: &Worker, + ) { + // Visit the y - 1 square + if y > 0 { + if visited.insert((x, y - 1)) { + scope.spawn_on(worker, move |worker: &Worker| { + visit(size, visited, x, y - 1, scope, worker) + }); + } + } + + // Visit the y + 1 square + if y < size - 1 { + if visited.insert((x, y + 1)) { + scope.spawn_on(worker, move |worker: &Worker| { + visit(size, visited, x, y + 1, scope, worker) + }); + } + } + + // Visit the x - 1 square + if x > 0 { + if visited.insert((x - 1, y)) { + scope.spawn_on(worker, move |worker: &Worker| { + visit(size, visited, x - 1, y, scope, worker) + }); + } + } + + // Visit the x + 1 square + if x < size - 1 { + if visited.insert((x + 1, y)) { + scope.spawn_on(worker, move |worker: &Worker| { + visit(size, visited, x + 1, y, scope, worker) + }); + } + } + + for i in 0..200 { + let mut s = DefaultHasher::new(); + i.hash(&mut s); + black_box(s.finish()); + } + } + + COMPUTE.with_worker(|worker| { + bencher.bench_local(|| { + let visited = DashSet::new(); + + worker.scope(|scope| { + visit(size, &visited, size / 2, size / 2, scope, worker); + }); + }); + }); +} + +#[divan::bench(args = sizes(), threads = false)] +fn rayon(bencher: Bencher, size: usize) { + use rayon::{Scope, scope}; + + fn visit<'scope>( + size: usize, + visited: &'scope DashSet<(usize, usize)>, + x: usize, + y: usize, + scope: &Scope<'scope>, + ) { + // Visit the y - 1 square + if y > 0 { + if visited.insert((x, y - 1)) { + scope.spawn(move |scope| { + visit(size, visited, x, y - 1, scope); + }); + } + } + + // Visit the y + 1 square + if y < size - 1 { + if visited.insert((x, y + 1)) { + scope.spawn(move |scope| { + visit(size, visited, x, y + 1, scope); + }); + } + } + + // Visit the x - 1 square + if x > 0 { + if visited.insert((x - 1, y)) { + scope.spawn(move |scope| { + visit(size, visited, x - 1, y, scope); + }); + } + } + + // Visit the x + 1 square + if x < size - 1 { + if visited.insert((x + 1, y)) { + scope.spawn(move |scope| { + visit(size, visited, x + 1, y, scope); + }); + } + } + + for i in 0..200 { + let mut s = DefaultHasher::new(); + i.hash(&mut s); + black_box(s.finish()); + } + } + + bencher.bench_local(|| { + let visited = DashSet::new(); + + scope(|scope| { + visit(size, &visited, size / 2, size / 2, scope); + }); + }); +} + +fn main() { + let fmt_layer = fmt::layer() + .without_time() + .with_target(false) + .with_thread_names(true) + .compact(); + + tracing_subscriber::registry().with(fmt_layer).init(); + + COMPUTE.resize_to_available(); + + divan::main(); +} diff --git a/benches/fork_join.rs b/benches/fork_join.rs index 286554a..6772b8b 100644 --- a/benches/fork_join.rs +++ b/benches/fork_join.rs @@ -1,7 +1,6 @@ //! A benchmark for fork-join workloads adapted from `chili`. use chili::Scope; -use criterion::black_box; use divan::Bencher; use forte::Worker; use tracing::info; @@ -78,14 +77,8 @@ static COMPUTE: forte::ThreadPool = forte::ThreadPool::new(); fn forte(bencher: Bencher, nodes: (usize, usize)) { fn sum(node: &Node, worker: &Worker) -> u64 { let (left, right) = worker.join( - |w| { - let sum = node.left.as_deref().map(|n| sum(n, w)).unwrap_or_default(); - sum - }, - |w| { - let sum = node.right.as_deref().map(|n| sum(n, w)).unwrap_or_default(); - sum - }, + |w| node.left.as_deref().map(|n| sum(n, w)).unwrap_or_default(), + |w| node.right.as_deref().map(|n| sum(n, w)).unwrap_or_default(), ); node.val + left + right diff --git a/ci/src/ci.rs b/ci/src/ci.rs index a5e66b0..b3c23ba 100644 --- a/ci/src/ci.rs +++ b/ci/src/ci.rs @@ -71,12 +71,19 @@ impl CI { None => { // Note that we are running the subcommands directly rather than using any aliases let mut cmds = vec![]; + // Lint commands cmds.append(&mut commands::FormatCommand::default().prepare(sh, flags)); cmds.append(&mut commands::ClippyCommand::default().prepare(sh, flags)); cmds.append(&mut commands::LintsCommand::default().prepare(sh, flags)); + // Compile commands cmds.append(&mut commands::CompileCheckCommand::default().prepare(sh, flags)); + // Documentation commands cmds.append(&mut commands::DocCheckCommand::default().prepare(sh, flags)); cmds.append(&mut commands::DocTestCommand::default().prepare(sh, flags)); + // Shuttle commands + cmds.append(&mut commands::ShuttleCheckCommand::default().prepare(sh, flags)); + cmds.append(&mut commands::ShuttleClippyCommand::default().prepare(sh, flags)); + cmds.append(&mut commands::ShuttleTestCommand::default().prepare(sh, flags)); cmds } } diff --git a/ci/src/commands/shuttle.rs b/ci/src/commands/shuttle.rs index 2f80b06..857a7d9 100644 --- a/ci/src/commands/shuttle.rs +++ b/ci/src/commands/shuttle.rs @@ -7,9 +7,9 @@ use crate::commands::ShuttleCheckCommand; use crate::commands::ShuttleClippyCommand; use crate::commands::ShuttleTestCommand; -/// Alias for running the `loom-check`, `loom-clippy` and `loom-test` subcommands. +/// Alias for running the `shuttle-check`, `shuttle-clippy` and `shuttle-test` subcommands. #[derive(FromArgs, Default)] -#[argh(subcommand, name = "loom")] +#[argh(subcommand, name = "shuttle")] pub struct ShuttleCommand {} impl Prepare for ShuttleCommand { diff --git a/ci/src/commands/shuttle_clippy.rs b/ci/src/commands/shuttle_clippy.rs index ecaf180..8c0111d 100644 --- a/ci/src/commands/shuttle_clippy.rs +++ b/ci/src/commands/shuttle_clippy.rs @@ -7,7 +7,7 @@ use crate::PreparedCommand; /// Checks for clippy warnings and errors in the loom test suite. #[derive(FromArgs, Default)] -#[argh(subcommand, name = "loom-clippy")] +#[argh(subcommand, name = "shuttle-clippy")] pub struct ShuttleClippyCommand {} impl Prepare for ShuttleClippyCommand { diff --git a/ci/src/commands/shuttle_test.rs b/ci/src/commands/shuttle_test.rs index 12b401f..cdfc41f 100644 --- a/ci/src/commands/shuttle_test.rs +++ b/ci/src/commands/shuttle_test.rs @@ -7,7 +7,7 @@ use crate::PreparedCommand; /// Runs the loom concurrency test suite. #[derive(FromArgs, Default)] -#[argh(subcommand, name = "loom-test")] +#[argh(subcommand, name = "shuttle-test")] pub struct ShuttleTestCommand {} impl Prepare for ShuttleTestCommand { diff --git a/rayon-compat/Cargo.toml b/rayon-compat/Cargo.toml deleted file mode 100644 index d2e4000..0000000 --- a/rayon-compat/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "forte-rayon-compat" -version = "1.12.0-dev" -edition = "2024" -license = "MIT OR Apache-2.0" -description = "A shim that allows replacing rayon-core with forte" -repository = "https://github.com/NthTensor/Forte" - -[dependencies] -forte = { version = "1.0.0-dev", path = ".." } - -[features] -web_spin_lock = [] diff --git a/rayon-compat/README.md b/rayon-compat/README.md deleted file mode 100644 index da70597..0000000 --- a/rayon-compat/README.md +++ /dev/null @@ -1,14 +0,0 @@ -# Rayon Compat - -This is a way to run `rayon` on top of `forte`! The `rayon-compat` crate mocks the important bits of the api of `rayon_core` in a pretty simple and crude way, which is none-the-less enough to support most of what `rayon` needs. - -To use this crate, apply the following cargo patch like one of these: -``` -// If you want to clone forte and use it locally -[patch.crates-io] -rayon-core = { path = "path to this repo", package = "rayon-compat" } - -// If you want to use the latest published version of forte -[patch.crates-io] -rayon-core = { path = "https://github.com/NthTensor/Forte", package = "rayon-compat" } -``` diff --git a/rayon-compat/src/lib.rs b/rayon-compat/src/lib.rs deleted file mode 100644 index 995e518..0000000 --- a/rayon-compat/src/lib.rs +++ /dev/null @@ -1,186 +0,0 @@ -use std::sync::atomic::{AtomicBool, Ordering}; - -pub static THREAD_POOL: forte::ThreadPool = const { forte::ThreadPool::new() }; - -pub static STARTED: AtomicBool = const { AtomicBool::new(false) }; - -#[inline(always)] -fn ensure_started() { - if !STARTED.load(Ordering::Relaxed) && !STARTED.swap(true, Ordering::Relaxed) { - THREAD_POOL.resize_to_available(); - } -} - -#[inline(always)] -pub fn current_num_threads() -> usize { - 64 // Forte prefers smaller tasks, so it's better to lie to rayon about the size of the pool -} - -#[inline(always)] -pub fn current_thread_index() -> Option { - forte::Worker::map_current(|worker| worker.index()) -} - -#[inline(always)] -pub fn max_num_threads() -> usize { - usize::MAX // The number of forte workers is only bounded by the size of a vector. -} - -// ----------------------------------------------------------------------------- -// Join - -#[derive(Debug)] -pub struct FnContext { - /// True if the task was migrated. - migrated: bool, -} - -impl FnContext { - #[inline(always)] - pub fn migrated(&self) -> bool { - self.migrated - } -} - -#[inline(always)] -pub fn join_context(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce(FnContext) -> RA + Send, - B: FnOnce(FnContext) -> RB + Send, - RA: Send, - RB: Send, -{ - ensure_started(); - THREAD_POOL.join( - |worker| { - let migrated = worker.migrated(); - let ctx = FnContext { migrated }; - oper_a(ctx) - }, - |worker| { - let migrated = worker.migrated(); - let ctx = FnContext { migrated }; - oper_b(ctx) - }, - ) -} - -#[inline(always)] -pub fn join(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, - RA: Send, - RB: Send, -{ - ensure_started(); - THREAD_POOL.join(|_| oper_a(), |_| oper_b()) -} - -// ----------------------------------------------------------------------------- -// Scope - -pub struct Scope<'scope, 'env> { - inner_scope: &'scope forte::Scope<'scope, 'env>, -} - -impl<'scope, 'env> Scope<'scope, 'env> { - pub fn spawn(self, f: F) - where - F: FnOnce(Scope<'scope, 'env>) + Send + 'scope, - { - forte::Worker::with_current(|worker| { - let worker = worker.unwrap(); - let inner_scope = self.inner_scope; - inner_scope.spawn_on(worker, |_| f(Scope { inner_scope })) - }); - } -} - -#[inline(always)] -pub fn scope<'env, OP, R>(op: OP) -> R -where - OP: for<'scope> FnOnce(Scope<'scope, 'env>) -> R + Send, - R: Send, -{ - ensure_started(); - forte::scope(|inner_scope| op(Scope { inner_scope })) -} - -#[inline(always)] -pub fn in_place_scope<'env, OP, R>(op: OP) -> R -where - OP: for<'scope> FnOnce(Scope<'scope, 'env>) -> R, -{ - ensure_started(); - forte::scope(|inner_scope| op(Scope { inner_scope })) -} - -// ----------------------------------------------------------------------------- -// Spawn - -#[inline(always)] -pub fn spawn(func: F) -where - F: FnOnce() + Send + 'static, -{ - ensure_started(); - THREAD_POOL.spawn(|_| func()) -} - -// ----------------------------------------------------------------------------- -// Yield - -pub use forte::Yield; - -pub fn yield_local() -> Yield { - let result = forte::Worker::map_current(forte::Worker::yield_local); - match result { - Some(status) => status, - _ => Yield::Idle, - } -} - -pub fn yield_now() -> Yield { - let result = forte::Worker::map_current(forte::Worker::yield_now); - match result { - Some(status) => status, - _ => Yield::Idle, - } -} - -// ----------------------------------------------------------------------------- -// Fake stuff that dosn't work. These are here only so so that rayon can export -// them. - -pub struct ThreadBuilder; - -pub struct ThreadPool; - -pub struct ThreadPoolBuildError; - -pub struct ThreadPoolBuilder; - -pub struct BroadcastContext; - -pub struct ScopeFifo; - -pub fn broadcast() { - unimplemented!() -} - -pub fn spawn_broadcast() { - unimplemented!() -} - -pub fn scope_fifo() { - unimplemented!() -} - -pub fn in_place_scope_fifo() { - unimplemented!() -} - -pub fn spawn_fifo() { - unimplemented!() -} diff --git a/src/compile_fail.rs b/src/compile_fail.rs new file mode 100644 index 0000000..832b171 --- /dev/null +++ b/src/compile_fail.rs @@ -0,0 +1,190 @@ +//! Contains a set of compile failure doctests. + +// ----------------------------------------------------------------------------- +// Ensures non-send data cannot be moved into a join. + +/** ```compile_fail,E0277 + +use std::rc::Rc; +use forte::ThreadPool; + +static THREAD_POOL: ThreadPool = ThreadPool::new(); + +let r = Rc::new(22); +THREAD_POOL.join(|_| r.clone(), |_| r.clone()); +//~^ ERROR + +``` */ +mod nonsend_input {} + +// ----------------------------------------------------------------------------- +// Ensures non-send data cannot be returned by join. + +/** ```compile_fail,E0277 + +use std::rc::Rc; +use forte::ThreadPool; + +static THREAD_POOL: ThreadPool = ThreadPool::new(); + +THREAD_POOL.join(|_| Rc::new(22), |_| ()); //~ ERROR + +THREAD_POOL.depopulate(); + +``` */ +mod nonsend_left_join {} + +/** ```compile_fail,E0277 + +use std::rc::Rc; +use forte::ThreadPool; + +static THREAD_POOL: ThreadPool = ThreadPool::new(); + +THREAD_POOL.join(|_| (), |_| Rc::new(23)); //~ ERROR + +THREAD_POOL.depopulate(); + +``` */ +mod nonsend_right_join {} + +// ----------------------------------------------------------------------------- +// Ensures scopes can not borrow data spawned within the closure. + +/** ```compile_fail,E0373 + +use forte::ThreadPool; +use forte::Worker; + +static THREAD_POOL: ThreadPool = ThreadPool::new(); + +fn bad_scope(f: F) + where F: FnOnce(&i32) + Send, +{ + THREAD_POOL.scope(|scope| { + let x = 22; + scope.spawn(|_: &Worker| f(&x)); //~ ERROR `x` does not live long enough + }); +} + +fn good_scope(f: F) + where F: FnOnce(&i32) + Send, +{ + let x = 22; + THREAD_POOL.scope(|scope| { + scope.spawn(|_: &Worker| f(&x)); + }); +} + +fn main() { } + +``` */ +mod scope_join_bad {} + +// ----------------------------------------------------------------------------- +// Ensures the two branches of a join mutably borrow the same data. + +/** ```compile_fail,E0524 + +use forte::ThreadPool; +use forte::Worker; + +static THREAD_POOL: ThreadPool = ThreadPool::new(); + +fn quick_sort(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, _hi) = v.split_at_mut(mid); + THREAD_POOL.join(|_| quick_sort(lo), |_| quick_sort(lo)); //~ ERROR +} + +fn partition(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ +mod quicksort_race_1 {} + +/** ```compile_fail,E0500 + +use forte::ThreadPool; +use forte::Worker; + +static THREAD_POOL: ThreadPool = ThreadPool::new(); + +fn quick_sort(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (lo, _hi) = v.split_at_mut(mid); + THREAD_POOL.join(|_| quick_sort(lo), |_| quick_sort(v)); //~ ERROR +} + +fn partition(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ +mod quicksort_race_2 {} + +/** ```compile_fail,E0524 + +use forte::ThreadPool; +use forte::Worker; + +static THREAD_POOL: ThreadPool = ThreadPool::new(); + +fn quick_sort(v: &mut [T]) { + if v.len() <= 1 { + return; + } + + let mid = partition(v); + let (_lo, hi) = v.split_at_mut(mid); + THREAD_POOL.join(|_| quick_sort(hi), |_| quick_sort(hi)); //~ ERROR +} + +fn partition(v: &mut [T]) -> usize { + let pivot = v.len() - 1; + let mut i = 0; + for j in 0..pivot { + if v[j] <= v[pivot] { + v.swap(i, j); + i += 1; + } + } + v.swap(i, pivot); + i +} + +fn main() { } + +``` */ +mod quicksort_race_3 {} diff --git a/src/job.rs b/src/job.rs index 9aba42f..75e6347 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,5 +1,5 @@ //! This module defines an executable unit of work called a [`Job`]. Jobs are what -//! get scheduled on the thread-pool. There are two core job types: [`StackJob`] +//! get scheduled on the thread pool. There are two core job types: [`StackJob`] //! and [`HeapJob`]. //! //! After a job is allocated, we typically refer to it by a [`JobRef`]. Job refs @@ -87,8 +87,8 @@ impl JobRef { /// Returns an opaque handle that can be saved and compared, without making /// `JobRef` itself `Copy + Eq`. #[inline(always)] - pub fn id(&self) -> impl Eq + use<> { - (self.job_pointer, self.execute_fn) + pub fn id(&self) -> (usize, usize) { + (self.job_pointer.as_ptr() as usize, self.execute_fn as usize) } /// Executes the `JobRef` by passing the execute function on the job pointer. @@ -117,7 +117,7 @@ impl JobQueue { } #[inline(always)] - pub fn push_back(&self, job_ref: JobRef) -> Option { + pub fn push(&self, job_ref: JobRef) -> Option { // SAFETY: The queue itself is only access mutably within `push_back`, // `pop_back` and `pop_front`. Since these functions never call each // other, we must have exclusive access to the queue. @@ -130,7 +130,7 @@ impl JobQueue { } #[inline(always)] - pub fn pop_back(&self) -> Option { + pub fn pop_newest(&self) -> Option { // SAFETY: The queue itself is only access mutably within `push_back`, // `pop_back` and `pop_front`. Since these functions never call each // other, we must have exclusive access to the queue. @@ -138,8 +138,23 @@ impl JobQueue { job_refs.pop_back() } + // Attempt to remove the given job-ref from the back of the queue. #[inline(always)] - pub fn pop_front(&self) -> Option { + pub fn recover_just_pushed(&self, id: (usize, usize)) -> bool { + // SAFETY: The queue itself is only access mutably within `push_back`, + // `pop_back` and `pop_front`. Since these functions never call each + // other, we must have exclusive access to the queue. + let job_refs = unsafe { &mut *self.job_refs.get() }; + if job_refs.back().map(JobRef::id) == Some(id) { + let _ = job_refs.pop_back(); + true + } else { + false + } + } + + #[cold] + pub fn pop_oldest(&self) -> Option { // SAFETY: The queue itself is only access mutably within `push_back`, // `pop_back` and `pop_front`. Since these functions never call each // other, we must have exclusive access to the queue. @@ -151,10 +166,7 @@ impl JobQueue { // ----------------------------------------------------------------------------- // Stack allocated work function -/// A [`StackJob`] is a job that's allocated on the stack. It's efficient, but -/// relies on us preventing the stack frame from being dropped. Stack jobs are -/// used mainly for `join` and other blocking thread pool operations. They -/// also support explicit return values, transmitted via an attached signal. +/// A [`StackJob`] is a job that's allocated on the stack. /// /// This is analogous to the chili type `JobStack` and the rayon type `StackJob`. pub struct StackJob { @@ -203,6 +215,13 @@ where unsafe { JobRef::new_raw(job_pointer, Self::execute) } } + /// Returns a reference to the latch embedded in this stack job. After this + /// latch is set, it becomes safe to call `StackJob::return_value`. + #[inline(always)] + pub fn completion_latch(&self) -> &Latch { + &self.completed + } + /// Unwraps the stack job back into a closure. This allows the closure to be /// executed without indirection in situations where the one still has /// direct access. @@ -211,27 +230,20 @@ where /// /// This may only be called before the job is executed. #[inline(always)] - pub unsafe fn unwrap(mut self) -> F { + pub unsafe fn unwrap(&mut self) -> F { // SAFETY: This will not be used again. Given that `execute` has not // already been, it will never be used twice. unsafe { ManuallyDrop::take(self.f.get_mut()) } } - /// Returns a reference to the signal embedded in this stack job. The - /// closure's return value is sent over this signal after the job is - /// executed. - #[inline(always)] - pub fn completion_latch(&self) -> &Latch { - &self.completed - } - /// Unwraps the job into it's return value. /// /// # Safety /// - /// This may only be called after the job has finished executing. + /// This may only be called after the job has finished executing, and it's + /// latch has been set. #[inline(always)] - pub unsafe fn return_value(mut self) -> ThreadResult { + pub unsafe fn return_value(&mut self) -> ThreadResult { // Synchronize with the fence in `StackJob::execute`. fence(Ordering::Acquire); // Get a ref to the result. @@ -315,12 +327,12 @@ where /// Converts the heap job into an "owning" `JobRef`. The job will be /// automatically dropped when the `JobRef` is executed. /// - /// # Safety - /// /// This will leak memory if the `JobRef` is not executed, so the caller /// must ensure that it is eventually executed (unless the process is /// exiting). /// + /// # Safety + /// /// If the `JobRef` is executed, the caller must ensure that it has not /// outlived the data it closes over. In other words, if the closure /// references something, that thing must live until the `JobRef` is diff --git a/src/latch.rs b/src/latch.rs index 7a65c19..8c7564c 100644 --- a/src/latch.rs +++ b/src/latch.rs @@ -1,4 +1,10 @@ -//! A core concept in Rayon is the *latch*. +//! A core concept in Rayon is the *latch*. Forte has borrowed this, in a +//! somewhat simplified form. +//! +//! Every forte worker thread is has a single "sleep controller" that it uses to +//! park and unpark itself. Latches build on this to create a simple boolean +//! switch, which allows the owning thread to sleep until the latch becomes set +//! by another thread. use core::{ pin::Pin, @@ -24,9 +30,9 @@ const ASLEEP: u32 = 0b10; // ----------------------------------------------------------------------------- // Latch -/// A [Latch] is a signaling mechanism used to indicate when an event has +/// A Latch is a signaling mechanism used to indicate when an event has /// occurred. The latch begins as *unset* (In the `LOCKED` state), and can later -/// be *set* by any thread (entering the *SIGNAL*) state. +/// be *set* by any thread (entering the `SIGNAL`) state. /// /// Each latch is associated with one *owner thread*. This is the thread that /// may be blocking, waiting for the latch to complete. @@ -60,17 +66,21 @@ impl Latch { /// Waits for the latch to be set. In actuality, this may be woken. /// /// Returns true if the latch signal was received, and false otherwise. - #[inline(always)] - pub fn wait(&self) -> bool { + #[cold] + pub fn wait(&self) { + // First, check if the latch has been set. + // + // In the event of a race with `set`: + // + If this happens before the store, then we will go to sleep. + // + If this happens after the store, then we notice and return. if self.state.load(Ordering::Relaxed) == SIGNAL { - return true; - } - let slept = self.sleep_controller.sleep(); - if slept { - self.state.load(Ordering::Relaxed) == SIGNAL - } else { - false + return; } + // If it has not been set, go to sleep. + // + // In the event of a race with `set`, the `wake` will always cause this + // to return regardless of memory ordering. + self.sleep_controller.sleep(); } /// Activates the latch, potentially unblocking the owning thread. @@ -86,8 +96,16 @@ impl Latch { pub unsafe fn set(latch: *const Latch) { // SAFETY: At this point, the latch must still be valid to dereference. let sleep_controller = unsafe { (*latch).sleep_controller }; + // First we set the state to true. + // + // In the event of a race with `wait`, this may cause `wait` to return. + // Otherwise the other thread will sleep within `wait. + // // SAFETY: At this point, the latch must still be valid to dereference. unsafe { (*latch).state.store(SIGNAL, Ordering::Relaxed) }; + // We must try to wake the other thread, just in case it missed the + // notification and went to sleep. This garentees that the other thread + // will make progress. sleep_controller.wake(); } @@ -107,36 +125,124 @@ impl Latch { // Sleeper /// Used, in combination with a latch to park and unpark threads. +#[cfg(not(feature = "shuttle"))] pub struct SleepController { state: AtomicU32, + num_sleeping: &'static AtomicU32, } -impl Default for SleepController { - fn default() -> SleepController { +#[cfg(not(feature = "shuttle"))] +impl SleepController { + /// Creates a new latch. Expects to be passed an atomic used for tracking + /// the number of sleeping workers. + pub fn new(num_sleeping: &'static AtomicU32) -> Self { SleepController { state: AtomicU32::new(LOCKED), + num_sleeping, } } -} -impl SleepController { + // Attempt to wake the thread to which this belongs. + // + // Returns true if this allows the thread to make progress (by waking it up + // or catching it before it goes to sleep) and false if the thread was + // running. + #[inline(always)] pub fn wake(&self) -> bool { + // Set set the state to SIGNAL and read the current state, which must be + // either LOCKED, ASLEEP or SIGNAL. let sleep_state = self.state.swap(SIGNAL, Ordering::Relaxed); let asleep = sleep_state == ASLEEP; if asleep { + // Decrement the sleeping counter by one. + self.num_sleeping.fetch_sub(1, Ordering::Relaxed); + // If the state was ASLEEP, the thread is either asleep or about to + // go to sleep. + // + // + If it is about to go to sleep (but has not yet called + // `atomic_wait::wait`) then setting the state to SIGNAL above + // should prevent it from going to sleep. + // + // + If it is already waiting, the following notification will wake + // it up. + // + // Either way, after this call the other thread must make progress. atomic_wait::wake_one(&self.state); } + // Return true if the other thread was asleep asleep } - pub fn sleep(&self) -> bool { + // Attempt to send the thread to sleep. This should only be called on a + // single thread, and we say that this controller "belongs" to that thread. + // + // Returns true if this thread makes a syscall to suspend the thread, and + // false if the thread was already woken (letting us skip the syscall). + #[cold] + pub fn sleep(&self) { + // Set the state to ASLEEP and read the current state, which must be + // either LOCKED or SIGNAL. let state = self.state.swap(ASLEEP, Ordering::Relaxed); - let sleep = state == LOCKED; - if sleep { + // If the state is LOCKED, then we have not yet received a signal, and + // we should try to put the thread to sleep. Otherwise we should return + // early. + if state == LOCKED { + // Increase the sleeping count by one. + self.num_sleeping.fetch_add(1, Ordering::Relaxed); + // If we have received a signal since entering the sleep state + // (meaning the state is not longer set to ASLEEP) then this will + // return immediately. + // + // If the state is still ASLEEP, then the next call to `wake` will + // register that and call `wake_on`. + // + // Either way, there is no way we can fail to receive a `wake`. atomic_wait::wait(&self.state, ASLEEP); } + // Set the state back to LOCKED so that we are ready to receive new + // signals. self.state.store(LOCKED, Ordering::Relaxed); - sleep + } +} + +// ----------------------------------------------------------------------------- +// Shuttle sleeper fallback + +/// This is a fallback implementation because the futex api is not available on +/// shuttle. +#[cfg(feature = "shuttle")] +pub struct SleepController { + state: Mutex, + condvar: Condvar, +} + +#[cfg(feature = "shuttle")] +impl Default for SleepController { + fn default() -> SleepController { + SleepController { + state: Mutex::new(LOCKED), + condvar: Condvar::new(), + } + } +} + +#[cfg(feature = "shuttle")] +impl SleepController { + pub fn wake(&self) -> bool { + let state = core::mem::replace(&mut *self.state.lock().unwrap(), SIGNAL); + let asleep = state == ASLEEP; + if asleep { + self.condvar.notify_one(); + } + asleep + } + + pub fn sleep(&self) { + let mut state = self.state.lock().unwrap(); + if *state == LOCKED { + *state = ASLEEP; + self.condvar.wait(state).unwrap(); + } } } diff --git a/src/lib.rs b/src/lib.rs index 16e1f14..368c2d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,26 +1,214 @@ -//! An async-compatible thread-pool aiming for "speed through simplicity". -//! -//! Forte is a parallel & async work scheduler designed to accommodate very large -//! workloads with many short-lived tasks. It replicates the `rayon_core` api -//! but with native support for futures and async tasks. Its design was -//! prompted by the needs of the bevy game engine, but should be applicable to -//! any problem that involves running both synchronous and asynchronous work -//! concurrently. -//! -//! The thread-pool provided by this crate does not employ work-stealing. Forte -//! instead uses "Heartbeat Scheduling", an alternative load-balancing technique -//! that (theoretically) provides provably small overheads and good utilization. -//! The end effect is that work is only parallelized every so often, allowing -//! more work to be done sequentially on each thread and amortizing the -//! synchronization overhead. -//! -//! # Acknowledgments -//! -//! Large portions of the code are direct ports from various versions of -//! `rayon_core`, with minor simplifications and improvements. We also relied -//! upon `chili` and `spice` for reference while writing the heartbeat -//! scheduling. Support for futures is based on an approach sketched out by -//! members of the `rayon` community to whom we are deeply indebted. +//! Forte is a thread pool that uses lazy scheduling techniques to reduce latency and +//! maximize parallel utilization. It is a close cousin to rayon-core, and takes +//! loose inspiration from zig's `chili` library, the Beam virtual machine, and Java's +//! `ForkJoinPool`. +//! +//! It features: +//! + Statically defined and dynamically sized thread pools. +//! + Fully stack-allocated and inlined fork/join parrellism. +//! + The ability to execute both closures and futures on the same pool. +//! + Hybrid scopes that can contain work distributed across multiple thread pools. +//! + A primitive for awaiting async work in non-async contexts without spinning. +//! + An exposed unsafe api, built for for low-level integration and customization. +//! +//! Here's an example of what it looks like: +//! +//! ``` +//! # use forte::ThreadPool; +//! # use forte::Worker; +//! // Allocate a new thread pool. +//! static THREAD_POOL: ThreadPool = ThreadPool::new(); +//! +//! fn main() { +//! // Resize the pool to fill the available number of cores. +//! THREAD_POOL.resize_to_available(); +//! +//! // Register this thread as a worker on the pool. +//! THREAD_POOL.with_worker(|worker| { +//! // Spawn a job onto the pool. The closure also accepts a worker, because the +//! // job may be executed on a different thread. This will be the worker for whatever +//! // thread it executes on. +//! worker.spawn(|worker: &Worker| { +//! // Spawn another job after this one runs, using the provided local worker. +//! worker.spawn(|_: &Worker| { }); +//! +//! // Spawn another job, this time using the thread pool. If it is not already, +//! // methods like this may temporarily turn the current thread into a participant +//! // in the pool. +//! THREAD_POOL.spawn(|_: &Worker| { }); +//! +//! // Spawn a third job, which will automatically use the parent thread pool. +//! // This will panic if not called within a worker context, and is generally +//! // not recommended. +//! forte::spawn(|_: &Worker| { }); +//! }); +//! +//! // Spawn a future as a job. +//! let task = THREAD_POOL.spawn(async { "Hello World" }); +//! +//! // Do two operations in parallel, and await the result of each. This is the most +//! // efficient and hyper-optimized thread pool operation. +//! let (a, b) = worker.join(|_| "a", |_| "b"); +//! assert_eq!(a, "a"); +//! assert_eq!(b, "b"); +//! +//! // Wait for that task we started earlier, without using `await`. +//! let result = worker.block_on(task); +//! assert_eq!(result, "Hello World"); +//! }); +//! +//! // Wait for all the worker threads to gracefully halt. +//! THREAD_POOL.depopulate(); +//! } +//! ``` +//! +//! # Creating Thread Pools +//! +//! Thread pools must be static and const constructed. You don't have to worry +//! about `LazyStatic` or anything else; to create a new thread pool, just call +//! [`ThreadPool::new`] and create a new static to name your pool. +//! +//! ```rust,no_run +//! # use forte::ThreadPool; +//! # use forte::Worker; +//! // Allocate a new thread pool. +//! static THREAD_POOL: ThreadPool = ThreadPool::new(); +//! ``` +//! +//! This attaches a new thread pool to your program named `THREAD_POOL`, which you +//! can begin to schedule work on immediately. The thread pool will exist for +//! the entire duration of your program, and will shut down when your program +//! completes. +//! +//! # Resizing Thread Pools +//! +//! Thread pools are dynamically sized; When your program starts they have size +//! zero (meaning no worker threads are running). You can change the number of +//! works assigned to a pool using [`ThreadPool::grow`], [`ThreadPool::shrink`] +//! and [`ThreadPool::resize_to`]. But most of the time you will want to call +//! [`ThreadPool::resize_to_available`], which will resize the pool to exploit +//! all the available parallelism on your system by spawning a worker thread for +//! every core. +//! +//! When a thread pool has workers, those workers will automatically help +//! execute all work done on the pool. The number of worker threads attached to +//! a pool roughly determines the amount of parallelism available. If an +//! external thread tries to use a pool of size zero (with no workers), it will +//! still be able to do work, it just won't be done in parallel. And if multiple +//! external threads use an empty pool at the same time, they will sometimes try +//! to collaborate and help each-other out with work. +//! +//! ``` +//! # use forte::ThreadPool; +//! # static THREAD_POOL: ThreadPool = ThreadPool::new(); +//! // Create as many worker threads as possible. +//! THREAD_POOL.resize_to_available(); +//! +//! // Do some potentially parallel work. These may (or may not) run in parallel. +//! THREAD_POOL.join(|_| println!("world"), |_| println!("hello" )); +//! +//! // This may have printed "hello world" or "worldhello ". +//! +//! // Gracefully shut down all the worker threads. +//! THREAD_POOL.depopulate(); +//! +//! // Do the same work, but this time we know it will execute serially (because +//! // there are no workers to parallelized it). +//! THREAD_POOL.join(|_| println!("world"), |_| println!("hello ")); +//! +//! // This will always print "hello world" (because join happens execute things +//! // backwards in this case). +//! ``` +//! +//! # Workers +//! +//! Thread pools are comprised of (and run on) workers, represented as instances +//! of the [`Worker`] type. All work done on the pool is done in a "worker +//! context" created by [`Worker::occupy`]. The recommended way to access a +//! worker context for a specific pool is via [`ThreadPool::with_worker`]. +//! +//! ``` +//! # use forte::ThreadPool; +//! # static THREAD_POOL: ThreadPool = ThreadPool::new(); +//! THREAD_POOL.with_worker(|worker_1| { // <-- Creates a worker on the pool. +//! THREAD_POOL.with_worker(|worker_2| { // <-- Returns a reference to the existing worker. +//! // These pointers are identical. +//! assert!(std::ptr::eq(worker_1, worker_2)); +//! }); // <-- Leaving this scope does nothing. +//! }); // <-- Leaving this scope frees the worker. +//! ``` +//! +//! Every worker holds a local queue of tasks, as well as metadata that allows +//! other workers on the pool to communicate with it and wake it from sleep. +//! When existing outermost scope (where the worker was actually allocated), all +//! tasks left in the local queue are executed. +//! +//! You will only ever receive `&Worker` references, because the worker is not +//! allowed to move or be mutably referenced. Worker are `!Send` and `!Sync`, +//! and are meant to represent local-only data. +//! +//! To access the current worker context, you can use [`Worker::map_current`] or +//! [`Worker::with_current`]. These allow executing work on arbitrary pools, and +//! can be used to write library code that works normally dispute not knowing +//! about the thread pool static defined by the application. +//! +//! ```rust +//! # use forte::Worker; +//! # fn foo() {} +//! fn my_library_function() { +//! Worker::with_current(|maybe_worker| match maybe_worker { +//! // This was called from within a worker context. +//! Some(worker) => worker.spawn(|_: &Worker| foo()), +//! // This was not called from within a worker context. +//! None => foo() +//! }) +//! } +//! +//! ``` +//! +//! # Core Operations +//! +//! Thread pools support four core operations: +//! * *Join.* Executes two non-static closures, possibly in parallel, and waits for them to complete. +//! * *Spawn.* Runs a static closure or future in the background. +//! * *Scope.* Runs multiple non-static closures or futures, and waits for them all to complete. +//! * *Block on.* Waits for a future to complete (outside of an async context). +//! +//! All of these with the exception of *Spawn* are blocking; they have a +//! specific join-point where a thread must wait for the all the forks of the +//! parallel operation to complete before proceeding. While it is waiting, +//! threads will attempt to do background work, or help each-other out with +//! their assigned workload. +//! +//! Each operation is available in three different "flavors", depending on the +//! information available at the callsite. +//! +//! | Operation | Headless | Thread pool | Worker | +//! |-----------|----------|-------------|--------| +//! | *Join* | [`join()`] | [`ThreadPool::join()`] | [`Worker::join()`] +//! | *Spawn* | [`spawn()`] | [`ThreadPool::spawn()`] | [`Worker::spawn()`] +//! | *Scope* | [`scope()`] | [`ThreadPool::scope()`] | [`Worker::scope()`] +//! | *Block on* | [`block_on()`] | [`ThreadPool::block_on()`] | [`Worker::block_on()`] +//! +//! * *Worker.* Uses the provided worker context. +//! * *Thread pool.* Looks for an existing worker context, creates one if it dosn't find one. +//! * *Headless.* Looks for an existing worker context, and panics if it dosn't find one. +//! +//! The headless and thread pool flavors are more or less just aliases for the +//! worker flavor. Where possible, the worker flavor should be preferred to the +//! thread pool flavor, and the thread pool flavor should be preferred to the +//! headless flavor. +//! +//! # Theory & Background +//! +//! Forte is based on `rayon_core`, to the extent that during development it was +//! often possible to port code from `rayon_core` more or less verbatim. +//! However, forte and rayon differ significantly in their goals and approach. +//! +//! Rayon uses an approach to work-stealing adapted from Cilk and Intel TBB. +//! These techniques are largely the industry standard. +//! +//! [^TZANNES]: Tzannes et al. 2024, #![no_std] #![cfg_attr(feature = "shuttle", allow(dead_code))] @@ -35,6 +223,7 @@ extern crate std; // ----------------------------------------------------------------------------- // Modules +mod compile_fail; mod job; mod latch; mod scope; @@ -42,10 +231,21 @@ mod thread_pool; mod unwind; mod util; +// ----------------------------------------------------------------------------- +// Trait markers + +#[doc(hidden)] +pub struct FnOnceMarker(); + +#[doc(hidden)] +pub struct FutureMarker(); + // ----------------------------------------------------------------------------- // Top-level exports pub use scope::Scope; +pub use scope::ScopedSpawn; +pub use thread_pool::Spawn; pub use thread_pool::ThreadPool; pub use thread_pool::Worker; pub use thread_pool::Yield; @@ -53,8 +253,6 @@ pub use thread_pool::block_on; pub use thread_pool::join; pub use thread_pool::scope; pub use thread_pool::spawn; -pub use thread_pool::spawn_async; -pub use thread_pool::spawn_future; // ----------------------------------------------------------------------------- // Platform Support @@ -102,6 +300,9 @@ mod platform { pub use shuttle::thread::JoinHandle; pub use shuttle::thread_local; + pub use shuttle::rand::Rng; + pub use shuttle::rand::thread_rng; + // Available parallelism pub fn available_parallelism() -> std::io::Result> { diff --git a/src/scope.rs b/src/scope.rs index d1d97ce..641ae97 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -3,14 +3,25 @@ use alloc::boxed::Box; use core::any::Any; +use core::cell::UnsafeCell; use core::future::Future; use core::marker::PhantomData; +use core::mem::ManuallyDrop; +use core::pin::Pin; use core::ptr; +use core::ptr::NonNull; +use core::sync::atomic::fence; +use core::task::Context; +use core::task::Poll; +use core::task::RawWaker; +use core::task::RawWakerVTable; +use core::task::Waker; +use std::process::abort; -use async_task::Runnable; -use async_task::Task; use scope_ptr::ScopePtr; +use crate::FnOnceMarker; +use crate::FutureMarker; use crate::ThreadPool; use crate::job::HeapJob; use crate::job::JobRef; @@ -23,7 +34,30 @@ use crate::unwind::AbortOnDrop; // ----------------------------------------------------------------------------- // Scope -/// A scope which can spawn a number of non-static jobs and async tasks. +/// A scope which can spawn a number of non-static jobs and async tasks. Refer +/// to [`scope`](crate::scope()) for more extensive documentation. +/// +/// # Lifetimes +/// +/// A scope has two lifetimes: `'scope` and `'env`. +/// +/// The `'scope` lifetime represents the lifetime of the scope itself. That is: +/// the time during which new scoped jobs may be spawned, and also the time +/// during which they might still be running. This lifetime starts within the +/// `scope()` function, before the closure `f` (the argument to `scope()`) is +/// executed. It ends after the closure `f` returns and after all scoped work is +/// complete, but before `scope()` returns. +/// +/// The `'env` lifetime represents the lifetime of whatever is borrowed by the +/// scoped jobs. This lifetime must outlast the call to `scope()`, and thus +/// cannot be smaller than `'scope`. It can be as small as the call to +/// `scope()`, meaning that anything that outlives this call, such as local +/// variables defined right before the scope, can be borrowed by the scoped +/// jobs. +/// +/// The `'env: 'scope` bound is part of the definition of the `Scope` type. The +/// requirement that scoped work outlive `'scope` is part of the definition of +/// the [`ScopedSpawn`] trait. pub struct Scope<'scope, 'env: 'scope> { /// Number of active references to the scope (including the owning /// allocation). This is incremented each time a new `ScopePtr` is created, @@ -34,14 +68,34 @@ pub struct Scope<'scope, 'env: 'scope> { completed: Latch, /// If any job panics, we store the result here to propagate it. panic: AtomicPtr>, - /// Makes `Scope` invariant over 'scope + /// This adds invariance over 'scope, to make sure 'scope cannot shrink, + /// which is necessary for soundness. + /// + /// Without invariance, this would compile fine but be unsound: + /// + /// ```compile_fail + /// # use forte::ThreadPool; + /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); + /// # THREAD_POOL.resize_to_available(); + /// # THREAD_POOL.with_worker(|worker| { + /// worker.scope(|scope| { + /// scope.spawn_on(worker, |worker: &Worker| { + /// let a = String::from("abcd"); + /// scope.spawn_on(worker, |_: &Worker| println!("{a:?}")); // might run after `a` is dropped + /// }); + /// }); + /// # }); + /// ``` _scope: PhantomData<&'scope mut &'scope ()>, - /// Makes `Scope` invantiant over 'env + /// This adds covariance over 'env. _env: PhantomData<&'env mut &'env ()>, } -/// Crates a new scope on a worker. [`Worker::scope`] is just an alias for this -/// function. +/// Executes a new scope on a worker. [`Worker::scope`], +/// [`ThreadPool::scope`][crate::ThreadPool::scope] and [`scope`][crate::scope()] are all just +/// an aliases for this function. +/// +/// For details about the `'scope` and `'env` lifetimes see [`Scope`]. #[inline] pub fn with_scope<'env, F, T>(worker: &Worker, f: F) -> T where @@ -57,13 +111,13 @@ where // all spawned work is complete. This is not a safety requirement, it's just // a nicer behavior than aborting. let result = match unwind::halt_unwinding(|| f(&scope)) { - Ok(value) => Some(value), + Ok(result) => Some(result), Err(err) => { scope.store_panic(err); None } }; - // Now that the user has (presuamably) spawnd some work onto the scope, we + // Now that the user has (presumably) spawned some work onto the scope, we // must wait for it to complete. // // SAFETY: This is called only once, and we provide the same worker used to @@ -75,7 +129,7 @@ where core::mem::forget(abort_guard); // If the closure or any spawned work did panic, we can now panic. scope.maybe_propagate_panic(); - // Otherwise return the result of evaluating the closure. + // Return the result. result.unwrap() } @@ -97,128 +151,30 @@ impl<'scope, 'env> Scope<'scope, 'env> { } } - /// Spawns a scoped job onto the local worker. This job will execute - /// sometime before the scope completes. + /// Runs a closure or future sometime before the scope completes. Valid + /// inputs to this method are: /// - /// # Returns + /// + A `for<'worker> FnOnce(&'worker Worker)` closure, with no return type. /// - /// Nothing. The spawned closures cannot pass back values to the caller - /// directly, though they can write to local variables on the stack (if - /// those variables outlive the scope) or communicate through shared - /// channels. + /// + A `Future` future, with no return type. /// - /// If you need to return a value, spawn a `Future` instead with - /// [`Scope::spawn_future_on`]. + /// # Panics /// - pub fn spawn_on(&self, worker: &Worker, f: F) - where - F: FnOnce(&Worker) + Send + 'scope, - { - // Create a job to execute the spawned function in the scope. - let scope_ptr = ScopePtr::new(self); - let job = HeapJob::new(move |worker| { - // Catch any panics and store them on the scope. - let result = unwind::halt_unwinding(|| f(worker)); - if let Err(err) = result { - scope_ptr.store_panic(err); - }; - drop(scope_ptr); - }); - - // SAFETY: We must ensure that the heap job does not outlive the data it - // closes over. In effect, this means it must not outlive `'scope`. - // - // This is ensured by the `scope_ptr` and the scope rules, which will - // keep the calling stack frame alive until this job completes, - // effectively extending the lifetime of `'scope` for as long as is - // nessicary. - let job_ref = unsafe { job.into_job_ref() }; - - // Send the job to a queue to be executed. - worker.enqueue(job_ref); + /// If not in a worker, this panics. + pub fn spawn>(&'scope self, scoped_work: S) { + Worker::with_current(|worker| scoped_work.spawn_on(worker.unwrap(), self)); } - /// Spawns a future onto the scope. This future will be asynchronously - /// polled to completion some time before the scope completes. - /// - /// # Returns - /// - /// This returns a task, which represents a handle to the async computation - /// and is itself a future that can be awaited to receive the output of the - /// future. There's four ways to interact with a task: + /// Runs a closure or future sometime before the scope completes. Valid + /// inputs to this method are: /// - /// 1. Await the task. This will eventually produce the output of the - /// provided future. The scope will not complete until the output is - /// returned to the awaiting logic. + /// + A `for<'worker> FnOnce(&'worker Worker)` closure, with no return type. /// - /// 2. Drop the task. This will stop execution of the future and potentially - /// allow the scope to complete immediately. + /// + A `Future` future, with no return type. /// - /// 3. Cancel the task. This has the same effect as dropping the task, but - /// waits until the futures stops running (which in the worst-case means - /// waiting for the scope to complete). - /// - /// 4. Detach the task. This will allow the future to continue executing - /// even after the task itself is dropped. The scope will only complete - /// after the future polls to completion. Detaching a task with an - /// infinite loop will prevent the scope from completing, and is not - /// recommended. - /// - pub fn spawn_future_on(&self, thread_pool: &'static ThreadPool, future: F) -> Task - where - F: Future + Send + 'scope, - T: Send, - { - // Embed the scope pointer into the future. - let scope_ptr = ScopePtr::new(self); - let future = async move { - let result = future.await; - drop(scope_ptr); - result - }; - - // The schedule function will turn the future into a job when woken. - let schedule = move |runnable: Runnable| { - // Turn the runnable into a job-ref that we can send to a worker. - - // SAFETY: We provide a pointer to a non-null runnable, and we turn - // it back into a non-null runnable. The runnable will remain valid - // until the task is run. - let job_ref = unsafe { - JobRef::new_raw(runnable.into_raw(), |this, _| { - let runnable = Runnable::<()>::from_raw(this); - // Poll the task. - runnable.run(); - }) - }; - - // Send this job off to be executed. When this schedule function is - // called on a worker thread this re-schedules it onto the worker's - // local queue, which will generally cause tasks to stick to the - // same thread instead of jumping around randomly. This is also - // faster than injecting into the global queue. - thread_pool.with_worker(|worker| { - worker.enqueue(job_ref); - }); - }; - - // SAFETY: We must ensure that the runnable does not outlive the data it - // closes over. In effect, this means it must not outlive `'scope`. - // - // This is ensured by the `scope_ptr` and the scope rules, which will - // keep the calling stack frame alive until the runnable is dropped, - // effectively extending the lifetime of `'scope` for as long as is - // nessicary. - // - // We have to use `spawn_unchecked` here instead of `spawn` because the - // future is non-static. - let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; - - // Call the schedule function once to create the initial job. - runnable.schedule(); - - // Return the task handle. - task + /// Unlike [`Scope::spawn`], this accepts the current worker as a parameter. + pub fn spawn_on>(&'scope self, worker: &Worker, scoped_work: S) { + scoped_work.spawn_on(worker, self); } /// Adds an additional reference to the scope's reference counter. @@ -248,7 +204,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { // once, when the scope has been dropped and all work has been // completed. // - // SAFETY: The signal is passed as a reference, and is live for the + // SAFETY: The latch is passed as a reference, and is live for the // duration of the function. unsafe { Latch::set(&self.completed) }; } @@ -257,6 +213,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { /// Stores a panic so that it can be propagated when the scope is complete. /// If called multiple times, only the first panic is stored, and the /// remainder are dropped. + #[cold] fn store_panic(&self, err: Box) { if self.panic.load(Ordering::Relaxed).is_null() { let nil = ptr::null_mut(); @@ -304,7 +261,7 @@ impl<'scope, 'env> Scope<'scope, 'env> { // call to `add_reference`. // // Only after the following call will the counter decrement to zero, - // causing the signal to become set and allowing this function to + // causing the latch to become set and allowing this function to // return. unsafe { self.remove_reference() }; // Wait for the remaining work to complete. @@ -312,6 +269,391 @@ impl<'scope, 'env> Scope<'scope, 'env> { } } +// ----------------------------------------------------------------------------- +// Generalized scoped spawn trait + +/// A trait for types that can be spawned onto a [`Scope`]. It is implemented for: +/// +/// + Closures that satisfy `for<'worker> FnOnce(&'worker Worker) + Send + 'scope`. +/// +/// + Futures that satisfy `Future + Send + 'scope`. +/// +/// Due to a bug in rustc, you may be given errors when using closures +/// with inferred types. If you encounter the following: +/// +/// ```compile_fail +/// # use forte::ThreadPool; +/// # use forte::Worker; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// THREAD_POOL.scope(|scope| { +/// scope.spawn(|_| { }); +/// // ^^^^^^^ the trait `ScopedSpawn<'_, _>` is not implemented for closure ... +/// }); +/// ``` +/// Try adding a type hint to the closure's parameters, like so: +/// ``` +/// # use forte::ThreadPool; +/// # use forte::Worker; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// THREAD_POOL.scope(|scope| { +/// scope.spawn(|_: &Worker| { }); +/// }); +/// ``` +/// Hopefully rustc will fix this type inference failure eventually. +pub trait ScopedSpawn<'scope, M>: Send + 'scope { + /// Spawns the value of self as scoped work on the provided worker. + fn spawn_on<'env>(self, worker: &Worker, scope: &'scope Scope<'scope, 'env>); +} + +impl<'scope, F> ScopedSpawn<'scope, FnOnceMarker> for F +where + F: FnOnce(&Worker) + Send + 'scope, +{ + #[inline] + fn spawn_on<'env>(self, worker: &Worker, scope: &'scope Scope<'scope, 'env>) { + // Create a job to execute the spawned function in the scope. + let scope_ptr = ScopePtr::new(scope); + let job = HeapJob::new(move |worker| { + // Catch any panics and store them on the scope. + let result = unwind::halt_unwinding(|| self(worker)); + if let Err(err) = result { + scope_ptr.store_panic(err); + }; + drop(scope_ptr); + }); + + // SAFETY: We must ensure that the heap job does not outlive the data it + // closes over. In effect, this means it must not outlive `'scope`. + // + // This is ensured by the `scope_ptr` and the scope rules, which will + // keep the calling stack frame alive until this job completes, + // effectively extending the lifetime of `'scope` for as long as is + // necessary. + let job_ref = unsafe { job.into_job_ref() }; + + // Send the job to a queue to be executed. + worker.enqueue(job_ref); + } +} + +impl<'scope, Fut> ScopedSpawn<'scope, FutureMarker> for Fut +where + Fut: Future + Send + 'scope, +{ + #[inline] + fn spawn_on<'env, 'worker>(self, worker: &'worker Worker, scope: &'scope Scope<'scope, 'env>) { + let poll_job = ScopeFutureJob::new(worker.thread_pool(), scope, self); + let job_ref = poll_job.into_job_ref(); + worker.enqueue(job_ref); + } +} + +// The following is a small async executor built specifically to execute futures +// spawned onto scopes. It is one of the more complex (and unsafe) areas of +// Forte's code. Please take care when making changes. + +/// This value is used for the state of future-jobs that are not currently being +/// polled or queued onto the thread pool. They may switch to the WOKEN state at +/// any time, after which they are queued. +const READY: u32 = 0; + +/// This value is used for the state of future-jobs that have already been +/// woken. Jobs in this state may be in one of the three following categories: +/// +/// + A pending job that has been (or is about to be) pushed to the queue +/// so that it can be polled. +/// +/// + A pending job that is currently being polled (or has just finished) and +/// which was *not* queued after it was woken, because it was woken while +/// running. +/// +/// + A job that was woken after it completed or panicked. These jobs will stay +/// in the WOKEN state forever, and will never be queued or polled again. +/// +/// When a WOKEN future-job is executed by a worker, it switches into the LOCKED +/// state. When a job finished executing in the WOKEN state (indicating it was +/// woken while executing), and has not been completed or panicked, it is +/// queued, and remains in the WOKEN state. +/// +/// Future-jobs should never move directly from WOKEN to READY. +const WOKEN: u32 = 1; + +/// This value is used for the state of future-jobs that have not been woken and +/// are either executing, completed, or have been canceled due to a panic. They +/// may switch to the WOKEN state at any time, but are not queued to the pool +/// when this happens (they are instead queued when the future is done being +/// polled, assuming it has not pancaked or been completed). +/// +/// When a job finished executing and has not been WOKEN, it switches back to +/// the READY state. +const LOCKED: u32 = 2; + +/// This is a job that polls a future each time it is executed. The 'scope and +/// 'env lifetimes are the same as the ones from scope (see [`Scope`] for a +/// detailed explanation). +/// +/// This struct is designed to live within an `Arc` and be fully +/// reference-counted. +/// +/// This type serves a dual purpose: +/// +/// 1. It be converted into a job ref that can be queued onto the pool. When the +/// job executes, it polls the future. +/// +/// 2. It can be converted into a `Waker`, which is provided to the future while +/// it is executing and which can be used to schedule a poll of the future +/// by queuing itself back on the pool. +/// +/// The future is always queued on the thread on which it was woken. This allows +/// the waker to be converted directly into a job-ref, and may mean that the +/// cache is already primed. +struct ScopeFutureJob<'scope, 'env, Fut> { + /// The future that the job exists to poll. This is stored in an unsafe cell + /// to allow mutable access within an `Arc`. The `state` field acts as a + /// kind of mutex that ensures exclusive access by preventing the job from + /// being queued or executed multiple times simultaneously. + future: UnsafeCell, + /// A scope pointer. This allows the job to interact with the scope, and + /// also keeps the scope alive until the job is dropped. + scope_ptr: ScopePtr<'scope, 'env>, + /// The thread pool this job is attached to. + thread_pool: &'static ThreadPool, + /// The state of the job, which is either READY, WOKEN, or LOCKED. + state: AtomicU32, +} + +impl<'scope, 'env, Fut> ScopeFutureJob<'scope, 'env, Fut> +where + Fut: Future + Send + 'scope, +{ + /// This vtable is part of what allows a `ScopedFutureJob` to act as an + /// async task waker. + const VTABLE: RawWakerVTable = RawWakerVTable::new( + Self::clone_as_waker, + Self::wake, + Self::wake_by_ref, + Self::drop_as_waker, + ); + + /// Creates a new `ScopedFutureJob` in an `Arc`. The caller is expected to + /// immediately call `into_job_ref` and queue it on a worker to be polled. + fn new( + thread_pool: &'static ThreadPool, + scope: &Scope<'scope, 'env>, + future: Fut, + ) -> Arc { + let scope_ptr = ScopePtr::new(scope); + Arc::new(Self { + future: UnsafeCell::new(future), + scope_ptr, + thread_pool, + // The job starts in the WOKEN state because we always queue it + // after creating it. + state: AtomicU32::new(WOKEN), + }) + } + + /// Converts an `Arc` into a job ref that can be queued on a + /// thread pool. The ref-count is not decremented, ensuring that the job + /// remains alive while this job ref exists. + /// + /// Forgetting this job ref will cause a memory leak. + fn into_job_ref(self: Arc) -> JobRef { + // SAFETY: Pointers created by `Arc::into_raw` are never null. + let job_pointer = unsafe { NonNull::new_unchecked(Arc::into_raw(self).cast_mut().cast()) }; + + // SAFETY: This pointer is an erased `Arc` which is what + // `Self::poll` expects to receive. + unsafe { JobRef::new_raw(job_pointer, Self::poll) } + } + + /// This is what happens when the job is executed. It is this function that + /// is in charge of actually polling the future, and it is therefore an + /// extremely hot and performance sensitive function. + fn poll(this: NonNull<()>, worker: &Worker) { + // While we still have a raw pointer to the job, create a raw task waker + // using our vtable. + let raw_waker = RawWaker::new(this.as_ptr().cast_const(), &Self::VTABLE); + + // Create a new waker from the raw waker. This is *non-owning* and + // functions like a `&Arc` rather than an `Arc`. We wrap it + // in a `ManuallyDrop` to prevent the waker from calling `drop_as_waker` + // through the vtable, which would cause the reference-count to + // decrement (incorrectly). + // + // SAFETY: The api contract of RawWaker and RawWakerVTable is upheld by + // the `Self::VTABLE` const. + // + // * The functions are all thread safe. + // + // * `clone` increments the reference count, ensuring that all resources + // needed to schedule the task are retained, and returns a waker that + // wakes the same task as the input. + // + // * `wake` converts the waker into a job ref that it queues for + // execution (and will be freed when completed unless closed again as a + // waker). + // + // * `wake_by_ref` uses a clone to create job ref, which will prevent + // resources from being freed. + // + // * `drop` decrements the reference count, potentially allowing the job + // to be freed. + // + let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker)) }; + + // SAFETY: This was created from an `Arc` in `into_job_ref`. + // + // Within this function, this pointer is only turned into an arc once, + // so it can only be dropped once. Outside of this function, we are + // forced to assume that the reference count is maintained correctly. + let this = unsafe { Arc::from_raw(this.cast::().as_ptr()) }; + + // If our implementation is correct, we should always be in the WOKEN + // state at this point. To avoid potential UB, we double check that this + // is the case, and abort the program if it is not. + // + // We use Acquire ordering here to ensure we have the current state of + // the future. This synchronizes with the fence in the Poll::Pending + // branch. + if this.state.swap(LOCKED, Ordering::Acquire) != WOKEN { + // We abort because this function needs to be panic safe. + abort(); + } + + // At this point, we have acquired exclusive ownership of the future. + + // SAFETY: The arc never moves, and the future cannot be aliased mutably + // elsewhere because this is the only place we access it, and no other + // threads can have gotten past the memory swap above without causing an + // abort. + let future = unsafe { Pin::new_unchecked(&mut *this.future.get()) }; + + // Create a new context from the waker, and poll the future. + let mut cx = Context::from_waker(&waker); + let result = unwind::halt_unwinding(|| future.poll(&mut cx)); + + // Update the job state depending on the outcome of polling the future. + match result { + // The job completed without panicking. + Ok(Poll::Ready(())) => { + // Drop the job without rescheduling it, leaving it in the + // LOCKED or WOKEN state so that it cannot be rescheduled. + } + // The job is still pending, and has not yet panicked. + Ok(Poll::Pending) => { + // The fence here ensures that our changes to the future become + // visible to the next thread to execute the job and poll the + // future. + fence(Ordering::Release); + // Try to set the state back back idle so other threads can + // schedule it again. This will only fail if the job was woken + // while running, and is already in the WOKEN state. + // + // If successful, this effectively releases our exclusive + // ownership of the future. + let rescheduled = this + .state + .compare_exchange(LOCKED, READY, Ordering::Relaxed, Ordering::Relaxed) + .is_err(); + // If the job was woken while running, it should be queued + // immediately. Conveniently, we know the state will already be + // QUEUED, so we can leave it as it is. + if rescheduled { + // This converts the local `Arc` into a job ref, + // preventing it from being dropped and potentially + // extending the job's lifetime. + let job_ref = this.into_job_ref(); + worker.enqueue(job_ref); + } + } + // The job panicked. Store the panic in the scope so it can be + // resumed later. + Err(err) => this.scope_ptr.store_panic(err), + } + + // At this point, if we have not converted `this` into a job-ref and + // queued it, it will be dropped. If no wakers for this task are being + // held, then this will cause the reference counter to decrement to zero + // and free the task. + // + // The alternative, if there are no wakers being held for the task, is + // that the task will never wake and the scope will deadlock. + } + + /// Creates a new `RawWaker` from the provided pointer. + /// + /// # Safety + /// + /// Must be called with a pointer created by calling `Arc::into_raw` on an + /// instance of `Arc` that is still alive. + unsafe fn clone_as_waker(this: *const ()) -> RawWaker { + // SAFETY: This is called on a pointer created by `Arc::into_raw` on an + // instance on of `Arc()) }; + RawWaker::new(this, &Self::VTABLE) + } + + /// Queues self to be executed and consumes the waker. + /// + /// # Safety + /// + /// Must be called with a pointer created by calling `Arc::into_raw` on an + /// instance of `Arc` that is still alive. + unsafe fn wake(this: *const ()) { + // SAFETY: This is called on a pointer created by `Arc::into_raw` on an + // instance on of `Arc()) }; + + if this.state.swap(WOKEN, Ordering::Relaxed) == READY { + this.thread_pool.with_worker(|worker| { + // Convert the waker into a job ref and queue it. + let job_ref = this.into_job_ref(); + worker.enqueue(job_ref); + }); + } + } + + /// Queues self to be executed without consuming the waker. + /// + /// # Safety + /// + /// Must be called with a pointer created by calling `Arc::into_raw` on an + /// instance of `Arc` that is still alive. + fn wake_by_ref(this: *const ()) { + // We use manually drop here to prevent us from consuming the arc on + // drop. This functions like an `&Arc` rather than an `Arc`. + // + // SAFETY: This is called on a pointer created by `Arc::into_raw` on an + // instance on of `Arc())) }; + + if this.state.swap(WOKEN, Ordering::Relaxed) == READY { + this.thread_pool.with_worker(|worker| { + // Clone the waker, convert it into a job-ref and queue it. + let this = ManuallyDrop::into_inner(this.clone()); + let job_ref = this.into_job_ref(); + worker.enqueue(job_ref); + }); + } + } + + /// Frees the waker. + /// + /// # Safety + /// + /// Must be called with a pointer created by calling `Arc::into_raw` on an + /// instance of `Arc` that is still alive. + fn drop_as_waker(this: *const ()) { + // Rather than converting back into an arc, we can just decrement the + // counter here. + // + // SAFETY: This is called on a pointer created by `Arc::into_raw` on an + // instance on of `Arc()) }; + } +} + // ----------------------------------------------------------------------------- // Scope pointer @@ -377,53 +719,207 @@ mod scope_ptr { #[cfg(all(test, not(feature = "shuttle")))] mod tests { + use core::iter::once; + use core::pin::Pin; use core::sync::atomic::AtomicU8; + use core::sync::atomic::AtomicUsize; use core::sync::atomic::Ordering; + use core::task::Context; + use core::task::Poll; + use std::sync::Mutex; + use std::vec; + use std::vec::Vec; use crate::ThreadPool; + use crate::Worker; use crate::scope; + use crate::unwind; + use crate::util::XorShift64Star; + + use super::Scope; + + /// Tests that empty scopes return properly. + #[test] + fn scope_empty() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.scope(|_: &Scope| {}); + THREAD_POOL.depopulate(); + } + + /// Tests that scopes return the output of the closure. + #[test] + fn scope_result() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut x = 0; + THREAD_POOL.scope(|_: &Scope| { + x = 22; + }); + assert_eq!(x, 22); + + THREAD_POOL.depopulate(); + } + + /// Tests that multiple tasks in a scope run to completion. + #[test] + fn scope_two() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let counter = &AtomicUsize::new(0); + THREAD_POOL.scope(|scope| { + scope.spawn(move |_: &Worker| { + counter.fetch_add(1, Ordering::SeqCst); + }); + scope.spawn(move |_: &Worker| { + counter.fetch_add(10, Ordering::SeqCst); + }); + }); + + let v = counter.load(Ordering::SeqCst); + assert_eq!(v, 11); + + THREAD_POOL.depopulate(); + } + + /// Test that it is possible to borrow local data within a scope, modify it, + /// and then read it later. This is mostly here to ensure stuff like this + /// compiles. #[test] - fn scoped_borrow() { + fn scope_borrow() { static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.populate(); + THREAD_POOL.resize_to_available(); let mut string = "a"; - THREAD_POOL.with_worker(|worker| { - scope(|scope| { - scope.spawn_on(worker, |_| { - string = "b"; - }) + THREAD_POOL.scope(|scope| { + scope.spawn(|_: &Worker| { + string = "b"; }); }); assert_eq!(string, "b"); + + THREAD_POOL.depopulate(); } + /// Test that it is possible to borrow local data immutably within deeply + /// nested scopes. This is also mostly here to ensure stuff like this + /// compiles. #[test] - fn scoped_borrow_twice() { + fn scope_borrow_twice() { static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.populate(); + THREAD_POOL.resize_to_available(); - let mut string = "a"; - THREAD_POOL.with_worker(|worker| { - scope(|scope| { - scope.spawn_on(worker, |worker| { - string = "b"; - scope.spawn_on(worker, |_| { - string = "c"; - }) - }) + let counter = AtomicU8::new(0); + THREAD_POOL.scope(|scope| { + scope.spawn(|_: &Worker| { + counter.fetch_add(1, Ordering::Relaxed); + scope.spawn(|_: &Worker| { + counter.fetch_add(1, Ordering::Relaxed); + }); + }); + scope.spawn(|worker: &Worker| { + counter.fetch_add(1, Ordering::Relaxed); + scope.spawn_on(worker, |_: &Worker| { + counter.fetch_add(1, Ordering::Relaxed); + }); }); }); - assert_eq!(string, "c"); + assert_eq!(counter.load(Ordering::Relaxed), 4); + + THREAD_POOL.depopulate(); + } + + /// Tests that we can spawn futures onto the thraed pool and that they can + /// borrow data as expected. + #[test] + fn scope_future() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut value = 0; + THREAD_POOL.scope(|scope| { + scope.spawn(async { + value = 42; + }); + }); + assert_eq!(value, 42); + + THREAD_POOL.depopulate(); + } + + /// This is a handy future that needs to be polled repeatedly before + /// resolving. + /// + /// Each time it is polled, it wakes itself (so it will be polled again) and + /// yields. It does this until it has been polled 128 times. + /// + /// This lets us test the behavior of scopes for sleeping tasks, to ensure + /// we do not return from the scope while tasks are still pending. + #[derive(Default)] + struct CountFuture { + /// The number of times the future has been polled. + count: usize, + } + + impl Future for CountFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.count == 128 { + Poll::Ready(()) + } else { + self.count += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + /// Tests that we can spawn futures onto a scope, and that the scope really + /// does poll wait for the future to complete before returning. + #[test] + fn scope_pending_future() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.scope(|scope| scope.spawn(CountFuture::default())); + + THREAD_POOL.depopulate(); + } + + /// Tests that a future that sleeps for a nontrivial amount of time does not + /// cause the scope to exit early. + #[test] + fn scope_sleeping_future() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut complete = false; + THREAD_POOL.scope(|scope| { + scope.spawn(async { + // Spawn a count future onto the thread pool, then have this + // future sleep until that future is done. + THREAD_POOL.spawn(CountFuture::default()).await; + complete = true; + }) + }); + + assert!(complete); + + THREAD_POOL.depopulate(); } + /// Tests that blocking functions like `join` can be nested within scopes. #[test] - fn scoped_concurrency() { + fn scope_concurrency() { const NUM_JOBS: u8 = 128; static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.resize_to(4); + THREAD_POOL.resize_to_available(); let a = AtomicU8::new(0); let b = AtomicU8::new(0); @@ -431,7 +927,7 @@ mod tests { THREAD_POOL.with_worker(|worker| { scope(|scope| { for _ in 0..NUM_JOBS { - scope.spawn_on(worker, |_| { + scope.spawn_on(worker, |_: &Worker| { THREAD_POOL.join( |_| a.fetch_add(1, Ordering::Relaxed), |_| b.fetch_add(1, Ordering::Relaxed), @@ -446,4 +942,437 @@ mod tests { THREAD_POOL.depopulate(); } + + #[test] + fn scope_nesting() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut completed = false; + + THREAD_POOL.scope(|scope| { + scope.spawn(|worker: &Worker| { + worker.scope(|scope| { + scope.spawn(|_: &Worker| { + completed = true; + }) + }) + }) + }); + + assert!(completed); + + THREAD_POOL.depopulate(); + } + + /// Tests that nesting two scopes on different workers will not deadlock. + #[test] + fn scope_nesting_new_worker() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut completed = false; + + THREAD_POOL.with_worker(|worker| { + worker.scope(|scope| { + scope.spawn_on(worker, |_: &Worker| { + // Creating a new worker instead of reusing the old one is + // bad form, but we may as well test it. + THREAD_POOL.with_worker(|worker| { + worker.scope(|scope| { + scope.spawn_on(worker, |_: &Worker| { + completed = true; + }); + }); + }); + }); + }); + }); + + assert!(completed); + + THREAD_POOL.depopulate(); + } + + /// Tests that a serial and parallel version of a binary tree traversal + /// produces the same output. + #[test] + fn scope_divide_and_conquer() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let counter_p = &AtomicUsize::new(0); + THREAD_POOL.with_worker(|worker| { + worker.scope(|scope| { + scope.spawn(move |worker: &Worker| { + divide_and_conquer(worker, scope, counter_p, 1024) + }) + }); + }); + + let counter_s = &AtomicUsize::new(0); + divide_and_conquer_seq(counter_s, 1024); + + let p = counter_p.load(Ordering::SeqCst); + let s = counter_s.load(Ordering::SeqCst); + assert_eq!(p, s); + + THREAD_POOL.depopulate(); + } + + fn divide_and_conquer<'scope, 'env>( + worker: &Worker, + scope: &'scope Scope<'scope, 'env>, + counter: &'scope AtomicUsize, + size: usize, + ) { + if size > 1 { + scope.spawn_on(worker, move |worker: &Worker| { + divide_and_conquer(worker, scope, counter, size / 2) + }); + scope.spawn_on(worker, move |worker: &Worker| { + divide_and_conquer(worker, scope, counter, size / 2) + }); + } else { + // count the leaves + counter.fetch_add(1, Ordering::SeqCst); + } + } + + fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) { + if size > 1 { + divide_and_conquer_seq(counter, size / 2); + divide_and_conquer_seq(counter, size / 2); + } else { + // count the leaves + counter.fetch_add(1, Ordering::SeqCst); + } + } + + /// Tests for correct scope completion on a deeply nested semi-random tree. + #[test] + fn scope_update_tree() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.with_worker(|_| { + let mut tree = random_tree(10, 1337); + let values: Vec<_> = tree.iter().cloned().collect(); + tree.update(|v| *v += 1); + let new_values: Vec = tree.iter().cloned().collect(); + assert_eq!(values.len(), new_values.len()); + for (&i, &j) in values.iter().zip(&new_values) { + assert_eq!(i + 1, j); + } + }); + + THREAD_POOL.depopulate(); + } + + struct Tree { + value: T, + children: Vec>, + } + + impl Tree { + fn iter(&self) -> vec::IntoIter<&T> { + once(&self.value) + .chain(self.children.iter().flat_map(Tree::iter)) + .collect::>() // seems like it shouldn't be needed... but prevents overflow + .into_iter() + } + + fn update(&mut self, op: OP) + where + OP: Fn(&mut T) + Sync, + T: Send, + { + scope(|scope| self.update_in_scope(scope, &op)); + } + + fn update_in_scope<'scope, 'env, OP>( + &'env mut self, + scope: &'scope Scope<'scope, 'env>, + op: &'scope OP, + ) where + OP: Fn(&mut T) + Sync, + { + let Tree { value, children } = self; + scope.spawn(move |worker: &Worker| { + for child in children { + scope.spawn_on(worker, move |_: &Worker| { + let child = child; + child.update_in_scope(scope, op) + }); + } + }); + + op(value); + } + } + + fn random_tree(depth: usize, seed: u64) -> Tree { + assert!(depth > 0); + let mut rng = XorShift64Star::from_seed(seed); + random_tree_inner(depth, &mut rng) + } + + fn random_tree_inner(depth: usize, rng: &mut XorShift64Star) -> Tree { + let children = if depth == 0 { + vec![] + } else { + (0..rng.next_usize(4)) // somewhere between 0 and 3 children at each level + .map(|_| random_tree_inner(depth - 1, rng)) + .collect() + }; + + Tree { + value: rng.next_usize(1_000_000), + children, + } + } + + /// Check that if you have a chain of scoped tasks where T0 spawns T1 + /// spawns T2 and so forth down to Tn, the stack space should not grow + /// linearly with N. We test this by some unsafe hackery and + /// permitting an approx 10% change with a 10x input change. + /// + /// Ported from rayon. + #[test] + fn scope_linear_stack_growth() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.with_worker(|_| { + let mut max_diff = Mutex::new(0); + let bottom_of_stack = 0; + scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 5)); + let diff_when_5 = *max_diff.get_mut().unwrap() as f64; + + scope(|s| the_final_countdown(s, &bottom_of_stack, &max_diff, 500)); + let diff_when_500 = *max_diff.get_mut().unwrap() as f64; + + let ratio = diff_when_5 / diff_when_500; + assert!( + ratio > 0.9 && ratio < 1.1, + "stack usage ratio out of bounds: {ratio}" + ); + }); + + THREAD_POOL.depopulate(); + } + + fn the_final_countdown<'scope, 'env>( + scope: &'scope Scope<'scope, 'env>, + bottom_of_stack: &'scope i32, + max: &'scope Mutex, + n: usize, + ) { + let top_of_stack = 0; + let p = bottom_of_stack as *const i32 as usize; + let q = &top_of_stack as *const i32 as usize; + let diff = p.abs_diff(q); + + let mut data = max.lock().unwrap(); + *data = Ord::max(diff, *data); + + if n > 0 { + scope.spawn(move |_: &Worker| the_final_countdown(scope, bottom_of_stack, max, n - 1)); + } + } + + /// Tests that panics within the scope closure are propagated. + #[test] + #[should_panic(expected = "Hello, world!")] + fn scope_panic_propagate_from_closure() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.scope(|_: &Scope| panic!("Hello, world!")); + + THREAD_POOL.depopulate(); + } + + /// Tests that panics within closures spawned onto a scope are propagated. + #[test] + #[should_panic(expected = "Hello, world!")] + fn scope_panic_propagate_from_spawn() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.scope(|scope| scope.spawn(|_: &Worker| panic!("Hello, world!"))); + + THREAD_POOL.depopulate(); + } + + /// Tests that panics within nested scoped spawns are propagated. + #[test] + #[should_panic(expected = "Hello, world!")] + fn scope_panic_propagate_from_nested_spawn() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.scope(|scope| { + scope.spawn(|_: &Worker| { + scope.spawn(|_: &Worker| scope.spawn(|_: &Worker| panic!("Hello, world!"))) + }) + }); + + THREAD_POOL.depopulate(); + } + + /// Tests that panics within nested scopes are propagated. + #[test] + #[should_panic(expected = "Hello, world!")] + fn scope_panic_propagate_from_nested_scope_spawn() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + THREAD_POOL.scope(|scope_1| { + scope_1.spawn(|worker: &Worker| { + worker.scope(|scope_2| scope_2.spawn(|_: &Worker| panic!("Hello, world!"))) + }) + }); + + THREAD_POOL.depopulate(); + } + + /// Tests that work spawned after a panicking job still completes. + #[test] + #[cfg_attr(not(panic = "unwind"), ignore)] + fn scope_panic_propagate_still_execute_after() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut x = false; + let result = unwind::halt_unwinding(|| { + THREAD_POOL.scope(|scope| { + scope.spawn(|_: &Worker| panic!("Hello, world!")); // job A + scope.spawn(|_: &Worker| x = true); // job B, should still execute even though A panics + }); + }); + + match result { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "job b failed to execute"), + } + + THREAD_POOL.depopulate(); + } + + /// Tests that work spawned before a panicking job still completes. + #[test] + #[cfg_attr(not(panic = "unwind"), ignore)] + fn scope_panic_propagate_still_execute_before() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut x = false; + let result = unwind::halt_unwinding(|| { + THREAD_POOL.scope(|scope| { + scope.spawn(|_: &Worker| x = true); // job B, should still execute even though A panics + scope.spawn(|_: &Worker| panic!("Hello, world!")); // job A + }); + }); + match result { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "job b failed to execute"), + } + + THREAD_POOL.depopulate(); + } + + /// Tests that work spawned before the scoped closure panics still completes. + #[test] + #[cfg_attr(not(panic = "unwind"), ignore)] + fn scope_panic_propagate_still_execute_before_closure() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut x = false; + let result = unwind::halt_unwinding(|| { + THREAD_POOL.scope(|scope| { + scope.spawn(|_: &Worker| x = true); // spawned job should still execute despite later panic + panic!("Hello, world!"); + }); + }); + match result { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "panic after spawn, spawn failed to execute"), + } + + THREAD_POOL.depopulate(); + } + + /// Tests that the closure still completes if one of it's items panics. + #[test] + #[cfg_attr(not(panic = "unwind"), ignore)] + fn scope_panic_propagate_still_execute_closure_after() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let mut x = false; + let result = unwind::halt_unwinding(|| { + THREAD_POOL.scope(|scope| { + scope.spawn(|_: &Worker| panic!("Hello, world!")); + x = true; + }); + }); + match result { + Ok(_) => panic!("failed to propagate panic"), + Err(_) => assert!(x, "panic in spawn tainted scope"), + } + + THREAD_POOL.depopulate(); + } + + /// Tests that we can create scopes that capture a static env. + #[test] + fn scope_static() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + static COUNTER: AtomicUsize = AtomicUsize::new(0); + + let mut range = 0..100; + let sum = range.clone().sum(); + let iter = &mut range; + + COUNTER.store(0, Ordering::Relaxed); + THREAD_POOL.scope::<'static>(|scope: &Scope| { + for i in iter { + scope.spawn(move |_: &Worker| { + COUNTER.fetch_add(i, Ordering::Relaxed); + }); + } + }); + + assert_eq!(COUNTER.load(Ordering::Relaxed), sum); + + THREAD_POOL.depopulate(); + } + + /// Tests that scopes can deal with multiple lifetimes being captured. + #[test] + fn scope_mixed_lifetimes() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + fn increment<'slice, 'counter>(counters: &'slice [&'counter AtomicUsize]) { + THREAD_POOL.scope::<'counter>(move |scope| { + // We can borrow 'slice here, but the spawns can only borrow 'counter. + for &c in counters { + scope.spawn(move |_: &Worker| { + c.fetch_add(1, Ordering::Relaxed); + }); + } + }); + } + + let counter = AtomicUsize::new(0); + increment(&[&counter; 100]); + assert_eq!(counter.into_inner(), 100); + + THREAD_POOL.depopulate(); + } } diff --git a/src/thread_pool.rs b/src/thread_pool.rs index c7c0fba..24017fa 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -14,16 +14,16 @@ use core::ptr; use core::ptr::NonNull; use core::task::Context; use core::task::Poll; -use core::time::Duration; use crossbeam_queue::SegQueue; use crossbeam_utils::CachePadded; use async_task::Runnable; -use async_task::Task; use tracing::debug; use tracing::trace; use tracing::trace_span; +use crate::FnOnceMarker; +use crate::FutureMarker; use crate::job::HeapJob; use crate::job::JobQueue; use crate::job::JobRef; @@ -39,86 +39,39 @@ use crate::util::XorShift64Star; // ----------------------------------------------------------------------------- // Thread pool -/// The "heartbeat interval" controls the frequency at which workers share work. -#[cfg(not(feature = "shuttle"))] -pub const HEARTBEAT_INTERVAL: Duration = Duration::from_micros(5); - -/// The `ThreadPool` object is used to orchestrate and distribute work to a pool -/// of threads, and is generally the main entry point to using `Forte`. -/// -/// # Creating Thread Pools -/// -/// Thread pools must be static and const constructed. You don't have to worry -/// about `LazyStatic` or anything else; to create a new thread pool, just call -/// [`ThreadPool::new`]. +/// A thread pool is a set of threads. /// -/// ```rust,no_run +/// You can dispatch work to a thread pool, and it will be distributed amongst +/// the threads and run as quickly as possible. To create a new thread pool, +/// assign it to a constant. +/// ``` /// # use forte::ThreadPool; -/// // Allocate a new thread pool. +/// # use forte::Worker; /// static THREAD_POOL: ThreadPool = ThreadPool::new(); -/// -/// fn main() { -/// -/// // Resize the pool to fill the available number of cores. -/// THREAD_POOL.resize_to_available(); -/// -/// // Register this thread as a worker on the pool. -/// THREAD_POOL.with_worker(|worker| { -/// // Spawn a job onto the pool. The closure also accepts a worker, because the -/// // job may be executed on a different thread. This will be the worker for whatever -/// // thread it executes on. -/// worker.spawn(|worker| { -/// // Spawn another job after this one runs, using the provided local worker. -/// worker.spawn(|_| { }); -/// // Spawn another job using the thread pool directly (this will be slower). -/// THREAD_POOL.spawn(|_| { }); -/// // Spawn a third job, which will automatically use the parent thread pool. -/// // This will also be slower than using the worker. -/// forte::spawn(|_| { }); -/// }); -/// -/// // Spawn an async job, which can return a value through a `Future`. This does not -/// // provide access to a worker, because futures may move between threads while they -/// // are suspended. -/// let task = THREAD_POOL.spawn_async(async || { "Hello World" }); -/// -/// // Do two operations in parallel, and await the result of each. This is the most -/// // efficient and hyper-optimized thread pool operation. -/// let (a, b) = worker.join(|_| "a", |_| "b"); -/// assert_eq!(a, "a"); -/// assert_eq!(b, "b"); -/// -/// // Wait for that task we completed earlier, without using `await`. -/// let result = worker.block_on(task); -/// assert_eq!(result, "Hello World"); -/// }); -/// -/// // Halt the thread pool by removing all the managed workers. -/// THREAD_POOL.resize_to(0); -/// } /// ``` -/// -/// This attaches a new thread pool to your program named `THREAD_POOL`, which you -/// can begin to schedule work on immediately. The thread pool will exist for -/// the entire duration of your program, and will shut down when your program -/// completes. -/// -/// # Resizing Thread Pools -/// -/// Thread pools are dynamically sized; When your program starts they have size -/// zero (meaning no threads are running), and you will have to add threads by -/// resizing it. The simplest way to resize a pool is via -/// [`ThreadPool::resize_to_available`] which will simply fill all the available -/// space. More granular control is possible through other methods such as -/// [`ThreadPool::grow`], [`ThreadPool::shrink`], or [`ThreadPool::resize_to`]. +/// Thread pools are empty when created, and must be explicitly resized at runtime. +/// ``` +/// # use forte::ThreadPool; +/// # use forte::Worker; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// THREAD_POOL.resize_to_available(); +/// ``` +/// After this, you can start sending work to the pool with +/// [`spawn`][ThreadPool::spawn], [`block_on`][ThreadPool::block_on], +/// [`join`][ThreadPool::join], or [`scope`][ThreadPool::scope]. pub struct ThreadPool { - /// The internal state of the thread pool. This mutex should only be - /// accessed infrequently. + /// The internal state of the thread pool + /// + /// This should only be locked infrequently for short periods of time in + /// cold functions. state: Mutex, /// A queue used for cooperatively sharing jobs between workers. shared_jobs: SegQueue, /// A condvar that is used to signal a new worker taking a lease on a seat. - new_participant: Condvar, + start_heartbeat: Condvar, + /// Tracks the number of currently sleeping workers. Incremented when a + /// worker goes to sleep, decremented when a worker is woken. + num_sleeping: AtomicU32, } /// The internal state of a thread pool. @@ -152,10 +105,11 @@ impl ThreadPoolState { // If none are available, add a new seat. let index = self.seats.len(); - let seat_data = Box::leak(Box::new(CachePadded::new(SeatData { - heartbeat: AtomicBool::new(false), - sleep_controller: SleepController::default(), - }))); + let seat_data = Box::leak(Box::new(SeatData { + #[cfg(not(feature = "shuttle"))] + heartbeat: AtomicBool::new(true).into(), + sleep_controller: SleepController::new(&thread_pool.num_sleeping), + })); let seat = Seat { occupied: true, data: seat_data, @@ -193,10 +147,11 @@ impl ThreadPoolState { // Then create new seats as needed. while leases.len() != num { let index = self.seats.len(); - let seat_data = Box::leak(Box::new(CachePadded::new(SeatData { - heartbeat: AtomicBool::new(false), - sleep_controller: SleepController::default(), - }))); + let seat_data = Box::leak(Box::new(SeatData { + #[cfg(not(feature = "shuttle"))] + heartbeat: AtomicBool::new(true).into(), + sleep_controller: SleepController::new(&thread_pool.num_sleeping), + })); let seat = Seat { occupied: true, data: seat_data, @@ -213,15 +168,17 @@ impl ThreadPoolState { } } +#[derive(Clone)] struct Seat { occupied: bool, data: &'static SeatData, } -/// A public interface that can be claimed and used by a worker.. +/// A public interface that can be claimed and used by a worker. struct SeatData { /// The heartbeat signal sent to the worker. - heartbeat: AtomicBool, + #[cfg(not(feature = "shuttle"))] + heartbeat: CachePadded, /// Allows other threads to wake the worker. sleep_controller: SleepController, } @@ -249,6 +206,7 @@ struct ManagedThreads { /// Stores thread controls for workers spawned by the pool. workers: Vec, /// Stores thread controls for the heartbeat thread. + #[cfg(not(feature = "shuttle"))] heartbeat: Option, } @@ -281,22 +239,76 @@ impl ThreadPool { seats: Vec::new(), managed_threads: ManagedThreads { workers: Vec::new(), + #[cfg(not(feature = "shuttle"))] heartbeat: None, }, }), shared_jobs: SegQueue::new(), - new_participant: Condvar::new(), + start_heartbeat: Condvar::new(), + num_sleeping: AtomicU32::new(0), } } /// Claims a lease on the thread pool which can be occupied by a worker /// (using [`Worker::occupy`]), allowing a thread to participate in the pool. + #[cold] pub fn claim_lease(&'static self) -> Lease { - self.new_participant.notify_one(); + self.start_heartbeat.notify_one(); let mut state = self.state.lock().unwrap(); state.claim_lease(self) } + /// Returns an opaque identifier for this thread pool. + #[inline(always)] + pub fn id(&self) -> usize { + // We can rely on `self` not to change since it's a static ref. + ptr::from_ref(self) as usize + } + + /// Returns the number of workers participating in this thread pool. + #[inline(always)] + pub fn num_workers(&self) -> usize { + todo!() + } + + /// Tries to ensure the calling thread is a member of the thread pool, and + /// then executes the provided closure. If the thread is already a member of + /// the pool, the closure is called directly. Otherwise, the thread will + /// attempt to temporarily register itself with the pool (which can be + /// slightly slower). If registration fails (because the pool is full to + /// capacity) the closure is passed `None` instead of a worker instance. + /// + /// The provided closure is never sent to another thread. + #[inline(always)] + pub fn with_worker(&'static self, f: F) -> R + where + F: FnOnce(&Worker) -> R, + { + Worker::with_current(|worker| match worker { + Some(worker) if worker.lease.thread_pool.id() == self.id() => f(worker), + _ => self.with_worker_cold(f), + }) + } + + /// Tries to register the calling thread on the thread pool, and pass a + /// worker instance to the provided closure. + /// + /// This is the slow fallback for `with_worker` covering "external calls" + /// from outside the pool. Never call this directly. + #[cold] + fn with_worker_cold(&'static self, f: F) -> R + where + F: FnOnce(&Worker) -> R, + { + let lease = self.state.lock().unwrap().claim_lease(self); + Worker::occupy(lease, f) + } +} + +// ----------------------------------------------------------------------------- +// Thread pool resizing + +impl ThreadPool { /// Resizes the thread pool to fill all available space. After this returns, /// the pool will have at least one worker thread and at most `MAX_THREADS`. /// Returns the new size of the pool. @@ -363,7 +375,7 @@ impl ThreadPool { where F: Fn(usize) -> usize, { - debug!("starting threadpool resize"); + debug!("starting thread pool resize"); // Resizing a pool is a critical section; only one thread can resize the // pool at a time. This is implemented using a mutex on the thread manager. @@ -384,7 +396,7 @@ impl ThreadPool { match new_size.cmp(¤t_size) { // The size remained the same cmp::Ordering::Equal => { - debug!("completed threadpool resize, size unchanged"); + debug!("completed thread pool resize, size unchanged"); return current_size; } // The size increased @@ -440,182 +452,199 @@ impl ThreadPool { } // The size decreased cmp::Ordering::Less => { - let mut heartbeat_handle = None; - - // Halt the heartbeat thread when scaling to zero. - if new_size == 0 - && let Some(control) = state.managed_threads.heartbeat.take() - { - control.halt.store(true, Ordering::Relaxed); - heartbeat_handle = Some(control.handle); - } - // Pull the workers we intend to halt out of the thread manager. let terminating_workers = state.managed_threads.workers.split_off(new_size); - // Terminate the workers. + // Halt the heartbeat thread when scaling to zero. + #[cfg(not(feature = "shuttle"))] + let heartbeat_control = if new_size == 0 { + state.managed_threads.heartbeat.take() + } else { + None + }; + + // Terminate and wake the workers. for worker in &terminating_workers { // Tell the worker to halt. worker.control.halt.store(true, Ordering::Relaxed); + // Wake the worker up. + state.seats[worker.index].data.sleep_controller.wake(); } - // Wake any sleeping workers to ensure they will eventually see the termination notice. - for seat in &state.seats { - seat.data.sleep_controller.wake(); - } - + // Drop the lock on the state so as not to block the workers or heartbeat. drop(state); - let own_lease = Worker::map_current(|worker| worker.lease.index); + // Determine our seat index. + let own_seat = Worker::map_current(|worker| worker.lease.index); - // Wait for the workers to fully halt. + // Wait for the other workers to fully halt. for worker in terminating_workers { // It's possible we may be trying to terminate ourselves, in // which case we can skip the thread-join. - if Some(worker.index) != own_lease { + if Some(worker.index) != own_seat { let _ = worker.control.handle.join(); } } - if let Some(handle) = heartbeat_handle { - let _ = handle.join(); + // If we took control of the heartbeat, halt it after the workers. + #[cfg(not(feature = "shuttle"))] + if let Some(control) = heartbeat_control { + control.halt.store(true, Ordering::Relaxed); + self.start_heartbeat.notify_one(); + let _ = control.handle.join(); } } } - debug!("completed thread pool resize"); - - // Return the new size of the threadpool + // Return the new size of the thread pool new_size } +} - /// Returns an opaque identifier for this thread pool. - #[inline(always)] - pub fn id(&self) -> usize { - // We can rely on `self` not to change since it's a static ref. - ptr::from_ref(self) as usize - } +// ----------------------------------------------------------------------------- +// Generalized spawn trait - /// Tries to ensure the calling thread is a member of the thread pool, and - /// then executes the provided closure. If the thread is already a member of - /// the pool, the closure is called directly. Otherwise, the thread will - /// attempt to temporarily register itself with the pool (which can be - /// slightly slower). If registration fails (because the pool is full to - /// capacity) the closure is passed `None` instead of a worker instance. - /// - /// The provided closure is never sent to another thread. - #[inline(always)] - pub fn with_worker(&'static self, f: F) -> R - where - F: FnOnce(&Worker) -> R, - { - Worker::with_current(|worker| match worker { - Some(worker) if worker.lease.thread_pool.id() == self.id() => f(worker), - _ => self.with_worker_cold(f), - }) - } +/// A trait for types that can be spawned onto a [`ThreadPool`]. It is implemented for: +/// +/// + Closures that satisfy `for<'worker> FnOnce(&'worker Worker) + Send + 'static`. +/// +/// + Futures that satisfy `Future + Send + 'static` where `T: Send + 'static`. +/// +/// Due to a bug in rustc, you may be given errors when using closures +/// with inferred types. If you encounter the following: +/// +/// ```compile_fail +/// # use forte::ThreadPool; +/// # use forte::Worker; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// THREAD_POOL.spawn(|_| { }); +/// // ^^^^^^^ the trait `Spawn<'_, _>` is not implemented for closure ... +/// ``` +/// Try adding a type hint to the closure's parameters, like so: +/// ``` +/// # use forte::ThreadPool; +/// # use forte::Worker; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// THREAD_POOL.spawn(|_: &Worker| { }); +/// ``` +/// Hopefully rustc will fix this type inference failure eventually. +pub trait Spawn: Send + 'static { + /// The handle returned when spawning this type. + type Output: Send + 'static; - /// Tries to register the calling thread on the thread pool, and pass a - /// worker instance to the provided closure. - /// - /// This is the slow fallback for `with_worker` covering "external calls" - /// from outside the pool. Never call this directly. - #[cold] - fn with_worker_cold(&'static self, f: F) -> R - where - F: FnOnce(&Worker) -> R, - { - let lease = self.state.lock().unwrap().claim_lease(self); - Worker::occupy(lease, f) - } + /// Spawns work onto the thread pool. + fn spawn(self, thread_pool: &'static ThreadPool, worker: Option<&Worker>) -> Self::Output; } -// ----------------------------------------------------------------------------- -// Thread pool scheduling api +impl Spawn for F +where + F: for<'worker> FnOnce(&'worker Worker) + Send + 'static, +{ + type Output = (); -impl ThreadPool { - /// Spawns a job into the thread pool. - /// - /// See also: [`Worker::spawn`] and [`spawn`]. - #[inline(always)] - pub fn spawn(&'static self, f: F) - where - F: FnOnce(&Worker) + Send + 'static, - { - self.with_worker(|worker| worker.spawn(f)); - } + #[inline] + fn spawn(self, thread_pool: &'static ThreadPool, worker: Option<&Worker>) { + // Allocate a new job on the heap to store the closure. + let job = HeapJob::new(self); - /// Spawns a future onto the thread pool. - /// - /// See also: [`Worker::spawn_future`] and [`spawn_future`]. - #[inline(always)] - pub fn spawn_future(&'static self, future: F) -> Task - where - F: Future + Send + 'static, - T: Send + 'static, - { - // This function "schedules" work on the future, which in this case - // pushing a `JobRef` that knows how to run it onto the local work queue. - let schedule = move |runnable: Runnable| { - // Temporarily turn the task into a raw pointer so that it can be - // used as a job. We could also use `HeapJob` here, but since - // `Runnable` is heap allocated this would result in a needless - // second allocation. - let job_pointer = runnable.into_raw(); - - // Define a function to run the runnable that will be compatible with `JobRef`. - #[inline(always)] - fn execute_runnable(this: NonNull<()>, _worker: &Worker) { - // SAFETY: This pointer was created by the call to `Runnable::into_raw` just above. - let runnable = unsafe { Runnable::<()>::from_raw(this) }; - // Poll the task. This will drop the future if the task is - // canceled or the future completes. - runnable.run(); - } + // Turn the job into an "owning" `JobRef` so it can be queued. + // + // SAFETY: All jobs added to the queue are guaranteed to be executed + // eventually, this is one of the core invariants of the thread pool. + // The closure `f` has a static lifetime, meaning it only closes over + // data that lasts for the duration of the program, so it's not possible + // for this job to outlive the data `f` closes over. + let job_ref = unsafe { job.into_job_ref() }; - // SAFETY: The raw runnable pointer will remain valid until it is - // used by `execute_runnable`, after which it will be dropped. - let job_ref = unsafe { JobRef::new_raw(job_pointer, execute_runnable) }; + // Queue the job for evaluation + if let Some(worker) = worker { + worker.enqueue(job_ref); + } else { + // Push the work into the share queue and wake a worker + thread_pool.shared_jobs.push(job_ref); + } + } +} - // Send this job off to be executed. - self.with_worker(|worker| { - worker.enqueue(job_ref); - }); - }; +pub type Task = async_task::Task; + +// Schedules a runnable future as a job. +// +// Async-task prefers that this is a static function, rather than a closure, +// which is why this is a separate function that pulls the thread pool from the +// runnable metadata. +fn schedule_runnable(runnable: Runnable<&'static ThreadPool>) { + // Get a ref to the thread pool from the runnable. + let thread_pool = *runnable.metadata(); + + // Temporarily turn the task into a raw pointer so that it can be + // used as a job. We could also use `HeapJob` here, but since + // `Runnable` is heap allocated this would result in a needless + // second allocation. + let job_pointer = runnable.into_raw(); + + // SAFETY: The raw runnable pointer will remain valid until it is + // used by `execute_runnable`, after which it will be dropped. + let job_ref = unsafe { JobRef::new_raw(job_pointer, execute_runnable) }; + + // Send this job off to be executed. + thread_pool.with_worker(|worker| { + worker.enqueue(job_ref); + }); +} - // Creates a task from the future and schedule. - let (runnable, task) = async_task::spawn(future, schedule); +// Executes a raw pointer to a runnable future. +#[inline(always)] +fn execute_runnable(this: NonNull<()>, _worker: &Worker) { + // SAFETY: This pointer was created by the call to `Runnable::into_raw` just above. + let runnable = unsafe { Runnable::<()>::from_raw(this) }; + // Poll the task. This will drop the future if the task is + // canceled or the future completes. + runnable.run(); +} - // This calls the schedule function, pushing a `JobRef` for the future - // onto the local work queue. If the future dosn't complete, it will - // schedule a waker that will call this schedule again, which will then - // add create a new `JobRef`. +impl Spawn for Fut +where + Fut: Future + Send + 'static, + T: Send + 'static, +{ + type Output = Task; + + #[inline] + fn spawn(self, thread_pool: &'static ThreadPool, _worker: Option<&Worker>) -> Task { + // Create a runnable and add the thread pool as metadata. + let (runnable, task) = async_task::Builder::new() + .metadata(thread_pool) + .spawn(|_| self, schedule_runnable); + + // Call the schedule function, pushing a `JobRef` for the future onto + // the local work queue. If the future doesn't complete, it can be + // woken and scheduled at a later point. // // Because we always look up the local worker within the schedule // function, woken futures will tend to run on the thread that wakes // them. This is a desirable property, as typically the next thing a // future is going to do after being woken up is read some data from the - // thread that woke it. - runnable.schedule(); + // thread/task that woke it. + // + // This is potentially more efficient than `Runnable::schedule`. + schedule_runnable(runnable); - // Return the task, which acts as a handle for this series of jobs. + // Return the task. task } +} - /// Spawns an async closure onto the thread pool. +// ----------------------------------------------------------------------------- +// Thread pool operations + +impl ThreadPool { + /// Spawns a job into the thread pool. /// - /// See also: [`Worker::spawn_async`] and [`spawn_async`]. + /// See also: [`Worker::spawn`] and [`spawn`]. #[inline(always)] - pub fn spawn_async(&'static self, f: Fn) -> Task - where - Fn: FnOnce() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - T: Send + 'static, - { - // Wrap the function into a future using an async block. - let future = async move { f().await }; - // We just pass this future to `spawn_future`. - self.spawn_future(future) + pub fn spawn>(&'static self, work: S) -> S::Output { + work.spawn(self, None) } /// Blocks the thread waiting for a future to complete. @@ -659,7 +688,7 @@ impl ThreadPool { } // ----------------------------------------------------------------------------- -// Worker thread data +// Worker context thread_local! { static WORKER_PTR: Cell<*const Worker> = const { Cell::new(ptr::null()) }; @@ -687,7 +716,7 @@ thread_local! { pub struct Worker { migrated: Cell, lease: Lease, - pub(crate) queue: JobQueue, + queue: JobQueue, rng: XorShift64Star, // Make non-send _phantom: PhantomData<*const ()>, @@ -704,6 +733,56 @@ pub enum Yield { } impl Worker { + /// Temporarily sets the thread's worker. [`Worker::with_current`] always + /// returns a reference to the worker set up by the most recent call to this + /// worker. + /// + /// Rust's thread locals are fairly costly, so this function is expensive. + /// If you can avoid calling it, do so. + #[inline(always)] + pub fn occupy(lease: Lease, f: F) -> R + where + F: FnOnce(&Worker) -> R, + { + trace!("occupying lease"); + + let span = trace_span!("occupy", lease = lease.index); + let _enter = span.enter(); + + // Create a new worker to occupy the lease. Note: It's potentially a + // problem that the same thread can occupy multiple workers on the same + // thread. We many eventually need to design something to prevent this. + let worker = Worker { + migrated: Cell::new(false), + lease, + queue: JobQueue::new(), + rng: XorShift64Star::new(), + _phantom: PhantomData, + }; + + // Swap the local pointer to point to the newly allocated worker. + let outer_ptr = WORKER_PTR.with(|ptr| ptr.replace(&worker)); + + // Run the function within the context created by the worker pointer, + // and pass in a worker reference directly. + let result = f(&worker); + + // Execute the work queue until it's empty. This happens to be pulled in + // LIFO order, but it's fairly arbitrary. + while let Some(job_ref) = worker.queue.pop_newest() { + worker.execute(job_ref, false); + } + + // Swap back to pointing to the previous value (possibly null). + WORKER_PTR.with(|ptr| ptr.set(outer_ptr)); + + trace!("vacating lease"); + + // Return the intermediate values created while running the closure, + // namely the result and any jobs still remaining on the local queue. + result + } + /// Calls the provided closure on the thread's worker instance, if it has one. /// /// Rust's thread locals are fairly costly, so this function is expensive. @@ -758,90 +837,75 @@ impl Worker { } } - /// Temporarily sets the thread's worker. [`Worker::with_current`] always - /// returns a reference to the worker set up by the most recent call to this - /// worker. - /// - /// Rust's thread locals are fairly costly, so this function is expensive. - /// If you can avoid calling it, do so. - #[inline(always)] - pub fn occupy(lease: Lease, f: F) -> R - where - F: FnOnce(&Worker) -> R, - { - trace!("occupying lease"); - - let span = trace_span!("occupy", lease = lease.index); - let _enter = span.enter(); - - // Create a new worker to occupy the lease. Note: It's potentially a - // problem that the same thread can occupy multiple workers on the same - // thread. We many eventually need to design something to prevent this. - let worker = Worker { - migrated: Cell::new(false), - lease, - queue: JobQueue::new(), - rng: XorShift64Star::new(), - _phantom: PhantomData, - }; - - // Swap the local pointer to point to the newly allocated worker. - let outer_ptr = WORKER_PTR.with(|ptr| ptr.replace(&worker)); - - // Run the function within the context created by the worker pointer, - // and pass in a worker reference directly. - let result = f(&worker); - - // Execute the work queue until it's empty - while let Some(job_ref) = worker.queue.pop_front() { - worker.execute(job_ref, false); - } - - // Swap back to pointing to the previous value (possibly null). - WORKER_PTR.with(|ptr| ptr.set(outer_ptr)); - - trace!("vacating lease"); - - // Return the intermediate values created while running the closure, - // namely the result and any jobs still remaining on the local queue. - result - } - /// Returns the index of the worker in the leases list. #[inline(always)] pub fn index(&self) -> usize { self.lease.index } + /// Returns the index of the thread pool of the worker. + #[inline(always)] + pub fn thread_pool(&self) -> &'static ThreadPool { + self.lease.thread_pool + } + /// Pushes a job onto the local queue, overflowing to the shared queue when /// full. #[inline(always)] pub fn enqueue(&self, job_ref: JobRef) { - if let Some(job_ref) = self.queue.push_back(job_ref) { + if let Some(job_ref) = self.queue.push(job_ref) { + // push the work to the shared queue self.lease.thread_pool.shared_jobs.push(job_ref); } } - /// Tries to promote the oldest job in the local stack to a shared job. If - /// the local job queue is empty, or if the shared queue is full, this does - /// nothing. If the promotion is successful, it tries to wake another - /// thread to accept the shared work. This is lock free. + /// Try to promote the oldest task in the queue. + #[inline(always)] + fn promote(&self) { + // Check for a heartbeat, potentially promoting the job we just pushed + // to a shared job. + #[cfg(not(feature = "shuttle"))] + let heartbeat = self.lease.seat_data.heartbeat.load(Ordering::Relaxed); + + #[cfg(feature = "shuttle")] + let heartbeat = true; // thread_rng().gen_bool(0.5); + + if heartbeat && let Some(job_ref) = self.queue.pop_oldest() { + self.promote_cold(job_ref); + #[cfg(not(feature = "shuttle"))] + self.lease + .seat_data + .heartbeat + .store(false, Ordering::Relaxed); + } + } + + /// Pushes work onto the shared queue and wakes another worker. #[cold] - fn promote(&self, job_ref: JobRef) { - // Push the job onto the shared queue + fn promote_cold(&self, job_ref: JobRef) { + // Push the job onto the shared queue. self.lease.thread_pool.shared_jobs.push(job_ref); - // Try to wake a worker to work on it - let state = self.lease.thread_pool.state.lock().unwrap(); - let num_seats = state.seats.len(); + // Fetch the number of sleeping workers and pending shared tasks + let num_sleeping = self.lease.thread_pool.num_sleeping.load(Ordering::Relaxed); + + if num_sleeping == 0 { + return; + } + + // Try to wake a worker to work on it. + // + // Note: This operation is extremely expensive, and should be avoided if possible. + let seats = self.lease.thread_pool.state.lock().unwrap().seats.clone(); + let num_seats = seats.len(); let offset = self.rng.next_usize(num_seats); for i in 0..num_seats { let i = (i + offset) % num_seats; if i == self.lease.index { continue; } - if state.seats[i].occupied { - let ready = state.seats[i].data.sleep_controller.wake(); + if seats[i].occupied { + let ready = seats[i].data.sleep_controller.wake(); if ready { return; } @@ -855,56 +919,62 @@ impl Worker { Latch::new(&self.lease.seat_data.sleep_controller) } - /// Runs jobs until the provided signal is received. When this thread runs - /// out of local or shared work and the signal is still yet to be received, - /// this puts the thread to sleep, and the thread will not wake again until - /// the signal is received. - /// - /// # Panics + /// Runs jobs until the provided latch is set. /// - /// This panics if a value has already been received over this signal. The - /// caller must ensure this won't be the case. + /// The thread may go to sleep if it runs out of work to do, but will wake + /// when the latch is set or more work becomes available. #[inline(always)] pub fn wait_for(&self, latch: &Latch) { while !latch.check() { + #[cfg(feature = "shuttle")] + shuttle::hint::spin_loop(); + if self.yield_now() == Yield::Idle { - let ready = latch.wait(); - if ready { - return; - } + latch.wait(); } } } /// Tries to find a job to execute, either in the local queue or shared on - /// the threadpool. + /// the thread pool. /// /// The second value is true if the job was shared, or false if it was spawned locally. #[inline(always)] - pub fn find_work(&self) -> Option<(JobRef, bool)> { + fn find_work(&self) -> Option<(JobRef, bool)> { // We give preference first to things in our local deque, then in other // workers deques, and finally to injected jobs from the outside. The // idea is to finish what we started before we take on something new. + // + // We pull from the local queue in LIFO order, which means are popping + // from the *back* of the queue (the most recently added jobs). This is + // because `yield_now` (and by extension `wait_for` which uses it) is + // often called directly after pushing work onto the queue (as in `join` + // and `scope`). Pulling from the back of the queue potentially can + // allow these blocking operations to complete faster. In the cast when + // scopes/joins are deeply nested, this also causes work to be executed + // *depth-first*, which is often desirable. self.queue - .pop_back() + .pop_newest() .map(|job| (job, false)) .or_else(|| self.claim_shared_job().map(|job| (job, true))) } /// Claims a shared job from the thread pool. #[inline(always)] - pub fn claim_shared_job(&self) -> Option { + fn claim_shared_job(&self) -> Option { self.lease.thread_pool.shared_jobs.pop() } - /// Cooperatively yields execution to the threadpool, allowing it to execute + /// Cooperatively yields execution to the thread pool, allowing it to execute /// some work. /// /// This function only executes local work: work already queued on the /// worker. It will never claim shared work. #[inline(always)] pub fn yield_local(&self) -> Yield { - match self.queue.pop_back() { + // We use LIFO order here, pulling the newest work from the queue. This + // is just for consistency with yield_now/find_work. + match self.queue.pop_newest() { Some(job_ref) => { self.execute(job_ref, false); Yield::Executed @@ -913,16 +983,19 @@ impl Worker { } } - /// Cooperatively yields execution to the threadpool, allowing it to execute + /// Cooperatively yields execution to the thread pool, allowing it to execute /// some work. /// /// This function may execute either local or shared work: work already /// queued on the worker, or work off-loaded by a different worker. If there - /// is no work on the pool, this will lock the thread-pool mutex, so it + /// is no work on the pool, this will lock the thread pool mutex, so it /// should not be called within a hot loop. Consider using /// [`Worker::yield_local`] instead. #[inline(always)] pub fn yield_now(&self) -> Yield { + // Try to promote an item from the queue + self.promote(); + match self.find_work() { Some((job_ref, migrated)) => { self.execute(job_ref, migrated); @@ -951,93 +1024,28 @@ impl Worker { } // ----------------------------------------------------------------------------- -// Worker scheduling api +// Worker operations +/// # Operations impl Worker { - /// Spawns a new closure onto the thread pool. Just like a standard thread, - /// this task is not tied to the current stack frame, and hence it cannot - /// hold any references other than those with 'static lifetime. If you want - /// to spawn a task that references stack data, use the - /// [`Worker::scope()`] function to create a scope. + /// Spawns work (a closure or future) onto the thread pool. Just like a + /// standard thread, this work executes concurrently (and potentially in + /// parallel) to the place where it is spawned. It is not tied to the + /// current stack frame, and hence it cannot hold any references other than + /// those with `'static` lifetime. If you want to spawn a task that + /// references stack data, use the [`scope`], [`ThreadPool::scope`] or + /// [`Worker::scope`] functions. /// /// Since tasks spawned with this function cannot hold references into the /// enclosing stack frame, you almost certainly want to use a move closure /// as their argument (otherwise, the closure will typically hold references /// to any variables from the enclosing function that you happen to use). /// - /// To spawn an async closure or future, use [`Worker::spawn_async`] or - /// [`Worker::spawn_future`]. To spawn a non-static closure, use - /// [`ThreadPool::scope`]. - /// /// If you do not have access to a [`Worker`], you may call /// [`ThreadPool::spawn`] or simply [`spawn`]. - #[inline(always)] - pub fn spawn(&self, f: F) - where - F: FnOnce(&Worker) + Send + 'static, - { - // Allocate a new job on the heap to store the closure. - let job = HeapJob::new(f); - - // Turn the job into an "owning" `JobRef` so it can be queued. - // - // SAFETY: All jobs added to the queue are guaranteed to be executed - // eventually, this is one of the core invariants of the thread pool. - // The closure `f` has a static lifetime, meaning it only closes over - // data that lasts for the duration of the program, so it's not possible - // for this job to outlive the data `f` closes over. - let job_ref = unsafe { job.into_job_ref() }; - - // Queue the `JobRef` on the worker so that it will be evaluated. - self.enqueue(job_ref); - } - - /// Spawns a future onto the thread pool. See [`Worker::spawn`] for more - /// information about spawning jobs. Only static futures are supported - /// through this function, but you can use [`Worker::scope`] to get a scope - /// on which non-static futures and async tasks can be spawned. - /// - /// # Returns - /// - /// Spawning a future returns a [`Task`], which represents a handle to the async - /// computation and is itself a future that can be awaited to receive the - /// return value. There's four ways to interact with a task: - /// - /// 1. Await the task. This will eventually produce the output of the - /// provided future. - /// - /// 2. Drop the task. This will stop execution of the future. - /// - /// 3. Cancel the task. This has the same effect as dropping the task, but - /// waits until the future stops running (which can take a while). - /// - /// 4. Detach the task. This will allow the future to continue executing - /// even after the task itself is dropped. - /// - /// If you do not have access to a [`Worker`], you may call - /// [`ThreadPool::spawn_future`] or simply [`spawn_future`]. - #[inline(always)] - pub fn spawn_future(&self, future: F) -> Task - where - F: Future + Send + 'static, - T: Send + 'static, - { - self.lease.thread_pool.spawn_future(future) - } - - /// Spawns an async closure onto the task pool. This is a simple wrapper - /// around [`Worker::spawn_future`]. - /// - /// If you do not have access to a [`Worker`], you may call - /// [`ThreadPool::spawn_async`] or simply [`spawn_async`]. - #[inline(always)] - pub fn spawn_async(&self, f: Fn) -> Task - where - Fn: FnOnce() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - T: Send + 'static, - { - self.lease.thread_pool.spawn_async(f) + #[inline] + pub fn spawn>(&self, work: S) -> S::Output { + work.spawn(self.lease.thread_pool, Some(self)) } /// Polls a future to completion, then returns the outcome. This function @@ -1100,8 +1108,10 @@ impl Worker { RA: Send, RB: Send, { - // Allocate a job to run the closure `a` on the stack. - let stack_job = StackJob::new(a, self); + // Allocate a job to run the closure `a` on the stack. It is vital to + // the correctness of this function that this stack-job never move until + // it is freed. + let mut stack_job = StackJob::new(a, self); // SAFETY: The `StackJob` is allocated on the stack just above, is never // moved, and so will live for the entirety of this function in the same @@ -1119,52 +1129,37 @@ impl Worker { // Push the job onto the queue. self.enqueue(job_ref); - // Check for a heartbeat, potentially promoting the job we just pushed - // to a shared job. - if self.lease.seat_data.heartbeat.load(Ordering::Relaxed) - && let Some(job_ref) = self.queue.pop_front() - { - self.promote(job_ref); - self.lease - .seat_data - .heartbeat - .store(false, Ordering::Relaxed); - } + // If we have received a heartbeat, we remove the oldest item in the + // local queue and push it into the shared queue. This causes work to be + // shared in "breadth-first" order (as opposed to the "depth-first" + // order we use when executing). + self.promote(); // Run the second closure directly. - let result_b = b(self); + let result_a; + let result_b = unwind::halt_unwinding(|| b(self)); // Attempt to recover the job from the queue. It should still be there // if we didn't share it. - if let Some(job) = self.queue.pop_back() { - // If the shoe fits, this is our original `JobRef`, and we can - // unwrap it to recover the closure `a` to execute it directly. - if job.id() == job_ref_id { - // SAFETY: Because the ids match, the JobRef we just popped from - // the queue must point to `stack_job`, implying that - // `stack_job` cannot have been executed yet. - let a = unsafe { stack_job.unwrap() }; - // Execute the closure directly and return the results. This - // allows the compiler to inline and optimize `a`. - let result_a = a(self); - return (result_a, result_b); - } - - // Even if it's not the droid we were looking for, we must still - // execute the job. - self.execute(job, false); + if self.queue.recover_just_pushed(job_ref_id) { + // SAFETY: Because the ids match, the JobRef we just popped from + // the queue must point to `stack_job`, implying that + // `stack_job` cannot have been executed yet. + let a = unsafe { stack_job.unwrap() }; + // Execute the closure directly and return the results. This is + // allows the compiler to inline and optimize `a`. + result_a = unwind::halt_unwinding(|| a(self)); + } else { + // Wait for the job to complete. + self.wait_for(stack_job.completion_latch()); + // SAFETY: The job must be complete, because we just waited on the latch. + result_a = unsafe { stack_job.return_value() }; } - // Wait for the job to complete. - self.wait_for(stack_job.completion_latch()); - - // SAFETY: The job must be complete, because we just waited on the latch. - let job_return_value = unsafe { stack_job.return_value() }; - - // If the job panicked, resume the panic on this thread. - match job_return_value { - Ok(result_a) => (result_a, result_b), - Err(error) => unwind::resume_unwinding(error), + // Resume unwinding if either job panicked. + match (result_a, result_b) { + (Err(error), _) | (_, Err(error)) => unwind::resume_unwinding(error), + (Ok(value_a), Ok(value_b)) => (value_a, value_b), } } @@ -1188,49 +1183,11 @@ impl Worker { /// If there is no current thread pool, this panics. /// /// See also: [`Worker::spawn`] and [`ThreadPool::spawn`]. -pub fn spawn(f: F) -where - F: FnOnce(&Worker) + Send + 'static, -{ +pub fn spawn>(work: S) -> S::Output { Worker::with_current(|worker| { worker .expect("attempt to call `forte::spawn` from outside a thread pool") - .spawn(f); - }); -} - -/// Spawns a future onto the current thread pool. -/// -/// If there is no current thread pool, this panics. -/// -/// See also: [`Worker::spawn_future`] and [`ThreadPool::spawn_future`]. -pub fn spawn_future(future: F) -> Task -where - F: Future + Send + 'static, - T: Send + 'static, -{ - Worker::with_current(|worker| { - worker - .expect("attempt to call `forte::spawn_future` from outside a thread pool") - .spawn_future(future) - }) -} - -/// Spawns an async closure onto the current thread pool. -/// -/// If there is no current thread pool, this panics. -/// -/// See also: [`Worker::spawn_async`] and [`ThreadPool::spawn_async`]. -pub fn spawn_async(f: Fn) -> Task -where - Fn: FnOnce() -> Fut + Send + 'static, - Fut: Future + Send + 'static, - T: Send + 'static, -{ - Worker::with_current(|worker| { - worker - .expect("attempt to call `forte::spawn_async` from outside a thread pool") - .spawn_async(f) + .spawn(work) }) } @@ -1238,7 +1195,7 @@ where /// /// If there is no current thread pool, this panics. /// -/// See also: [`Worker::spawn_future`] and [`ThreadPool::spawn_future`]. +/// See also: [`Worker::block_on`] and [`ThreadPool::block_on`]. pub fn block_on(future: F) -> T where F: Future + Send, @@ -1251,7 +1208,8 @@ where }) } -/// Executes two closures on the current thread pool and returns the results. +/// Takes two closures and *potentially* runs them in parallel. It +/// returns a pair of the results from those closures. /// /// If there is no current thread pool, this panics. /// @@ -1302,13 +1260,14 @@ where /// /// ``` /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { /// let ok: Vec = vec![1, 2, 3]; /// forte::scope(|scope| { /// let bad: Vec = vec![4, 5, 6]; -/// scope.spawn_on(worker, |_| { +/// scope.spawn_on(worker, |_: &Worker| { /// // Transfer ownership of `bad` into a local variable (also named `bad`). /// // This will force the closure to take ownership of `bad` from the environment. /// let bad = bad; @@ -1316,7 +1275,7 @@ where /// println!("bad: {:?}", bad); // refers to our local variable, above. /// }); /// -/// scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// scope.spawn_on(worker, |_: &Worker| println!("ok: {:?}", ok)); // we too can borrow `ok` /// }); /// # }); /// ``` @@ -1328,20 +1287,21 @@ where /// /// ```rust /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { /// let ok: Vec = vec![1, 2, 3]; /// forte::scope(|scope| { /// let bad: Vec = vec![4, 5, 6]; -/// scope.spawn_on(worker, move |_| { +/// scope.spawn_on(worker, move |_: &Worker| { /// println!("ok: {:?}", ok); /// println!("bad: {:?}", bad); /// }); /// /// // That closure is fine, but now we can't use `ok` anywhere else, /// // since it is owned by the previous task: -/// // scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); +/// // scope.spawn_on(worker, |_: &Worker| println!("ok: {:?}", ok)); /// }); /// # }); /// ``` @@ -1353,6 +1313,7 @@ where /// /// ```rust /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { @@ -1360,7 +1321,7 @@ where /// forte::scope(|scope| { /// let bad: Vec = vec![4, 5, 6]; /// let ok: &Vec = &ok; // shadow the original `ok` -/// scope.spawn_on(worker, move |_| { +/// scope.spawn_on(worker, move |_: &Worker| { /// println!("ok: {:?}", ok); // captures the shadowed version /// println!("bad: {:?}", bad); /// }); @@ -1369,7 +1330,7 @@ where /// // can be shared freely. Note that we need a `move` closure here though, /// // because otherwise we'd be trying to borrow the shadowed `ok`, /// // and that doesn't outlive `scope`. -/// scope.spawn_on(worker, move |_| println!("ok: {:?}", ok)); +/// scope.spawn_on(worker, move |_: &Worker| println!("ok: {:?}", ok)); /// }); /// # }); /// ``` @@ -1379,13 +1340,14 @@ where /// /// ```rust /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { /// let ok: Vec = vec![1, 2, 3]; /// forte::scope(|scope| { /// let bad: Vec = vec![4, 5, 6]; -/// scope.spawn_on(worker, |_| { +/// scope.spawn_on(worker, |_: &Worker| { /// // Transfer ownership of `bad` into a local variable (also named `bad`). /// // This will force the closure to take ownership of `bad` from the environment. /// let bad = bad; @@ -1393,7 +1355,7 @@ where /// println!("bad: {:?}", bad); // refers to our local variable, above. /// }); /// -/// scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// scope.spawn_on(worker, |_: &Worker| println!("ok: {:?}", ok)); // we too can borrow `ok` /// }); /// # }); /// ``` @@ -1405,6 +1367,7 @@ where /// /// ```compile_fail /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { @@ -1421,16 +1384,18 @@ where /// /// ``` /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// # THREAD_POOL.with_worker(|worker| { /// let mut counter = 0; +/// let counter_ref = &mut counter; /// forte::scope(|scope| { -/// scope.spawn_on(worker, |worker| { -/// counter += 1; +/// scope.spawn_on(worker, |worker: &Worker| { +/// *counter_ref += 1; /// // Note: we borrow the scope again here. -/// scope.spawn_on(worker, |_| { -/// counter += 1; +/// scope.spawn_on(worker, move |_: &Worker| { +/// *counter_ref += 1; /// }); /// }); /// }); @@ -1442,17 +1407,20 @@ where /// generally can't hold references to the scope. So for example, the /// following also fails to compile: /// -/// ```compile_fail +/// ```compile_fail,E0521 /// # use forte::ThreadPool; +/// # use forte::Worker; /// # static THREAD_POOL: ThreadPool = ThreadPool::new(); /// # THREAD_POOL.populate(); /// THREAD_POOL.with_worker(|worker| { /// worker.scope(|scope| { -/// worker.spawn(|worker| { -/// // This isn't allowed, because it makes the worker borrow the scope -/// scope.spawn_on(worker |_| { -/// // ... -/// }); +/// worker.spawn(|worker: &Worker| { +/// // ^^^^^ This creates a *non-static* job on the worker, +/// // which may outlive the scope. +/// +/// scope.spawn_on(worker, |_: &Worker| { }); +/// // ^^^^^ This requires borrowing the scope within the +/// // unscoped job, which isn't allowed by the compiler. /// }); /// }); /// }); @@ -1472,8 +1440,8 @@ where /// spawning task should later panic. The scope returns once all work is /// complete, and panics are propagated at that point. /// -/// Note: Panics in futures are instead propagated to their [`Task`], and will -/// not cause the scope to panic. +/// Note: Panics in futures are instead propagated to their +/// [`Task`][async_task::Task], and will not cause the scope to panic. pub fn scope<'env, F, T>(f: F) -> T where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, @@ -1500,17 +1468,12 @@ fn managed_worker(lease: Lease, halt: Arc, barrier: Arc) { // Register as the indicated worker, and work until we are told to halt. Worker::occupy(lease, |worker| { while !halt.load(Ordering::Relaxed) { - if let Some(job) = worker.queue.pop_back() { - worker.execute(job, false); - continue; - } - - while !halt.load(Ordering::Relaxed) { - if let Some(job) = worker.claim_shared_job() { - worker.execute(job, true); - break; - } + #[cfg(feature = "shuttle")] + shuttle::hint::spin_loop(); + if let Some((job, migrated)) = worker.find_work() { + worker.execute(job, migrated); + } else { worker.lease.seat_data.sleep_controller.sleep(); } } @@ -1533,48 +1496,30 @@ fn managed_worker(lease: Lease, halt: Arc, barrier: Arc) { /// This is never runs when testing in shuttle. #[cfg(not(feature = "shuttle"))] fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc) { - use std::thread; - trace!("starting managed heartbeat thread"); - // Stores the index of the tenant we intend to send the next heartbeat to. - let mut queued_to_heartbeat = 0; - - let mut state = thread_pool.state.lock().unwrap(); + let mut seats = thread_pool.state.lock().unwrap().seats.clone(); + let mut index = 0; while !halt.load(Ordering::Relaxed) { - let num_seats = state.seats.len(); - let mut num_occupied: usize = 0; - let mut sent_heartbeat = false; - - // Iterate through all the tenants, starting at the one queued to wake - for i in 0..num_seats { - let seat_index = (queued_to_heartbeat + i) % num_seats; - // Ignore unoccupied seats. - if state.seats[seat_index].occupied { - // Send a single heartbeat to the first live tenant we find. - if !sent_heartbeat { - state.seats[seat_index] - .data - .heartbeat - .store(true, Ordering::Relaxed); - sent_heartbeat = true; - // Start with the next tenant on the next invocation of the loop. - queued_to_heartbeat = (seat_index + 1) % num_seats; - } - - // Count every occupied slot, even if we didn't send them a heartbeat. - num_occupied += 1; - } - } - - if num_occupied > 0 { - drop(state); - let sleep_interval = HEARTBEAT_INTERVAL / num_occupied as u32; - thread::sleep(sleep_interval); - state = thread_pool.state.lock().unwrap(); + let num_seats = seats.len(); + let (back, front) = seats.split_at(index); + if let Some((offset, seat)) = Iterator::chain(front.iter(), back.iter()) + .enumerate() + .find(|(_, seat)| seat.occupied) + { + index = (index + offset + 1) % num_seats; + seat.data.heartbeat.store(true, Ordering::Relaxed); + std::thread::yield_now(); + seats = thread_pool.state.lock().unwrap().seats.clone(); } else { - state = thread_pool.new_participant.wait(state).unwrap(); + let state = thread_pool.state.lock().unwrap(); + seats = thread_pool + .start_heartbeat + .wait(state) + .unwrap() + .seats + .clone(); } } } @@ -1584,14 +1529,43 @@ fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc) { #[cfg(all(test, not(feature = "shuttle")))] mod tests { + + use std::sync::mpsc::channel; + use alloc::vec; use super::*; + #[test] + fn spawn_then_join_in_worker() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let (tx, rx) = channel(); + THREAD_POOL.scope(move |_| { + spawn(move |_: &Worker| tx.send(22).unwrap()); + }); + assert_eq!(22, rx.recv().unwrap()); + + THREAD_POOL.depopulate(); + } + + #[test] + fn spawn_then_join_outside_worker() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to_available(); + + let (tx, rx) = channel(); + THREAD_POOL.spawn(move |_: &Worker| tx.send(22).unwrap()); + assert_eq!(22, rx.recv().unwrap()); + + THREAD_POOL.depopulate(); + } + #[test] fn join_basic() { static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.populate(); + THREAD_POOL.resize_to_available(); let mut a = 0; let mut b = 0; @@ -1604,6 +1578,7 @@ mod tests { } #[test] + #[cfg(not(miri))] // This is too much for miri to handle fn join_long() { fn increment(worker: &Worker, slice: &mut [u32]) { match slice.len() { @@ -1618,7 +1593,7 @@ mod tests { } static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.populate(); + THREAD_POOL.resize_to_available(); let mut vals = [0; 1_024]; THREAD_POOL.with_worker(|worker| increment(worker, &mut vals)); @@ -1628,6 +1603,7 @@ mod tests { } #[test] + #[cfg(not(miri))] // This is too much for miri to handle fn join_very_long() { fn increment(worker: &Worker, slice: &mut [u32]) { match slice.len() { @@ -1646,11 +1622,11 @@ mod tests { } static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.populate(); + THREAD_POOL.resize_to_available(); - let mut vals = vec![0; 1_024 * 1_024]; + let mut vals = vec![0; 512 * 512]; THREAD_POOL.with_worker(|worker| increment(worker, &mut vals)); - assert_eq!(vals, vec![1; 1_024 * 1_024]); + assert_eq!(vals, vec![1; 512 * 512]); THREAD_POOL.depopulate(); } diff --git a/src/util.rs b/src/util.rs index 6f3a4d1..33ba5da 100644 --- a/src/util.rs +++ b/src/util.rs @@ -9,10 +9,12 @@ use std::hash::DefaultHasher; /// even tolerate weak seeding, as long as it's not zero. /// /// [xorshift*]: https://en.wikipedia.org/wiki/Xorshift#xorshift* +#[cfg(not(feature = "shuttle"))] pub struct XorShift64Star { state: Cell, } +#[cfg(not(feature = "shuttle"))] impl XorShift64Star { pub fn new() -> Self { // Any non-zero seed will do -- this uses the hash of a global counter. @@ -29,6 +31,13 @@ impl XorShift64Star { } } + #[allow(dead_code)] + pub fn from_seed(seed: u64) -> Self { + XorShift64Star { + state: Cell::new(seed), + } + } + fn next(&self) -> u64 { let mut x = self.state.get(); debug_assert_ne!(x, 0); @@ -44,3 +53,20 @@ impl XorShift64Star { (self.next() % n as u64) as usize } } + +#[cfg(feature = "shuttle")] +pub struct XorShift64Star; + +#[cfg(feature = "shuttle")] +impl XorShift64Star { + pub fn new() -> Self { + Self + } + + pub fn next_usize(&self, n: usize) -> usize { + use shuttle::rand::Rng; + use shuttle::rand::thread_rng; + + thread_rng().gen_range(0..n) + } +} diff --git a/tests/shuttle.rs b/tests/shuttle.rs index 05a8b60..9d7e14a 100644 --- a/tests/shuttle.rs +++ b/tests/shuttle.rs @@ -3,8 +3,17 @@ #![cfg(feature = "shuttle")] #![allow(unused_imports)] +use core::pin::Pin; +use core::task::Context; +use core::task::Poll; + use forte::ThreadPool; +use forte::Worker; + use shuttle::hint::black_box; +use shuttle::sync::atomic::AtomicBool; +use shuttle::sync::atomic::AtomicUsize; +use shuttle::sync::atomic::Ordering; use tracing::Level; use tracing_subscriber::fmt::Subscriber; @@ -31,22 +40,22 @@ where /// Provides access to a thread pool which can be treated as static for the /// purposes of testing. -fn with_thread_pool(f: F) +fn with_thread_pool(f: F) -> impl Fn() + 'static where F: Fn(&'static ThreadPool) + 'static, { - let thread_pool = Box::new(ThreadPool::new()); - let ptr = Box::into_raw(thread_pool); + move || { + let thread_pool = Box::new(ThreadPool::new()); + let thread_pool_ptr = Box::into_raw(thread_pool); - // SAFETY: TODO - unsafe { - let thread_pool = &*ptr; - f(thread_pool); - }; + // SAFETY: TODO + let thread_pool_ref = unsafe { &*thread_pool_ptr }; + f(thread_pool_ref); - // SAFETY: TODO - let thread_pool = unsafe { Box::from_raw(&mut *ptr) }; - drop(thread_pool); + // SAFETY: TODO + let thread_pool = unsafe { Box::from_raw(&mut *thread_pool_ptr) }; + drop(thread_pool); + } } // ----------------------------------------------------------------------------- @@ -55,57 +64,179 @@ where /// Tests for concurrency issues within the `with_thread_pool` helper function. /// This spins up a thread pool with a single thread, then spins it back down. #[test] -pub fn thread_pool() { - fn resize(thread_pool: &'static ThreadPool) { - thread_pool.resize_to(3); - thread_pool.resize_to(0); - } +pub fn shuttle_populate_depopulate() { + let test = with_thread_pool(|pool| { + pool.populate(); + pool.depopulate(); + }); - shuttle::check_dfs(|| with_thread_pool(resize), None); + shuttle::check_dfs(test, None); } // ----------------------------------------------------------------------------- // Core API -/* - -/// Tests for concurrency issues when spawning a static closure. +/// Tests spawning a worker on a pool of size one. #[test] -pub fn spawn_closure() { - fn spawn(thread_pool: &'static ThreadPool) { - thread_pool.spawn(|worker| { - black_box(worker); - }); +pub fn shuttle_spawn_closure() { + let test = with_thread_pool(|pool| { + pool.resize_to(1); + pool.spawn(|_: &Worker| {}); + pool.depopulate(); + }); + + shuttle::check_dfs(test, None); +} + +#[derive(Default)] +struct CountFuture { + count: usize, +} + +impl Future for CountFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.count == 128 { + Poll::Ready(()) + } else { + self.count += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } } +} - shuttle::check_pct(|| with_thread_pool(spawn), 200, 200); +/// Tests spawning a nontrivial future on a pool of size one. +#[test] +pub fn shuttle_spawn_future() { + let test = with_thread_pool(|pool| { + pool.resize_to(1); + let task = pool.spawn(CountFuture::default()); + assert!(task.is_finished()); + pool.depopulate(); + }); + + shuttle::check_dfs(test, None); } -/// Tests for concurrency issues when spawning a static future. +/// Tests a two-level join operation on a pool of size one. #[test] -pub fn spawn_future() { - model(|| { - with_thread_pool(|_, worker| { - let complete = Box::leak(Box::new(AtomicBool::new(false))); - let task = worker.spawn_future(async { - complete.store(true, Ordering::Release); - }); - task.detach(); - worker.run_until(&complete); - }); +pub fn join_4_on_1() { + let test = with_thread_pool(|pool| { + pool.resize_to(1); + + let counter = AtomicUsize::new(0); + pool.join( + |worker| { + worker.join( + |_| counter.fetch_add(1, Ordering::Relaxed), + |_| counter.fetch_add(1, Ordering::Relaxed), + ) + }, + |worker| { + worker.join( + |_| counter.fetch_add(1, Ordering::Relaxed), + |_| counter.fetch_add(1, Ordering::Relaxed), + ) + }, + ); + assert_eq!(counter.load(Ordering::Relaxed), 4); + + pool.depopulate(); }); + + shuttle::check_pct(test, 100_000, 10_000); } -/// Tests for concurrency issues in join operations. +/// Tests a two-level join operation on a pool of size two. #[test] -pub fn join() { - model(|| { - with_thread_pool(|_, worker| { - worker.join(|_| black_box(()), |_| black_box(())); - }); +pub fn join_4_on_2() { + let test = with_thread_pool(|pool| { + pool.resize_to(2); + + let counter = AtomicUsize::new(0); + pool.join( + |worker| { + worker.join( + |_| counter.fetch_add(1, Ordering::Relaxed), + |_| counter.fetch_add(1, Ordering::Relaxed), + ) + }, + |worker| { + worker.join( + |_| counter.fetch_add(1, Ordering::Relaxed), + |_| counter.fetch_add(1, Ordering::Relaxed), + ) + }, + ); + assert_eq!(counter.load(Ordering::Relaxed), 4); + + pool.depopulate(); + }); + + shuttle::check_pct(test, 100_000, 10_000); +} + +/// Tests a two-level join operation on a pool of size three. +#[test] +pub fn join_4_on_3() { + let test = with_thread_pool(|pool| { + pool.resize_to(3); + + let counter = AtomicUsize::new(0); + pool.join( + |worker| { + worker.join( + |_| counter.fetch_add(1, Ordering::Relaxed), + |_| counter.fetch_add(1, Ordering::Relaxed), + ) + }, + |worker| { + worker.join( + |_| counter.fetch_add(1, Ordering::Relaxed), + |_| counter.fetch_add(1, Ordering::Relaxed), + ) + }, + ); + assert_eq!(counter.load(Ordering::Relaxed), 4); + + pool.depopulate(); }); + + shuttle::check_pct(test, 100_000, 10_000); } +/// Tests a moderately deep join operation on a large pool. +#[test] +pub fn join_long() { + let test = with_thread_pool(|pool| { + pool.resize_to(8); + + fn increment(worker: &Worker, slice: &mut [u32]) { + match slice.len() { + 0 => (), + 1 => slice[0] += 1, + _ => { + let (head, tail) = slice.split_at_mut(1); + + worker.join(|_| head[0] += 1, |worker| increment(worker, tail)); + } + } + } + + let mut vals = [0; 10]; + pool.with_worker(|worker| increment(worker, &mut vals)); + assert_eq!(vals, [1; 10]); + + pool.depopulate(); + }); + + shuttle::check_pct(test, 100_000, 10_000); +} + +/* + /// Tests for concurrency issues when blocking on a future. #[test] pub fn block_on() { diff --git a/tests/tests.rs b/tests/tests.rs deleted file mode 100644 index 29e6cd5..0000000 --- a/tests/tests.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Entrypoint for all of Forte's tests - -mod general; -mod miri; -mod shuttle;