@@ -23,7 +23,7 @@ use regex::Regex;
2323use std:: fmt:: Display ;
2424use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
2525use std:: sync:: Arc ;
26- use std:: time:: Duration ;
26+ use std:: time:: { Duration , Instant } ;
2727use thiserror:: Error ;
2828use tokio:: sync:: mpsc;
2929use tracing:: trace;
@@ -463,6 +463,51 @@ impl Client {
463463 Ok ( ( ) )
464464 }
465465
466+ /// Calculates the round trip time between this client and the server,
467+ /// if the server is currently connected.
468+ ///
469+ /// # Examples
470+ ///
471+ /// ```no_run
472+ /// # #[tokio::main]
473+ /// # async fn main() -> Result<(), async_nats::Error> {
474+ /// let client = async_nats::connect("demo.nats.io").await?;
475+ /// let rtt = client.rtt().await?;
476+ /// println!("server rtt: {:?}", rtt);
477+ /// # Ok(())
478+ /// # }
479+ /// ```
480+ pub async fn rtt ( & self ) -> Result < Duration , RttError > {
481+ let start = Instant :: now ( ) ;
482+
483+ let ( ping_tx, ping_rx) = tokio:: sync:: oneshot:: channel ( ) ;
484+ let ( pong_tx, pong_rx) = tokio:: sync:: oneshot:: channel ( ) ;
485+
486+ self . sender
487+ . send ( Command :: Ping {
488+ ping_result : Some ( ping_tx) ,
489+ pong_result : Some ( pong_tx) ,
490+ } )
491+ . await
492+ . map_err ( |err| RttError :: with_source ( RttErrorKind :: PingError , err) ) ?;
493+
494+ ping_rx
495+ . await
496+ // first handle rx error
497+ . map_err ( |err| RttError :: with_source ( RttErrorKind :: PingError , err) ) ?
498+ // second handle the atual ping error
499+ . map_err ( |err| RttError :: with_source ( RttErrorKind :: PingError , err) ) ?;
500+
501+ pong_rx
502+ . await
503+ // first handle rx error
504+ . map_err ( |err| RttError :: with_source ( RttErrorKind :: PongError , err) ) ?
505+ // second handle the actual pong error
506+ . map_err ( |err| RttError :: with_source ( RttErrorKind :: PongError , err) ) ?;
507+
508+ Ok ( start. elapsed ( ) )
509+ }
510+
466511 /// Returns the current state of the connection.
467512 ///
468513 /// # Examples
@@ -688,3 +733,48 @@ impl From<SubscribeError> for RequestError {
688733 RequestError :: with_source ( RequestErrorKind :: Other , e)
689734 }
690735}
736+
737+ /// Error returned when doing a round-trip time measurement fails.
738+ /// To enumerate over the variants, call [RttError::kind].
739+ #[ derive( Debug , Error ) ]
740+ pub struct RttError {
741+ kind : RttErrorKind ,
742+ source : Option < Box < dyn std:: error:: Error + Send + Sync > > ,
743+ }
744+
745+ impl Display for RttError {
746+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
747+ let source_info = self
748+ . source
749+ . as_ref ( )
750+ . map ( |e| e. to_string ( ) )
751+ . unwrap_or_else ( || "no details" . into ( ) ) ;
752+ match self . kind {
753+ RttErrorKind :: PingError => {
754+ write ! ( f, "failed to ping server: {}" , source_info)
755+ }
756+ RttErrorKind :: PongError => write ! ( f, "pong failed: {}" , source_info) ,
757+ }
758+ }
759+ }
760+
761+ impl RttError {
762+ fn with_source < E > ( kind : RttErrorKind , source : E ) -> RttError
763+ where
764+ E : Into < Box < dyn std:: error:: Error + Send + Sync > > ,
765+ {
766+ RttError {
767+ kind,
768+ source : Some ( source. into ( ) ) ,
769+ }
770+ }
771+ pub fn kind ( & self ) -> RttErrorKind {
772+ self . kind
773+ }
774+ }
775+
776+ #[ derive( Debug , PartialEq , Clone , Copy ) ]
777+ pub enum RttErrorKind {
778+ PingError ,
779+ PongError ,
780+ }
0 commit comments