Skip to content

Commit de827b8

Browse files
committed
Add rtt to Client
1 parent 87d7f04 commit de827b8

File tree

4 files changed

+136
-4
lines changed

4 files changed

+136
-4
lines changed

.config/nats.dic

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,4 @@ ConnectError
133133
DNS
134134
RequestErrorKind
135135
rustls
136+
RttError

async-nats/src/client.rs

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use regex::Regex;
2323
use std::fmt::Display;
2424
use std::sync::atomic::{AtomicU64, Ordering};
2525
use std::sync::Arc;
26-
use std::time::Duration;
26+
use std::time::{Duration, Instant};
2727
use thiserror::Error;
2828
use tokio::sync::mpsc;
2929
use tracing::trace;
@@ -463,6 +463,51 @@ impl Client {
463463
Ok(())
464464
}
465465

466+
/// Calculates the round trip time between this client and the server,
467+
/// if the server is currently connected.
468+
///
469+
/// # Examples
470+
///
471+
/// ```no_run
472+
/// # #[tokio::main]
473+
/// # async fn main() -> Result<(), async_nats::Error> {
474+
/// let client = async_nats::connect("demo.nats.io").await?;
475+
/// let rtt = client.rtt().await?;
476+
/// println!("server rtt: {:?}", rtt);
477+
/// # Ok(())
478+
/// # }
479+
/// ```
480+
pub async fn rtt(&self) -> Result<Duration, RttError> {
481+
let start = Instant::now();
482+
483+
let (ping_tx, ping_rx) = tokio::sync::oneshot::channel();
484+
let (pong_tx, pong_rx) = tokio::sync::oneshot::channel();
485+
486+
self.sender
487+
.send(Command::Ping {
488+
ping_result: Some(ping_tx),
489+
pong_result: Some(pong_tx),
490+
})
491+
.await
492+
.map_err(|err| RttError::with_source(RttErrorKind::PingError, err))?;
493+
494+
ping_rx
495+
.await
496+
// first handle rx error
497+
.map_err(|err| RttError::with_source(RttErrorKind::PingError, err))?
498+
// second handle the atual ping error
499+
.map_err(|err| RttError::with_source(RttErrorKind::PingError, err))?;
500+
501+
pong_rx
502+
.await
503+
// first handle rx error
504+
.map_err(|err| RttError::with_source(RttErrorKind::PongError, err))?
505+
// second handle the actual pong error
506+
.map_err(|err| RttError::with_source(RttErrorKind::PongError, err))?;
507+
508+
Ok(start.elapsed())
509+
}
510+
466511
/// Returns the current state of the connection.
467512
///
468513
/// # Examples
@@ -688,3 +733,48 @@ impl From<SubscribeError> for RequestError {
688733
RequestError::with_source(RequestErrorKind::Other, e)
689734
}
690735
}
736+
737+
/// Error returned when doing a round-trip time measurement fails.
738+
/// To enumerate over the variants, call [RttError::kind].
739+
#[derive(Debug, Error)]
740+
pub struct RttError {
741+
kind: RttErrorKind,
742+
source: Option<Box<dyn std::error::Error + Send + Sync>>,
743+
}
744+
745+
impl Display for RttError {
746+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
747+
let source_info = self
748+
.source
749+
.as_ref()
750+
.map(|e| e.to_string())
751+
.unwrap_or_else(|| "no details".into());
752+
match self.kind {
753+
RttErrorKind::PingError => {
754+
write!(f, "failed to ping server: {}", source_info)
755+
}
756+
RttErrorKind::PongError => write!(f, "pong failed: {}", source_info),
757+
}
758+
}
759+
}
760+
761+
impl RttError {
762+
fn with_source<E>(kind: RttErrorKind, source: E) -> RttError
763+
where
764+
E: Into<Box<dyn std::error::Error + Send + Sync>>,
765+
{
766+
RttError {
767+
kind,
768+
source: Some(source.into()),
769+
}
770+
}
771+
pub fn kind(&self) -> RttErrorKind {
772+
self.kind
773+
}
774+
}
775+
776+
#[derive(Debug, PartialEq, Clone, Copy)]
777+
pub enum RttErrorKind {
778+
PingError,
779+
PongError,
780+
}

