@@ -168,7 +168,10 @@ impl SubgraphExecutor for TimeoutExecutor {
168168
169169#[ cfg( test) ]
170170mod tests {
171+ use std:: time:: Duration ;
172+
171173 use async_trait:: async_trait;
174+ use axum:: { extract:: State , http:: Response , Router } ;
172175 use hive_router_config:: parse_yaml_config;
173176 use http:: Method ;
174177 use ntex_http:: HeaderMap ;
@@ -177,6 +180,7 @@ mod tests {
177180 execution:: plan:: { ClientRequestDetails , OperationDetails } ,
178181 executors:: {
179182 common:: { HttpExecutionRequest , HttpExecutionResponse , SubgraphExecutor } ,
183+ map:: from_traffic_shaping_config_to_client,
180184 timeout:: TimeoutExecutor ,
181185 } ,
182186 } ;
@@ -291,4 +295,134 @@ mod tests {
291295 let duration = timeout_executor. get_timeout_duration ( & client_request) ;
292296 assert_eq ! ( duration, Some ( std:: time:: Duration :: from_millis( 7000 ) ) ) ;
293297 }
298+
299+ #[ tokio:: test]
300+ async fn cancels_http_request_when_timeout_expires ( ) {
301+ /**
302+ * We will test here that when the timeout expires, the request is cancelled on the server-end as well.
303+ * For that, we will create a server that sets a flag when the request is dropped/cancelled.
304+ */
305+ use std:: sync:: Arc ;
306+
307+ use http:: Method ;
308+
309+ let ( tx, mut rx) = tokio:: sync:: broadcast:: channel ( 16 ) ;
310+
311+ struct AppState {
312+ tx : Arc < tokio:: sync:: broadcast:: Sender < Duration > > ,
313+ }
314+
315+ let app_state = AppState { tx : Arc :: new ( tx) } ;
316+
317+ let app_state_arc = Arc :: new ( app_state) ;
318+
319+ struct CancelOnDrop {
320+ start : std:: time:: Instant ,
321+ tx : Arc < tokio:: sync:: broadcast:: Sender < Duration > > ,
322+ }
323+
324+ impl Drop for CancelOnDrop {
325+ fn drop ( & mut self ) {
326+ self . tx . send ( self . start . elapsed ( ) ) . unwrap ( ) ;
327+ }
328+ }
329+
330+ #[ axum:: debug_handler]
331+ async fn handler ( State ( state) : State < Arc < AppState > > ) -> Response < String > {
332+ let _cancel_on_drop = CancelOnDrop {
333+ start : std:: time:: Instant :: now ( ) ,
334+ tx : state. tx . clone ( ) ,
335+ } ;
336+ // Never resolve the request, just wait until it's cancelled
337+ let fut = futures:: future:: pending :: < Response < String > > ( ) ;
338+ fut. await
339+ }
340+
341+ println ! ( "Starting server..." ) ;
342+ let app = Router :: new ( )
343+ . fallback ( handler)
344+ . with_state ( app_state_arc. clone ( ) ) ;
345+ println ! ( "Router created, binding to port..." ) ;
346+ let listener = tokio:: net:: TcpListener :: bind ( "0.0.0.0:0" ) . await . unwrap ( ) ;
347+ println ! ( "Listener bound, starting server..." ) ;
348+ let addr = listener. local_addr ( ) . unwrap ( ) ;
349+ tokio:: spawn ( async move {
350+ if let Err ( e) = axum:: serve ( listener, app) . await {
351+ eprintln ! ( "Server error: {}" , e) ;
352+ }
353+ } ) ;
354+ println ! ( "Server started on {}" , addr) ;
355+ let graphql_path = "graphql" ;
356+ let endpoint: http:: Uri = format ! ( "http://{}/{}" , addr, graphql_path) . parse ( ) . unwrap ( ) ;
357+ println ! ( "Endpoint: {}" , endpoint) ;
358+
359+ let config = r#"
360+ traffic_shaping:
361+ all:
362+ timeout:
363+ duration: 5s
364+ "# ;
365+
366+ let config = hive_router_config:: parse_yaml_config ( config. to_string ( ) ) . unwrap ( ) ;
367+ let http_client = from_traffic_shaping_config_to_client ( & config. traffic_shaping . all ) ;
368+ let http_executor = crate :: executors:: http:: HTTPSubgraphExecutor :: new (
369+ endpoint. clone ( ) ,
370+ http_client,
371+ Arc :: new ( tokio:: sync:: Semaphore :: new ( 10 ) ) ,
372+ Arc :: new ( config. traffic_shaping . all . clone ( ) ) ,
373+ Default :: default ( ) ,
374+ ) ;
375+ let timeout_executor = TimeoutExecutor :: try_new (
376+ endpoint,
377+ & config. traffic_shaping . all . timeout . unwrap ( ) ,
378+ http_executor. to_boxed_arc ( ) ,
379+ )
380+ . unwrap ( ) ;
381+
382+ let headers = HeaderMap :: new ( ) ;
383+ let client_request = ClientRequestDetails {
384+ operation : OperationDetails {
385+ name : Some ( "TestQuery" . to_string ( ) ) ,
386+ kind : "query" ,
387+ query : "query TestQuery { field }" . into ( ) ,
388+ } ,
389+ url : "http://example.com/graphql" . parse ( ) . unwrap ( ) ,
390+ headers : & headers,
391+ method : Method :: POST ,
392+ } ;
393+
394+ let execution_request = HttpExecutionRequest {
395+ operation_name : Some ( "TestQuery" ) ,
396+ query : r#"{ field }"# ,
397+ variables : None ,
398+ representations : None ,
399+ headers : http:: HeaderMap :: new ( ) ,
400+ client_request : & client_request,
401+ dedupe : true ,
402+ } ;
403+
404+ println ! ( "Sending request to executor with 5s timeout..." ) ;
405+ let response = timeout_executor. execute ( execution_request) . await ;
406+
407+ println ! ( "Received response from executor." ) ;
408+ assert ! (
409+ response
410+ . body
411+ . starts_with( b"{\" errors\" :[{\" message\" :\" Failed to execute request to subgraph" ) ,
412+ "Expected error response due to timeout"
413+ ) ;
414+
415+ println ! ( "Waiting to see if server was notified of cancellation..." ) ;
416+
417+ // Wait for the server to be notified that the request was cancelled
418+ let elapsed = rx. recv ( ) . await . unwrap ( ) ;
419+ println ! ( "Server was notified of cancellation after {:?}" , elapsed) ;
420+ assert ! (
421+ elapsed >= Duration :: from_secs_f32( 4.9 ) ,
422+ "Expected server to be notified of cancellation after at least 5s, but was {:?}" ,
423+ elapsed
424+ ) ;
425+
426+ println ! ( "Test completed." ) ;
427+ }
294428}
0 commit comments