Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9b09c4f
Add new tests for JoinSet::spawn_local/JoinSet::spawn_local_on and Lo…
FrancescoV1985 Sep 11, 2025
2a07d7f
ignore new test in rt_local for wasm target family
FrancescoV1985 Sep 11, 2025
2a8deed
updated rt_local.rs and task_join_set.rs according to review's comments
FrancescoV1985 Sep 25, 2025
ebe940e
add tests for JoinMap in task_join_map.rs
FrancescoV1985 Sep 26, 2025
12f4713
add missing configuration attribute in task_join_map.rs
FrancescoV1985 Sep 26, 2025
13c211a
add tests for TaskTracker in task_tracker.rs
FrancescoV1985 Sep 26, 2025
c69a37a
add tests for TaskTracker in task_tracker.rs
FrancescoV1985 Sep 26, 2025
22511fe
refactored new JoinSet tests to eliminate duplicated code
FrancescoV1985 Oct 6, 2025
7556da8
Merge branch 'tokio-rs:master' into issue-7562-fix
FrancescoV1985 Oct 7, 2025
de05b54
refactored new JoinMap tests to eliminate duplicated code
FrancescoV1985 Oct 7, 2025
288abdc
improved formatting for new JoinMap tests
FrancescoV1985 Oct 7, 2025
d9feec0
improved formatting (2) for new JoinMap tests
FrancescoV1985 Oct 7, 2025
cd21d97
regrouped new tests for JoinSet and JoinMap thorugh 'mod'
FrancescoV1985 Oct 8, 2025
c7cf4e1
fixed warning during compilation
FrancescoV1985 Oct 8, 2025
86405b6
Apply suggestions from code review
FrancescoV1985 Oct 9, 2025
3fdb1c1
commit suggested changes for task_join_map.rs
FrancescoV1985 Oct 9, 2025
a78c9d2
changed code according to review's feedback
FrancescoV1985 Oct 15, 2025
3f149f7
Merge branch 'tokio-rs:master' into issue-7562-fix
FrancescoV1985 Oct 15, 2025
b86d8d9
Merge branch 'master' into issue-7562-fix
FrancescoV1985 Oct 16, 2025
21eb095
fix missing delimiter
FrancescoV1985 Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 277 additions & 2 deletions tokio-util/tests/task_join_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,76 @@

use std::panic::AssertUnwindSafe;

use futures::future::{pending, FutureExt};
use tokio::sync::oneshot;
use tokio::task::LocalSet;
use tokio::time::Duration;
use tokio_util::task::JoinMap;

use futures::future::FutureExt;

fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
}

// Spawn `N` tasks that return their index (`i`).
fn spawn_index_tasks(map: &mut JoinMap<usize, usize>, n: usize, on: Option<&LocalSet>) {
for i in 0..n {
let rc = std::rc::Rc::new(i);
match on {
None => map.spawn_local(i, async move { *rc }),
Some(local) => map.spawn_local_on(i, async move { *rc }, local),
};
}
}

// Spawn `N` “pending” tasks that own a `oneshot::Sender`.
// When the task is aborted the sender is dropped, which is observed
// via the returned `Receiver`s.
fn spawn_pending_tasks(
map: &mut JoinMap<usize, ()>,
receivers: &mut Vec<oneshot::Receiver<()>>,
n: usize,
on: Option<&LocalSet>,
) {
for i in 0..n {
let (tx, rx) = oneshot::channel::<()>();
receivers.push(rx);

let fut = async move {
pending::<()>().await;
drop(tx);
};
match on {
None => map.spawn_local(i, fut),
Some(local) => map.spawn_local_on(i, fut, local),
};
}
}

/// Await every task in JoinMap and assert every task returns its own key.
async fn drain_joinmap_and_assert(mut map: JoinMap<usize, usize>, n: usize) {
let mut seen = vec![false; n];
while let Some((k, res)) = map.join_next().await {
let v = res.expect("task panicked");
assert_eq!(k, v);
seen[v] = true;
}
assert!(seen.into_iter().all(|b| b));
assert!(map.is_empty());
}

// Await every receiver and assert they all return `Err` because the
// corresponding sender (inside an aborted task) was dropped.
async fn await_receivers_and_assert(receivers: Vec<oneshot::Receiver<()>>) {
for rx in receivers {
assert!(
rx.await.is_err(),
"task should have been aborted and sender dropped"
);
}
}

