From 9b09c4f36c43ed953c88957e45eba98fc8490ff6 Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Thu, 11 Sep 2025 10:48:46 +0200 Subject: [PATCH 01/15] Add new tests for JoinSet::spawn_local/JoinSet::spawn_local_on and LocalRuntime. Fixes: #7562 --- tokio/tests/rt_local.rs | 21 ++ tokio/tests/task_join_set.rs | 396 ++++++++++++++++++++++++++++++++++- 2 files changed, 415 insertions(+), 2 deletions(-) diff --git a/tokio/tests/rt_local.rs b/tokio/tests/rt_local.rs index 4eb88d48a4d..16ee9ccd5b9 100644 --- a/tokio/tests/rt_local.rs +++ b/tokio/tests/rt_local.rs @@ -1,8 +1,10 @@ #![warn(rust_2018_idioms)] #![cfg(all(feature = "full", tokio_unstable))] +use std::panic; use tokio::runtime::LocalOptions; use tokio::task::spawn_local; +use tokio::task::LocalSet; #[test] fn test_spawn_local_in_runtime() { @@ -111,6 +113,25 @@ fn test_spawn_local_from_guard_other_thread() { spawn_local(async {}); } +#[test] +fn test_spawn_local_panic() { + let rt = rt(); + let local = LocalSet::new(); + + rt.block_on(local.run_until(async { + let thread_result = std::thread::spawn(|| { + let panic_result = panic::catch_unwind(|| { + let _jh = tokio::task::spawn_local(async { + println!("you will never see this line"); + }); + }); + assert!(panic_result.is_err(), "Expected panic, but none occurred"); + }) + .join(); + assert!(thread_result.is_ok(), "Thread itself panicked unexpectedly"); + })); +} + fn rt() -> tokio::runtime::LocalRuntime { tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index 027f475e25f..a80673ee493 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -1,9 +1,12 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use futures::future::FutureExt; +use futures::future::{pending, FutureExt}; +use std::panic; +#[cfg(tokio_unstable)] +use tokio::runtime::LocalRuntime; use tokio::sync::oneshot; -use tokio::task::JoinSet; +use tokio::task::{JoinSet, LocalSet}; use tokio::time::Duration; fn rt() -> tokio::runtime::Runtime { @@ -344,3 +347,392 @@ async fn try_join_next_with_id() { assert_eq!(count, TASK_NUM); assert_eq!(joined, spawned); } + +// JoinSet::spawn_local on a LocalRuntime +// We create a `LocalRuntime`, enter it, spawn several local tasks with +// `JoinSet::spawn_local`, and wait for every task to finish. +#[cfg(tokio_unstable)] +#[test] +fn spawn_local_local_runtime() { + const N: usize = 8; + + let rt = LocalRuntime::new().expect("failed to create LocalRuntime"); + + let completed: [bool; N] = rt.block_on(async { + let mut set = JoinSet::new(); + for i in 0..N { + let rc = std::rc::Rc::new(i); + set.spawn_local(async move { *rc }); + } + + assert!(set.try_join_next().is_none()); + + let mut seen = [false; N]; + while let Some(res) = set.join_next().await { + let idx = res.expect("task panicked"); + seen[idx] = true; + } + + assert!(set.is_empty()); + seen + }); + + assert!(completed.into_iter().all(|b| b)); +} + +// Drive a `LocalSet` on top of a `LocalRuntime` and verify that tasks +// queued with `JoinSet::spawn_local_on` run to completion only after the +// `LocalSet` starts. +#[cfg(tokio_unstable)] +#[test] +fn spawn_local_on_local_runtime() { + const N: usize = 8; + + let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + + rt.block_on(async { + let local = LocalSet::new(); + let mut set = JoinSet::new(); + for i in 0..N { + let rc = std::rc::Rc::new(i); + set.spawn_local_on(async move { *rc }, &local); + } + + assert!(set.try_join_next().is_none()); + + let mut results = local + .run_until(async move { + let mut out = Vec::::with_capacity(N); + while let Some(res) = set.join_next().await { + out.push(res.expect("task panicked")); + } + assert!(set.is_empty()); + out + }) + .await; + + results.sort(); + assert_eq!(results, (0..N).collect::>()); + }); +} + +// Calling `JoinSet::shutdown` inside a `LocalRuntime` must +// abort and drain every still-running task that was +// inserted with `JoinSet::spawn_local`. +#[cfg(tokio_unstable)] +#[test] +fn shutdown_spawn_local_local_runtime() { + const N: usize = 8; + + let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + + rt.block_on(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + for _ in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + set.spawn_local(async move { + pending::<()>().await; + drop(tx); + }); + } + + assert!(set.try_join_next().is_none()); + + set.shutdown().await; + + assert!(set.is_empty()); + + for rx in receivers { + assert!( + rx.await.is_err(), + "task should have been aborted and sender dropped", + ); + } + }); +} + +// Dropping a `JoinSet` created inside a **LocalRuntime** +// must abort every still-running `!Send` task that was +// added with `JoinSet::spawn_local`. +#[cfg(tokio_unstable)] +#[test] +fn drop_spawn_local_local_runtime() { + const N: usize = 8; + + let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + + rt.block_on(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + for _ in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + set.spawn_local(async move { + pending::<()>().await; + drop(tx); + }); + } + + assert!(set.try_join_next().is_none()); + + drop(set); + + for rx in receivers { + assert!( + rx.await.is_err(), + "task should have been aborted and sender dropped", + ); + } + }); +} + +// JoinSet::spawn_local on a LocalSet. +// Every task is spawned with `spawn_local` **inside** the `LocalSet` that is +// currently running. +#[tokio::test(flavor = "current_thread")] +async fn spawn_local_running_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + local + .run_until(async move { + let mut set = JoinSet::new(); + + for i in 0..N { + set.spawn_local(async move { i }); + } + let mut seen = [false; N]; + while let Some(res) = set.join_next().await { + let idx = res.expect("task failed"); + seen[idx] = true; + } + assert!(seen.into_iter().all(|b| b)); + assert!(set.is_empty()); + }) + .await; +} + +// JoinSet::spawn_local on a LocalSet. +// Tasks are queued with `spawn_local_on` **before** the `LocalSet` is +// running, then executed once the `LocalSet` is started. +#[tokio::test(flavor = "current_thread")] +async fn spawn_local_on_on_started_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + let mut pending_set = JoinSet::new(); + + for i in 0..N { + pending_set.spawn_local_on(async move { i }, &local); + } + + assert!(pending_set.try_join_next().is_none()); + + local + .run_until(async move { + let mut set = pending_set; + let mut out = Vec::new(); + + while let Some(res) = set.join_next().await { + out.push(res.expect("task failed")); + } + out.sort(); + assert_eq!(out, (0..N).collect::>()); + assert!(set.is_empty()); + }) + .await; +} + +// Shutdown a `JoinSet` that was populated with `spawn_local` must abort all +// still-running tasks. +#[tokio::test(flavor = "current_thread")] +async fn shutdown_spawn_local_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + local + .run_until(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + for _ in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + set.spawn_local(async move { + pending::<()>().await; + drop(tx); + }); + } + + assert!(set.try_join_next().is_none()); + + set.shutdown().await; + + assert!(set.is_empty()); + + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } + }) + .await; +} + +// Calling `JoinSet::shutdown` on a set whose tasks were queued with +// `spawn_local_on` must abort all of them once the `LocalSet` is driven. +#[tokio::test(flavor = "current_thread")] +async fn shutdown_spawn_local_on() { + const N: usize = 8; + + let local = LocalSet::new(); + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + for _ in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + set.spawn_local_on( + async move { + pending::<()>().await; + drop(tx); + }, + &local, + ); + } + + assert!(set.try_join_next().is_none()); + + local + .run_until(async move { + set.shutdown().await; + assert!(set.is_empty()); + + for rx in receivers { + assert!( + rx.await.is_err(), + "task should have been aborted and sender dropped" + ); + } + }) + .await; +} + +// Dropping a `JoinSet` whose tasks were queued with `spawn_local_on` +// must cancel all of them once the `LocalSet` is subsequently driven. +#[tokio::test(flavor = "current_thread")] +async fn drop_spawn_local_on() { + const N: usize = 8; + + let local = LocalSet::new(); + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + for _ in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + set.spawn_local_on( + async move { + pending::<()>().await; + drop(tx); + }, + &local, + ); + } + + assert!(set.try_join_next().is_none()); + + drop(set); + + local + .run_until(async move { + for rx in receivers { + assert!( + rx.await.is_err(), + "task should have been aborted and sender dropped" + ); + } + }) + .await; +} + +// Dropping a `JoinSet` that was populated with `spawn_local` must abort all +// still-running tasks. +#[tokio::test(flavor = "current_thread")] +async fn drop_spawn_local_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + local + .run_until(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + for _ in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + set.spawn_local(async move { + pending::<()>().await; + drop(tx); + }); + } + + assert!(set.try_join_next().is_none()); + + drop(set); + + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } + }) + .await; +} + +// Dropping a `JoinSet` whose tasks were queued with `spawn_local_on` +// when the `LocalSet` is already driven. +#[tokio::test(flavor = "current_thread")] +async fn drop_spawn_local_on_running_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + for _ in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + set.spawn_local_on( + async move { + pending::<()>().await; + drop(tx); + }, + &local, + ); + } + + assert!(set.try_join_next().is_none()); + + local + .run_until(async move { + drop(set); + for rx in receivers { + assert!( + rx.await.is_err(), + "task should have been aborted and sender dropped" + ); + } + }) + .await; +} From 2a07d7fd2966cfe7a93d4ca16f06515fcebdfc53 Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Thu, 11 Sep 2025 15:36:26 +0200 Subject: [PATCH 02/15] ignore new test in rt_local for wasm target family --- tokio/tests/rt_local.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/tests/rt_local.rs b/tokio/tests/rt_local.rs index 16ee9ccd5b9..c5529069c94 100644 --- a/tokio/tests/rt_local.rs +++ b/tokio/tests/rt_local.rs @@ -114,6 +114,7 @@ fn test_spawn_local_from_guard_other_thread() { } #[test] +#[cfg_attr(target_family = "wasm", ignore)] // threads not supported fn test_spawn_local_panic() { let rt = rt(); let local = LocalSet::new(); From 2a8deed123b18b1621135334410878638395e89b Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Thu, 25 Sep 2025 16:51:35 +0200 Subject: [PATCH 03/15] updated rt_local.rs and task_join_set.rs according to review's comments --- tokio/tests/rt_local.rs | 7 +++++++ tokio/tests/task_join_set.rs | 4 ++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/tokio/tests/rt_local.rs b/tokio/tests/rt_local.rs index c5529069c94..be360571a80 100644 --- a/tokio/tests/rt_local.rs +++ b/tokio/tests/rt_local.rs @@ -113,6 +113,13 @@ fn test_spawn_local_from_guard_other_thread() { spawn_local(async {}); } +// This test guarantees that **`tokio::task::spawn_local` panics** when it is invoked +// from a thread that is *not* running the `LocalRuntime` / `LocalSet` to which +// the task would belong. +// The test creates a `LocalRuntime` and `LocalSet`, drives the `LocalSet` on the `LocalRuntime`'s thread, +// then spawns a **separate OS thread** and tries to call +// `tokio::task::spawn_local` there. `std::panic::catch_unwind` is then used +// to capture the panic and to assert that it indeed occurs. #[test] #[cfg_attr(target_family = "wasm", ignore)] // threads not supported fn test_spawn_local_panic() { diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index a80673ee493..89e9eb07ea3 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -455,8 +455,8 @@ fn shutdown_spawn_local_local_runtime() { }); } -// Dropping a `JoinSet` created inside a **LocalRuntime** -// must abort every still-running `!Send` task that was +// Dropping a `JoinSet` created inside a `LocalRuntime` +// must abort every still-running task that was // added with `JoinSet::spawn_local`. #[cfg(tokio_unstable)] #[test] From ebe940eeedabbc8e5c4aa5aefcccdce5642e213b Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Fri, 26 Sep 2025 10:29:57 +0200 Subject: [PATCH 04/15] add tests for JoinMap in task_join_map.rs --- tokio-util/tests/task_join_map.rs | 399 +++++++++++++++++++++++++++++- tokio/tests/task_join_set.rs | 10 +- 2 files changed, 402 insertions(+), 7 deletions(-) diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index b19e3d887cb..5aad5dc131c 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -3,12 +3,13 @@ use std::panic::AssertUnwindSafe; +use futures::future::{pending, FutureExt}; +use tokio::runtime::LocalRuntime; use tokio::sync::oneshot; +use tokio::task::LocalSet; use tokio::time::Duration; use tokio_util::task::JoinMap; -use futures::future::FutureExt; - fn rt() -> tokio::runtime::Runtime { tokio::runtime::Builder::new_current_thread() .build() @@ -376,3 +377,397 @@ async fn duplicate_keys_drop() { assert!(map.join_next().await.is_none()); } + +// JoinMap::spawn_local on a LocalRuntime +// We create a `LocalRuntime`, enter it, spawn several local tasks with +// `JoinMap::spawn_local`, and wait for every task to finish. +#[cfg(tokio_unstable)] +#[test] +fn spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().expect("failed to create LocalRuntime"); + + let completed: [bool; N] = rt.block_on(async { + let mut map = JoinMap::new(); + for i in 0..N { + let rc = std::rc::Rc::new(i); + map.spawn_local(i, async move { *rc }); + } + + assert!(map.join_next().now_or_never().is_none()); + + let mut seen = [false; N]; + while let Some((k, res)) = map.join_next().await { + let v = res.expect("task panicked"); + assert_eq!(k, v); + seen[v] = true; + } + + assert!(map.is_empty()); + seen + }); + + assert!(completed.into_iter().all(|b| b)); +} + +// Drive a `JoinMap` on top of a `LocalRuntime` and verify that tasks +// queued with `JoinMap::spawn_local_on` run to completion only after the +// `LocalSet` starts. +#[cfg(tokio_unstable)] +#[test] +fn spawn_local_on_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + + rt.block_on(async { + let local = LocalSet::new(); + let mut map = JoinMap::new(); + for i in 0..N { + let rc = std::rc::Rc::new(i); + map.spawn_local_on(i, async move { *rc }, &local); + } + + assert!(map.join_next().now_or_never().is_none()); + + let mut results = local + .run_until(async move { + let mut out = Vec::::with_capacity(N); + while let Some((k, res)) = map.join_next().await { + let v = res.expect("task panicked"); + assert_eq!(k, v); + out.push(v); + } + assert!(map.is_empty()); + out + }) + .await; + + results.sort(); + assert_eq!(results, (0..N).collect::>()); + }); +} + +// Calling `JoinMap::shutdown` inside a `LocalRuntime` must +// abort and drain every still-running task that was +// inserted with `JoinMap::spawn_local`. +#[cfg(tokio_unstable)] +#[test] +fn shutdown_spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + + rt.block_on(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + for i in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + map.spawn_local(i, async move { + pending::<()>().await; + drop(tx); + }); + } + + assert!(map.join_next().now_or_never().is_none()); + + map.shutdown().await; + assert!(map.is_empty()); + + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped", + ); + } + }); +} + +// Dropping a `JoinMap` created inside a `LocalRuntime` +// must abort every still-running task that was +// added with `JoinMap::spawn_local`. +#[cfg(tokio_unstable)] +#[test] +fn drop_spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + + rt.block_on(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + for i in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + map.spawn_local(i, async move { + pending::<()>().await; + drop(tx); + }); + } + + assert!(map.join_next().now_or_never().is_none()); + + drop(map); + + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } + }); +} + +// JoinMap::spawn_local on a LocalSet. +// Every task is spawned with `spawn_local` **inside** the `LocalSet` that is +// currently running. +#[tokio::test(flavor = "current_thread")] +async fn spawn_local_running_localset() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async move { + let mut map = JoinMap::new(); + for i in 0..N { + map.spawn_local(i, async move { i }); + } + + let mut seen = [false; N]; + while let Some((k, res)) = map.join_next().await { + let v = res.expect("task failed"); + assert_eq!(k, v); + seen[v] = true; + } + assert!(seen.into_iter().all(|b| b)); + assert!(map.is_empty()); + }) + .await; +} + +// JoinMap::spawn_local on a LocalSet. +// Tasks are queued with `spawn_local_on` **before** the `LocalSet` is +// running, then executed once the `LocalSet` is started. +#[tokio::test(flavor = "current_thread")] +async fn spawn_local_on_on_started_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + let mut pending_map = JoinMap::new(); + + for i in 0..N { + pending_map.spawn_local_on(i, async move { i }, &local); + } + + assert!(pending_map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + let mut map = pending_map; + let mut out = Vec::new(); + + while let Some((k, res)) = map.join_next().await { + let v = res.expect("task failed"); + assert_eq!(k, v); + out.push(v); + } + out.sort(); + assert_eq!(out, (0..N).collect::>()); + assert!(map.is_empty()); + }) + .await; +} + +// Shutdown a `JoinMap` that was populated with `spawn_local` must abort all +// still-running tasks. +#[tokio::test(flavor = "current_thread")] +async fn shutdown_spawn_local_localset() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + for i in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + map.spawn_local(i, async move { + pending::<()>().await; + drop(tx); + }); + } + + assert!(map.join_next().now_or_never().is_none()); + + map.shutdown().await; + assert!(map.is_empty()); + + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } + }) + .await; +} + +// Calling `JoinMap::shutdown` on a set whose tasks were queued with +// `spawn_local_on` must abort all of them once the `LocalSet` is driven. +#[tokio::test(flavor = "current_thread")] +async fn shutdown_spawn_local_on() { + const N: usize = 8; + + let local = LocalSet::new(); + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + for i in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + map.spawn_local_on( + i, + async move { + pending::<()>().await; + drop(tx); + }, + &local, + ); + } + + assert!(map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + map.shutdown().await; + assert!(map.is_empty()); + + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } + }) + .await; +} + +// Dropping a `JoinMap` whose tasks were queued with `spawn_local_on` +// must cancel all of them once the `LocalSet` is subsequently driven. +#[tokio::test(flavor = "current_thread")] +async fn drop_spawn_local_on() { + const N: usize = 8; + + let local = LocalSet::new(); + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + for i in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + map.spawn_local_on( + i, + async move { + pending::<()>().await; + drop(tx); + }, + &local, + ); + } + + assert!(map.join_next().now_or_never().is_none()); + + drop(map); + + local + .run_until(async move { + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } + }) + .await; +} + +// Dropping a `JoinMap` that was populated with `spawn_local` must abort all +// still-running tasks. +#[tokio::test(flavor = "current_thread")] +async fn drop_spawn_local_localset() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + for i in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + map.spawn_local(i, async move { + pending::<()>().await; + drop(tx); + }); + } + + assert!(map.join_next().now_or_never().is_none()); + + drop(map); + + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } + }) + .await; +} + +// Dropping a `JoinMap` whose tasks were queued with `spawn_local_on` +// when the `LocalSet` is already driven. +#[tokio::test(flavor = "current_thread")] +async fn drop_spawn_local_on_running_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + for i in 0..N { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + map.spawn_local_on( + i, + async move { + pending::<()>().await; + drop(tx); + }, + &local, + ); + } + + assert!(map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + drop(map); + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } + }) + .await; +} diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index 89e9eb07ea3..6bed8dce8d2 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -449,7 +449,7 @@ fn shutdown_spawn_local_local_runtime() { for rx in receivers { assert!( rx.await.is_err(), - "task should have been aborted and sender dropped", + "the task should have been aborted and the sender dropped", ); } }); @@ -485,7 +485,7 @@ fn drop_spawn_local_local_runtime() { for rx in receivers { assert!( rx.await.is_err(), - "task should have been aborted and sender dropped", + "the task should have been aborted and the sender dropped", ); } }); @@ -618,7 +618,7 @@ async fn shutdown_spawn_local_on() { for rx in receivers { assert!( rx.await.is_err(), - "task should have been aborted and sender dropped" + "the task should have been aborted and the sender dropped" ); } }) @@ -657,7 +657,7 @@ async fn drop_spawn_local_on() { for rx in receivers { assert!( rx.await.is_err(), - "task should have been aborted and sender dropped" + "the task should have been aborted and the sender dropped" ); } }) @@ -730,7 +730,7 @@ async fn drop_spawn_local_on_running_localset() { for rx in receivers { assert!( rx.await.is_err(), - "task should have been aborted and sender dropped" + "the task should have been aborted and the sender dropped" ); } }) From 12f4713d51afd461f5870d4864e7ff1e892543cd Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Fri, 26 Sep 2025 10:35:39 +0200 Subject: [PATCH 05/15] add missing configuration attribute in task_join_map.rs --- tokio-util/tests/task_join_map.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 5aad5dc131c..26ac7ff771d 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -4,6 +4,7 @@ use std::panic::AssertUnwindSafe; use futures::future::{pending, FutureExt}; +#[cfg(tokio_unstable)] use tokio::runtime::LocalRuntime; use tokio::sync::oneshot; use tokio::task::LocalSet; From 13c211ae9b47d3d2c81ba852318122e175f0d055 Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Fri, 26 Sep 2025 16:58:37 +0200 Subject: [PATCH 06/15] add tests for TaskTracker in task_tracker.rs --- tokio-util/tests/task_tracker.rs | 168 +++++++++++++++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/tokio-util/tests/task_tracker.rs b/tokio-util/tests/task_tracker.rs index f0eb2442ca6..02a7481e5b8 100644 --- a/tokio-util/tests/task_tracker.rs +++ b/tokio-util/tests/task_tracker.rs @@ -1,5 +1,11 @@ #![warn(rust_2018_idioms)] +use futures::future::pending; +use std::rc::Rc; +#[cfg(tokio_unstable)] +use tokio::runtime::LocalRuntime; +use tokio::sync::mpsc; +use tokio::task::LocalSet; use tokio_test::{assert_pending, assert_ready, task}; use tokio_util::task::TaskTracker; @@ -176,3 +182,165 @@ fn notify_many() { assert_ready!(wait.poll()); } } + +#[cfg(tokio_unstable)] +#[test] +fn local_runtime_spawn_and_wait() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let tracker = TaskTracker::new(); + + for _ in 0..5 { + tracker.spawn(async {}); + } + + for _ in 0..N { + tracker.spawn_on(async {}, rt.handle()); + } + + tracker.close(); + tracker.wait().await; + + assert!(tracker.is_empty()); + assert!(tracker.is_closed()); + }); +} + +#[cfg(tokio_unstable)] +#[test] +fn local_runtime_spawn_local() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let tracker = TaskTracker::new(); + + for _ in 0..N { + let rc = Rc::new(()); + tracker.spawn_local(async move { + drop(rc); + }); + } + + tracker.close(); + tracker.wait().await; + + assert!(tracker.is_empty()); + assert!(tracker.is_closed()); + }); +} + +#[cfg(tokio_unstable)] +#[test] +fn local_runtime_spawn_local_on_localset() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + let local_set = LocalSet::new(); + + rt.block_on(local_set.run_until(async { + let tracker = TaskTracker::new(); + + for _ in 0..N { + let rc = Rc::new(()); + tracker.spawn_local_on( + async move { + drop(rc); + }, + &local_set, + ); + } + + tracker.close(); + tracker.wait().await; + + assert!(tracker.is_empty()); + assert!(tracker.is_closed()); + })); +} + +#[tokio::test(flavor = "current_thread")] +async fn drop_spawn_local_on_running_localset() { + const N: usize = 8; + + let local = LocalSet::new(); + let tracker = TaskTracker::new(); + let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + + for _i in 0..N { + let tx = tx.clone(); + tracker.spawn_local_on( + async move { + pending::<()>().await; + drop(tx); + }, + &local, + ); + } + drop(tx); + + local + .run_until(async move { + drop(tracker); + tokio::task::yield_now().await; + + use tokio::sync::mpsc::error::TryRecvError; + + assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty))); + }) + .await; +} + +#[cfg(tokio_unstable)] +#[test] +fn spawn_local_after_close() { + const N: usize = 8; + + let rt = LocalRuntime::new().unwrap(); + rt.block_on(async { + let tracker = TaskTracker::new(); + + tracker.close(); + + for _ in 0..N { + let rc = Rc::new(()); + tracker.spawn_local(async move { + drop(rc); + }); + } + + tracker.wait().await; + + assert!(tracker.is_closed()); + assert!(tracker.is_empty()); + }); +} + +#[tokio::test(flavor = "current_thread")] +async fn spawn_local_on_after_close() { + const N: usize = 8; + + let local = LocalSet::new(); + let tracker = TaskTracker::new(); + + tracker.close(); + + for _ in 0..N { + let rc = Rc::new(()); + tracker.spawn_local_on( + async move { + drop(rc); + }, + &local, + ); + } + + local + .run_until(async move { + tracker.wait().await; + assert!(tracker.is_closed()); + assert!(tracker.is_empty()); + }) + .await; +} From c69a37a47627ebff96ea3dc9068ac12afc38bb3b Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Fri, 26 Sep 2025 16:59:13 +0200 Subject: [PATCH 07/15] add tests for TaskTracker in task_tracker.rs --- tokio-util/tests/task_tracker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/tests/task_tracker.rs b/tokio-util/tests/task_tracker.rs index 02a7481e5b8..35b3311c558 100644 --- a/tokio-util/tests/task_tracker.rs +++ b/tokio-util/tests/task_tracker.rs @@ -192,7 +192,7 @@ fn local_runtime_spawn_and_wait() { rt.block_on(async { let tracker = TaskTracker::new(); - for _ in 0..5 { + for _ in 0..N { tracker.spawn(async {}); } From 22511fe8dfc33dc98c3c0d1900345234e2526349 Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Mon, 6 Oct 2025 16:11:08 +0200 Subject: [PATCH 08/15] refactored new JoinSet tests to eliminate duplicated code --- tokio/tests/task_join_set.rs | 267 ++++++++++++----------------------- 1 file changed, 89 insertions(+), 178 deletions(-) diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index 6bed8dce8d2..8a462ef69a7 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -15,6 +15,64 @@ fn rt() -> tokio::runtime::Runtime { .unwrap() } +// Spawn `N` tasks that return their index (`i`). +fn spawn_index_tasks(set: &mut JoinSet, n: usize, on: Option<&LocalSet>) { + for i in 0..n { + let rc = std::rc::Rc::new(i); + match on { + None => set.spawn_local(async move { *rc }), + Some(local) => set.spawn_local_on(async move { *rc }, local), + }; + } +} + +// Spawn `N` “pending” tasks that own a `oneshot::Sender`. +// When the task is aborted the sender is dropped, which is observed +// via the returned `Receiver`s. +fn spawn_pending_tasks( + set: &mut JoinSet<()>, + receivers: &mut Vec>, + n: usize, + on: Option<&LocalSet>, +) { + for _ in 0..n { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + let fut = async move { + pending::<()>().await; + drop(tx); + }; + + match on { + None => set.spawn_local(fut), + Some(local) => set.spawn_local_on(fut, local), + }; + } +} + +// Await every task in a JoinSet and assert every task returns its own index. +async fn drain_joinset_and_assert(mut set: JoinSet, n: usize) { + let mut seen = vec![false; n]; + while let Some(res) = set.join_next().await { + let idx = res.expect("task panicked"); + seen[idx] = true; + } + assert!(seen.into_iter().all(|b| b)); + assert!(set.is_empty()); +} + +// Await every receiver and assert they all return `Err` because the +// corresponding sender (inside an aborted task) was dropped. +async fn await_receivers_and_assert(receivers: Vec>) { + for rx in receivers { + assert!( + rx.await.is_err(), + "the task should have been aborted and the sender dropped" + ); + } +} + #[tokio::test(start_paused = true)] async fn test_with_sleep() { let mut set = JoinSet::new(); @@ -355,29 +413,16 @@ async fn try_join_next_with_id() { #[test] fn spawn_local_local_runtime() { const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); - let rt = LocalRuntime::new().expect("failed to create LocalRuntime"); - - let completed: [bool; N] = rt.block_on(async { + rt.block_on(async { let mut set = JoinSet::new(); - for i in 0..N { - let rc = std::rc::Rc::new(i); - set.spawn_local(async move { *rc }); - } + spawn_index_tasks(&mut set, N, None); assert!(set.try_join_next().is_none()); - let mut seen = [false; N]; - while let Some(res) = set.join_next().await { - let idx = res.expect("task panicked"); - seen[idx] = true; - } - - assert!(set.is_empty()); - seen + drain_joinset_and_assert(set, N).await; }); - - assert!(completed.into_iter().all(|b| b)); } // Drive a `LocalSet` on top of a `LocalRuntime` and verify that tasks @@ -387,32 +432,21 @@ fn spawn_local_local_runtime() { #[test] fn spawn_local_on_local_runtime() { const N: usize = 8; - - let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + let rt = LocalRuntime::new().unwrap(); rt.block_on(async { let local = LocalSet::new(); let mut set = JoinSet::new(); - for i in 0..N { - let rc = std::rc::Rc::new(i); - set.spawn_local_on(async move { *rc }, &local); - } + + spawn_index_tasks(&mut set, N, Some(&local)); assert!(set.try_join_next().is_none()); - let mut results = local + local .run_until(async move { - let mut out = Vec::::with_capacity(N); - while let Some(res) = set.join_next().await { - out.push(res.expect("task panicked")); - } - assert!(set.is_empty()); - out + drain_joinset_and_assert(set, N).await; }) .await; - - results.sort(); - assert_eq!(results, (0..N).collect::>()); }); } @@ -423,22 +457,13 @@ fn spawn_local_on_local_runtime() { #[test] fn shutdown_spawn_local_local_runtime() { const N: usize = 8; - - let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + let rt = LocalRuntime::new().unwrap(); rt.block_on(async { let mut set = JoinSet::new(); let mut receivers = Vec::new(); - for _ in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - set.spawn_local(async move { - pending::<()>().await; - drop(tx); - }); - } + spawn_pending_tasks(&mut set, &mut receivers, N, None); assert!(set.try_join_next().is_none()); @@ -446,12 +471,7 @@ fn shutdown_spawn_local_local_runtime() { assert!(set.is_empty()); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped", - ); - } + await_receivers_and_assert(receivers).await; }); } @@ -462,32 +482,19 @@ fn shutdown_spawn_local_local_runtime() { #[test] fn drop_spawn_local_local_runtime() { const N: usize = 8; - - let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + let rt = LocalRuntime::new().unwrap(); rt.block_on(async { let mut set = JoinSet::new(); let mut receivers = Vec::new(); - for _ in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - set.spawn_local(async move { - pending::<()>().await; - drop(tx); - }); - } + spawn_pending_tasks(&mut set, &mut receivers, N, None); assert!(set.try_join_next().is_none()); drop(set); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped", - ); - } + await_receivers_and_assert(receivers).await; }); } @@ -497,22 +504,13 @@ fn drop_spawn_local_local_runtime() { #[tokio::test(flavor = "current_thread")] async fn spawn_local_running_localset() { const N: usize = 8; - let local = LocalSet::new(); + local .run_until(async move { let mut set = JoinSet::new(); - - for i in 0..N { - set.spawn_local(async move { i }); - } - let mut seen = [false; N]; - while let Some(res) = set.join_next().await { - let idx = res.expect("task failed"); - seen[idx] = true; - } - assert!(seen.into_iter().all(|b| b)); - assert!(set.is_empty()); + spawn_index_tasks(&mut set, N, None); + drain_joinset_and_assert(set, N).await; }) .await; } @@ -523,27 +521,16 @@ async fn spawn_local_running_localset() { #[tokio::test(flavor = "current_thread")] async fn spawn_local_on_on_started_localset() { const N: usize = 8; - let local = LocalSet::new(); let mut pending_set = JoinSet::new(); - for i in 0..N { - pending_set.spawn_local_on(async move { i }, &local); - } + spawn_index_tasks(&mut pending_set, N, Some(&local)); assert!(pending_set.try_join_next().is_none()); local .run_until(async move { - let mut set = pending_set; - let mut out = Vec::new(); - - while let Some(res) = set.join_next().await { - out.push(res.expect("task failed")); - } - out.sort(); - assert_eq!(out, (0..N).collect::>()); - assert!(set.is_empty()); + drain_joinset_and_assert(pending_set, N).await; }) .await; } @@ -553,21 +540,14 @@ async fn spawn_local_on_on_started_localset() { #[tokio::test(flavor = "current_thread")] async fn shutdown_spawn_local_localset() { const N: usize = 8; - let local = LocalSet::new(); + local .run_until(async { let mut set = JoinSet::new(); let mut receivers = Vec::new(); - for _ in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - set.spawn_local(async move { - pending::<()>().await; - drop(tx); - }); - } + spawn_pending_tasks(&mut set, &mut receivers, N, None); assert!(set.try_join_next().is_none()); @@ -575,12 +555,7 @@ async fn shutdown_spawn_local_localset() { assert!(set.is_empty()); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }) .await; } @@ -590,23 +565,11 @@ async fn shutdown_spawn_local_localset() { #[tokio::test(flavor = "current_thread")] async fn shutdown_spawn_local_on() { const N: usize = 8; - let local = LocalSet::new(); let mut set = JoinSet::new(); let mut receivers = Vec::new(); - for _ in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - set.spawn_local_on( - async move { - pending::<()>().await; - drop(tx); - }, - &local, - ); - } + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); assert!(set.try_join_next().is_none()); @@ -614,13 +577,7 @@ async fn shutdown_spawn_local_on() { .run_until(async move { set.shutdown().await; assert!(set.is_empty()); - - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }) .await; } @@ -630,23 +587,11 @@ async fn shutdown_spawn_local_on() { #[tokio::test(flavor = "current_thread")] async fn drop_spawn_local_on() { const N: usize = 8; - let local = LocalSet::new(); let mut set = JoinSet::new(); let mut receivers = Vec::new(); - for _ in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - set.spawn_local_on( - async move { - pending::<()>().await; - drop(tx); - }, - &local, - ); - } + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); assert!(set.try_join_next().is_none()); @@ -654,12 +599,7 @@ async fn drop_spawn_local_on() { local .run_until(async move { - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }) .await; } @@ -676,25 +616,13 @@ async fn drop_spawn_local_localset() { let mut set = JoinSet::new(); let mut receivers = Vec::new(); - for _ in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - set.spawn_local(async move { - pending::<()>().await; - drop(tx); - }); - } + spawn_pending_tasks(&mut set, &mut receivers, N, None); assert!(set.try_join_next().is_none()); drop(set); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }) .await; } @@ -704,35 +632,18 @@ async fn drop_spawn_local_localset() { #[tokio::test(flavor = "current_thread")] async fn drop_spawn_local_on_running_localset() { const N: usize = 8; - let local = LocalSet::new(); let mut set = JoinSet::new(); let mut receivers = Vec::new(); - for _ in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - set.spawn_local_on( - async move { - pending::<()>().await; - drop(tx); - }, - &local, - ); - } + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); assert!(set.try_join_next().is_none()); local .run_until(async move { drop(set); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }) .await; } From de05b548bbc17c5fd33acefb56adb4ddd725ffc4 Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Tue, 7 Oct 2025 14:42:22 +0200 Subject: [PATCH 09/15] refactored new JoinMap tests to eliminate duplicated code --- tokio-util/tests/task_join_map.rs | 275 ++++++++++-------------------- 1 file changed, 87 insertions(+), 188 deletions(-) diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 26ac7ff771d..c175c94ec6e 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -17,6 +17,64 @@ fn rt() -> tokio::runtime::Runtime { .unwrap() } +// Spawn `N` tasks that return their index (`i`). +fn spawn_index_tasks(map: &mut JoinMap, n: usize, on: Option<&LocalSet>) { + for i in 0..n { + let rc = std::rc::Rc::new(i); + match on { + None => map.spawn_local(i, async move { *rc }), + Some(local) => map.spawn_local_on(i, async move { *rc }, local), + }; + } +} + +// Spawn `N` “pending” tasks that own a `oneshot::Sender`. +// When the task is aborted the sender is dropped, which is observed +// via the returned `Receiver`s. +fn spawn_pending_tasks( + map: &mut JoinMap, + receivers: &mut Vec>, + n: usize, + on: Option<&LocalSet>, +) { + for i in 0..n { + let (tx, rx) = oneshot::channel::<()>(); + receivers.push(rx); + + let fut = async move { + pending::<()>().await; + drop(tx); + }; + match on { + None => map.spawn_local(i, fut), + Some(local) => map.spawn_local_on(i, fut, local), + }; + } +} + +/// Await every task in JoinMap and assert every task returns its own key. +async fn drain_joinmap_and_assert(mut map: JoinMap, n: usize) { + let mut seen = vec![false; n]; + while let Some((k, res)) = map.join_next().await { + let v = res.expect("task panicked"); + assert_eq!(k, v); + seen[v] = true; + } + assert!(seen.into_iter().all(|b| b)); + assert!(map.is_empty()); +} + +// Await every receiver and assert they all return `Err` because the +// corresponding sender (inside an aborted task) was dropped. +async fn await_receivers_and_assert(receivers: Vec>) { + for rx in receivers { + assert!( + rx.await.is_err(), + "task should have been aborted and sender dropped" + ); + } +} + #[tokio::test(start_paused = true)] async fn test_with_sleep() { let mut map = JoinMap::new(); @@ -386,29 +444,15 @@ async fn duplicate_keys_drop() { #[test] fn spawn_local_local_runtime() { const N: usize = 8; - let rt = LocalRuntime::new().expect("failed to create LocalRuntime"); + let rt = LocalRuntime::new().unwrap(); - let completed: [bool; N] = rt.block_on(async { + rt.block_on(async { let mut map = JoinMap::new(); - for i in 0..N { - let rc = std::rc::Rc::new(i); - map.spawn_local(i, async move { *rc }); - } + spawn_index_tasks(&mut map, N, None); assert!(map.join_next().now_or_never().is_none()); - - let mut seen = [false; N]; - while let Some((k, res)) = map.join_next().await { - let v = res.expect("task panicked"); - assert_eq!(k, v); - seen[v] = true; - } - - assert!(map.is_empty()); - seen + drain_joinmap_and_assert(map, N).await; }); - - assert!(completed.into_iter().all(|b| b)); } // Drive a `JoinMap` on top of a `LocalRuntime` and verify that tasks @@ -418,33 +462,20 @@ fn spawn_local_local_runtime() { #[test] fn spawn_local_on_local_runtime() { const N: usize = 8; - let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + let rt = LocalRuntime::new().unwrap(); rt.block_on(async { let local = LocalSet::new(); let mut map = JoinMap::new(); - for i in 0..N { - let rc = std::rc::Rc::new(i); - map.spawn_local_on(i, async move { *rc }, &local); - } + spawn_index_tasks(&mut map, N, Some(&local)); assert!(map.join_next().now_or_never().is_none()); - let mut results = local + local .run_until(async move { - let mut out = Vec::::with_capacity(N); - while let Some((k, res)) = map.join_next().await { - let v = res.expect("task panicked"); - assert_eq!(k, v); - out.push(v); - } - assert!(map.is_empty()); - out + drain_joinmap_and_assert(map, N).await; }) .await; - - results.sort(); - assert_eq!(results, (0..N).collect::>()); }); } @@ -455,33 +486,19 @@ fn spawn_local_on_local_runtime() { #[test] fn shutdown_spawn_local_local_runtime() { const N: usize = 8; - let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + let rt = LocalRuntime::new().unwrap(); rt.block_on(async { let mut map = JoinMap::new(); let mut receivers = Vec::new(); - for i in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - map.spawn_local(i, async move { - pending::<()>().await; - drop(tx); - }); - } + spawn_pending_tasks(&mut map, &mut receivers, N, None); assert!(map.join_next().now_or_never().is_none()); map.shutdown().await; assert!(map.is_empty()); - - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped", - ); - } + await_receivers_and_assert(receivers).await; }); } @@ -492,32 +509,19 @@ fn shutdown_spawn_local_local_runtime() { #[test] fn drop_spawn_local_local_runtime() { const N: usize = 8; - let rt = LocalRuntime::new().expect("failed to build LocalRuntime"); + let rt = LocalRuntime::new().unwrap(); rt.block_on(async { let mut map = JoinMap::new(); let mut receivers = Vec::new(); - for i in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - map.spawn_local(i, async move { - pending::<()>().await; - drop(tx); - }); - } + spawn_pending_tasks(&mut map, &mut receivers, N, None); assert!(map.join_next().now_or_never().is_none()); drop(map); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }); } @@ -532,18 +536,9 @@ async fn spawn_local_running_localset() { local .run_until(async move { let mut map = JoinMap::new(); - for i in 0..N { - map.spawn_local(i, async move { i }); - } - let mut seen = [false; N]; - while let Some((k, res)) = map.join_next().await { - let v = res.expect("task failed"); - assert_eq!(k, v); - seen[v] = true; - } - assert!(seen.into_iter().all(|b| b)); - assert!(map.is_empty()); + spawn_index_tasks(&mut map, N, None); + drain_joinmap_and_assert(map, N).await; }) .await; } @@ -554,29 +549,16 @@ async fn spawn_local_running_localset() { #[tokio::test(flavor = "current_thread")] async fn spawn_local_on_on_started_localset() { const N: usize = 8; - let local = LocalSet::new(); let mut pending_map = JoinMap::new(); - for i in 0..N { - pending_map.spawn_local_on(i, async move { i }, &local); - } + spawn_index_tasks(&mut pending_map, N, Some(&local)); assert!(pending_map.join_next().now_or_never().is_none()); local .run_until(async move { - let mut map = pending_map; - let mut out = Vec::new(); - - while let Some((k, res)) = map.join_next().await { - let v = res.expect("task failed"); - assert_eq!(k, v); - out.push(v); - } - out.sort(); - assert_eq!(out, (0..N).collect::>()); - assert!(map.is_empty()); + drain_joinmap_and_assert(pending_map, N).await; }) .await; } @@ -592,28 +574,14 @@ async fn shutdown_spawn_local_localset() { .run_until(async { let mut map = JoinMap::new(); let mut receivers = Vec::new(); - - for i in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - map.spawn_local(i, async move { - pending::<()>().await; - drop(tx); - }); - } + spawn_pending_tasks(&mut map, &mut receivers, N, None); assert!(map.join_next().now_or_never().is_none()); map.shutdown().await; - assert!(map.is_empty()); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + assert!(map.is_empty()); + await_receivers_and_assert(receivers).await; }) .await; } @@ -623,24 +591,11 @@ async fn shutdown_spawn_local_localset() { #[tokio::test(flavor = "current_thread")] async fn shutdown_spawn_local_on() { const N: usize = 8; - let local = LocalSet::new(); let mut map = JoinMap::new(); let mut receivers = Vec::new(); - for i in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - map.spawn_local_on( - i, - async move { - pending::<()>().await; - drop(tx); - }, - &local, - ); - } + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); assert!(map.join_next().now_or_never().is_none()); @@ -648,13 +603,7 @@ async fn shutdown_spawn_local_on() { .run_until(async move { map.shutdown().await; assert!(map.is_empty()); - - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }) .await; } @@ -664,38 +613,18 @@ async fn shutdown_spawn_local_on() { #[tokio::test(flavor = "current_thread")] async fn drop_spawn_local_on() { const N: usize = 8; - let local = LocalSet::new(); let mut map = JoinMap::new(); let mut receivers = Vec::new(); - for i in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - map.spawn_local_on( - i, - async move { - pending::<()>().await; - drop(tx); - }, - &local, - ); - } + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); assert!(map.join_next().now_or_never().is_none()); drop(map); local - .run_until(async move { - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } - }) + .run_until(async move { await_receivers_and_assert(receivers).await }) .await; } @@ -711,25 +640,13 @@ async fn drop_spawn_local_localset() { let mut map = JoinMap::new(); let mut receivers = Vec::new(); - for i in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - map.spawn_local(i, async move { - pending::<()>().await; - drop(tx); - }); - } + spawn_pending_tasks(&mut map, &mut receivers, N, None); assert!(map.join_next().now_or_never().is_none()); drop(map); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }) .await; } @@ -739,36 +656,18 @@ async fn drop_spawn_local_localset() { #[tokio::test(flavor = "current_thread")] async fn drop_spawn_local_on_running_localset() { const N: usize = 8; - let local = LocalSet::new(); let mut map = JoinMap::new(); let mut receivers = Vec::new(); - for i in 0..N { - let (tx, rx) = oneshot::channel::<()>(); - receivers.push(rx); - - map.spawn_local_on( - i, - async move { - pending::<()>().await; - drop(tx); - }, - &local, - ); - } + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); assert!(map.join_next().now_or_never().is_none()); local .run_until(async move { drop(map); - for rx in receivers { - assert!( - rx.await.is_err(), - "the task should have been aborted and the sender dropped" - ); - } + await_receivers_and_assert(receivers).await; }) .await; } From 288abdce2fe81fbeb994327d68d5f2a3474405dc Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Tue, 7 Oct 2025 14:48:40 +0200 Subject: [PATCH 10/15] improved formatting for new JoinMap tests --- tokio-util/tests/task_join_map.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index c175c94ec6e..70efc6e78ef 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -451,6 +451,7 @@ fn spawn_local_local_runtime() { spawn_index_tasks(&mut map, N, None); assert!(map.join_next().now_or_never().is_none()); + drain_joinmap_and_assert(map, N).await; }); } @@ -538,6 +539,7 @@ async fn spawn_local_running_localset() { let mut map = JoinMap::new(); spawn_index_tasks(&mut map, N, None); + drain_joinmap_and_assert(map, N).await; }) .await; @@ -574,6 +576,7 @@ async fn shutdown_spawn_local_localset() { .run_until(async { let mut map = JoinMap::new(); let mut receivers = Vec::new(); + spawn_pending_tasks(&mut map, &mut receivers, N, None); assert!(map.join_next().now_or_never().is_none()); From d9feec035798562e7590c10d5856bbde0f66b2cb Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Tue, 7 Oct 2025 14:52:11 +0200 Subject: [PATCH 11/15] improved formatting (2) for new JoinMap tests --- tokio-util/tests/task_join_map.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 70efc6e78ef..9bcaaa77978 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -448,6 +448,7 @@ fn spawn_local_local_runtime() { rt.block_on(async { let mut map = JoinMap::new(); + spawn_index_tasks(&mut map, N, None); assert!(map.join_next().now_or_never().is_none()); @@ -468,6 +469,7 @@ fn spawn_local_on_local_runtime() { rt.block_on(async { let local = LocalSet::new(); let mut map = JoinMap::new(); + spawn_index_tasks(&mut map, N, Some(&local)); assert!(map.join_next().now_or_never().is_none()); From cd21d97e81a365441fd4dd8815bfa1fb3cdc10df Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Wed, 8 Oct 2025 14:03:34 +0200 Subject: [PATCH 12/15] regrouped new tests for JoinSet and JoinMap thorugh 'mod' --- tokio-util/tests/task_join_map.rs | 433 +++++++++++++++--------------- tokio/tests/task_join_set.rs | 418 ++++++++++++++-------------- 2 files changed, 434 insertions(+), 417 deletions(-) diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 9bcaaa77978..e8d40126772 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -437,242 +437,249 @@ async fn duplicate_keys_drop() { assert!(map.join_next().await.is_none()); } -// JoinMap::spawn_local on a LocalRuntime -// We create a `LocalRuntime`, enter it, spawn several local tasks with -// `JoinMap::spawn_local`, and wait for every task to finish. -#[cfg(tokio_unstable)] -#[test] -fn spawn_local_local_runtime() { - const N: usize = 8; - let rt = LocalRuntime::new().unwrap(); - - rt.block_on(async { - let mut map = JoinMap::new(); - - spawn_index_tasks(&mut map, N, None); - - assert!(map.join_next().now_or_never().is_none()); - - drain_joinmap_and_assert(map, N).await; - }); -} - -// Drive a `JoinMap` on top of a `LocalRuntime` and verify that tasks -// queued with `JoinMap::spawn_local_on` run to completion only after the -// `LocalSet` starts. -#[cfg(tokio_unstable)] -#[test] -fn spawn_local_on_local_runtime() { - const N: usize = 8; - let rt = LocalRuntime::new().unwrap(); - - rt.block_on(async { - let local = LocalSet::new(); - let mut map = JoinMap::new(); - - spawn_index_tasks(&mut map, N, Some(&local)); - - assert!(map.join_next().now_or_never().is_none()); - - local - .run_until(async move { +mod spawn_local { + use super::*; + + mod local_runtime { + use super::*; + + // JoinMap::spawn_local on a LocalRuntime + // We create a `LocalRuntime`, enter it, spawn several local tasks with + // `JoinMap::spawn_local`, and wait for every task to finish. + #[cfg(tokio_unstable)] + #[test] + fn spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut map = JoinMap::new(); + spawn_index_tasks(&mut map, N, None); + + assert!(map.join_next().now_or_never().is_none()); drain_joinmap_and_assert(map, N).await; - }) - .await; - }); -} - -// Calling `JoinMap::shutdown` inside a `LocalRuntime` must -// abort and drain every still-running task that was -// inserted with `JoinMap::spawn_local`. -#[cfg(tokio_unstable)] -#[test] -fn shutdown_spawn_local_local_runtime() { - const N: usize = 8; - let rt = LocalRuntime::new().unwrap(); - - rt.block_on(async { - let mut map = JoinMap::new(); - let mut receivers = Vec::new(); - - spawn_pending_tasks(&mut map, &mut receivers, N, None); - - assert!(map.join_next().now_or_never().is_none()); - - map.shutdown().await; - assert!(map.is_empty()); - await_receivers_and_assert(receivers).await; - }); -} - -// Dropping a `JoinMap` created inside a `LocalRuntime` -// must abort every still-running task that was -// added with `JoinMap::spawn_local`. -#[cfg(tokio_unstable)] -#[test] -fn drop_spawn_local_local_runtime() { - const N: usize = 8; - let rt = LocalRuntime::new().unwrap(); - - rt.block_on(async { - let mut map = JoinMap::new(); - let mut receivers = Vec::new(); - - spawn_pending_tasks(&mut map, &mut receivers, N, None); - - assert!(map.join_next().now_or_never().is_none()); - - drop(map); + }); + } - await_receivers_and_assert(receivers).await; - }); -} + // Calling `JoinMap::shutdown` inside a `LocalRuntime` must + // abort and drain every still-running task that was + // inserted with `JoinMap::spawn_local`. + #[cfg(tokio_unstable)] + #[test] + fn shutdown_spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, None); + assert!(map.join_next().now_or_never().is_none()); + + map.shutdown().await; + assert!(map.is_empty()); + await_receivers_and_assert(receivers).await; + }); + } -// JoinMap::spawn_local on a LocalSet. -// Every task is spawned with `spawn_local` **inside** the `LocalSet` that is -// currently running. -#[tokio::test(flavor = "current_thread")] -async fn spawn_local_running_localset() { - const N: usize = 8; - let local = LocalSet::new(); + // Dropping a `JoinMap` created inside a `LocalRuntime` + // must abort every still-running task that was + // added with `JoinMap::spawn_local`. + #[cfg(tokio_unstable)] + #[test] + fn drop_spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, None); + assert!(map.join_next().now_or_never().is_none()); + + drop(map); + await_receivers_and_assert(receivers).await; + }); + } + } - local - .run_until(async move { - let mut map = JoinMap::new(); + mod local_set { + use super::*; + + // JoinMap::spawn_local on a LocalSet. + // Every task is spawned with `spawn_local` **inside** the `LocalSet` that is + // currently running. + #[tokio::test(flavor = "current_thread")] + async fn spawn_local_running_localset() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async move { + let mut map = JoinMap::new(); + spawn_index_tasks(&mut map, N, None); + drain_joinmap_and_assert(map, N).await; + }) + .await; + } - spawn_index_tasks(&mut map, N, None); + // Shutdown a `JoinMap` that was populated with `spawn_local` must abort all + // still-running tasks. + #[tokio::test(flavor = "current_thread")] + async fn shutdown_spawn_local_localset() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, None); + assert!(map.join_next().now_or_never().is_none()); + + map.shutdown().await; + assert!(map.is_empty()); + await_receivers_and_assert(receivers).await; + }) + .await; + } - drain_joinmap_and_assert(map, N).await; - }) - .await; + // Dropping a `JoinMap` that was populated with `spawn_local` must abort all + // still-running tasks. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_localset() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut map, &mut receivers, N, None); + assert!(map.join_next().now_or_never().is_none()); + + drop(map); + await_receivers_and_assert(receivers).await; + }) + .await; + } + } } -// JoinMap::spawn_local on a LocalSet. -// Tasks are queued with `spawn_local_on` **before** the `LocalSet` is -// running, then executed once the `LocalSet` is started. -#[tokio::test(flavor = "current_thread")] -async fn spawn_local_on_on_started_localset() { - const N: usize = 8; - let local = LocalSet::new(); - let mut pending_map = JoinMap::new(); - - spawn_index_tasks(&mut pending_map, N, Some(&local)); - - assert!(pending_map.join_next().now_or_never().is_none()); - - local - .run_until(async move { - drain_joinmap_and_assert(pending_map, N).await; - }) - .await; -} +mod spawn_local_on { + use super::*; + + mod local_runtime { + use super::*; + + // Drive a `JoinMap` on top of a `LocalRuntime` and verify that tasks + // queued with `JoinMap::spawn_local_on` run to completion only after the + // `LocalSet` starts. + #[cfg(tokio_unstable)] + #[test] + fn spawn_local_on_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let local = LocalSet::new(); + let mut map = JoinMap::new(); + + spawn_index_tasks(&mut map, N, Some(&local)); + assert!(map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + drain_joinmap_and_assert(map, N).await; + }) + .await; + }); + } + } -// Shutdown a `JoinMap` that was populated with `spawn_local` must abort all -// still-running tasks. -#[tokio::test(flavor = "current_thread")] -async fn shutdown_spawn_local_localset() { - const N: usize = 8; - let local = LocalSet::new(); + mod local_set { + use super::*; + + // JoinMap::spawn_local on a LocalSet. + // Tasks are queued with `spawn_local_on` **before** the `LocalSet` is + // running, then executed once the `LocalSet` is started. + #[tokio::test(flavor = "current_thread")] + async fn spawn_local_on_on_started_localset() { + const N: usize = 8; + let local = LocalSet::new(); + let mut pending_map = JoinMap::new(); + + spawn_index_tasks(&mut pending_map, N, Some(&local)); + assert!(pending_map.join_next().now_or_never().is_none()); + + local + .run_until(async move { + drain_joinmap_and_assert(pending_map, N).await; + }) + .await; + } - local - .run_until(async { + // Calling `JoinMap::shutdown` on a set whose tasks were queued with + // `spawn_local_on` must abort all of them once the `LocalSet` is driven. + #[tokio::test(flavor = "current_thread")] + async fn shutdown_spawn_local_on() { + const N: usize = 8; + let local = LocalSet::new(); let mut map = JoinMap::new(); let mut receivers = Vec::new(); - spawn_pending_tasks(&mut map, &mut receivers, N, None); - + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); assert!(map.join_next().now_or_never().is_none()); - map.shutdown().await; - - assert!(map.is_empty()); - await_receivers_and_assert(receivers).await; - }) - .await; -} - -// Calling `JoinMap::shutdown` on a set whose tasks were queued with -// `spawn_local_on` must abort all of them once the `LocalSet` is driven. -#[tokio::test(flavor = "current_thread")] -async fn shutdown_spawn_local_on() { - const N: usize = 8; - let local = LocalSet::new(); - let mut map = JoinMap::new(); - let mut receivers = Vec::new(); - - spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); - - assert!(map.join_next().now_or_never().is_none()); - - local - .run_until(async move { - map.shutdown().await; - assert!(map.is_empty()); - await_receivers_and_assert(receivers).await; - }) - .await; -} - -// Dropping a `JoinMap` whose tasks were queued with `spawn_local_on` -// must cancel all of them once the `LocalSet` is subsequently driven. -#[tokio::test(flavor = "current_thread")] -async fn drop_spawn_local_on() { - const N: usize = 8; - let local = LocalSet::new(); - let mut map = JoinMap::new(); - let mut receivers = Vec::new(); - - spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); - - assert!(map.join_next().now_or_never().is_none()); - - drop(map); - - local - .run_until(async move { await_receivers_and_assert(receivers).await }) - .await; -} - -// Dropping a `JoinMap` that was populated with `spawn_local` must abort all -// still-running tasks. -#[tokio::test(flavor = "current_thread")] -async fn drop_spawn_local_localset() { - const N: usize = 8; - let local = LocalSet::new(); + local + .run_until(async move { + map.shutdown().await; + assert!(map.is_empty()); + await_receivers_and_assert(receivers).await; + }) + .await; + } - local - .run_until(async { + // Dropping a `JoinMap` whose tasks were queued with `spawn_local_on` + // must cancel all of them once the `LocalSet` is subsequently driven. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_on() { + const N: usize = 8; + let local = LocalSet::new(); let mut map = JoinMap::new(); let mut receivers = Vec::new(); - spawn_pending_tasks(&mut map, &mut receivers, N, None); - + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); assert!(map.join_next().now_or_never().is_none()); drop(map); + local + .run_until(async move { await_receivers_and_assert(receivers).await }) + .await; + } - await_receivers_and_assert(receivers).await; - }) - .await; -} - -// Dropping a `JoinMap` whose tasks were queued with `spawn_local_on` -// when the `LocalSet` is already driven. -#[tokio::test(flavor = "current_thread")] -async fn drop_spawn_local_on_running_localset() { - const N: usize = 8; - let local = LocalSet::new(); - let mut map = JoinMap::new(); - let mut receivers = Vec::new(); - - spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); + // Dropping a `JoinMap` whose tasks were queued with `spawn_local_on` + // when the `LocalSet` is already driven. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_on_running_localset() { + const N: usize = 8; + let local = LocalSet::new(); + let mut map = JoinMap::new(); + let mut receivers = Vec::new(); - assert!(map.join_next().now_or_never().is_none()); + spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local)); + assert!(map.join_next().now_or_never().is_none()); - local - .run_until(async move { - drop(map); - await_receivers_and_assert(receivers).await; - }) - .await; + local + .run_until(async move { + drop(map); + await_receivers_and_assert(receivers).await; + }) + .await; + } + } } diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index 8a462ef69a7..baaf385e9e0 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -406,244 +406,254 @@ async fn try_join_next_with_id() { assert_eq!(joined, spawned); } -// JoinSet::spawn_local on a LocalRuntime -// We create a `LocalRuntime`, enter it, spawn several local tasks with -// `JoinSet::spawn_local`, and wait for every task to finish. -#[cfg(tokio_unstable)] -#[test] -fn spawn_local_local_runtime() { - const N: usize = 8; - let rt = LocalRuntime::new().unwrap(); - - rt.block_on(async { - let mut set = JoinSet::new(); - spawn_index_tasks(&mut set, N, None); - - assert!(set.try_join_next().is_none()); - - drain_joinset_and_assert(set, N).await; - }); -} - -// Drive a `LocalSet` on top of a `LocalRuntime` and verify that tasks -// queued with `JoinSet::spawn_local_on` run to completion only after the -// `LocalSet` starts. -#[cfg(tokio_unstable)] -#[test] -fn spawn_local_on_local_runtime() { - const N: usize = 8; - let rt = LocalRuntime::new().unwrap(); - - rt.block_on(async { - let local = LocalSet::new(); - let mut set = JoinSet::new(); - - spawn_index_tasks(&mut set, N, Some(&local)); - - assert!(set.try_join_next().is_none()); - - local - .run_until(async move { +mod spawn_local { + use super::*; + + mod local_runtime { + use super::*; + + // JoinSet::spawn_local on a LocalRuntime + // We create a `LocalRuntime`, enter it, spawn several local tasks with + // `JoinSet::spawn_local`, and wait for every task to finish. + #[cfg(tokio_unstable)] + #[test] + fn spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let mut set = JoinSet::new(); + spawn_index_tasks(&mut set, N, None); + + assert!(set.try_join_next().is_none()); drain_joinset_and_assert(set, N).await; - }) - .await; - }); -} - -// Calling `JoinSet::shutdown` inside a `LocalRuntime` must -// abort and drain every still-running task that was -// inserted with `JoinSet::spawn_local`. -#[cfg(tokio_unstable)] -#[test] -fn shutdown_spawn_local_local_runtime() { - const N: usize = 8; - let rt = LocalRuntime::new().unwrap(); - - rt.block_on(async { - let mut set = JoinSet::new(); - let mut receivers = Vec::new(); + }); + } - spawn_pending_tasks(&mut set, &mut receivers, N, None); + // Calling `JoinSet::shutdown` inside a `LocalRuntime` must + // abort and drain every still-running task that was + // inserted with `JoinSet::spawn_local`. + #[cfg(tokio_unstable)] + #[test] + fn shutdown_spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); - assert!(set.try_join_next().is_none()); + rt.block_on(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); - set.shutdown().await; + spawn_pending_tasks(&mut set, &mut receivers, N, None); - assert!(set.is_empty()); + assert!(set.try_join_next().is_none()); + set.shutdown().await; + assert!(set.is_empty()); - await_receivers_and_assert(receivers).await; - }); -} + await_receivers_and_assert(receivers).await; + }); + } -// Dropping a `JoinSet` created inside a `LocalRuntime` -// must abort every still-running task that was -// added with `JoinSet::spawn_local`. -#[cfg(tokio_unstable)] -#[test] -fn drop_spawn_local_local_runtime() { - const N: usize = 8; - let rt = LocalRuntime::new().unwrap(); + // Dropping a `JoinSet` created inside a `LocalRuntime` + // must abort every still-running task that was + // added with `JoinSet::spawn_local`. + #[cfg(tokio_unstable)] + #[test] + fn drop_spawn_local_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); - rt.block_on(async { - let mut set = JoinSet::new(); - let mut receivers = Vec::new(); + rt.block_on(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); - spawn_pending_tasks(&mut set, &mut receivers, N, None); + spawn_pending_tasks(&mut set, &mut receivers, N, None); - assert!(set.try_join_next().is_none()); + assert!(set.try_join_next().is_none()); + drop(set); - drop(set); + await_receivers_and_assert(receivers).await; + }); + } + } - await_receivers_and_assert(receivers).await; - }); -} + mod local_set { + use super::*; + + // JoinSet::spawn_local on a LocalSet. + // Every task is spawned with `spawn_local` **inside** the `LocalSet` + // that is currently running. + #[tokio::test(flavor = "current_thread")] + async fn spawn_local_running_localset() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async move { + let mut set = JoinSet::new(); + spawn_index_tasks(&mut set, N, None); + drain_joinset_and_assert(set, N).await; + }) + .await; + } -// JoinSet::spawn_local on a LocalSet. -// Every task is spawned with `spawn_local` **inside** the `LocalSet` that is -// currently running. -#[tokio::test(flavor = "current_thread")] -async fn spawn_local_running_localset() { - const N: usize = 8; - let local = LocalSet::new(); + // Shutdown a `JoinSet` that was populated with `spawn_local` + // must abort all still-running tasks. + #[tokio::test(flavor = "current_thread")] + async fn shutdown_spawn_local_localset() { + const N: usize = 8; + let local = LocalSet::new(); - local - .run_until(async move { - let mut set = JoinSet::new(); - spawn_index_tasks(&mut set, N, None); - drain_joinset_and_assert(set, N).await; - }) - .await; -} + local + .run_until(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); -// JoinSet::spawn_local on a LocalSet. -// Tasks are queued with `spawn_local_on` **before** the `LocalSet` is -// running, then executed once the `LocalSet` is started. -#[tokio::test(flavor = "current_thread")] -async fn spawn_local_on_on_started_localset() { - const N: usize = 8; - let local = LocalSet::new(); - let mut pending_set = JoinSet::new(); + spawn_pending_tasks(&mut set, &mut receivers, N, None); + assert!(set.try_join_next().is_none()); - spawn_index_tasks(&mut pending_set, N, Some(&local)); + set.shutdown().await; + assert!(set.is_empty()); - assert!(pending_set.try_join_next().is_none()); + await_receivers_and_assert(receivers).await; + }) + .await; + } - local - .run_until(async move { - drain_joinset_and_assert(pending_set, N).await; - }) - .await; + // Dropping a `JoinSet` that was populated with `spawn_local` + // must abort all still-running tasks. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_localset() { + const N: usize = 8; + let local = LocalSet::new(); + + local + .run_until(async { + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); + + spawn_pending_tasks(&mut set, &mut receivers, N, None); + assert!(set.try_join_next().is_none()); + + drop(set); + await_receivers_and_assert(receivers).await; + }) + .await; + } + } } -// Shutdown a `JoinSet` that was populated with `spawn_local` must abort all -// still-running tasks. -#[tokio::test(flavor = "current_thread")] -async fn shutdown_spawn_local_localset() { - const N: usize = 8; - let local = LocalSet::new(); +mod spawn_local_on { + use super::*; + + mod local_runtime { + use super::*; + + // Drive a `LocalSet` on top of a `LocalRuntime` and verify that tasks + // queued with `JoinSet::spawn_local_on` run to completion only after + // the `LocalSet` starts. + #[cfg(tokio_unstable)] + #[test] + fn spawn_local_on_local_runtime() { + const N: usize = 8; + let rt = LocalRuntime::new().unwrap(); + + rt.block_on(async { + let local = LocalSet::new(); + let mut set = JoinSet::new(); + + spawn_index_tasks(&mut set, N, Some(&local)); + assert!(set.try_join_next().is_none()); + + local + .run_until(async move { + drain_joinset_and_assert(set, N).await; + }) + .await; + }); + } + } - local - .run_until(async { + mod local_set { + use super::*; + + // JoinSet::spawn_local on a LocalSet. + // Tasks are queued with `spawn_local_on` **before** the `LocalSet` is + // running, then executed once the `LocalSet` is started. + #[tokio::test(flavor = "current_thread")] + async fn spawn_local_on_on_started_localset() { + const N: usize = 8; + let local = LocalSet::new(); + let mut pending_set = JoinSet::new(); + + spawn_index_tasks(&mut pending_set, N, Some(&local)); + assert!(pending_set.try_join_next().is_none()); + + local + .run_until(async move { + drain_joinset_and_assert(pending_set, N).await; + }) + .await; + } + + // Calling `JoinSet::shutdown` on a set whose tasks were queued with + // `spawn_local_on` must abort all of them once the `LocalSet` is driven. + #[tokio::test(flavor = "current_thread")] + async fn shutdown_spawn_local_on() { + const N: usize = 8; + let local = LocalSet::new(); let mut set = JoinSet::new(); let mut receivers = Vec::new(); - spawn_pending_tasks(&mut set, &mut receivers, N, None); - + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); assert!(set.try_join_next().is_none()); - set.shutdown().await; - - assert!(set.is_empty()); - - await_receivers_and_assert(receivers).await; - }) - .await; -} - -// Calling `JoinSet::shutdown` on a set whose tasks were queued with -// `spawn_local_on` must abort all of them once the `LocalSet` is driven. -#[tokio::test(flavor = "current_thread")] -async fn shutdown_spawn_local_on() { - const N: usize = 8; - let local = LocalSet::new(); - let mut set = JoinSet::new(); - let mut receivers = Vec::new(); - - spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); - - assert!(set.try_join_next().is_none()); - - local - .run_until(async move { - set.shutdown().await; - assert!(set.is_empty()); - await_receivers_and_assert(receivers).await; - }) - .await; -} - -// Dropping a `JoinSet` whose tasks were queued with `spawn_local_on` -// must cancel all of them once the `LocalSet` is subsequently driven. -#[tokio::test(flavor = "current_thread")] -async fn drop_spawn_local_on() { - const N: usize = 8; - let local = LocalSet::new(); - let mut set = JoinSet::new(); - let mut receivers = Vec::new(); - - spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); - - assert!(set.try_join_next().is_none()); - - drop(set); - - local - .run_until(async move { - await_receivers_and_assert(receivers).await; - }) - .await; -} - -// Dropping a `JoinSet` that was populated with `spawn_local` must abort all -// still-running tasks. -#[tokio::test(flavor = "current_thread")] -async fn drop_spawn_local_localset() { - const N: usize = 8; + local + .run_until(async move { + set.shutdown().await; + assert!(set.is_empty()); + await_receivers_and_assert(receivers).await; + }) + .await; + } - let local = LocalSet::new(); - local - .run_until(async { + // Dropping a `JoinSet` whose tasks were queued with `spawn_local_on` + // must cancel all of them once the `LocalSet` is subsequently driven. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_on() { + const N: usize = 8; + let local = LocalSet::new(); let mut set = JoinSet::new(); let mut receivers = Vec::new(); - spawn_pending_tasks(&mut set, &mut receivers, N, None); - + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); assert!(set.try_join_next().is_none()); - drop(set); - await_receivers_and_assert(receivers).await; - }) - .await; -} - -// Dropping a `JoinSet` whose tasks were queued with `spawn_local_on` -// when the `LocalSet` is already driven. -#[tokio::test(flavor = "current_thread")] -async fn drop_spawn_local_on_running_localset() { - const N: usize = 8; - let local = LocalSet::new(); - let mut set = JoinSet::new(); - let mut receivers = Vec::new(); + local + .run_until(async move { + await_receivers_and_assert(receivers).await; + }) + .await; + } - spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); + // Dropping a `JoinSet` whose tasks were queued with `spawn_local_on` + // when the `LocalSet` is already driven. + #[tokio::test(flavor = "current_thread")] + async fn drop_spawn_local_on_running_localset() { + const N: usize = 8; + let local = LocalSet::new(); + let mut set = JoinSet::new(); + let mut receivers = Vec::new(); - assert!(set.try_join_next().is_none()); + spawn_pending_tasks(&mut set, &mut receivers, N, Some(&local)); + assert!(set.try_join_next().is_none()); - local - .run_until(async move { - drop(set); - await_receivers_and_assert(receivers).await; - }) - .await; + local + .run_until(async move { + drop(set); + await_receivers_and_assert(receivers).await; + }) + .await; + } + } } From c7cf4e102c15210a9820309c4a22c7c3aa4aa182 Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Wed, 8 Oct 2025 14:18:48 +0200 Subject: [PATCH 13/15] fixed warning during compilation --- tokio-util/tests/task_join_map.rs | 2 ++ tokio/tests/task_join_set.rs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index e8d40126772..3930bee48f6 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -441,6 +441,7 @@ mod spawn_local { use super::*; mod local_runtime { + #[cfg(tokio_unstable)] use super::*; // JoinMap::spawn_local on a LocalRuntime @@ -574,6 +575,7 @@ mod spawn_local_on { use super::*; mod local_runtime { + #[cfg(tokio_unstable)] use super::*; // Drive a `JoinMap` on top of a `LocalRuntime` and verify that tasks diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index baaf385e9e0..05e95fe7cca 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -410,6 +410,7 @@ mod spawn_local { use super::*; mod local_runtime { + #[cfg(tokio_unstable)] use super::*; // JoinSet::spawn_local on a LocalRuntime @@ -546,6 +547,7 @@ mod spawn_local_on { use super::*; mod local_runtime { + #[cfg(tokio_unstable)] use super::*; // Drive a `LocalSet` on top of a `LocalRuntime` and verify that tasks From 86405b6209c92df886b6ce73285df7804ee41b93 Mon Sep 17 00:00:00 2001 From: FrancescoV1985 <62872737+FrancescoV1985@users.noreply.github.com> Date: Thu, 9 Oct 2025 09:29:12 +0200 Subject: [PATCH 14/15] Apply suggestions from code review Commit suggestions for task_join_set.rs Co-authored-by: Qi --- tokio/tests/task_join_set.rs | 51 +++++++++++++----------------------- 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/tokio/tests/task_join_set.rs b/tokio/tests/task_join_set.rs index 05e95fe7cca..99b181a9324 100644 --- a/tokio/tests/task_join_set.rs +++ b/tokio/tests/task_join_set.rs @@ -413,12 +413,10 @@ mod spawn_local { #[cfg(tokio_unstable)] use super::*; - // JoinSet::spawn_local on a LocalRuntime - // We create a `LocalRuntime`, enter it, spawn several local tasks with - // `JoinSet::spawn_local`, and wait for every task to finish. + /// Spawn several tasks, and then join all tasks. #[cfg(tokio_unstable)] #[test] - fn spawn_local_local_runtime() { + fn spawn_then_join_next() { const N: usize = 8; let rt = LocalRuntime::new().unwrap(); @@ -431,12 +429,10 @@ mod spawn_local { }); } - // Calling `JoinSet::shutdown` inside a `LocalRuntime` must - // abort and drain every still-running task that was - // inserted with `JoinSet::spawn_local`. + /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`]. #[cfg(tokio_unstable)] #[test] - fn shutdown_spawn_local_local_runtime() { + fn spawn_then_shutdown() { const N: usize = 8; let rt = LocalRuntime::new().unwrap(); @@ -454,12 +450,10 @@ mod spawn_local { }); } - // Dropping a `JoinSet` created inside a `LocalRuntime` - // must abort every still-running task that was - // added with `JoinSet::spawn_local`. + /// Spawn several pending-forever tasks, and then drop the [`JoinSet`]. #[cfg(tokio_unstable)] #[test] - fn drop_spawn_local_local_runtime() { + fn spawn_then_drop() { const N: usize = 8; let rt = LocalRuntime::new().unwrap(); @@ -480,11 +474,9 @@ mod spawn_local { mod local_set { use super::*; - // JoinSet::spawn_local on a LocalSet. - // Every task is spawned with `spawn_local` **inside** the `LocalSet` - // that is currently running. + /// Spawn several tasks, and then join all tasks. #[tokio::test(flavor = "current_thread")] - async fn spawn_local_running_localset() { + async fn spawn_then_join_next() { const N: usize = 8; let local = LocalSet::new(); @@ -497,10 +489,9 @@ mod spawn_local { .await; } - // Shutdown a `JoinSet` that was populated with `spawn_local` - // must abort all still-running tasks. + /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`]. #[tokio::test(flavor = "current_thread")] - async fn shutdown_spawn_local_localset() { + async fn spawn_then_shutdown() { const N: usize = 8; let local = LocalSet::new(); @@ -520,10 +511,9 @@ mod spawn_local { .await; } - // Dropping a `JoinSet` that was populated with `spawn_local` - // must abort all still-running tasks. + /// Spawn several pending-forever tasks, and then drop the [`JoinSet`]. #[tokio::test(flavor = "current_thread")] - async fn drop_spawn_local_localset() { + async fn spawn_then_drop() { const N: usize = 8; let local = LocalSet::new(); @@ -550,12 +540,10 @@ mod spawn_local_on { #[cfg(tokio_unstable)] use super::*; - // Drive a `LocalSet` on top of a `LocalRuntime` and verify that tasks - // queued with `JoinSet::spawn_local_on` run to completion only after - // the `LocalSet` starts. + /// Spawn several tasks, and then join all tasks. #[cfg(tokio_unstable)] #[test] - fn spawn_local_on_local_runtime() { + fn spawn_then_join_next() { const N: usize = 8; let rt = LocalRuntime::new().unwrap(); @@ -578,11 +566,9 @@ mod spawn_local_on { mod local_set { use super::*; - // JoinSet::spawn_local on a LocalSet. - // Tasks are queued with `spawn_local_on` **before** the `LocalSet` is - // running, then executed once the `LocalSet` is started. + /// Spawn several tasks, and then join all tasks. #[tokio::test(flavor = "current_thread")] - async fn spawn_local_on_on_started_localset() { + async fn spawn_then_join_next() { const N: usize = 8; let local = LocalSet::new(); let mut pending_set = JoinSet::new(); @@ -597,10 +583,9 @@ mod spawn_local_on { .await; } - // Calling `JoinSet::shutdown` on a set whose tasks were queued with - // `spawn_local_on` must abort all of them once the `LocalSet` is driven. + /// Spawn several pending-forever tasks, and then shutdown the [`JoinSet`]. #[tokio::test(flavor = "current_thread")] - async fn shutdown_spawn_local_on() { + async fn spawn_then_shutdown() { const N: usize = 8; let local = LocalSet::new(); let mut set = JoinSet::new(); From 3fdb1c1457a5c9f41868f432a4d24b2cc072da31 Mon Sep 17 00:00:00 2001 From: FrancescoV1985 Date: Thu, 9 Oct 2025 09:46:29 +0200 Subject: [PATCH 15/15] commit suggested changes for task_join_map.rs --- tokio-util/tests/task_join_map.rs | 51 +++++++++++-------------------- 1 file changed, 18 insertions(+), 33 deletions(-) diff --git a/tokio-util/tests/task_join_map.rs b/tokio-util/tests/task_join_map.rs index 3930bee48f6..e3c4ca9bc88 100644 --- a/tokio-util/tests/task_join_map.rs +++ b/tokio-util/tests/task_join_map.rs @@ -444,12 +444,10 @@ mod spawn_local { #[cfg(tokio_unstable)] use super::*; - // JoinMap::spawn_local on a LocalRuntime - // We create a `LocalRuntime`, enter it, spawn several local tasks with - // `JoinMap::spawn_local`, and wait for every task to finish. + /// Spawn several tasks, and then join all tasks. #[cfg(tokio_unstable)] #[test] - fn spawn_local_local_runtime() { + fn spawn_then_join_next() { const N: usize = 8; let rt = LocalRuntime::new().unwrap(); @@ -462,12 +460,10 @@ mod spawn_local { }); } - // Calling `JoinMap::shutdown` inside a `LocalRuntime` must - // abort and drain every still-running task that was - // inserted with `JoinMap::spawn_local`. + /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`]. #[cfg(tokio_unstable)] #[test] - fn shutdown_spawn_local_local_runtime() { + fn spawn_then_shutdown() { const N: usize = 8; let rt = LocalRuntime::new().unwrap(); @@ -484,12 +480,10 @@ mod spawn_local { }); } - // Dropping a `JoinMap` created inside a `LocalRuntime` - // must abort every still-running task that was - // added with `JoinMap::spawn_local`. + /// Spawn several pending-forever tasks, and then drop the [`JoinMap`]. #[cfg(tokio_unstable)] #[test] - fn drop_spawn_local_local_runtime() { + fn spawn_then_drop() { const N: usize = 8; let rt = LocalRuntime::new().unwrap(); @@ -509,11 +503,9 @@ mod spawn_local { mod local_set { use super::*; - // JoinMap::spawn_local on a LocalSet. - // Every task is spawned with `spawn_local` **inside** the `LocalSet` that is - // currently running. + /// Spawn several tasks, and then join all tasks. #[tokio::test(flavor = "current_thread")] - async fn spawn_local_running_localset() { + async fn spawn_then_join_next() { const N: usize = 8; let local = LocalSet::new(); @@ -526,10 +518,9 @@ mod spawn_local { .await; } - // Shutdown a `JoinMap` that was populated with `spawn_local` must abort all - // still-running tasks. + /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`]. #[tokio::test(flavor = "current_thread")] - async fn shutdown_spawn_local_localset() { + async fn spawn_then_shutdown() { const N: usize = 8; let local = LocalSet::new(); @@ -548,10 +539,9 @@ mod spawn_local { .await; } - // Dropping a `JoinMap` that was populated with `spawn_local` must abort all - // still-running tasks. + /// Spawn several pending-forever tasks, and then drop the [`JoinMap`]. #[tokio::test(flavor = "current_thread")] - async fn drop_spawn_local_localset() { + async fn spawn_then_drop() { const N: usize = 8; let local = LocalSet::new(); @@ -578,12 +568,10 @@ mod spawn_local_on { #[cfg(tokio_unstable)] use super::*; - // Drive a `JoinMap` on top of a `LocalRuntime` and verify that tasks - // queued with `JoinMap::spawn_local_on` run to completion only after the - // `LocalSet` starts. + /// Spawn several tasks, and then join all tasks. #[cfg(tokio_unstable)] #[test] - fn spawn_local_on_local_runtime() { + fn spawn_then_join_next() { const N: usize = 8; let rt = LocalRuntime::new().unwrap(); @@ -606,11 +594,9 @@ mod spawn_local_on { mod local_set { use super::*; - // JoinMap::spawn_local on a LocalSet. - // Tasks are queued with `spawn_local_on` **before** the `LocalSet` is - // running, then executed once the `LocalSet` is started. + /// Spawn several tasks, and then join all tasks. #[tokio::test(flavor = "current_thread")] - async fn spawn_local_on_on_started_localset() { + async fn spawn_then_join_next() { const N: usize = 8; let local = LocalSet::new(); let mut pending_map = JoinMap::new(); @@ -625,10 +611,9 @@ mod spawn_local_on { .await; } - // Calling `JoinMap::shutdown` on a set whose tasks were queued with - // `spawn_local_on` must abort all of them once the `LocalSet` is driven. + /// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`]. #[tokio::test(flavor = "current_thread")] - async fn shutdown_spawn_local_on() { + async fn spawn_then_shutdown() { const N: usize = 8; let local = LocalSet::new(); let mut map = JoinMap::new();