From e607d29040948cda77fa2953a1227599ac2fa890 Mon Sep 17 00:00:00 2001 From: NthTensor Date: Tue, 19 Aug 2025 23:27:11 -0400 Subject: [PATCH] refactor: use bevy-style scopes --- rayon-compat/src/lib.rs | 29 ++- src/lib.rs | 2 + src/scope.rs | 413 ++++++++++++++++++++++++---------------- src/thread_pool.rs | 277 +++++++++++++++++++++------ 4 files changed, 494 insertions(+), 227 deletions(-) diff --git a/rayon-compat/src/lib.rs b/rayon-compat/src/lib.rs index e130e91..995e518 100644 --- a/rayon-compat/src/lib.rs +++ b/rayon-compat/src/lib.rs @@ -80,25 +80,40 @@ where // ----------------------------------------------------------------------------- // Scope -pub use forte::Scope; +pub struct Scope<'scope, 'env> { + inner_scope: &'scope forte::Scope<'scope, 'env>, +} + +impl<'scope, 'env> Scope<'scope, 'env> { + pub fn spawn(self, f: F) + where + F: FnOnce(Scope<'scope, 'env>) + Send + 'scope, + { + forte::Worker::with_current(|worker| { + let worker = worker.unwrap(); + let inner_scope = self.inner_scope; + inner_scope.spawn_on(worker, |_| f(Scope { inner_scope })) + }); + } +} #[inline(always)] -pub fn scope<'scope, OP, R>(op: OP) -> R +pub fn scope<'env, OP, R>(op: OP) -> R where - OP: FnOnce(&Scope<'scope>) -> R + Send, + OP: for<'scope> FnOnce(Scope<'scope, 'env>) -> R + Send, R: Send, { ensure_started(); - forte::scope(op) + forte::scope(|inner_scope| op(Scope { inner_scope })) } #[inline(always)] -pub fn in_place_scope<'scope, OP, R>(op: OP) -> R +pub fn in_place_scope<'env, OP, R>(op: OP) -> R where - OP: FnOnce(&Scope<'scope>) -> R, + OP: for<'scope> FnOnce(Scope<'scope, 'env>) -> R, { ensure_started(); - forte::scope(op) + forte::scope(|inner_scope| op(Scope { inner_scope })) } // ----------------------------------------------------------------------------- diff --git a/src/lib.rs b/src/lib.rs index 4afcebf..4aa6c49 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,6 +73,7 @@ mod platform { pub use alloc::sync::Arc; pub use alloc::sync::Weak; pub use core::sync::atomic::AtomicBool; + pub use core::sync::atomic::AtomicPtr; pub use core::sync::atomic::AtomicU32; pub use core::sync::atomic::Ordering; pub use std::sync::Barrier; @@ -95,6 +96,7 @@ mod platform { pub use shuttle::sync::Mutex; pub use shuttle::sync::Weak; pub use shuttle::sync::atomic::AtomicBool; + pub use shuttle::sync::atomic::AtomicPtr; pub use shuttle::sync::atomic::AtomicU32; pub use shuttle::sync::atomic::Ordering; pub use shuttle::thread::Builder as ThreadBuilder; diff --git a/src/scope.rs b/src/scope.rs index 87ccfa2..7bc5e09 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -2,71 +2,102 @@ //! information see [`crate::scope()`] or the [`Scope`] type. use alloc::boxed::Box; +use core::any::Any; use core::future::Future; use core::marker::PhantomData; -use core::marker::PhantomPinned; +use core::ptr; use async_task::Runnable; use async_task::Task; use scope_ptr::ScopePtr; +use crate::ThreadPool; use crate::job::HeapJob; use crate::job::JobRef; use crate::platform::*; use crate::signal::Signal; use crate::thread_pool::Worker; +use crate::unwind; +use crate::unwind::AbortOnDrop; // ----------------------------------------------------------------------------- // Scope -/// A scope which can spawn a number of non-static jobs and async tasks. See -/// [`ThreadPool::scope`](crate::ThreadPool::scope) for more information. -pub struct Scope<'scope> { +/// A scope which can spawn a number of non-static jobs and async tasks. +pub struct Scope<'scope, 'env: 'scope> { /// Number of active references to the scope (including the owning /// allocation). This is incremented each time a new `ScopePtr` is created, - /// and decremented when a `ScopePtr` or the `Scope` itself is dropped. + /// and decremented when a `ScopePtr` is dropped or the owning thead is done + /// using it. count: AtomicU32, /// A signal used to communicate when the scope has been completed. signal: Signal, - /// A marker that makes the scope behave as if it contained a vector of - /// closures to execute, all of which outlive `'scope`. We pretend they are - /// `Send + Sync` even though they're not actually required to be `Sync`. - /// It's still safe to let the `Scope` implement `Sync` because the closures - /// are only *moved* across threads to be executed. - #[allow(clippy::type_complexity)] - _marker: PhantomData) + Send + Sync + 'scope>>, - /// Opt out of Unpin behavior; this type requires strong pinning guaranties. - _phantom: PhantomPinned, + /// If any job panics, we store the result here to propagate it. + panic: AtomicPtr>, + /// Makes `Scope` invariant over 'scope + _scope: PhantomData<&'scope mut &'scope ()>, + /// Makes `Scope` invantiant over 'env + _env: PhantomData<&'env mut &'env ()>, } -impl<'scope> Scope<'scope> { - /// Creates a new scope owned by the given worker thread. For a safe - /// equivalent, use [`ThreadPool::scope`](crate::ThreadPool::scope). - /// - /// Every scope contains a lifetime `'scope`, which must outlive anything - /// spawned onto the scope. - /// - /// When a scope is dropped, it will block the thread until all work - /// spawened on the scope is complete. +/// Crates a new scope on a worker. [`Worker::scope`] is just an alias for this +/// function. +#[inline] +pub fn with_scope<'env, F, T>(worker: &Worker, f: F) -> T +where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, +{ + let abort_guard = AbortOnDrop; + // SAFETY: The scope is never moved or mutably referenced. The scope is only + // dropped at the end of this function, after the call to `complete`. The + // abort guard above prevents the stack from being dropped early during a + // panic unwind. + let scope = unsafe { Scope::new() }; + // Panics that occur within the closure should be caught and propagated once + // all spawned work is complete. This is not a safety requirement, it's just + // a nicer behavior than aborting. + let result = match unwind::halt_unwinding(|| f(&scope)) { + Ok(value) => Some(value), + Err(err) => { + scope.store_panic(err); + None + } + }; + // Now that the user has (presuamably) spawnd some work onto the scope, we + // must wait for it to complete. + // + // SAFETY: This is called only once. + unsafe { scope.complete(worker) }; + // At this point all work on the scope is complete, so it is safe to drop + // the scope. This also means we can relinquish our abort guard (returning + // to the normal panic behavior). + core::mem::forget(abort_guard); + // If the closure or any spawned work did panic, we can now panic. + scope.maybe_propagate_panic(); + // Otherwise return the result of evaluating the closure. + result.unwrap() +} + +impl<'scope, 'env> Scope<'scope, 'env> { + /// Creates a new scope /// /// # Safety /// - /// The caller must pin the scope before it can be used (This cannot be - /// enforced on the type-level due to compatibility requirements with rayon) - /// and must ensure the scope is eventually dropped. - pub(crate) unsafe fn new() -> Scope<'scope> { + /// The caller must promise not to move or mutably reference this scope + /// until it is dropped, and must not allow the scope to be dropped until + /// after `Scope::complete` is run and returns. + unsafe fn new() -> Scope<'scope, 'env> { Scope { count: AtomicU32::new(1), signal: Signal::new(), - _marker: PhantomData, - _phantom: PhantomPinned, + panic: AtomicPtr::new(ptr::null_mut()), + _scope: PhantomData, + _env: PhantomData, } } - /// Spawns a job into the scope. This job will execute sometime before the - /// scope completes. The job is specified as a closure, and this closure - /// receives its own reference to the scope `self` as argument. This can be - /// used to inject new jobs into `self`. + /// Spawns a scoped job onto the local worker. This job will execute + /// sometime before the scope completes. /// /// # Returns /// @@ -76,45 +107,34 @@ impl<'scope> Scope<'scope> { /// channels. /// /// If you need to return a value, spawn a `Future` instead with - /// [`Scope::spawn_future`]. - /// - /// # See also - /// - /// The [`ThreadPool::scope`](crate::ThreadPool::scope) function has more - /// extensive documentation about task spawning. - /// - /// # Panics - /// - /// Panics if not called from within a worker. + /// [`Scope::spawn_future_on`]. /// - pub fn spawn(&self, f: F) + pub fn spawn_on(&self, worker: &Worker, f: F) where - F: FnOnce(&Scope<'scope>) + Send + 'scope, + F: FnOnce(&Worker) + Send + 'scope, { // Create a job to execute the spawned function in the scope. - // - // SAFETY: This scope must be pinned, since the only way to create a - // scope is via `Scope::new` and that function requires the caller pin - // the scope before using it. - let scope_ptr = unsafe { ScopePtr::new(self) }; - let job = HeapJob::new(move |_| { - scope_ptr.run(f); + let scope_ptr = ScopePtr::new(self); + let job = HeapJob::new(move |worker| { + // Catch any panics and store them on the scope. + let result = unwind::halt_unwinding(|| f(worker)); + if let Err(err) = result { + scope_ptr.store_panic(err); + }; + drop(scope_ptr); }); // SAFETY: We must ensure that the heap job does not outlive the data it // closes over. In effect, this means it must not outlive `'scope`. // - // The `'scope` will last until the scope is deallocated, which (due to - // reference counting) will not be until after `scope_ptr` within the - // heap job is dropped. So `'scope` should last at least until the heap - // job is dropped. + // This is ensured by the `scope_ptr` and the scope rules, which will + // keep the calling stack frame alive until this job completes, + // effectively extending the lifetime of `'scope` for as long as is + // nessicary. let job_ref = unsafe { job.into_job_ref() }; // Send the job to a queue to be executed. - Worker::with_current(|worker| { - let worker = worker.unwrap(); - worker.queue.push_back(job_ref); - }); + worker.queue.push_back(job_ref); } /// Spawns a future onto the scope. This future will be asynchronously @@ -143,43 +163,18 @@ impl<'scope> Scope<'scope> { /// infinite loop will prevent the scope from completing, and is not /// recommended. /// - /// # Panics - /// - /// Panics if not called within a worker. - /// - pub fn spawn_future(&self, future: F) -> Task + pub fn spawn_future_on(&self, thread_pool: &'static ThreadPool, future: F) -> Task where F: Future + Send + 'scope, - T: Send + 'scope, - { - self.spawn_async(|_| future) - } - - /// Spawns an async closure onto the scope. This future will be - /// asynchronously polled to completion some time before the scope - /// completes. - /// - /// Internally the closure is wrapped into a future and passed along to - /// [`Scope::spawn_future`]. See the docs on that function for more - /// information. - /// - /// # Panics - /// - /// Panics if not called within a worker. - /// - pub fn spawn_async(&self, f: Fn) -> Task - where - Fn: FnOnce(&Scope<'scope>) -> Fut + Send + 'scope, - Fut: Future + Send + 'scope, - T: Send + 'scope, + T: Send, { - // Wrap the function into a future using an async block. - // - // SAFETY: This scope must be pinned, since the only way to create a - // scope is via `Scope::new` and that function requires the caller pin - // the scope before using it. - let scope_ptr = unsafe { ScopePtr::new(self) }; - let future = async move { scope_ptr.run(f).await }; + // Embed the scope pointer into the future. + let scope_ptr = ScopePtr::new(self); + let future = async move { + let result = future.await; + drop(scope_ptr); + result + }; // The schedule function will turn the future into a job when woken. let schedule = move |runnable: Runnable| { @@ -201,8 +196,7 @@ impl<'scope> Scope<'scope> { // local queue, which will generally cause tasks to stick to the // same thread instead of jumping around randomly. This is also // faster than injecting into the global queue. - Worker::with_current(|worker| { - let worker = worker.unwrap(); + thread_pool.with_worker(|worker| { worker.queue.push_back(job_ref); }); }; @@ -210,11 +204,10 @@ impl<'scope> Scope<'scope> { // SAFETY: We must ensure that the runnable does not outlive the data it // closes over. In effect, this means it must not outlive `'scope`. // - // The `'scope` will last until the scope is deallocated, which (due to - // reference counting) will not be until after `scope_ptr` within the - // future is dropped. The future will not be dropped until after the - // runnable is dropped, so `'scope` should last at least until the - // runnable is dropped. + // This is ensured by the `scope_ptr` and the scope rules, which will + // keep the calling stack frame alive until the runnable is dropped, + // effectively extending the lifetime of `'scope` for as long as is + // nessicary. // // We have to use `spawn_unchecked` here instead of `spawn` because the // future is non-static. @@ -233,7 +226,7 @@ impl<'scope> Scope<'scope> { /// `Scope::remove_reference`, or the scope will block forever on /// completion. fn add_reference(&self) { - let counter = self.count.fetch_add(1, Ordering::SeqCst); + let counter = self.count.fetch_add(1, Ordering::Release); tracing::trace!("scope reference counter increased to {}", counter + 1); } @@ -245,7 +238,7 @@ impl<'scope> Scope<'scope> { /// `add_reference` for every call to this function, unless used within /// `Scope::complete`. unsafe fn remove_reference(&self) { - let counter = self.count.fetch_sub(1, Ordering::SeqCst); + let counter = self.count.fetch_sub(1, Ordering::Acquire); tracing::trace!("scope reference counter decreased to {}", counter - 1); if counter == 1 { // Alerts the owning thread that the scope has completed. @@ -259,22 +252,61 @@ impl<'scope> Scope<'scope> { unsafe { Signal::send(&self.signal, ()) }; } } -} -impl Drop for Scope<'_> { - fn drop(&mut self) { - // When the scope is dropped, block to prevent deallocation until the - // reference counter allows the scope to complete. - tracing::trace!("completing scope"); + /// Stores a panic so that it can be propagated when the scope is complete. + /// If called multiple times, only the first panic is stored, and the + /// remainder are dropped. + fn store_panic(&self, err: Box) { + if self.panic.load(Ordering::Relaxed).is_null() { + let nil = ptr::null_mut(); + let err_ptr = Box::into_raw(Box::new(err)); + if self + .panic + .compare_exchange(nil, err_ptr, Ordering::Release, Ordering::Relaxed) + .is_ok() + { + // Ownership is now transferred into the panic field. + } else { + // Another panic raced in ahead of us, so we need to drop this one. + // + // SAFETY: This was created by `Box::into_raw` just above. It is + // possible that this will panic, because it's a `Box`, + // however in the worst case this will simply trigger the + // scope's abort guard, causing an abort rather than UB. + let _: Box<_> = unsafe { Box::from_raw(err_ptr) }; + } + } + } + + /// Propagates any panic captured while the scope was executing. + fn maybe_propagate_panic(&self) { + let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed); + if !panic.is_null() { + // SAFETY: This was created by `Box::into_raw` in `store_panic` and, + // because of the atomic swap just above, is only called once for + // each box. + let value = unsafe { Box::from_raw(panic) }; + unwind::resume_unwinding(*value); + } + } + + /// Waits for the scope to complete. + /// + /// # Safety + /// + /// This must be called only once. + unsafe fn complete(&self, worker: &Worker) { // SAFETY: This is explicitly allowed, because every scope starts off - // with a counter of 1. This should be the only call to - // `remove_reference` without a corresponding call to `add_reference`, so - // the only one that can cause the reference counter to drop to zero. + // with a counter of 1. Because this is called only once, the following + // should be the only call to `remove_reference` without a corresponding + // call to `add_reference`. + // + // Only after the following call will the counter decrement to zero, + // causing the signal to become set and allowing this function to + // return. unsafe { self.remove_reference() }; - Worker::with_current(|worker| { - let worker = worker.unwrap(); - worker.wait_for_signal(&self.signal); - }); + // Wait for the remaining work to complete. + worker.wait_for_signal(&self.signal); } } @@ -284,81 +316,132 @@ impl Drop for Scope<'_> { mod scope_ptr { //! Defines a "lifetime-erased" reference-counting pointer to a scope. + use alloc::boxed::Box; + use core::any::Any; + use super::Scope; /// A reference-counted pointer to a scope. Used to capture a scope pointer /// in jobs without faking a lifetime. Holding a `ScopePtr` keeps the /// reference scope from being deallocated. - pub struct ScopePtr<'scope>(*const Scope<'scope>); + pub struct ScopePtr<'scope, 'env>(*const Scope<'scope, 'env>); // SAFETY: !Send for raw pointers is not for safety, just as a lint. - unsafe impl Send for ScopePtr<'_> {} + unsafe impl Send for ScopePtr<'_, '_> {} // SAFETY: !Sync for raw pointers is not for safety, just as a lint. - unsafe impl Sync for ScopePtr<'_> {} + unsafe impl Sync for ScopePtr<'_, '_> {} - impl<'scope> ScopePtr<'scope> { + impl<'scope, 'env> ScopePtr<'scope, 'env> { /// Creates a new reference-counted scope pointer which can be sent to other /// threads. - /// - /// # SAFETY: - /// - /// The scope must be pinned (this cannot be enforced on the type level - /// due to compatibility requirements with rayon). - pub unsafe fn new(scope: &Scope<'scope>) -> ScopePtr<'scope> { + pub fn new(scope: &Scope<'scope, 'env>) -> ScopePtr<'scope, 'env> { + // Add a reference to ensure the scope will stay alive at least + // until this is dropped (which we will decrement the counter). scope.add_reference(); ScopePtr(scope) } - /// Passes the scope referred to by this pointer into a closure. - pub fn run(&self, f: F) -> T - where - F: FnOnce(&Scope<'scope>) -> T + 'scope, - { - // SAFETY: This pointer is convertible to a shared reference. - // - // + It was created from an immutable reference to a pinned scope. - // The only way for this to be invalidated is if the scope was - // dropped in the time since we created the pointer. - // - // + We incremented the scope's reference counter and will not - // decrement it until this pointer is dropped. Since the scope - // will not be dropped while the reference counter is above - // zero, we know the pointer is still valid. - // - // + The scope is never accessed mutably, so creating shared - // references is allowed. - // + /// Stores a panic in the scope that can be resumed later. + pub fn store_panic(&self, err: Box) { + // SAFETY: This was created using an immutable scope reference, and + // by the scope rules there can be no mutable references to this + // scope, nor can the scope have been moved or deallocated while the + // scope's counter remains incremented. let scope_ref = unsafe { &*self.0 }; - - // Execute the closure on the shared reference. - f(scope_ref) + scope_ref.store_panic(err); } } - impl Drop for ScopePtr<'_> { + impl Drop for ScopePtr<'_, '_> { fn drop(&mut self) { - // SAFETY: This pointer is convertible to a shared reference. - // - // + It was created from an immutable reference to a pinned scope. - // The only way for this to be invalidated is if the scope was - // dropped in the time since we created the pointer. - // - // + We incremented the scope's reference counter and will not - // decrement it until this pointer is dropped. Since the scope - // will not be dropped while the reference counter is above - // zero, we know the pointer is still valid. - // - // + The scope is never accessed mutably, so creating shared - // references is allowed. - // + // SAFETY: This was created using an immutable scope reference, and + // by the scope rules there can be no mutable references to this + // scope, nor can the scope have been moved or deallocated while the + // scope's counter remains incremented. let scope_ref = unsafe { &*self.0 }; - // Decrement the reference counter, possibly allowing the scope to - // complete. + // Decrement the reference counter, possibly allowing + // `Scope::complete` to return and the scope itself to be freed. // // SAFETY: We call `add_reference` in `ScopePtr::new`. unsafe { scope_ref.remove_reference() }; } } } + +// ----------------------------------------------------------------------------- +// Tests + +#[cfg(all(test, not(feature = "shuttle")))] +mod tests { + use core::sync::atomic::AtomicU8; + use core::sync::atomic::Ordering; + + use crate::ThreadPool; + use crate::scope; + + #[test] + fn scoped_borrow() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.populate(); + + let mut string = "a"; + THREAD_POOL.with_worker(|worker| { + scope(|scope| { + scope.spawn_on(worker, |_| { + string = "b"; + }) + }); + }); + assert_eq!(string, "b"); + } + + #[test] + fn scoped_borrow_twice() { + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.populate(); + + let mut string = "a"; + THREAD_POOL.with_worker(|worker| { + scope(|scope| { + scope.spawn_on(worker, |worker| { + string = "b"; + scope.spawn_on(worker, |_| { + string = "c"; + }) + }) + }); + }); + assert_eq!(string, "c"); + } + + #[test] + fn scoped_concurrency() { + const NUM_JOBS: u8 = 128; + + static THREAD_POOL: ThreadPool = ThreadPool::new(); + THREAD_POOL.resize_to(4); + + let a = AtomicU8::new(0); + let b = AtomicU8::new(0); + + THREAD_POOL.with_worker(|worker| { + scope(|scope| { + for _ in 0..NUM_JOBS { + scope.spawn_on(worker, |_| { + THREAD_POOL.join( + |_| a.fetch_add(1, Ordering::Relaxed), + |_| b.fetch_add(1, Ordering::Relaxed), + ); + }); + } + }); + }); + + assert_eq!(a.load(Ordering::Relaxed), NUM_JOBS); + assert_eq!(b.load(Ordering::Relaxed), NUM_JOBS); + + THREAD_POOL.depopulate(); + } +} diff --git a/src/thread_pool.rs b/src/thread_pool.rs index 58361d8..d2ddf94 100644 --- a/src/thread_pool.rs +++ b/src/thread_pool.rs @@ -7,8 +7,8 @@ use alloc::vec::Vec; use core::cell::Cell; use core::cmp; use core::future::Future; +use core::marker::PhantomData; use core::num::NonZero; -use core::pin::Pin; use core::pin::pin; use core::ptr; use core::ptr::NonNull; @@ -22,13 +22,14 @@ use tracing::debug; use tracing::trace; use tracing::trace_span; +use crate::Scope; use crate::blocker::Blocker; use crate::job::HeapJob; use crate::job::JobQueue; use crate::job::JobRef; use crate::job::StackJob; use crate::platform::*; -use crate::scope::Scope; +use crate::scope::with_scope; use crate::signal::Signal; use crate::unwind; @@ -603,13 +604,13 @@ impl ThreadPool { self.with_worker(|worker| worker.join(a, b)) } - /// Create a scope for spawning non-static work. + /// Creates a scope onto which non-static work can be spawned. /// - /// See also: [`Worker::scope`] and [`scope`]. - #[inline] - pub fn scope<'scope, F, T>(&'static self, f: F) -> T + /// For more complete docs, see [`scope`]. If you have a reference to a + /// worker, you should call [`Worker::scope`] instead. + pub fn scope<'env, F, T>(&'static self, f: F) -> T where - F: FnOnce(&Scope<'scope>) -> T, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, { self.with_worker(|worker| worker.scope(f)) } @@ -642,9 +643,11 @@ thread_local! { /// Workers have one core memory-safety guarantee: Any jobs added to the worker /// will eventually be executed. pub struct Worker { - pub(crate) migrated: Cell, - pub(crate) lease: Lease, + migrated: Cell, + lease: Lease, pub(crate) queue: JobQueue, + // Make non-send + _phantom: PhantomData<*const ()>, } /// Describes the outcome of a call to [`Worker::yield_now`] or [`Worker::yield_local`]. @@ -735,6 +738,7 @@ impl Worker { migrated: Cell::new(false), lease, queue: JobQueue::new(), + _phantom: PhantomData, }; // Swap the local pointer to point to the newly allocated worker. @@ -1099,23 +1103,15 @@ impl Worker { } } - /// Creates a scope on which non-static work can be spawned. Spawned jobs - /// may run asynchronously with respect to the closure; they may themselves - /// spawn additional tasks into the scope. When the closure returns, it will - /// block until all tasks that have been spawned into onto the scope complete. + /// Creates a scope onto which non-static work can be spawned. For more complete docs, see [`scope`]. /// - /// If you do not have access to a [`Worker`], you may call - /// [`ThreadPool::scope`] or simply [`scope`]. + /// If you do not have access to a worker, you can use [`ThreadPool::scope`] or simply [`scope`]. #[inline] - pub fn scope<'scope, F, T>(&self, f: F) -> T + pub fn scope<'env, F, T>(&self, f: F) -> T where - F: FnOnce(&Scope<'scope>) -> T, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, { - // SAFETY: The scope is pinned upon creation and dropped when the - // function returns. - let scope = unsafe { pin!(Scope::new()) }; - let scope_ref = Pin::get_ref(scope.into_ref()); - f(scope_ref) + with_scope(self, f) } } @@ -1209,14 +1205,213 @@ where }) } -/// Creates a scope that allows spawning non-static jobs. +/// Creates a "fork-join" scope and invokes the closure with a reference to +/// it. Work spawned onto this scope does not have to have a `'static` +/// lifetime, and can borrow local variables. Local borrowing is possible +/// because this function will not return until all work spawned on the +/// scope has completed, this ensuring the stack frame is kept alive for the +/// duration. /// -/// If there is no current thread pool, this panics. +///
+/// Note: This function panics if the current thread is not registered as a worker. +///
+/// +/// # Alternatives +/// +/// Where possible, [`ThreadPool::scope`] or [`Worker::scope`] should be used +/// instead. These functions are more efficient, and do not panic when not +/// within a worker. +/// +/// Scopes are a more flexible building block compared to [`join()`], since a +/// loop can be used to spawn any number of tasks without recursing. +/// However, that flexibility comes at a performance price: tasks spawned +/// using `scope` must be allocated onto the heap, whereas [`join()`] can make +/// exclusive use of the stack. Prefer [`join()`] (or ideally [`Worker::join`]) where possible. +/// +/// [`join()`]: Worker::join +/// +/// # Accessing stack data +/// +/// In general, spawned tasks may borrow any stack data that lives outside +/// the scope closure. +/// +/// ``` +/// # use forte::ThreadPool; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// # THREAD_POOL.populate(); +/// # THREAD_POOL.with_worker(|worker| { +/// let ok: Vec = vec![1, 2, 3]; +/// forte::scope(|scope| { +/// let bad: Vec = vec![4, 5, 6]; +/// scope.spawn_on(worker, |_| { +/// // Transfer ownership of `bad` into a local variable (also named `bad`). +/// // This will force the closure to take ownership of `bad` from the environment. +/// let bad = bad; +/// println!("ok: {:?}", ok); // `ok` is only borrowed. +/// println!("bad: {:?}", bad); // refers to our local variable, above. +/// }); +/// +/// scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// }); +/// # }); +/// ``` +/// As the comments example above suggest, to reference `bad` we must +/// take ownership of it. One way to do this is to detach the closure +/// from the surrounding stack frame, using the `move` keyword. This +/// will cause it to take ownership of *all* the variables it touches, +/// in this case including both `ok` *and* `bad`: +/// +/// ```rust +/// # use forte::ThreadPool; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// # THREAD_POOL.populate(); +/// # THREAD_POOL.with_worker(|worker| { +/// let ok: Vec = vec![1, 2, 3]; +/// forte::scope(|scope| { +/// let bad: Vec = vec![4, 5, 6]; +/// scope.spawn_on(worker, move |_| { +/// println!("ok: {:?}", ok); +/// println!("bad: {:?}", bad); +/// }); +/// +/// // That closure is fine, but now we can't use `ok` anywhere else, +/// // since it is owned by the previous task: +/// // scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); +/// }); +/// # }); +/// ``` +/// +/// While this works, it could be a problem if we want to use `ok` elsewhere. +/// There are two choices. We can keep the closure as a `move` closure, but +/// instead of referencing the variable `ok`, we create a shadowed variable that +/// is a borrow of `ok` and capture *that*: +/// +/// ```rust +/// # use forte::ThreadPool; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// # THREAD_POOL.populate(); +/// # THREAD_POOL.with_worker(|worker| { +/// let ok: Vec = vec![1, 2, 3]; +/// forte::scope(|scope| { +/// let bad: Vec = vec![4, 5, 6]; +/// let ok: &Vec = &ok; // shadow the original `ok` +/// scope.spawn_on(worker, move |_| { +/// println!("ok: {:?}", ok); // captures the shadowed version +/// println!("bad: {:?}", bad); +/// }); +/// +/// // Now we too can use the shadowed `ok`, since `&Vec` references +/// // can be shared freely. Note that we need a `move` closure here though, +/// // because otherwise we'd be trying to borrow the shadowed `ok`, +/// // and that doesn't outlive `scope`. +/// scope.spawn_on(worker, move |_| println!("ok: {:?}", ok)); +/// }); +/// # }); +/// ``` +/// +/// Another option is not to use the `move` keyword but instead to take ownership +/// of individual variables: +/// +/// ```rust +/// # use forte::ThreadPool; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// # THREAD_POOL.populate(); +/// # THREAD_POOL.with_worker(|worker| { +/// let ok: Vec = vec![1, 2, 3]; +/// forte::scope(|scope| { +/// let bad: Vec = vec![4, 5, 6]; +/// scope.spawn_on(worker, |_| { +/// // Transfer ownership of `bad` into a local variable (also named `bad`). +/// // This will force the closure to take ownership of `bad` from the environment. +/// let bad = bad; +/// println!("ok: {:?}", ok); // `ok` is only borrowed. +/// println!("bad: {:?}", bad); // refers to our local variable, above. +/// }); +/// +/// scope.spawn_on(worker, |_| println!("ok: {:?}", ok)); // we too can borrow `ok` +/// }); +/// # }); +/// ``` +/// +/// # Referencing the scope +/// +/// The scope passed into the closure is not allowed to leak out of this call. +/// In other words, this will fail to compile: /// -/// See also: [`Worker::scope`] and [`ThreadPool::scope`]. -pub fn scope<'scope, F, T>(f: F) -> T +/// ```compile_fail +/// # use forte::ThreadPool; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// # THREAD_POOL.populate(); +/// # THREAD_POOL.with_worker(|worker| { +/// let mut leak = None; +/// forte::scope(|scope| { +/// leak = Some(scope); // <-- scope would be leaked here +/// }); +/// drop(leak); +/// # }); +/// ``` +/// +/// Anything spawned onto the scope can capture a reference to it. +/// This allows scoped work to spawn other scoped work. +/// +/// ``` +/// # use forte::ThreadPool; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// # THREAD_POOL.populate(); +/// # THREAD_POOL.with_worker(|worker| { +/// let mut counter = 0; +/// forte::scope(|scope| { +/// scope.spawn_on(worker, |worker| { +/// counter += 1; +/// // Note: we borrow the scope again here. +/// scope.spawn_on(worker, |_| { +/// counter += 1; +/// }); +/// }); +/// }); +/// assert_eq!(counter, 2); +/// # }); +/// ``` +/// +/// It's possible to spawn non-scoped work within the closure, but these +/// generally can't hold references to the scope. So for example, the +/// following also fails to compile: +/// +/// ```compile_fail +/// # use forte::ThreadPool; +/// # static THREAD_POOL: ThreadPool = ThreadPool::new(); +/// # THREAD_POOL.populate(); +/// THREAD_POOL.with_worker(|worker| { +/// worker.scope(|scope| { +/// worker.spawn(|worker| { +/// // This isn't allowed, because it makes the worker borrow the scope +/// scope.spawn_on(worker |_| { +/// // ... +/// }); +/// }); +/// }); +/// }); +/// ``` +/// +/// # Panics +/// +/// This function panics when not called within a worker. The +/// [`ThreadPool::scope`] and [`Worker::scope`] functions do not, and should be +/// preferred when possible. +/// +/// If a panic occurs, either in the closure given to `scope()` or in a blocking +/// (non-async) job spawned on the scope, that panic will be propagated and the +/// call to `scope()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. Regardless, +/// once a task is spawned using `scope.spawn(),` it will execute, even if the +/// spawning task should later panic. The scope returns once all work is +/// complete, and panics are propagated at that point. +/// +/// Note: Panics in futures are instead propagated to their [`Task`], and will +/// not cause the scope to panic. +pub fn scope<'env, F, T>(f: F) -> T where - F: FnOnce(&Scope<'scope>) -> T, + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, { Worker::with_current(|worker| { worker @@ -1331,7 +1526,6 @@ fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc) { #[cfg(all(test, not(feature = "shuttle")))] mod tests { use alloc::vec; - use core::sync::atomic::AtomicU8; use super::*; @@ -1401,31 +1595,4 @@ mod tests { THREAD_POOL.depopulate(); } - - #[test] - fn concurrent_scopes() { - const NUM_JOBS: u8 = 128; - - static THREAD_POOL: ThreadPool = ThreadPool::new(); - THREAD_POOL.resize_to(4); - - let a = AtomicU8::new(0); - let b = AtomicU8::new(0); - - THREAD_POOL.scope(|scope| { - for _ in 0..NUM_JOBS { - scope.spawn(|_| { - THREAD_POOL.join( - |_| a.fetch_add(1, Ordering::Relaxed), - |_| b.fetch_add(1, Ordering::Relaxed), - ); - }); - } - }); - - assert_eq!(a.load(Ordering::Relaxed), NUM_JOBS); - assert_eq!(b.load(Ordering::Relaxed), NUM_JOBS); - - THREAD_POOL.depopulate(); - } }