Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
304 changes: 302 additions & 2 deletions tokio-util/tests/task_join_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,78 @@

use std::panic::AssertUnwindSafe;

use futures::future::{pending, FutureExt};
#[cfg(tokio_unstable)]
use tokio::runtime::LocalRuntime;
use tokio::sync::oneshot;
use tokio::task::LocalSet;
use tokio::time::Duration;
use tokio_util::task::JoinMap;

use futures::future::FutureExt;

fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
}

// Spawn `N` tasks that return their index (`i`).
fn spawn_index_tasks(map: &mut JoinMap<usize, usize>, n: usize, on: Option<&LocalSet>) {
for i in 0..n {
let rc = std::rc::Rc::new(i);
match on {
None => map.spawn_local(i, async move { *rc }),
Some(local) => map.spawn_local_on(i, async move { *rc }, local),
};
}
}

// Spawn `N` “pending” tasks that own a `oneshot::Sender`.
// When the task is aborted the sender is dropped, which is observed
// via the returned `Receiver`s.
fn spawn_pending_tasks(
map: &mut JoinMap<usize, ()>,
receivers: &mut Vec<oneshot::Receiver<()>>,
n: usize,
on: Option<&LocalSet>,
) {
for i in 0..n {
let (tx, rx) = oneshot::channel::<()>();
receivers.push(rx);

let fut = async move {
pending::<()>().await;
drop(tx);
};
match on {
None => map.spawn_local(i, fut),
Some(local) => map.spawn_local_on(i, fut, local),
};
}
}

/// Await every task in JoinMap and assert every task returns its own key.
async fn drain_joinmap_and_assert(mut map: JoinMap<usize, usize>, n: usize) {
let mut seen = vec![false; n];
while let Some((k, res)) = map.join_next().await {
let v = res.expect("task panicked");
assert_eq!(k, v);
seen[v] = true;
}
assert!(seen.into_iter().all(|b| b));
assert!(map.is_empty());
}

// Await every receiver and assert they all return `Err` because the
// corresponding sender (inside an aborted task) was dropped.
async fn await_receivers_and_assert(receivers: Vec<oneshot::Receiver<()>>) {
for rx in receivers {
assert!(
rx.await.is_err(),
"task should have been aborted and sender dropped"
);
}
}

#[tokio::test(start_paused = true)]
async fn test_with_sleep() {
let mut map = JoinMap::new();
Expand Down Expand Up @@ -376,3 +436,243 @@ async fn duplicate_keys_drop() {

assert!(map.join_next().await.is_none());
}

// JoinMap::spawn_local on a LocalRuntime
// We create a `LocalRuntime`, enter it, spawn several local tasks with
// `JoinMap::spawn_local`, and wait for every task to finish.
#[cfg(tokio_unstable)]
#[test]
fn spawn_local_local_runtime() {
const N: usize = 8;
let rt = LocalRuntime::new().unwrap();

rt.block_on(async {
let mut map = JoinMap::new();

spawn_index_tasks(&mut map, N, None);

assert!(map.join_next().now_or_never().is_none());

drain_joinmap_and_assert(map, N).await;
});
}

// Drive a `JoinMap` on top of a `LocalRuntime` and verify that tasks
// queued with `JoinMap::spawn_local_on` run to completion only after the
// `LocalSet` starts.
#[cfg(tokio_unstable)]
#[test]
fn spawn_local_on_local_runtime() {
const N: usize = 8;
let rt = LocalRuntime::new().unwrap();

rt.block_on(async {
let local = LocalSet::new();
let mut map = JoinMap::new();

spawn_index_tasks(&mut map, N, Some(&local));

assert!(map.join_next().now_or_never().is_none());

local
.run_until(async move {
drain_joinmap_and_assert(map, N).await;
})
.await;
});
}

// Calling `JoinMap::shutdown` inside a `LocalRuntime` must
// abort and drain every still-running task that was
// inserted with `JoinMap::spawn_local`.
#[cfg(tokio_unstable)]
#[test]
fn shutdown_spawn_local_local_runtime() {
const N: usize = 8;
let rt = LocalRuntime::new().unwrap();

rt.block_on(async {
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, None);

assert!(map.join_next().now_or_never().is_none());

map.shutdown().await;
assert!(map.is_empty());
await_receivers_and_assert(receivers).await;
});
}

