Skip to content

Commit 3066eae

Browse files
authored
feat(executor): include subgraph name and code to the errors (#477)
For the propagated errors, the router adds a code to the extensions with the value `DOWNSTREAM_SERVICE_ERROR`. And all errors related to a subgraph has `serviceName` extension. This `extensions.code` and `extensions.serviceName` are a de-facto standard in the routers.
1 parent 30e643d commit 3066eae

File tree

5 files changed

+185
-65
lines changed

5 files changed

+185
-65
lines changed

bin/router/src/pipeline/error.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ use std::sync::Arc;
22

33
use graphql_tools::validation::utils::ValidationError;
44
use hive_router_plan_executor::{
5-
execution::error::PlanExecutionError, response::graphql_error::GraphQLError,
5+
execution::error::PlanExecutionError,
6+
response::graphql_error::{GraphQLError, GraphQLErrorExtensions},
67
};
78
use hive_router_query_planner::{
89
ast::normalization::error::NormalizationError, planner::PlannerError,
@@ -13,7 +14,6 @@ use ntex::{
1314
web::{self, error::QueryPayloadError, HttpRequest},
1415
};
1516
use serde::{Deserialize, Serialize};
16-
use sonic_rs::{object, Value};
1717

1818
use crate::pipeline::header::{RequestAccepts, APPLICATION_GRAPHQL_RESPONSE_JSON_STR};
1919

@@ -154,12 +154,10 @@ impl PipelineError {
154154
let code = self.error.graphql_error_code();
155155
let message = self.error.graphql_error_message();
156156

157-
let graphql_error = GraphQLError {
158-
extensions: Some(Value::from_iter(&object! {"code": code.to_string()})),
157+
let graphql_error = GraphQLError::from_message_and_extensions(
159158
message,
160-
path: None,
161-
locations: None,
162-
};
159+
GraphQLErrorExtensions::new_from_code(code),
160+
);
163161

164162
let result = FailedExecutionResult {
165163
errors: Some(vec![graphql_error]),

lib/executor/src/context.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,20 @@ impl<'a> ExecutionContext<'a> {
4444

4545
pub fn handle_errors(
4646
&mut self,
47+
subgraph_name: &str,
4748
errors: Option<Vec<GraphQLError>>,
4849
entity_index_error_map: Option<HashMap<&usize, Vec<GraphQLErrorPath>>>,
4950
) {
5051
if let Some(response_errors) = errors {
5152
for response_error in response_errors {
53+
let response_error_with_subgraph_name =
54+
response_error.add_subgraph_name(subgraph_name);
5255
if let Some(entity_index_error_map) = &entity_index_error_map {
53-
let normalized_errors =
54-
response_error.normalize_entity_error(entity_index_error_map);
56+
let normalized_errors = response_error_with_subgraph_name
57+
.normalize_entity_error(entity_index_error_map);
5558
self.errors.extend(normalized_errors);
5659
} else {
57-
self.errors.push(response_error);
60+
self.errors.push(response_error_with_subgraph_name);
5861
}
5962
}
6063
}

lib/executor/src/execution/plan.rs

Lines changed: 109 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{
3636
response::project_by_operation,
3737
},
3838
response::{
39-
graphql_error::{GraphQLError, GraphQLErrorPath},
39+
graphql_error::{GraphQLError, GraphQLErrorExtensions, GraphQLErrorPath},
4040
merge::deep_merge,
4141
subgraph_response::SubgraphResponse,
4242
value::Value,
@@ -260,12 +260,14 @@ impl<'exec> Executor<'exec> {
260260
match self.execute_fetch_node(node, None).await {
261261
Ok(result) => self.process_job_result(ctx, result),
262262
Err(err) => {
263-
ctx.errors.push(GraphQLError {
264-
message: err.to_string(),
265-
locations: None,
266-
path: None,
267-
extensions: None,
268-
});
263+
let extensions = GraphQLErrorExtensions::new_from_code_and_service_name(
264+
"PLAN_EXECUTION_ERROR",
265+
&node.service_name,
266+
);
267+
ctx.errors.push(GraphQLError::from_message_and_extensions(
268+
err.to_string(),
269+
extensions,
270+
));
269271
Ok(())
270272
}
271273
}
@@ -302,12 +304,10 @@ impl<'exec> Executor<'exec> {
302304
Ok(job) => {
303305
self.process_job_result(ctx, job)?;
304306
}
305-
Err(err) => ctx.errors.push(GraphQLError {
306-
message: err.to_string(),
307-
locations: None,
308-
path: None,
309-
extensions: None,
310-
}),
307+
Err(err) => ctx.errors.push(GraphQLError::from_message_and_extensions(
308+
err.to_string(),
309+
GraphQLErrorExtensions::new_from_code("PLAN_EXECUTION_FAILED"),
310+
)),
311311
}
312312
}
313313

@@ -324,12 +324,13 @@ impl<'exec> Executor<'exec> {
324324
Ok(job) => {
325325
self.process_job_result(ctx, job)?;
326326
}
327-
Err(err) => ctx.errors.push(GraphQLError {
328-
message: err.to_string(),
329-
locations: None,
330-
path: None,
331-
extensions: None,
332-
}),
327+
Err(err) => ctx.errors.push(GraphQLError::from_message_and_extensions(
328+
err.to_string(),
329+
GraphQLErrorExtensions::new_from_code_and_service_name(
330+
"PLAN_EXECUTION_FAILED",
331+
&fetch_node.service_name,
332+
),
333+
)),
333334
},
334335
PlanNode::Parallel(parallel_node) => {
335336
self.execute_parallel_wave(ctx, parallel_node).await?;
@@ -350,23 +351,44 @@ impl<'exec> Executor<'exec> {
350351
self.process_job_result(ctx, job)?;
351352
}
352353
Err(err) => {
353-
ctx.errors.push(GraphQLError {
354-
message: err.to_string(),
355-
locations: None,
356-
path: None,
357-
extensions: None,
358-
});
354+
let service_name = service_name_from_plan_node(node);
355+
let extensions = service_name
356+
.map(|name| {
357+
GraphQLErrorExtensions::new_from_code_and_service_name(
358+
"PLAN_EXECUTION_ERROR",
359+
name,
360+
)
361+
})
362+
.unwrap_or_else(|| {
363+
GraphQLErrorExtensions::new_from_code(
364+
"PLAN_EXECUTION_ERROR",
365+
)
366+
});
367+
ctx.errors.push(GraphQLError::from_message_and_extensions(
368+
err.to_string(),
369+
extensions,
370+
));
359371
}
360372
}
361373
}
362374
Ok(None) => { /* do nothing */ }
363-
Err(e) => {
364-
ctx.errors.push(GraphQLError {
365-
message: e.to_string(),
366-
locations: None,
367-
path: None,
368-
extensions: None,
369-
});
375+
Err(err) => {
376+
let service_name = service_name_from_plan_node(node);
377+
let extensions = service_name
378+
.map(|name| {
379+
GraphQLErrorExtensions::new_from_code_and_service_name(
380+
"PLAN_EXECUTION_ERROR",
381+
name,
382+
)
383+
})
384+
.unwrap_or_else(|| {
385+
GraphQLErrorExtensions::new_from_code("PLAN_EXECUTION_ERROR")
386+
});
387+
388+
ctx.errors.push(GraphQLError::from_message_and_extensions(
389+
err.to_string(),
390+
extensions,
391+
));
370392
}
371393
}
372394
}
@@ -419,6 +441,7 @@ impl<'exec> Executor<'exec> {
419441

420442
fn process_subgraph_response(
421443
&self,
444+
subgraph_name: &str,
422445
ctx: &mut ExecutionContext<'exec>,
423446
response_bytes: Bytes,
424447
fetch_node_id: i64,
@@ -443,13 +466,14 @@ impl<'exec> Executor<'exec> {
443466
let response = match SubgraphResponse::deserialize(&mut deserializer) {
444467
Ok(response) => response,
445468
Err(e) => {
446-
ctx.errors
447-
.push(crate::response::graphql_error::GraphQLError {
448-
message: format!("Failed to deserialize subgraph response: {}", e),
449-
locations: None,
450-
path: None,
451-
extensions: None,
452-
});
469+
let message = format!("Failed to deserialize subgraph response: {}", e);
470+
let extensions = GraphQLErrorExtensions::new_from_code_and_service_name(
471+
"SUBGRAPH_RESPONSE_DESERIALIZATION_FAILED",
472+
subgraph_name,
473+
);
474+
let error = GraphQLError::from_message_and_extensions(message, extensions);
475+
476+
ctx.errors.push(error);
453477
return None;
454478
}
455479
};
@@ -472,10 +496,13 @@ impl<'exec> Executor<'exec> {
472496
&mut ctx.response_headers_aggregator,
473497
)?;
474498

475-
if let Some((mut response, output_rewrites)) =
476-
self.process_subgraph_response(ctx, job.response.body, job.fetch_node_id)
477-
{
478-
ctx.handle_errors(response.errors, None);
499+
if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
500+
job.subgraph_name.as_ref(),
501+
ctx,
502+
job.response.body,
503+
job.fetch_node_id,
504+
) {
505+
ctx.handle_errors(&job.subgraph_name, response.errors, None);
479506
if let Some(output_rewrites) = output_rewrites {
480507
for output_rewrite in output_rewrites {
481508
output_rewrite
@@ -495,9 +522,12 @@ impl<'exec> Executor<'exec> {
495522
&mut ctx.response_headers_aggregator,
496523
)?;
497524

498-
if let Some((mut response, output_rewrites)) =
499-
self.process_subgraph_response(ctx, job.response.body, job.fetch_node_id)
500-
{
525+
if let Some((mut response, output_rewrites)) = self.process_subgraph_response(
526+
&job.subgraph_name,
527+
ctx,
528+
job.response.body,
529+
job.fetch_node_id,
530+
) {
501531
if let Some(mut entities) = response.data.take_entities() {
502532
if let Some(output_rewrites) = output_rewrites {
503533
for output_rewrite in output_rewrites {
@@ -549,7 +579,11 @@ impl<'exec> Executor<'exec> {
549579
index += 1;
550580
},
551581
);
552-
ctx.handle_errors(response.errors, entity_index_error_map);
582+
ctx.handle_errors(
583+
&job.subgraph_name,
584+
response.errors,
585+
entity_index_error_map,
586+
);
553587
}
554588
}
555589
}
@@ -700,6 +734,24 @@ impl<'exec> Executor<'exec> {
700734
}
701735
}
702736

737+
fn service_name_from_plan_node(node: &PlanNode) -> Option<&str> {
738+
match node {
739+
PlanNode::Fetch(fetch_node) => Some(fetch_node.service_name.as_ref()),
740+
PlanNode::Flatten(flatten_node) => service_name_from_plan_node(flatten_node.node.as_ref()),
741+
PlanNode::Condition(condition_node) => condition_node
742+
.if_clause
743+
.as_deref()
744+
.and_then(service_name_from_plan_node)
745+
.or_else(|| {
746+
condition_node
747+
.else_clause
748+
.as_deref()
749+
.and_then(service_name_from_plan_node)
750+
}),
751+
_ => None,
752+
}
753+
}
754+
703755
fn condition_node_by_variables<'a>(
704756
condition_node: &'a ConditionNode,
705757
variable_values: &'a Option<HashMap<String, sonic_rs::Value>>,
@@ -735,7 +787,10 @@ fn select_fetch_variables<'a>(
735787

736788
#[cfg(test)]
737789
mod tests {
738-
use crate::{context::ExecutionContext, response::graphql_error::GraphQLErrorPath};
790+
use crate::{
791+
context::ExecutionContext,
792+
response::graphql_error::{GraphQLErrorExtensions, GraphQLErrorPath},
793+
};
739794

740795
use super::select_fetch_variables;
741796
use sonic_rs::Value;
@@ -829,9 +884,13 @@ mod tests {
829884
GraphQLErrorPathSegment::String("field1".to_string()),
830885
],
831886
}),
832-
extensions: None,
887+
extensions: GraphQLErrorExtensions::default(),
833888
}];
834-
ctx.handle_errors(Some(response_errors), Some(entity_index_error_map));
889+
ctx.handle_errors(
890+
"subgraph_a",
891+
Some(response_errors),
892+
Some(entity_index_error_map),
893+
);
835894
assert_eq!(ctx.errors.len(), 2);
836895
assert_eq!(ctx.errors[0].message, "Error 1");
837896
assert_eq!(

lib/executor/src/projection/response.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::projection::error::ProjectionError;
22
use crate::projection::plan::{
33
FieldProjectionCondition, FieldProjectionConditionError, FieldProjectionPlan, TypeCondition,
44
};
5-
use crate::response::graphql_error::GraphQLError;
5+
use crate::response::graphql_error::{GraphQLError, GraphQLErrorExtensions};
66
use crate::response::value::Value;
77
use bytes::BufMut;
88
use sonic_rs::JsonValueTrait;
@@ -231,7 +231,7 @@ fn project_selection_set_with_map(
231231
message: "Value is not a valid enum value".to_string(),
232232
locations: None,
233233
path: None,
234-
extensions: None,
234+
extensions: GraphQLErrorExtensions::default(),
235235
});
236236
}
237237
Err(FieldProjectionConditionError::InvalidFieldType) => {

0 commit comments

Comments
 (0)