Skip to content

Fixed AsyncTransition #3881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions any_spawner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,8 @@ fn handle_uninitialized_spawn(_fut: PinnedFuture<()>) {
#[cfg(all(debug_assertions, not(feature = "tracing")))]
{
panic!(
"At {}, tried to spawn a Future with Executor::spawn() before a \
global executor was initialized.",
caller
"At {caller}, tried to spawn a Future with Executor::spawn() before a \
global executor was initialized."
);
}
// In release builds (without tracing), call the specific no-op function.
Expand Down Expand Up @@ -503,9 +502,8 @@ fn handle_uninitialized_spawn_local(_fut: PinnedLocalFuture<()>) {
#[cfg(all(debug_assertions, not(feature = "tracing")))]
{
panic!(
"At {}, tried to spawn a Future with Executor::spawn_local() \
before a global executor was initialized.",
caller
"At {caller}, tried to spawn a Future with Executor::spawn_local() \
before a global executor was initialized."
);
}
// In release builds (without tracing), call the specific no-op function (which usually panics).
Expand Down
2 changes: 1 addition & 1 deletion integrations/actix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ fn static_path(options: &LeptosOptions, path: &str) -> String {
// If the path ends with a trailing slash, we generate the path
// as a directory with a index.html file inside.
if path != "/" && path.ends_with("/") {
static_file_path(options, &format!("{}index", path))
static_file_path(options, &format!("{path}index"))
} else {
static_file_path(options, path)
}
Expand Down
2 changes: 1 addition & 1 deletion integrations/axum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1543,7 +1543,7 @@ fn static_path(options: &LeptosOptions, path: &str) -> String {
// If the path ends with a trailing slash, we generate the path
// as a directory with a index.html file inside.
if path != "/" && path.ends_with("/") {
static_file_path(options, &format!("{}index", path))
static_file_path(options, &format!("{path}index"))
} else {
static_file_path(options, path)
}
Expand Down
2 changes: 1 addition & 1 deletion meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ where
tracing::warn!("{}", msg);

#[cfg(not(feature = "tracing"))]
eprintln!("{}", msg);
eprintln!("{msg}");
}
}

Expand Down
2 changes: 1 addition & 1 deletion reactive_graph/src/effect/immediate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ mod inner {
const MSG: &str = "ImmediateEffect recursed more than once.";
match effect.defined_at() {
Some(defined_at) => {
log_warning(format_args!("{MSG} Defined at: {}", defined_at));
log_warning(format_args!("{MSG} Defined at: {defined_at}"));
}
None => {
log_warning(format_args!("{MSG}"));
Expand Down
2 changes: 1 addition & 1 deletion reactive_graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ pub fn log_warning(text: Arguments) {
not(all(target_arch = "wasm32", target_os = "unknown"))
))]
{
eprintln!("{}", text);
eprintln!("{text}");
}
}

Expand Down
4 changes: 2 additions & 2 deletions reactive_graph/src/send_wrapper_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ impl<T: Debug> Debug for SendOption<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match &self.inner {
Inner::Threadsafe(value) => {
write!(f, "SendOption::Threadsafe({:?})", value)
write!(f, "SendOption::Threadsafe({value:?})")
}
Inner::Local(value) => {
write!(f, "SendOption::Local({:?})", value)
write!(f, "SendOption::Local({value:?})")
}
}
}
Expand Down
93 changes: 62 additions & 31 deletions reactive_graph/src/transition.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,53 @@
//! Utilities to wait for asynchronous primitives to resolve.

use futures::{channel::oneshot, future::join_all};
use or_poisoned::OrPoisoned;
use std::{
future::Future,
sync::{mpsc, OnceLock, RwLock},
};
use pin_project_lite::pin_project;
use std::{cell::RefCell, future::Future, sync::mpsc};

static TRANSITION: OnceLock<RwLock<Option<TransitionInner>>> = OnceLock::new();
thread_local! {
static TRANSITION: RefCell<Option<TransitionInner>> = const { RefCell::new(None) };
}

fn global_transition() -> &'static RwLock<Option<TransitionInner>> {
TRANSITION.get_or_init(|| RwLock::new(None))
/// A Drop guard is needed because drop is called even in case of a panic
struct TransitionGuard<'a>(&'a mut Option<TransitionInner>);
impl<'a> TransitionGuard<'a> {
fn new(value: &'a mut Option<TransitionInner>) -> Self {
TRANSITION.with(|transaction| {
std::mem::swap(&mut *transaction.borrow_mut(), value)
});
Self(value)
}
}
impl Drop for TransitionGuard<'_> {
fn drop(&mut self) {
TRANSITION.with(|transaction| {
std::mem::swap(&mut *transaction.borrow_mut(), self.0)
});
}
}

