Skip to content

Commit 1940cbb

Browse files
authored
Fix union overfetching (#279)
by adding type conditions to the FetchNodes summary of changes -> #279 (comment) Closes #277
1 parent b078df1 commit 1940cbb

22 files changed

+1035
-190
lines changed

.cargo/config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ dev = "run --package qp-dev-cli"
55
subgraphs = "run --package subgraphs"
66
test_all = "test --workspace"
77
test_qp = "test --package query-planner -- --nocapture"
8+
test_qpe = "test --package query-plan-executor -- --nocapture"
89
"clippy:fix" = "clippy --all --fix --allow-dirty --allow-staged"

lib/query-plan-executor/benches/executor_benches.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use query_plan_executor::executors::map::SubgraphExecutorMap;
88
use query_planner::ast::selection_item::SelectionItem;
99
use query_planner::ast::selection_set::InlineFragmentSelection;
1010
use query_planner::graph::PlannerOverrideContext;
11+
use query_planner::planner::plan_nodes::FlattenNodePathSegment;
1112
use std::hint::black_box;
1213

1314
use query_plan_executor::execute_query_plan;
@@ -242,8 +243,17 @@ fn project_data_by_operation(c: &mut Criterion) {
242243

243244
fn traverse_and_collect(c: &mut Criterion) {
244245
let path = [
245-
"users", "@", "reviews", "@", "product", "reviews", "@", "author", "reviews", "@",
246-
"product",
246+
FlattenNodePathSegment::Field("users".into()),
247+
FlattenNodePathSegment::List,
248+
FlattenNodePathSegment::Field("reviews".into()),
249+
FlattenNodePathSegment::List,
250+
FlattenNodePathSegment::Field("product".into()),
251+
FlattenNodePathSegment::Field("reviews".into()),
252+
FlattenNodePathSegment::List,
253+
FlattenNodePathSegment::Field("author".into()),
254+
FlattenNodePathSegment::Field("reviews".into()),
255+
FlattenNodePathSegment::List,
256+
FlattenNodePathSegment::Field("product".into()),
247257
];
248258
let mut result: Value = non_projected_result::get_result();
249259
c.bench_function("traverse_and_collect", |b| {
@@ -328,8 +338,19 @@ fn deep_merge_with_complex(c: &mut Criterion) {
328338

329339
fn project_requires(c: &mut Criterion) {
330340
let path = [
331-
"users", "@", "reviews", "@", "product", "reviews", "@", "author", "reviews", "@",
332-
"product",
341+
FlattenNodePathSegment::Field("users".into()),
342+
FlattenNodePathSegment::List,
343+
FlattenNodePathSegment::Field("reviews".into()),
344+
FlattenNodePathSegment::List,
345+
FlattenNodePathSegment::Field("product".into()),
346+
FlattenNodePathSegment::List,
347+
FlattenNodePathSegment::Field("reviews".into()),
348+
FlattenNodePathSegment::List,
349+
FlattenNodePathSegment::Field("author".into()),
350+
FlattenNodePathSegment::List,
351+
FlattenNodePathSegment::Field("reviews".into()),
352+
FlattenNodePathSegment::List,
353+
FlattenNodePathSegment::Field("product".into()),
333354
];
334355
let mut result: Value = non_projected_result::get_result();
335356
let data = result.get_mut("data").unwrap();

lib/query-plan-executor/src/lib.rs

Lines changed: 93 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use query_planner::{
55
operation::OperationDefinition, selection_item::SelectionItem, selection_set::SelectionSet,
66
},
77
planner::plan_nodes::{
8-
ConditionNode, FetchNode, FetchNodePathSegment, FetchRewrite, FlattenNode, KeyRenamer,
9-
ParallelNode, PlanNode, QueryPlan, SequenceNode, ValueSetter,
8+
ConditionNode, FetchNode, FetchNodePathSegment, FetchRewrite, FlattenNode, FlattenNodePath,
9+
FlattenNodePathSegment, KeyRenamer, ParallelNode, PlanNode, QueryPlan, SequenceNode,
10+
ValueSetter,
1011
},
1112
state::supergraph_state::OperationKind,
1213
};
@@ -550,7 +551,7 @@ type ExecutionStepJob<'a, T> = BoxFuture<'a, T>;
550551
struct ExecutionStep<'a> {
551552
fetch_job: Option<ExecutionStepJob<'a, ExecutionResult>>,
552553
flatten_job: Option<ExecutionStepJob<'a, ExecuteForRepresentationsResult>>,
553-
flatten_path: Option<Vec<&'a str>>,
554+
flatten_path: Option<&'a FlattenNodePath>,
554555
}
555556

556557
fn create_execution_step<'a>(
@@ -572,12 +573,22 @@ fn create_execution_step<'a>(
572573
return ExecutionStep::default();
573574
};
574575

575-
let normalized_path: Vec<&str> = flatten_node.path.iter().map(String::as_str).collect();
576-
let collected_representations = traverse_and_collect(data, &normalized_path);
576+
let collected_representations =
577+
traverse_and_collect(data, flatten_node.path.as_slice());
578+
579+
if collected_representations.is_empty() {
580+
// No representations collected, skip execution
581+
return ExecutionStep::default();
582+
}
577583

578584
let project_result =
579585
fetch_node.project_representations(execution_context, &collected_representations);
580586

587+
if project_result.representations.is_empty() {
588+
// No representations collected, skip execution
589+
return ExecutionStep::default();
590+
}
591+
581592
let job = fetch_node.execute_for_projected_representations(
582593
execution_context,
583594
project_result.representations,
@@ -586,7 +597,7 @@ fn create_execution_step<'a>(
586597

587598
ExecutionStep {
588599
flatten_job: Some(job),
589-
flatten_path: Some(normalized_path),
600+
flatten_path: Some(&flatten_node.path),
590601
..Default::default()
591602
}
592603
}
@@ -643,7 +654,7 @@ impl ExecutablePlanNode for ParallelNode {
643654
flatten_jobs.push(flatten_job);
644655
}
645656
if let Some(flatten_path) = res.flatten_path {
646-
flatten_paths.push(flatten_path);
657+
flatten_paths.push(flatten_path.as_slice());
647658
}
648659
}
649660
trace!(
@@ -671,7 +682,7 @@ impl ExecutablePlanNode for ParallelNode {
671682
for (result, path) in flatten_results.into_iter().zip(flatten_paths) {
672683
// Process FlattenNode results
673684
if let Some(entities) = result.entities {
674-
let mut collected_representations = traverse_and_collect(data, &path);
685+
let mut collected_representations = traverse_and_collect(data, path);
675686
for (entity, index) in entities.into_iter().zip(result.indexes.into_iter()) {
676687
if let Some(representation) = collected_representations.get_mut(index) {
677688
// Merge the entity into the representation
@@ -739,14 +750,20 @@ impl ExecutablePlanNode for FlattenNode {
739750
) {
740751
// Execute the child node. `execution_context` can be borrowed mutably
741752
// because `collected_representations` borrows `data_for_flatten`, not `execution_context.data`.
742-
let normalized_path: Vec<&str> = self.path.iter().map(String::as_str).collect();
743753
let now = std::time::Instant::now();
744-
let mut representations = traverse_and_collect(data, normalized_path.as_slice());
754+
let mut representations = traverse_and_collect(data, self.path.as_slice());
745755
trace!(
746756
"traversed and collected representations: {:?} in {:#?}",
747757
representations.len(),
748758
now.elapsed()
749759
);
760+
761+
if representations.is_empty() {
762+
// If there are no representations,
763+
// return early without executing the child node.
764+
return;
765+
}
766+
750767
match self.node.as_ref() {
751768
PlanNode::Fetch(fetch_node) => {
752769
let now = std::time::Instant::now();
@@ -760,6 +777,12 @@ impl ExecutablePlanNode for FlattenNode {
760777
now.elapsed()
761778
);
762779

780+
if filtered_representations.is_empty() {
781+
// If there are no filtered representations,
782+
// return early without executing the child node.
783+
return;
784+
}
785+
763786
let now = std::time::Instant::now();
764787
let result = fetch_node
765788
.execute_for_projected_representations(
@@ -1033,26 +1056,72 @@ fn entity_satisfies_type_condition(
10331056
))]
10341057
pub fn traverse_and_collect<'a>(
10351058
current_data: &'a mut Value,
1036-
remaining_path: &[&str],
1059+
remaining_path: &[FlattenNodePathSegment],
10371060
) -> Vec<&'a mut Value> {
1038-
match (current_data, remaining_path) {
1039-
(Value::Array(arr), []) => arr.iter_mut().collect(), // Base case: No more path segments, return all items in the array
1040-
(current_data, []) => vec![current_data], // Base case: No more path segments,
1041-
(Value::Object(obj), [next_segment, next_remaining_path @ ..]) => {
1042-
if let Some(next_value) = obj.get_mut(*next_segment) {
1043-
traverse_and_collect(next_value, next_remaining_path)
1044-
} else {
1045-
vec![] // No valid path segment
1061+
// If the path is empty, we're done traversing.
1062+
let Some((segment, remaining_path)) = remaining_path.split_first() else {
1063+
return match current_data {
1064+
// If the final result is an array, return all its items.
1065+
Value::Array(arr) => arr.iter_mut().collect(),
1066+
// Otherwise, return the value itself in a vector.
1067+
_ => vec![current_data],
1068+
};
1069+
};
1070+
1071+
match segment {
1072+
FlattenNodePathSegment::Field(field) => {
1073+
// Attempt to access a field on an object
1074+
match current_data.get_mut(field) {
1075+
Some(next_value) => traverse_and_collect(next_value, remaining_path),
1076+
// Either the field doesn't exist, or it's is not an object
1077+
None => vec![],
1078+
}
1079+
}
1080+
1081+
FlattenNodePathSegment::List => {
1082+
match current_data {
1083+
Value::Array(arr) => arr
1084+
.iter_mut()
1085+
.flat_map(|item| traverse_and_collect(item, remaining_path))
1086+
.collect(),
1087+
// List is only valid for arrays
1088+
_ => vec![],
1089+
}
1090+
}
1091+
1092+
FlattenNodePathSegment::Cast(type_name) => {
1093+
match current_data {
1094+
Value::Object(_) => {
1095+
// For a single object, a missing `__typename` is a pass-through
1096+
if contains_typename(current_data, type_name, true) {
1097+
traverse_and_collect(current_data, remaining_path)
1098+
} else {
1099+
vec![]
1100+
}
1101+
}
1102+
Value::Array(arr) => {
1103+
// Filter an array based on matching `__typename`
1104+
arr.iter_mut()
1105+
.filter(|item| contains_typename(item, type_name, false))
1106+
.flat_map(|item| traverse_and_collect(item, remaining_path))
1107+
.collect()
1108+
}
1109+
// Cast is only valid for objects and arrays
1110+
_ => vec![],
10461111
}
10471112
}
1048-
(Value::Array(arr), ["@", next_remaining_path @ ..]) => arr
1049-
.iter_mut()
1050-
.flat_map(|item| traverse_and_collect(item, next_remaining_path))
1051-
.collect(),
1052-
_ => vec![], // No valid path segment
10531113
}
10541114
}
10551115

1116+
/// Checks if a serde_json::Value has a `__typename` that matches the given `type_name`.
1117+
fn contains_typename(value: &Value, type_name: &str, default_for_missing: bool) -> bool {
1118+
value
1119+
.as_object()
1120+
.and_then(|obj| obj.get("__typename"))
1121+
.and_then(Value::as_str)
1122+
.map_or(default_for_missing, |s| s == type_name)
1123+
}
1124+
10561125
// --- Helper Functions ---
10571126

10581127
// --- Main Function (for testing) ---

lib/query-plan-executor/src/tests/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use subgraphs::accounts;
33

44
use crate::executors::{common::SubgraphExecutor, map::SubgraphExecutorMap};
55

6+
mod traverse_and_collect;
7+
68
#[test]
79
fn query_executor_pipeline_locally() {
810
tokio_test::block_on(async {

0 commit comments

Comments
 (0)