Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use crate::runtime::handle::Handle;
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
#[cfg(tokio_unstable)]
use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
use crate::runtime::{
metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta, TaskSpawnCallback,
UserData,
};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use crate::runtime::blocking::BlockingPool;
Expand Down Expand Up @@ -89,6 +92,9 @@ pub struct Builder {
pub(super) after_unpark: Option<Callback>,

/// To run before each task is spawned.
#[cfg(tokio_unstable)]
pub(super) before_spawn: Option<TaskSpawnCallback>,
#[cfg(not(tokio_unstable))]
pub(super) before_spawn: Option<TaskCallback>,

/// To run before each poll
Expand Down Expand Up @@ -731,8 +737,15 @@ impl Builder {
/// Executes function `f` just before a task is spawned.
///
/// `f` is called within the Tokio context, so functions like
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
/// invoked immediately.
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback
/// being invoked immediately.
///
/// `f` must return an `Option<&'static dyn Any>`. A value returned by this callback
/// is attached to the task and can be retrieved using [`TaskMeta::get_data`] in
/// subsequent calls to other hooks for this task such as
/// [`on_before_task_poll`](crate::runtime::Builder::on_before_task_poll),
/// [`on_after_task_poll`](crate::runtime::Builder::on_after_task_poll), and
/// [`on_task_terminate`](crate::runtime::Builder::on_task_terminate).
///
/// This can be used for bookkeeping or monitoring purposes.
///
Expand All @@ -755,6 +768,7 @@ impl Builder {
/// let runtime = runtime::Builder::new_current_thread()
/// .on_task_spawn(|_| {
/// println!("spawning task");
/// None
/// })
/// .build()
/// .unwrap();
Expand All @@ -768,11 +782,53 @@ impl Builder {
/// })
/// # }
/// ```
///
/// ```
/// # use tokio::runtime;
/// # use std::sync::atomic::{AtomicUsize, Ordering};
/// # pub fn main() {
/// struct YieldingTaskMetadata {
/// pub yield_count: AtomicUsize,
/// }
/// let runtime = runtime::Builder::new_current_thread()
/// .on_task_spawn(|meta| {
/// println!("spawning task {}", meta.id());
/// let meta = Box::new(YieldingTaskMetadata { yield_count: AtomicUsize::new(0) });
/// Some(Box::leak(meta) as &dyn std::any::Any)
/// })
/// .on_after_task_poll(|meta| {
/// if let Some(data) = meta.get_data().and_then(|data| data.downcast_ref::<YieldingTaskMetadata>()) {
/// println!("task {} yield count: {}", meta.id(), data.yield_count.fetch_add(1, Ordering::Relaxed));
/// }
/// })
/// .on_task_terminate(|meta| {
/// match meta.get_data().and_then(|data| data.downcast_ref::<YieldingTaskMetadata>()) {
/// Some(data) => {
/// let yield_count = data.yield_count.load(Ordering::Relaxed);
/// println!("task {} total yield count: {}", meta.id(), yield_count);
/// assert!(yield_count == 64);
/// },
/// None => panic!("task has missing or incorrect user data"),
/// }
/// })
/// .build()
/// .unwrap();
///
/// runtime.block_on(async {
/// let _ = tokio::task::spawn(async {
/// for _ in 0..64 {
/// println!("yielding");
/// tokio::task::yield_now().await;
/// }
/// }).await.unwrap();
/// })
/// # }
/// ```
#[cfg(all(not(loom), tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
where
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
F: Fn(&TaskMeta<'_>) -> UserData + Send + Sync + 'static,
{
self.before_spawn = Some(std::sync::Arc::new(f));
self
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
any(not(all(tokio_unstable, feature = "full")), target_family = "wasm"),
allow(dead_code)
)]
#[cfg(tokio_unstable)]
use crate::runtime::TaskSpawnCallback;
use crate::runtime::{Callback, TaskCallback};
use crate::util::RngSeedGenerator;

Expand All @@ -19,6 +21,9 @@ pub(crate) struct Config {
pub(crate) after_unpark: Option<Callback>,

/// To run before each task is spawned.
#[cfg(tokio_unstable)]
pub(crate) before_spawn: Option<TaskSpawnCallback>,
#[cfg(not(tokio_unstable))]
pub(crate) before_spawn: Option<TaskCallback>,

/// To run after each task is terminated.
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ cfg_rt! {

mod task_hooks;
pub(crate) use task_hooks::{TaskHooks, TaskCallback};
#[cfg(tokio_unstable)]
pub(crate) use task_hooks::{TaskSpawnCallback, UserData};
cfg_unstable! {
pub use task_hooks::TaskMeta;
}
Expand Down
69 changes: 59 additions & 10 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::runtime::scheduler::{self, Defer, Inject};
use crate::runtime::task::{
self, JoinHandle, OwnedTasks, Schedule, SpawnLocation, Task, TaskHarnessScheduleHooks,
};
#[cfg(tokio_unstable)]
use crate::runtime::UserData;
use crate::runtime::{
blocking, context, Config, MetricsBatch, SchedulerMetrics, TaskHooks, TaskMeta, WorkerMetrics,
};
Expand Down Expand Up @@ -456,13 +458,49 @@ impl Handle {
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);
Self::spawn_with_user_data(
me,
future,
id,
spawned_at,
#[cfg(tokio_unstable)]
None,
)
}

me.task_hooks.spawn(&TaskMeta {
#[track_caller]
pub(crate) fn spawn_with_user_data<F>(
me: &Arc<Self>,
future: F,
id: crate::runtime::task::Id,
spawned_at: SpawnLocation,
#[cfg(tokio_unstable)] user_data: UserData,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
let task_meta = TaskMeta {
id,
spawned_at,
#[cfg(tokio_unstable)]
user_data,
_phantom: Default::default(),
});
};

#[cfg(not(tokio_unstable))]
{
me.task_hooks.spawn(&task_meta);
}

let (handle, notified) = me.shared.owned.bind(
future,
me.clone(),
id,
spawned_at,
#[cfg(tokio_unstable)]
me.task_hooks.spawn(&task_meta),
);

if let Some(notified) = notified {
me.schedule(notified);
Expand All @@ -488,16 +526,27 @@ impl Handle {
F: crate::future::Future + 'static,
F::Output: 'static,
{
let (handle, notified) = me
.shared
.owned
.bind_local(future, me.clone(), id, spawned_at);

me.task_hooks.spawn(&TaskMeta {
let task_meta = TaskMeta {
id,
spawned_at,
#[cfg(tokio_unstable)]
user_data: None,
_phantom: Default::default(),
});
};

#[cfg(not(tokio_unstable))]
{
me.task_hooks.spawn(&task_meta);
}

let (handle, notified) = me.shared.owned.bind_local(
future,
me.clone(),
id,
spawned_at,
#[cfg(tokio_unstable)]
me.task_hooks.spawn(&task_meta),
);

if let Some(notified) = notified {
me.schedule(notified);
Expand Down
16 changes: 16 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ impl Handle {
cfg_rt! {
use crate::future::Future;
use crate::loom::sync::Arc;
#[cfg(tokio_unstable)]
use crate::runtime::UserData;
use crate::runtime::{blocking, task::{Id, SpawnLocation}};
use crate::runtime::context;
use crate::task::JoinHandle;
Expand Down Expand Up @@ -130,6 +132,20 @@ cfg_rt! {
}
}

#[cfg(tokio_unstable)]
pub(crate) fn spawn_with_user_data<F>(&self, future: F, id: Id, spawned_at: SpawnLocation, user_data: UserData) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Handle::CurrentThread(h) => current_thread::Handle::spawn_with_user_data(h, future, id, spawned_at, user_data),

#[cfg(feature = "rt-multi-thread")]
Handle::MultiThread(h) => multi_thread::Handle::spawn_with_user_data(h, future, id, spawned_at, user_data),
}
}

/// Spawn a local task
///
/// # Safety
Expand Down
50 changes: 45 additions & 5 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread::worker;
use crate::runtime::task::{Notified, Task, TaskHarnessScheduleHooks};
#[cfg(tokio_unstable)]
use crate::runtime::task_hooks::UserData;
use crate::runtime::{
blocking, driver,
task::{self, JoinHandle, SpawnLocation},
Expand Down Expand Up @@ -47,7 +49,30 @@ impl Handle {
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id, spawned_at)
Self::bind_new_task(
me,
future,
id,
spawned_at,
#[cfg(tokio_unstable)]
None,
)
}

/// Spawns a future with user data onto the thread pool
#[cfg(tokio_unstable)]
pub(crate) fn spawn_with_user_data<F>(
me: &Arc<Self>,
future: F,
id: task::Id,
spawned_at: SpawnLocation,
user_data: UserData,
) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id, spawned_at, user_data)
}

pub(crate) fn shutdown(&self) {
Expand All @@ -60,18 +85,33 @@ impl Handle {
future: T,
id: task::Id,
spawned_at: SpawnLocation,
#[cfg(tokio_unstable)] user_data: UserData,
) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id, spawned_at);

me.task_hooks.spawn(&TaskMeta {
let task_meta = TaskMeta {
id,
spawned_at,
#[cfg(tokio_unstable)]
user_data,
_phantom: Default::default(),
});
};

#[cfg(not(tokio_unstable))]
{
me.task_hooks.spawn(&task_meta);
}

let (handle, notified) = me.shared.owned.bind(
future,
me.clone(),
id,
spawned_at,
#[cfg(tokio_unstable)]
me.task_hooks.spawn(&task_meta),
);

me.schedule_option_task_without_yield(notified);

Expand Down
Loading
Loading