// Dropping a `JoinMap` created inside a `LocalRuntime`
// must abort every still-running task that was
// added with `JoinMap::spawn_local`.
#[cfg(tokio_unstable)]
#[test]
fn drop_spawn_local_local_runtime() {
const N: usize = 8;
let rt = LocalRuntime::new().unwrap();

rt.block_on(async {
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, None);

assert!(map.join_next().now_or_never().is_none());

drop(map);

await_receivers_and_assert(receivers).await;
});
}

// JoinMap::spawn_local on a LocalSet.
// Every task is spawned with `spawn_local` **inside** the `LocalSet` that is
// currently running.
#[tokio::test(flavor = "current_thread")]
async fn spawn_local_running_localset() {
const N: usize = 8;
let local = LocalSet::new();

local
.run_until(async move {
let mut map = JoinMap::new();

spawn_index_tasks(&mut map, N, None);

drain_joinmap_and_assert(map, N).await;
})
.await;
}

// JoinMap::spawn_local on a LocalSet.
// Tasks are queued with `spawn_local_on` **before** the `LocalSet` is
// running, then executed once the `LocalSet` is started.
#[tokio::test(flavor = "current_thread")]
async fn spawn_local_on_on_started_localset() {
const N: usize = 8;
let local = LocalSet::new();
let mut pending_map = JoinMap::new();

spawn_index_tasks(&mut pending_map, N, Some(&local));

assert!(pending_map.join_next().now_or_never().is_none());

local
.run_until(async move {
drain_joinmap_and_assert(pending_map, N).await;
})
.await;
}

// Shutdown a `JoinMap` that was populated with `spawn_local` must abort all
// still-running tasks.
#[tokio::test(flavor = "current_thread")]
async fn shutdown_spawn_local_localset() {
const N: usize = 8;
let local = LocalSet::new();

local
.run_until(async {
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, None);

assert!(map.join_next().now_or_never().is_none());

map.shutdown().await;

assert!(map.is_empty());
await_receivers_and_assert(receivers).await;
})
.await;
}

// Calling `JoinMap::shutdown` on a set whose tasks were queued with
// `spawn_local_on` must abort all of them once the `LocalSet` is driven.
#[tokio::test(flavor = "current_thread")]
async fn shutdown_spawn_local_on() {
const N: usize = 8;
let local = LocalSet::new();
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));

assert!(map.join_next().now_or_never().is_none());

local
.run_until(async move {
map.shutdown().await;
assert!(map.is_empty());
await_receivers_and_assert(receivers).await;
})
.await;
}

// Dropping a `JoinMap` whose tasks were queued with `spawn_local_on`
// must cancel all of them once the `LocalSet` is subsequently driven.
#[tokio::test(flavor = "current_thread")]
async fn drop_spawn_local_on() {
const N: usize = 8;
let local = LocalSet::new();
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));

assert!(map.join_next().now_or_never().is_none());

drop(map);

local
.run_until(async move { await_receivers_and_assert(receivers).await })
.await;
}

// Dropping a `JoinMap` that was populated with `spawn_local` must abort all
// still-running tasks.
#[tokio::test(flavor = "current_thread")]
async fn drop_spawn_local_localset() {
const N: usize = 8;
let local = LocalSet::new();

local
.run_until(async {
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, None);

assert!(map.join_next().now_or_never().is_none());

drop(map);

await_receivers_and_assert(receivers).await;
})
.await;
}

// Dropping a `JoinMap` whose tasks were queued with `spawn_local_on`
// when the `LocalSet` is already driven.
#[tokio::test(flavor = "current_thread")]
async fn drop_spawn_local_on_running_localset() {
const N: usize = 8;
let local = LocalSet::new();
let mut map = JoinMap::new();
let mut receivers = Vec::new();

spawn_pending_tasks(&mut map, &mut receivers, N, Some(&local));

assert!(map.join_next().now_or_never().is_none());

local
.run_until(async move {
drop(map);
await_receivers_and_assert(receivers).await;
})
.await;
}
Loading