async-nats/src/lib.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,10 @@ pub enum Command {
255255
sid: u64,
256256
max: Option<u64>,
257257
},
258-
Ping,
258+
Ping {
259+
ping_result: Option<oneshot::Sender<Result<(), io::Error>>>,
260+
pong_result: Option<oneshot::Sender<Result<(), io::Error>>>,
261+
},
259262
Flush {
260263
result: oneshot::Sender<Result<(), io::Error>>,
261264
},
@@ -305,6 +308,7 @@ pub(crate) struct ConnectionHandler {
305308
info_sender: tokio::sync::watch::Sender<ServerInfo>,
306309
ping_interval: Interval,
307310
flush_interval: Interval,
311+
pending_pongs: HashMap<usize, oneshot::Sender<Result<(), io::Error>>>,
308312
}
309313

310314
impl ConnectionHandler {
@@ -330,6 +334,7 @@ impl ConnectionHandler {
330334
info_sender,
331335
ping_interval,
332336
flush_interval,
337+
pending_pongs: HashMap::new(),
333338
}
334339
}
335340

@@ -397,6 +402,13 @@ impl ConnectionHandler {
397402
}
398403
ServerOp::Pong => {
399404
debug!("received PONG");
405+
406+
while let Some(sender) = self.pending_pongs.remove(&self.pending_pings) {
407+
sender.send(Ok(())).map_err(|_| {
408+
io::Error::new(io::ErrorKind::Other, "one shot failed to be received")
409+
})?;
410+
}
411+
400412
self.pending_pings = self.pending_pings.saturating_sub(1);
401413
}
402414
ServerOp::Error(error) => {
@@ -508,7 +520,10 @@ impl ConnectionHandler {
508520
}
509521
}
510522
}
511-
Command::Ping => {
523+
Command::Ping {
524+
ping_result,
525+
pong_result,
526+
} => {
512527
debug!(
513528
"PING command. Pending pings {}, max pings {}",
514529
self.pending_pings, self.max_pings
@@ -524,8 +539,23 @@ impl ConnectionHandler {
524539
self.handle_disconnect().await?;
525540
}
526541

527-
if let Err(_err) = self.connection.write_op(&ClientOp::Ping).await {
542+
if let Err(err) = self.connection.write_op(&ClientOp::Ping).await {
528543
self.handle_disconnect().await?;
544+
545+
if let Some(ping_result) = ping_result {
546+
ping_result.send(Err(err)).map_err(|_| {
547+
io::Error::new(io::ErrorKind::Other, "one shot failed to be received")
548+
})?;
549+
}
550+
} else if let Some(ping_result) = ping_result {
551+
if let Some(pong_result) = pong_result {
552+
// Use this channel to return back a PONG
553+
self.pending_pongs.insert(self.pending_pings, pong_result);
554+
}
555+
556+
ping_result.send(Ok(())).map_err(|_| {
557+
io::Error::new(io::ErrorKind::Other, "one shot failed to be received")
558+
})?;
529559
}
530560

531561
self.handle_flush().await?;

async-nats/tests/client_tests.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,4 +764,15 @@ mod client {
764764
drop(servers.remove(0));
765765
rx.recv().await;
766766
}
767+
768+
#[tokio::test]
769+
async fn rtt() {
770+
let server = nats_server::run_basic_server();
771+
let client = async_nats::connect(server.client_url()).await.unwrap();
772+
773+
let rtt = client.rtt().await.unwrap();
774+
775+
println!("rtt: {:?}", rtt);
776+
assert!(rtt.as_nanos() > 0);
777+
}
767778
}

0 commit comments

Comments
 (0)