// A future wrapper, to use in async functions
pin_project! {
struct WithTransition<Fut>{
transition: Option<TransitionInner>,
#[pin]
inner: Fut
}
}
impl<Fut> Future for WithTransition<Fut>
where
Fut: Future,
{
type Output = <Fut as Future>::Output;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let this = self.project();
let _guard = TransitionGuard::new(this.transition);
this.inner.poll(cx)
}
}

#[derive(Debug, Clone)]
Expand All @@ -35,36 +72,30 @@ impl AsyncTransition {
T: Future<Output = U>,
{
let (tx, rx) = mpsc::channel();
let global_transition = global_transition();
let inner = TransitionInner { tx };
let prev = Option::replace(
&mut *global_transition.write().or_poisoned(),
inner.clone(),
);
let value = action().await;
_ = std::mem::replace(
&mut *global_transition.write().or_poisoned(),
prev,
);
let transition = Some(TransitionInner { tx });
let value = WithTransition {
transition,
inner: action(),
}
.await;

let mut pending = Vec::new();
while let Ok(tx) = rx.try_recv() {
// This should never block since all tx instances have been dropped
while let Ok(tx) = rx.recv() {
pending.push(tx);
}
join_all(pending).await;
value
}

pub(crate) fn register(rx: oneshot::Receiver<()>) {
if let Some(tx) = global_transition()
.read()
.or_poisoned()
.as_ref()
.map(|n| &n.tx)
{
// if it's an Err, that just means the Receiver was dropped
// i.e., the transition is no longer listening, in which case it doesn't matter if we
// successfully register with it or not
_ = tx.send(rx);
}
TRANSITION.with_borrow(|transition| {
if let Some(transition) = transition {
// if it's an Err, that just means the Receiver was dropped
// i.e., the transition is no longer listening, in which case it doesn't matter if we
// successfully register with it or not
_ = transition.tx.send(rx);
}
})
}
}
4 changes: 2 additions & 2 deletions reactive_stores_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ fn variant_to_tokens(
let field_ident = field.ident.as_ref().unwrap();
let field_ty = &field.ty;
let combined_ident = Ident::new(
&format!("{}_{}", ident, field_ident),
&format!("{ident}_{field_ident}"),
field_ident.span(),
);

Expand Down Expand Up @@ -481,7 +481,7 @@ fn variant_to_tokens(
let field_ident = idx;
let field_ty = &field.ty;
let combined_ident = Ident::new(
&format!("{}_{}", ident, field_ident),
&format!("{ident}_{field_ident}"),
ident.span(),
);

Expand Down
6 changes: 3 additions & 3 deletions server_fn/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ pub mod reqwest {
let mut websocket_server_url = get_server_url().to_string();
if let Some(postfix) = websocket_server_url.strip_prefix("http://")
{
websocket_server_url = format!("ws://{}", postfix);
websocket_server_url = format!("ws://{postfix}");
} else if let Some(postfix) =
websocket_server_url.strip_prefix("https://")
{
websocket_server_url = format!("wss://{}", postfix);
websocket_server_url = format!("wss://{postfix}");
}
let url = format!("{}{}", websocket_server_url, path);
let url = format!("{websocket_server_url}{path}");
let (ws_stream, _) =
tokio_tungstenite::connect_async(url).await.map_err(|e| {
Error::from_server_fn_error(ServerFnErrorErr::Request(
Expand Down
2 changes: 1 addition & 1 deletion server_fn/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ where

fn decode(bytes: Bytes) -> Result<ServerFnError<CustErr>, Self::Error> {
let data = String::from_utf8(bytes.to_vec())
.map_err(|err| format!("UTF-8 conversion error: {}", err))?;
.map_err(|err| format!("UTF-8 conversion error: {err}"))?;

data.split_once('|')
.ok_or_else(|| {
Expand Down
4 changes: 2 additions & 2 deletions tachys/src/ssr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl StreamBuilder {
self.sync_buf.reserve(11 + (id.len() * 2));
self.sync_buf.push_str("<!--s-");
for piece in id {
write!(&mut self.sync_buf, "{}-", piece).unwrap();
write!(&mut self.sync_buf, "{piece}-").unwrap();
}
if opening {
self.sync_buf.push_str("o-->");
Expand Down Expand Up @@ -206,7 +206,7 @@ impl StreamBuilder {
let mut id = String::new();
if let Some(ids) = &subbuilder.id {
for piece in ids {
write!(&mut id, "{}-", piece).unwrap();
write!(&mut id, "{piece}-").unwrap();
}
}
if let Some(id) = subbuilder.id.as_mut() {
Expand Down