diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index b19e3d887cb..e3c4ca9bc88 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -3,18 +3,78 @@ use std::panic::AssertUnwindSafe; +use futures::future::{pending, FutureExt}; +#[cfg(tokio_unstable)] +use tokio::runtime::LocalRuntime; use tokio::sync::oneshot; +use tokio::task::LocalSet; use tokio::time::Duration; use tokio_util::task::JoinMap; -use futures::future::FutureExt; - fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_current_thread() .build() .unwrap() } +// Spawn `N` tasks that return their index (`i`). +fn spawn_index_tasks(map: &mut JoinMap, n: usize, on: Option<&LocalSet>) { + for i in 0..n { + let rc = std::rc::Rc::new(i); + match on { + None => map.spawn_local(i, async move { *rc }), + Some(local) => map.spawn_local_on(i, async move { *rc }, local), + }; + } +} + +// Spawn `N` “pending” tasks that own a `oneshot::Sender`. +// When the task is aborted the sender is dropped, which is observed +// via the returned `Receiver`s. +fn spawn_pending_tasks( + map: &mut JoinMap, + receivers: &mut Vec>, + n: usize, + on: Option<&LocalSet>, +) { + for i in 0..n { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + let fut = async move { + pending::<()>().await; + drop(tx); + }; + match on { + None => map.spawn_local(i, fut), + Some(local) => map.spawn_local_on(i, fut, local), + }; + } +} + +/// Await every task in JoinMap and assert every task returns its own key. +async fn drain_joinmap_and_assert(mut map: JoinMap, n: usize) { + let mut seen = vec![false; n]; + while let Some((k, res)) = map.join_next().await { + let v = res.expect("task panicked"); + assert_eq!(k, v); + seen[v] = true; + } + assert!(seen.into_iter().all(|b| b)); + assert!(map.is_empty()); +} + +// Await every receiver and assert they all return `Err` because the +// corresponding sender (inside an aborted task) was dropped. +async fn await_receivers_and_assert(receivers: Vec>) { + for rx in receivers { + assert!( + rx.await.is_err(), + "task should have been aborted and sender dropped" + ); + } +} + #[tokio::test(start_paused = true)] async fn test_with_sleep() { let mut map = JoinMap::new(); @@ -376,3 +436,237 @@ async fn duplicate_keys_drop() { assert!(map.join_next().await.is_none()); } + +mod spawn_local { + use super::*; + + mod local_runtime { + #[cfg(tokio_unstable)] + use super::*; + + /// Spawn several tasks, and then join all tasks. + #[cfg(tokio_unstable)] + #[test] + fn spawn_then_join_next() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut map = JoinMap::new(); + spawn_index_tasks(&mut map, N, None); + + assert!(map.join_next().now_or_never().is_none()); + drain_joinmap_and_assert(map, N).await; + }); + } + + /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`]. + #[cfg(tokio_unstable)] + #[test] + fn spawn_then_shutdown() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, None); + assert!(map.join_next().now_or_never().is_none()); + + map.shutdown().await; + assert!(map.is_empty()); + await_receivers_and_assert(receivers).await; + }); + } + + /// Spawn several pending-forever tasks, and then drop the [`JoinMap`]. + #[cfg(tokio_unstable)] + #[test] + fn spawn_then_drop() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, None); + assert!(map.join_next().now_or_never().is_none()); + + drop(map); + await_receivers_and_assert(receivers).await; + }); + } + } + + mod local_set { + use super::*; + + /// Spawn several tasks, and then join all tasks. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_join_next() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async move { + let mut map = JoinMap::new(); + spawn_index_tasks(&mut map, N, None); + drain_joinmap_and_assert(map, N).await; + }) + .await; + } + + /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`]. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_shutdown() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, None); + assert!(map.join_next().now_or_never().is_none()); + + map.shutdown().await; + assert!(map.is_empty()); + await_receivers_and_assert(receivers).await; + }) + .await; + } + + /// Spawn several pending-forever tasks, and then drop the [`JoinMap`]. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_drop() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, None); + assert!(map.join_next().now_or_never().is_none()); + + drop(map); + await_receivers_and_assert(receivers).await; + }) + .await; + } + } +} + +mod spawn_local_on { + use super::*; + + mod local_runtime { + #[cfg(tokio_unstable)] + use super::*; + + /// Spawn several tasks, and then join all tasks. + #[cfg(tokio_unstable)] + #[test] + fn spawn_then_join_next() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let local = LocalSet::new(); + let mut map = JoinMap::new(); + + spawn_index_tasks(&mut map, N, Some(&local)); + assert!(map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + drain_joinmap_and_assert(map, N).await; + }) + .await; + }); + } + } + + mod local_set { + use super::*; + + /// Spawn several tasks, and then join all tasks. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_join_next() { + const N: usize = 8; + let local = LocalSet::new(); + let mut pending_map = JoinMap::new(); + + spawn_index_tasks(&mut pending_map, N, Some(&local)); + assert!(pending_map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + drain_joinmap_and_assert(pending_map, N).await; + }) + .await; + } + + /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`]. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_shutdown() { + const N: usize = 8; + let local = LocalSet::new(); + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); + assert!(map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + map.shutdown().await; + assert!(map.is_empty()); + await_receivers_and_assert(receivers).await; + }) + .await; + } + + // Dropping a `JoinMap` whose tasks were queued with `spawn_local_on` + // must cancel all of them once the `LocalSet` is subsequently driven. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_on() { + const N: usize = 8; + let local = LocalSet::new(); + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); + assert!(map.join_next().now_or_never().is_none()); + + drop(map); + local + .run_until(async move { await_receivers_and_assert(receivers).await }) + .await; + } + + // Dropping a `JoinMap` whose tasks were queued with `spawn_local_on` + // when the `LocalSet` is already driven. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_on_running_localset() { + const N: usize = 8; + let local = LocalSet::new(); + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); + assert!(map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + drop(map); + await_receivers_and_assert(receivers).await; + }) + .await; + } + } +} diff --git a/tokio-util/tests/task_tracker.rs b/tokio-util/tests/task_tracker.rs index f0eb2442ca6..35b3311c558 100644 --- a/tokio-util/tests/task_tracker.rs +++ b/tokio-util/tests/task_tracker.rs @@ -1,5 +1,11 @@ #![warn(rust_2018_idioms)] +use futures::future::pending; +use std::rc::Rc; +#[cfg(tokio_unstable)] +use tokio::runtime::LocalRuntime; +use tokio::sync::mpsc; +use tokio::task::LocalSet; use tokio_test::{assert_pending, assert_ready, task}; use tokio_util::task::TaskTracker; @@ -176,3 +182,165 @@ fn notify_many() { assert_ready!(wait.poll()); } } + +#[cfg(tokio_unstable)] +#[test] +fn local_runtime_spawn_and_wait() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let tracker = TaskTracker::new(); + + for _ in 0..N { + tracker.spawn(async {}); + } + + for _ in 0..N { + tracker.spawn_on(async {}, rt.handle()); + } + + tracker.close(); + tracker.wait().await; + + assert!(tracker.is_empty()); + assert!(tracker.is_closed()); + }); +} + +#[cfg(tokio_unstable)] +#[test] +fn local_runtime_spawn_local() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let tracker = TaskTracker::new(); + + for _ in 0..N { + let rc = Rc::new(()); + tracker.spawn_local(async move { + drop(rc); + }); + } + + tracker.close(); + tracker.wait().await; + + assert!(tracker.is_empty()); + assert!(tracker.is_closed()); + }); +} + +#[cfg(tokio_unstable)] +#[test] +fn local_runtime_spawn_local_on_localset() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + let local_set = LocalSet::new(); + + rt.block_on(local_set.run_until(async { + let tracker = TaskTracker::new(); + + for _ in 0..N { + let rc = Rc::new(()); + tracker.spawn_local_on( + async move { + drop(rc); + }, + &local_set, + ); + } + + tracker.close(); + tracker.wait().await; + + assert!(tracker.is_empty()); + assert!(tracker.is_closed()); + })); +} + +#[tokio::test(flavor = "current_thread")] +async fn drop_spawn_local_on_running_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + let tracker = TaskTracker::new(); + let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + + for _i in 0..N { + let tx = tx.clone(); + tracker.spawn_local_on( + async move { + pending::<()>().await; + drop(tx); + }, + &local, + ); + } + drop(tx); + + local + .run_until(async move { + drop(tracker); + tokio::task::yield_now().await; + + use tokio::sync::mpsc::error::TryRecvError; + + assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); + }) + .await; +} + +#[cfg(tokio_unstable)] +#[test] +fn spawn_local_after_close() { + const N: usize = 8; + + let rt = LocalRuntime::new().unwrap(); + rt.block_on(async { + let tracker = TaskTracker::new(); + + tracker.close(); + + for _ in 0..N { + let rc = Rc::new(()); + tracker.spawn_local(async move { + drop(rc); + }); + } + + tracker.wait().await; + + assert!(tracker.is_closed()); + assert!(tracker.is_empty()); + }); +} + +#[tokio::test(flavor = "current_thread")] +async fn spawn_local_on_after_close() { + const N: usize = 8; + + let local = LocalSet::new(); + let tracker = TaskTracker::new(); + + tracker.close(); + + for _ in 0..N { + let rc = Rc::new(()); + tracker.spawn_local_on( + async move { + drop(rc); + }, + &local, + ); + } + + local + .run_until(async move { + tracker.wait().await; + assert!(tracker.is_closed()); + assert!(tracker.is_empty()); + }) + .await; +} diff --git a/tokio/tests/rt_local.rs b/tokio/tests/rt_local.rs index 4eb88d48a4d..be360571a80 100644 --- a/tokio/tests/rt_local.rs +++ b/tokio/tests/rt_local.rs @@ -1,8 +1,10 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable))] +use std::panic; use tokio::runtime::LocalOptions; use tokio::task::spawn_local; +use tokio::task::LocalSet; #[test] fn test_spawn_local_in_runtime() { @@ -111,6 +113,33 @@ fn test_spawn_local_from_guard_other_thread() { spawn_local(async {}); } +// This test guarantees that **`tokio::task::spawn_local` panics** when it is invoked +// from a thread that is *not* running the `LocalRuntime` / `LocalSet` to which +// the task would belong. +// The test creates a `LocalRuntime` and `LocalSet`, drives the `LocalSet` on the `LocalRuntime`'s thread, +// then spawns a **separate OS thread** and tries to call +// `tokio::task::spawn_local` there. `std::panic::catch_unwind` is then used +// to capture the panic and to assert that it indeed occurs. +#[test] +#[cfg_attr(target_family = "wasm", ignore)] // threads not supported +fn test_spawn_local_panic() { + let rt = rt(); + let local = LocalSet::new(); + + rt.block_on(local.run_until(async { + let thread_result = std::thread::spawn(|| { + let panic_result = panic::catch_unwind(|| { + let _jh = tokio::task::spawn_local(async { + println!("you will never see this line"); + }); + }); + assert!(panic_result.is_err(), "Expected panic, but none occurred"); + }) + .join(); + assert!(thread_result.is_ok(), "Thread itself panicked unexpectedly"); + })); +} + fn rt() -> tokio::runtime::LocalRuntime { tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index 027f475e25f..99b181a9324 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -1,9 +1,12 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use futures::future::FutureExt; +use futures::future::{pending, FutureExt}; +use std::panic; +#[cfg(tokio_unstable)] +use tokio::runtime::LocalRuntime; use tokio::sync::oneshot; -use tokio::task::JoinSet; +use tokio::task::{JoinSet, LocalSet}; use tokio::time::Duration; fn rt() -> tokio::runtime::Runtime { @@ -12,6 +15,64 @@ fn rt() -> tokio::runtime::Runtime { .unwrap() } +// Spawn `N` tasks that return their index (`i`). +fn spawn_index_tasks(set: &mut JoinSet, n: usize, on: Option<&LocalSet>) { + for i in 0..n { + let rc = std::rc::Rc::new(i); + match on { + None => set.spawn_local(async move { *rc }), + Some(local) => set.spawn_local_on(async move { *rc }, local), + }; + } +} + +// Spawn `N` “pending” tasks that own a `oneshot::Sender`. +// When the task is aborted the sender is dropped, which is observed +// via the returned `Receiver`s. +fn spawn_pending_tasks( + set: &mut JoinSet<()>, + receivers: &mut Vec>, + n: usize, + on: Option<&LocalSet>, +) { + for _ in 0..n { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + let fut = async move { + pending::<()>().await; + drop(tx); + }; + + match on { + None => set.spawn_local(fut), + Some(local) => set.spawn_local_on(fut, local), + }; + } +} + +// Await every task in a JoinSet and assert every task returns its own index. +async fn drain_joinset_and_assert(mut set: JoinSet, n: usize) { + let mut seen = vec![false; n]; + while let Some(res) = set.join_next().await { + let idx = res.expect("task panicked"); + seen[idx] = true; + } + assert!(seen.into_iter().all(|b| b)); + assert!(set.is_empty()); +} + +// Await every receiver and assert they all return `Err` because the +// corresponding sender (inside an aborted task) was dropped. +async fn await_receivers_and_assert(receivers: Vec>) { + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } +} + #[tokio::test(start_paused = true)] async fn test_with_sleep() { let mut set = JoinSet::new(); @@ -344,3 +405,242 @@ async fn try_join_next_with_id() { assert_eq!(count, TASK_NUM); assert_eq!(joined, spawned); } + +mod spawn_local { + use super::*; + + mod local_runtime { + #[cfg(tokio_unstable)] + use super::*; + + /// Spawn several tasks, and then join all tasks. + #[cfg(tokio_unstable)] + #[test] + fn spawn_then_join_next() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut set = JoinSet::new(); + spawn_index_tasks(&mut set, N, None); + + assert!(set.try_join_next().is_none()); + drain_joinset_and_assert(set, N).await; + }); + } + + /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`]. + #[cfg(tokio_unstable)] + #[test] + fn spawn_then_shutdown() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut set, &mut receivers, N, None); + + assert!(set.try_join_next().is_none()); + set.shutdown().await; + assert!(set.is_empty()); + + await_receivers_and_assert(receivers).await; + }); + } + + /// Spawn several pending-forever tasks, and then drop the [`JoinSet`]. + #[cfg(tokio_unstable)] + #[test] + fn spawn_then_drop() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut set, &mut receivers, N, None); + + assert!(set.try_join_next().is_none()); + drop(set); + + await_receivers_and_assert(receivers).await; + }); + } + } + + mod local_set { + use super::*; + + /// Spawn several tasks, and then join all tasks. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_join_next() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async move { + let mut set = JoinSet::new(); + spawn_index_tasks(&mut set, N, None); + drain_joinset_and_assert(set, N).await; + }) + .await; + } + + /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`]. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_shutdown() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut set, &mut receivers, N, None); + assert!(set.try_join_next().is_none()); + + set.shutdown().await; + assert!(set.is_empty()); + + await_receivers_and_assert(receivers).await; + }) + .await; + } + + /// Spawn several pending-forever tasks, and then drop the [`JoinSet`]. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_drop() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut set, &mut receivers, N, None); + assert!(set.try_join_next().is_none()); + + drop(set); + await_receivers_and_assert(receivers).await; + }) + .await; + } + } +} + +mod spawn_local_on { + use super::*; + + mod local_runtime { + #[cfg(tokio_unstable)] + use super::*; + + /// Spawn several tasks, and then join all tasks. + #[cfg(tokio_unstable)] + #[test] + fn spawn_then_join_next() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let local = LocalSet::new(); + let mut set = JoinSet::new(); + + spawn_index_tasks(&mut set, N, Some(&local)); + assert!(set.try_join_next().is_none()); + + local + .run_until(async move { + drain_joinset_and_assert(set, N).await; + }) + .await; + }); + } + } + + mod local_set { + use super::*; + + /// Spawn several tasks, and then join all tasks. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_join_next() { + const N: usize = 8; + let local = LocalSet::new(); + let mut pending_set = JoinSet::new(); + + spawn_index_tasks(&mut pending_set, N, Some(&local)); + assert!(pending_set.try_join_next().is_none()); + + local + .run_until(async move { + drain_joinset_and_assert(pending_set, N).await; + }) + .await; + } + + /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`]. + #[tokio::test(flavor = "current_thread")] + async fn spawn_then_shutdown() { + const N: usize = 8; + let local = LocalSet::new(); + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); + assert!(set.try_join_next().is_none()); + + local + .run_until(async move { + set.shutdown().await; + assert!(set.is_empty()); + await_receivers_and_assert(receivers).await; + }) + .await; + } + + // Dropping a `JoinSet` whose tasks were queued with `spawn_local_on` + // must cancel all of them once the `LocalSet` is subsequently driven. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_on() { + const N: usize = 8; + let local = LocalSet::new(); + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); + assert!(set.try_join_next().is_none()); + drop(set); + + local + .run_until(async move { + await_receivers_and_assert(receivers).await; + }) + .await; + } + + // Dropping a `JoinSet` whose tasks were queued with `spawn_local_on` + // when the `LocalSet` is already driven. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_on_running_localset() { + const N: usize = 8; + let local = LocalSet::new(); + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); + assert!(set.try_join_next().is_none()); + + local + .run_until(async move { + drop(set); + await_receivers_and_assert(receivers).await; + }) + .await; + } + } +}