#[tokio::test(start_paused = true)]
async fn test_with_sleep() {
let mut map = JoinMap::new();
Expand Down Expand Up @@ -376,3 +434,220 @@ async fn duplicate_keys_drop() {

assert!(map.join_next().await.is_none());
}

mod spawn_local {
use super::*;

#[cfg(tokio_unstable)]
mod local_runtime {
use super::*;

/// Spawn several tasks, and then join all tasks.
#[tokio::test(flavor = "local")]
async fn spawn_then_join_next() {
const N: usize = 8;

let mut map = JoinMap::new();
spawn_index_tasks(&mut map, N, None);

assert!(map.join_next().now_or_never().is_none());
drain_joinmap_and_assert(map, N).await;
}

/// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
#[tokio::test(flavor = "local")]
async fn spawn_then_shutdown() {
const N: usize = 8;

let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, None);
assert!(map.join_next().now_or_never().is_none());

map.shutdown().await;
assert!(map.is_empty());
await_receivers_and_assert(receivers).await;
}

/// Spawn several pending-forever tasks, and then drop the [`JoinMap`].
#[tokio::test(flavor = "local")]
async fn spawn_then_drop() {
const N: usize = 8;

let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, None);
assert!(map.join_next().now_or_never().is_none());

drop(map);
await_receivers_and_assert(receivers).await;
}
}

mod local_set {
use super::*;

/// Spawn several tasks, and then join all tasks.
#[tokio::test(flavor = "current_thread")]
async fn spawn_then_join_next() {
const N: usize = 8;
let local = LocalSet::new();

local
.run_until(async move {
let mut map = JoinMap::new();
spawn_index_tasks(&mut map, N, None);
drain_joinmap_and_assert(map, N).await;
})
.await;
}

/// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
#[tokio::test(flavor = "current_thread")]
async fn spawn_then_shutdown() {
const N: usize = 8;
let local = LocalSet::new();

local
.run_until(async {
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, None);
assert!(map.join_next().now_or_never().is_none());

map.shutdown().await;
assert!(map.is_empty());
await_receivers_and_assert(receivers).await;
})
.await;
}

/// Spawn several pending-forever tasks, and then drop the [`JoinMap`].
#[tokio::test(flavor = "current_thread")]
async fn spawn_then_drop() {
const N: usize = 8;
let local = LocalSet::new();

local
.run_until(async {
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, None);
assert!(map.join_next().now_or_never().is_none());

drop(map);
await_receivers_and_assert(receivers).await;
})
.await;
}
}
}

mod spawn_local_on {
use super::*;

#[cfg(tokio_unstable)]
mod local_runtime {
use super::*;

/// Spawn several tasks, and then join all tasks.
#[tokio::test(flavor = "local")]
async fn spawn_then_join_next() {
const N: usize = 8;

let local = LocalSet::new();
let mut map = JoinMap::new();

spawn_index_tasks(&mut map, N, Some(&local));
assert!(map.join_next().now_or_never().is_none());

local
.run_until(async move {
drain_joinmap_and_assert(map, N).await;
})
.await;
}
}

mod local_set {
use super::*;

/// Spawn several tasks, and then join all tasks.
#[tokio::test(flavor = "current_thread")]
async fn spawn_then_join_next() {
const N: usize = 8;
let local = LocalSet::new();
let mut pending_map = JoinMap::new();

spawn_index_tasks(&mut pending_map, N, Some(&local));
assert!(pending_map.join_next().now_or_never().is_none());

local
.run_until(async move {
drain_joinmap_and_assert(pending_map, N).await;
})
.await;
}

/// Spawn several pending-forever tasks, and then shutdown the [`JoinMap`].
#[tokio::test(flavor = "current_thread")]
async fn spawn_then_shutdown() {
const N: usize = 8;
let local = LocalSet::new();
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
assert!(map.join_next().now_or_never().is_none());

local
.run_until(async move {
map.shutdown().await;
assert!(map.is_empty());
await_receivers_and_assert(receivers).await;
})
.await;
}

/// Spawn several pending-forever tasks and then drop the [`JoinMap`]
/// before the `LocalSet` is driven and while the `LocalSet` is already driven.
#[tokio::test(flavor = "current_thread")]
async fn spawn_then_drop() {
const N: usize = 8;

{
let local = LocalSet::new();
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
assert!(map.join_next().now_or_never().is_none());

drop(map);
local
.run_until(async move { await_receivers_and_assert(receivers).await })
.await;
}

{
let local = LocalSet::new();
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));
assert!(map.join_next().now_or_never().is_none());

local
.run_until(async move {
drop(map);
await_receivers_and_assert(receivers).await;
})
.await;
}
}
}
}
Loading