From f9fdbb3003263d7898ccab26f816f500ab607ffb Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Tue, 11 Mar 2025 18:10:10 -0700 Subject: [PATCH 1/9] chore: ignore vscode settings --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 69cbf271c6..a0dc9e0fcb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,8 @@ .idea/* +.vscode/* *.out *.test .DS_Store pkg/parser/testdata/lotto.graphql *node_modules* -*vendor* \ No newline at end of file +*vendor* From 6d5f8f2d6625be5708a76b930583422fa905003a Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Thu, 6 Mar 2025 13:31:45 -0800 Subject: [PATCH 2/9] feat: add @defer to base schema --- v2/pkg/asttransform/baseschema.go | 16 ++++++++++++++++ v2/pkg/engine/plan/analyze_plan_kind.go | 12 ++++++++++-- v2/pkg/engine/plan/analyze_plan_kind_test.go | 12 +++++++++--- v2/pkg/engine/plan/planner_test.go | 12 ++++-------- v2/pkg/engine/plan/visitor.go | 2 -- 5 files changed, 39 insertions(+), 15 deletions(-) diff --git a/v2/pkg/asttransform/baseschema.go b/v2/pkg/asttransform/baseschema.go index fa092273d6..8668e0a654 100644 --- a/v2/pkg/asttransform/baseschema.go +++ b/v2/pkg/asttransform/baseschema.go @@ -156,6 +156,22 @@ directive @skip( "Skipped when true." if: Boolean! ) on FIELD | FRAGMENT_SPREAD | INLINE_FRAGMENT +"Directs the executor to defer this fragment when the if argument is true or undefined." +directive @defer( + "A unique identifier for the results." + label: String + "Controls whether the fragment will be deferred, usually via a variable." + if: Boolean! = true +) on FRAGMENT_SPREAD | INLINE_FRAGMENT +"Directs the executor to stream this array field when the if argument is true or undefined." +directive @stream( + "A unique identifier for the results." + label: String + "Controls streaming, usually via a variable." + if: Boolean! = true + "The number of results to include in the initial (non-streamed) response." + initialCount: Int = 0 +) on FIELD "Marks an element of a GraphQL schema as no longer supported." directive @deprecated( """ diff --git a/v2/pkg/engine/plan/analyze_plan_kind.go b/v2/pkg/engine/plan/analyze_plan_kind.go index 22d2ef086c..a00dcc2c3b 100644 --- a/v2/pkg/engine/plan/analyze_plan_kind.go +++ b/v2/pkg/engine/plan/analyze_plan_kind.go @@ -41,11 +41,19 @@ func (p *planKindVisitor) EnterDirective(ref int) { switch ancestor.Kind { case ast.NodeKindField: switch directiveName { - case "defer": - p.hasDeferDirective = true case "stream": p.hasStreamDirective = true } + case ast.NodeKindInlineFragment: + switch directiveName { + case "defer": + p.hasDeferDirective = true + } + case ast.NodeKindFragmentSpread: + switch directiveName { + case "defer": + p.hasDeferDirective = true + } } } diff --git a/v2/pkg/engine/plan/analyze_plan_kind_test.go b/v2/pkg/engine/plan/analyze_plan_kind_test.go index ec25838d49..204b4c2bec 100644 --- a/v2/pkg/engine/plan/analyze_plan_kind_test.go +++ b/v2/pkg/engine/plan/analyze_plan_kind_test.go @@ -100,7 +100,9 @@ func TestAnalyzePlanKind(t *testing.T) { name } primaryFunction - favoriteEpisode @defer + ... @defer { + favoriteEpisode + } } }`, "MyQuery", @@ -146,7 +148,9 @@ func TestAnalyzePlanKind(t *testing.T) { name } primaryFunction - favoriteEpisode @defer + ... @defer { + favoriteEpisode + } } }`, "OperationNameNotExists", @@ -167,7 +171,9 @@ func TestAnalyzePlanKind(t *testing.T) { subscription NewReviews { newReviews { id - stars @defer + ... @defer { + stars + } } }`, "NewReviews", diff --git a/v2/pkg/engine/plan/planner_test.go b/v2/pkg/engine/plan/planner_test.go index af0ecf1215..4928954e2f 100644 --- a/v2/pkg/engine/plan/planner_test.go +++ b/v2/pkg/engine/plan/planner_test.go @@ -379,7 +379,7 @@ func TestPlanner_Plan(t *testing.T) { name } } - + query MyHero { hero{ name @@ -392,15 +392,15 @@ func TestPlanner_Plan(t *testing.T) { t.Run("unescape response json", func(t *testing.T) { schema := ` scalar JSON - + schema { query: Query } - + type Query { hero: Character! } - + type Character { info: JSON! infos: [JSON!]! @@ -678,12 +678,8 @@ var testDefinitionDSConfiguration = dsb(). const testDefinition = ` -directive @defer on FIELD - directive @flushInterval(milliSeconds: Int!) on QUERY | SUBSCRIPTION -directive @stream(initialBatchSize: Int) on FIELD - union SearchResult = Human | Droid | Starship schema { diff --git a/v2/pkg/engine/plan/visitor.go b/v2/pkg/engine/plan/visitor.go index 261d475f04..28477430dc 100644 --- a/v2/pkg/engine/plan/visitor.go +++ b/v2/pkg/engine/plan/visitor.go @@ -254,8 +254,6 @@ func (v *Visitor) EnterDirective(ref int) { v.currentField.Stream = &resolve.StreamField{ InitialBatchSize: initialBatchSize, } - case "defer": - v.currentField.Defer = &resolve.DeferField{} } } } From c01fc90bbd643b4a42f312bc223868ff20f46888 Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Tue, 11 Mar 2025 18:07:43 -0700 Subject: [PATCH 3/9] feat: DeferInfo marker type --- v2/pkg/ast/path.go | 64 ++++-- v2/pkg/ast/path_test.go | 106 +++++++++ v2/pkg/engine/resolve/node_object.go | 56 ++++- v2/pkg/engine/resolve/node_object_test.go | 250 ++++++++++++++++++++++ 4 files changed, 453 insertions(+), 23 deletions(-) create mode 100644 v2/pkg/ast/path_test.go create mode 100644 v2/pkg/engine/resolve/node_object_test.go diff --git a/v2/pkg/ast/path.go b/v2/pkg/ast/path.go index 2ff55f3bfb..31f9223b0a 100644 --- a/v2/pkg/ast/path.go +++ b/v2/pkg/ast/path.go @@ -54,6 +54,24 @@ func (p Path) Equals(another Path) bool { return true } +func (p Path) Overlaps(other Path) bool { + for i, el := range p { + switch { + case i >= len(other): + return true + case el.Kind != other[i].Kind: + return false + case el.FragmentRef != other[i].FragmentRef: + return false + case el.Kind == ArrayIndex && el.ArrayIndex != other[i].ArrayIndex: + return false + case !bytes.Equal(el.FieldName, other[i].FieldName): + return false + } + } + return true +} + func (p Path) EndsWithFragment() bool { if len(p) == 0 { return false @@ -77,29 +95,35 @@ func (p Path) WithoutInlineFragmentNames() Path { return out } +func (p Path) StringSlice() []string { + ret := make([]string, len(p)) + for i, item := range p { + ret[i] = item.String() + } + return ret +} + func (p Path) String() string { - out := "[" - for i := range p { - if i != 0 { - out += "," - } - switch p[i].Kind { - case ArrayIndex: - out += strconv.Itoa(p[i].ArrayIndex) - case FieldName: - if len(p[i].FieldName) == 0 { - out += "query" - } else { - out += unsafebytes.BytesToString(p[i].FieldName) - } - case InlineFragmentName: - out += InlineFragmentPathPrefix - out += strconv.Itoa(p[i].FragmentRef) - out += unsafebytes.BytesToString(p[i].FieldName) + return "[" + strings.Join(p.StringSlice(), ",") + "]" +} + +func (p PathItem) String() string { + switch p.Kind { + case ArrayIndex: + return strconv.Itoa(p.ArrayIndex) + case FieldName: + out := "query" + if len(p.FieldName) != 0 { + out = unsafebytes.BytesToString(p.FieldName) } + return out + case InlineFragmentName: + out := InlineFragmentPathPrefix + out += strconv.Itoa(p.FragmentRef) + out += unsafebytes.BytesToString(p.FieldName) + return out } - out += "]" - return out + return "" } func (p Path) DotDelimitedString() string { diff --git a/v2/pkg/ast/path_test.go b/v2/pkg/ast/path_test.go new file mode 100644 index 0000000000..52d8b1b401 --- /dev/null +++ b/v2/pkg/ast/path_test.go @@ -0,0 +1,106 @@ +package ast_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" +) + +func TestPath_Overlaps(t *testing.T) { + tests := []struct { + name string + a, b ast.Path + expect bool + }{ + { + name: "both empty", + a: ast.Path{}, + b: ast.Path{}, + expect: true, + }, + { + name: "same single field", + a: ast.Path{{Kind: ast.FieldName, FieldName: []byte("foo")}}, + b: ast.Path{{Kind: ast.FieldName, FieldName: []byte("foo")}}, + expect: true, + }, + { + name: "one empty", + a: ast.Path{}, + b: ast.Path{{Kind: ast.FieldName, FieldName: []byte("foo")}}, + expect: true, + }, + { + name: "different single field", + a: ast.Path{{Kind: ast.FieldName, FieldName: []byte("foo")}}, + b: ast.Path{{Kind: ast.FieldName, FieldName: []byte("bar")}}, + expect: false, + }, + { + name: "prefix matches but one is shorter", + a: ast.Path{{Kind: ast.FieldName, FieldName: []byte("foo")}}, + b: ast.Path{ + {Kind: ast.FieldName, FieldName: []byte("foo")}, + {Kind: ast.FieldName, FieldName: []byte("bar")}, + }, + expect: true, + }, + { + name: "same index array overlap", + a: ast.Path{{Kind: ast.ArrayIndex, ArrayIndex: 0}}, + b: ast.Path{{Kind: ast.ArrayIndex, ArrayIndex: 0}}, + expect: true, + }, + { + name: "different index array overlap", + a: ast.Path{{Kind: ast.ArrayIndex, ArrayIndex: 1}}, + b: ast.Path{{Kind: ast.ArrayIndex, ArrayIndex: 2}}, + expect: false, + }, + { + name: "fragment mismatch", + a: ast.Path{{Kind: ast.InlineFragmentName, FragmentRef: 1, FieldName: []byte("FragA")}}, + b: ast.Path{{Kind: ast.InlineFragmentName, FragmentRef: 2, FieldName: []byte("FragA")}}, + expect: false, + }, + { + name: "fragment match", + a: ast.Path{{Kind: ast.InlineFragmentName, FragmentRef: 1, FieldName: []byte("FragA")}}, + b: ast.Path{{Kind: ast.InlineFragmentName, FragmentRef: 1, FieldName: []byte("FragA")}}, + expect: true, + }, + { + name: "mixed path partial overlap", + a: ast.Path{ + {Kind: ast.FieldName, FieldName: []byte("foo")}, + {Kind: ast.ArrayIndex, ArrayIndex: 1}, + }, + b: ast.Path{ + {Kind: ast.FieldName, FieldName: []byte("foo")}, + {Kind: ast.ArrayIndex, ArrayIndex: 1}, + {Kind: ast.FieldName, FieldName: []byte("extra")}, + }, + expect: true, + }, + { + name: "mixed path no overlap at second item", + a: ast.Path{ + {Kind: ast.FieldName, FieldName: []byte("foo")}, + {Kind: ast.ArrayIndex, ArrayIndex: 2}, + }, + b: ast.Path{ + {Kind: ast.FieldName, FieldName: []byte("foo")}, + {Kind: ast.ArrayIndex, ArrayIndex: 3}, + }, + expect: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.a.Overlaps(tt.b), tt.expect) + assert.Equal(t, tt.b.Overlaps(tt.a), tt.expect) + }) + } +} diff --git a/v2/pkg/engine/resolve/node_object.go b/v2/pkg/engine/resolve/node_object.go index 2c3a80b4d8..0a109d628a 100644 --- a/v2/pkg/engine/resolve/node_object.go +++ b/v2/pkg/engine/resolve/node_object.go @@ -3,6 +3,8 @@ package resolve import ( "bytes" "slices" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" ) type Object struct { @@ -93,7 +95,7 @@ type Field struct { Name []byte Value Node Position Position - Defer *DeferField + DeferPaths []ast.Path Stream *StreamField OnTypeNames [][]byte ParentOnTypeNames []ParentOnTypeNames @@ -110,7 +112,7 @@ func (f *Field) Copy() *Field { Name: f.Name, Value: f.Value.Copy(), Position: f.Position, - Defer: f.Defer, + DeferPaths: f.DeferPaths, Stream: f.Stream, OnTypeNames: f.OnTypeNames, Info: f.Info, @@ -182,4 +184,52 @@ type StreamField struct { InitialBatchSize int } -type DeferField struct{} +type DeferInfo struct { + // TODO(cd): Label and If + Path ast.Path +} + +func (d *DeferInfo) Equals(other *DeferInfo) bool { + if d == nil || other == nil { + return d == other + } + return d.Path.Equals(other.Path) +} + +func (d *DeferInfo) Overlaps(path ast.Path) bool { + if d == nil { + return false + } + return d.Path.Overlaps(path) +} + +func (d *DeferInfo) HasPrefix(prefix []string) bool { + if len(prefix) == 0 { + return true + } + if d == nil || len(d.Path) == 0 { + return false + } + deferPath := d.Path + if !slices.Contains([]string{"query", "mutation", "subscription"}, prefix[0]) { + deferPath = deferPath[1:] + } + if len(prefix) > len(deferPath) { + return false + } + var skip int + for i, p := range deferPath { + if p.Kind == ast.InlineFragmentName { + skip++ + continue + } + idx := i + skip + if idx >= len(prefix) { + return true + } + if idx >= len(prefix) || prefix[idx] != string(p.FieldName) { + return false + } + } + return true +} diff --git a/v2/pkg/engine/resolve/node_object_test.go b/v2/pkg/engine/resolve/node_object_test.go new file mode 100644 index 0000000000..5ede6f8635 --- /dev/null +++ b/v2/pkg/engine/resolve/node_object_test.go @@ -0,0 +1,250 @@ +package resolve + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" +) + +var ( + pathItem1 = ast.PathItem{Kind: ast.FieldName, FieldName: []byte("query")} + pathItem2 = ast.PathItem{Kind: ast.FieldName, FieldName: []byte("object1")} + pathItem3 = ast.PathItem{Kind: ast.ArrayIndex, ArrayIndex: 3, FieldName: []byte("field1")} + + fragmentItem = ast.PathItem{Kind: ast.InlineFragmentName, FieldName: []byte("frag2"), FragmentRef: 2} + arrayItem = ast.PathItem{Kind: ast.ArrayIndex, ArrayIndex: 3, FieldName: []byte("arrayField3")} + + emptyPath = ast.Path{} + shortPath1 = ast.Path{pathItem1, pathItem2} + shortPath2 = ast.Path{pathItem2, pathItem3} + longPath = ast.Path{pathItem1, pathItem2, pathItem3} // shortPath1 + pathItem3 +) + +func TestDeferInfo_Equals(t *testing.T) { + tests := []struct { + name string + first, second *DeferInfo + expected bool + }{ + { + name: "equal", + first: &DeferInfo{Path: longPath}, + second: &DeferInfo{Path: longPath}, + expected: true, + }, + { + name: "zero-valued equal", + first: &DeferInfo{}, + second: &DeferInfo{}, + expected: true, + }, + { + name: "empty paths equal", + first: &DeferInfo{Path: emptyPath}, + second: &DeferInfo{Path: emptyPath}, + expected: true, + }, + { + name: "both nil equal", + first: nil, + second: nil, + expected: true, + }, + { + name: "not equal", + first: &DeferInfo{Path: shortPath1}, + second: &DeferInfo{Path: shortPath2}, + expected: false, + }, + { + name: "one nil not equal empty", + first: nil, + second: &DeferInfo{}, + expected: false, + }, + { + name: "not equal - one empty path", + first: &DeferInfo{Path: emptyPath}, + second: &DeferInfo{Path: shortPath1}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.first.Equals(tt.second)) + assert.Equal(t, tt.expected, tt.second.Equals(tt.first)) + }) + } +} + +func TestDeferInfo_Overlaps(t *testing.T) { + tests := []struct { + name string + input *DeferInfo + otherPath ast.Path + expected bool + }{ + { + name: "overlaps - equal paths", + input: &DeferInfo{Path: shortPath1}, + otherPath: shortPath1, + expected: true, + }, + { + name: "overlaps - shorter path", + input: &DeferInfo{Path: longPath}, + otherPath: shortPath1, + expected: true, + }, + { + name: "overlaps - longer path", + input: &DeferInfo{Path: shortPath1}, + otherPath: longPath, + expected: true, + }, + { + name: "overlaps - shorter path, mismatched", + input: &DeferInfo{Path: shortPath2}, + otherPath: longPath, + expected: false, + }, + { + name: "overlaps - longer path, mismatched", + input: &DeferInfo{Path: longPath}, + otherPath: shortPath2, + expected: false, + }, + { + name: "non-overlapping paths", + input: &DeferInfo{Path: shortPath1}, + otherPath: shortPath2, + expected: false, + }, + { + name: "empty paths equal", + input: &DeferInfo{Path: emptyPath}, + otherPath: emptyPath, + expected: true, + }, + { + name: "empty defer path", + input: &DeferInfo{Path: emptyPath}, + otherPath: shortPath1, + expected: true, + }, + { + name: "nil DeferInfo - empty path", + input: nil, + otherPath: emptyPath, + expected: false, + }, + { + name: "nil DeferInfo - non-empty path", + input: nil, + otherPath: shortPath1, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.input.Overlaps(tt.otherPath)) + // TODO: reflexive property test. + }) + } +} +func TestDeferInfo_HasPrefix(t *testing.T) { + tests := []struct { + name string + deferInfo *DeferInfo + prefix []string + expected bool + }{ + { + name: "empty prefix always returns true", + deferInfo: &DeferInfo{Path: shortPath1}, + prefix: []string{}, + expected: true, + }, + { + name: "non-empty prefix with empty DeferInfo path returns false", + deferInfo: &DeferInfo{Path: emptyPath}, + prefix: []string{"query"}, + expected: false, + }, + { + name: "exact match short path", + deferInfo: &DeferInfo{Path: shortPath1}, + prefix: []string{"query", "object1"}, + expected: true, + }, + { + name: "long path with prefix match", + deferInfo: &DeferInfo{Path: longPath}, + prefix: []string{"query", "object1"}, + expected: true, + }, + { + name: "mismatch prefix", + deferInfo: &DeferInfo{Path: shortPath1}, + prefix: []string{"x", "y"}, + expected: false, + }, + { + name: "prefix has no operation type, but matches", + deferInfo: &DeferInfo{Path: longPath}, + prefix: []string{"object1", "field1"}, + expected: true, + }, + { + name: "prefix has no operation type, and mis-matches", + deferInfo: &DeferInfo{Path: shortPath1}, + prefix: []string{"x"}, + expected: false, + }, + { + name: "prefix longer than path", + deferInfo: &DeferInfo{Path: shortPath1}, + prefix: []string{"query", "object1", "field1"}, + expected: false, + }, + { + name: "ignore inline fragment", + deferInfo: &DeferInfo{Path: ast.Path{pathItem1, pathItem2, fragmentItem, pathItem3}}, + prefix: []string{"query", "object1", "field1"}, + expected: true, + }, + { + name: "ignore terminal inline fragment", + deferInfo: &DeferInfo{Path: ast.Path{pathItem1, pathItem2, fragmentItem}}, + prefix: []string{"query", "object1"}, + expected: true, + }, + { + name: "ignore terminal inline fragment, but mis-match", + deferInfo: &DeferInfo{Path: ast.Path{pathItem1, pathItem2, fragmentItem}}, + prefix: []string{"query", "x"}, + expected: false, + }, + { + name: "nil DeferInfo, non-empty prefix", + deferInfo: nil, + prefix: []string{"query", "object1"}, + expected: false, + }, + { + name: "nil DeferInfo, empty prefix", + deferInfo: nil, + prefix: []string{}, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.deferInfo.HasPrefix(tt.prefix)) + }) + } +} From 25c13c8069cbbb38e74e3341cd820360cc752264 Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Tue, 11 Mar 2025 18:13:40 -0700 Subject: [PATCH 4/9] feat: planning for @defer --- v2/pkg/engine/plan/configuration_visitor.go | 47 +++++ v2/pkg/engine/plan/defer_visitor.go | 104 +++++++++++ v2/pkg/engine/plan/plan.go | 6 +- v2/pkg/engine/plan/planner.go | 38 +++- v2/pkg/engine/plan/planner_test.go | 197 ++++++++++++++++++++ v2/pkg/engine/plan/visitor.go | 35 +++- v2/pkg/engine/resolve/response.go | 11 +- 7 files changed, 428 insertions(+), 10 deletions(-) create mode 100644 v2/pkg/engine/plan/defer_visitor.go diff --git a/v2/pkg/engine/plan/configuration_visitor.go b/v2/pkg/engine/plan/configuration_visitor.go index e4eedb8ffb..cefc2c7e2a 100644 --- a/v2/pkg/engine/plan/configuration_visitor.go +++ b/v2/pkg/engine/plan/configuration_visitor.go @@ -12,6 +12,7 @@ import ( "github.com/wundergraph/graphql-go-tools/v2/pkg/astvisitor" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argument_templates" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" + "github.com/wundergraph/graphql-go-tools/v2/pkg/lexer/literal" ) // configurationVisitor - walks through the operation multiple times to collect plannings paths @@ -36,6 +37,8 @@ type configurationVisitor struct { nodeSuggestions *NodeSuggestions // nodeSuggestions holds information about suggested data sources for each field nodeSuggestionHints []NodeSuggestionHint // nodeSuggestionHints holds information about suggested data sources for key fields + targetDefer *resolve.DeferInfo // deferPath directs the planning to the specified path. + parentTypeNodes []ast.Node // parentTypeNodes is a stack of parent type nodes - used to determine if the parent is abstract arrayFields []arrayField // arrayFields is a stack of array fields - used to plan nested queries selectionSetRefs []int // selectionSetRefs is a stack of selection set refs - used to add a required fields @@ -110,6 +113,7 @@ type objectFetchConfiguration struct { dependsOnFetchIDs []int rootFields []resolve.GraphCoordinate operationType ast.OperationType + deferInfo *resolve.DeferInfo } func (c *configurationVisitor) currentSelectionSet() int { @@ -364,6 +368,43 @@ func (c *configurationVisitor) LeaveSelectionSet(ref int) { c.parentTypeNodes = c.parentTypeNodes[:len(c.parentTypeNodes)-1] } +func (c *configurationVisitor) EnterInlineFragment(ref int) { + c.deleteDeferAndSkipIfNeeded(ref) +} + +func (c *configurationVisitor) EnterFragmentSpread(ref int) { + c.deleteDeferAndSkipIfNeeded(ref) +} + +func (c *configurationVisitor) inDeferPath(item ast.PathItem) bool { + if c.targetDefer == nil { + return true + } + fullPath := append(make([]ast.PathItem, 0, len(c.walker.Path)+1), c.walker.Path...) + fullPath = append(fullPath, item) + + return c.targetDefer.Overlaps(fullPath) +} + +func (c *configurationVisitor) deleteDeferAndSkipIfNeeded(ref int) { + directives := c.operation.InlineFragments[ref].Directives.Refs + deferRef, found := c.operation.DirectiveWithNameBytes(directives, literal.DEFER) + + if !found { + // No defer directive here. + return + } + // Don't pass the directive on in any case. + if idx := slices.Index(directives, deferRef); idx >= 0 { + c.operation.InlineFragments[ref].Directives.Refs = slices.Delete(directives, idx, idx+1) + } + + // If target is nil or doesn't match, we skip the deferred fragment. + if c.targetDefer == nil || !c.targetDefer.Equals(&resolve.DeferInfo{Path: c.walker.Path}) { + c.walker.SkipNode() + } +} + func (c *configurationVisitor) EnterField(fieldRef int) { fieldName := c.operation.FieldNameUnsafeString(fieldRef) fieldAliasOrName := c.operation.FieldAliasOrNameString(fieldRef) @@ -371,6 +412,11 @@ func (c *configurationVisitor) EnterField(fieldRef int) { c.debugPrint("EnterField ref:", fieldRef, "fieldName:", fieldName, "typeName:", typeName) + if !c.inDeferPath(ast.PathItem{Kind: ast.FieldName, FieldName: c.operation.FieldAliasOrNameBytes(fieldRef)}) { + c.debugPrint(" ...", fieldName, "skipped") + c.walker.SkipNode() + return + } parentPath := c.walker.Path.DotDelimitedString() // we need to also check preceding path for inline fragments // as for the field within inline fragment the parent path will include type condition in a path @@ -835,6 +881,7 @@ func (c *configurationVisitor) addNewPlanner(fieldRef int, typeName, fieldName, sourceName: dsConfig.Name(), operationType: c.resolveRootFieldOperationType(typeName), filter: c.resolveSubscriptionFilterCondition(typeName, fieldName), + deferInfo: c.targetDefer, } plannerPathConfig := newPlannerPathsConfiguration( diff --git a/v2/pkg/engine/plan/defer_visitor.go b/v2/pkg/engine/plan/defer_visitor.go new file mode 100644 index 0000000000..3c18d10eb3 --- /dev/null +++ b/v2/pkg/engine/plan/defer_visitor.go @@ -0,0 +1,104 @@ +package plan + +import ( + "fmt" + "slices" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" + "github.com/wundergraph/graphql-go-tools/v2/pkg/astvisitor" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" + "github.com/wundergraph/graphql-go-tools/v2/pkg/lexer/literal" +) + +type deferVisitor struct { + walker *astvisitor.Walker + operation *ast.Document + + deferredFragments []resolve.DeferInfo + deferredFragmentStack []resolve.DeferInfo + deferredFields map[int]resolve.DeferInfo +} + +var _ astvisitor.EnterDocumentVisitor = (*deferVisitor)(nil) +var _ astvisitor.InlineFragmentVisitor = (*deferVisitor)(nil) +var _ astvisitor.FragmentSpreadVisitor = (*deferVisitor)(nil) +var _ astvisitor.EnterFieldVisitor = (*deferVisitor)(nil) + +var errDuplicateDefer = fmt.Errorf("duplicate defer") + +func (v *deferVisitor) EnterDocument(operation, definition *ast.Document) { + v.operation = operation + v.deferredFragments = nil + v.deferredFragmentStack = nil + v.deferredFields = make(map[int]resolve.DeferInfo) +} + +func (v *deferVisitor) EnterInlineFragment(ref int) { + directives := v.operation.InlineFragments[ref].Directives.Refs + if _, ok := v.operation.DirectiveWithNameBytes(directives, literal.DEFER); ok { + v.enterDefer(ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: v.operation.InlineFragmentTypeConditionName(ref), + FragmentRef: v.walker.CurrentRef, + }) + } +} + +func (v *deferVisitor) LeaveInlineFragment(ref int) { + directives := v.operation.InlineFragments[ref].Directives.Refs + if _, ok := v.operation.DirectiveWithNameBytes(directives, literal.DEFER); ok { + v.leaveDefer() + } +} + +func (v *deferVisitor) EnterFragmentSpread(ref int) { + // TODO(cd): Fragment spreads are expanded to inline fragments during normalization. Skipping these. +} + +func (v *deferVisitor) LeaveFragmentSpread(ref int) { +} + +func (v *deferVisitor) EnterField(ref int) { + if v.inDefer() { + v.deferredFields[ref] = v.currentDefer() + } +} + +func (v *deferVisitor) enterDefer(item ast.PathItem) { + fullPath := v.fullPathFor(item) + + info := resolve.DeferInfo{Path: fullPath} + + if slices.ContainsFunc(v.deferredFragments, func(el resolve.DeferInfo) bool { + return el.Equals(&info) + }) { + v.walker.StopWithInternalErr(fmt.Errorf("%w for %s %d", errDuplicateDefer, v.walker.CurrentKind.String(), v.walker.CurrentRef)) + return + } + + // v.deferredFragmentStack = append(v.deferredFragmentStack, info) + v.deferredFragments = append(v.deferredFragments, info) + v.deferredFragmentStack = append(v.deferredFragmentStack, info) +} + +func (v *deferVisitor) leaveDefer() { + if !v.inDefer() { + return + } + v.deferredFragmentStack = v.deferredFragmentStack[:len(v.deferredFragmentStack)-1] +} + +func (v *deferVisitor) inDefer() bool { + return len(v.deferredFragmentStack) > 0 +} + +func (v *deferVisitor) currentDefer() resolve.DeferInfo { + return v.deferredFragmentStack[len(v.deferredFragmentStack)-1] +} + +func (v *deferVisitor) fullPathFor(item ast.PathItem) ast.Path { + fullPath := append(make([]ast.PathItem, 0, len(v.walker.Path)+1), v.walker.Path...) + fullPath = append(fullPath, item) + + return fullPath +} diff --git a/v2/pkg/engine/plan/plan.go b/v2/pkg/engine/plan/plan.go index 4fe8266c02..979c3f7981 100644 --- a/v2/pkg/engine/plan/plan.go +++ b/v2/pkg/engine/plan/plan.go @@ -17,8 +17,10 @@ type Plan interface { } type SynchronousResponsePlan struct { - Response *resolve.GraphQLResponse - FlushInterval int64 + Response *resolve.GraphQLResponse + DeferredFragments []resolve.DeferInfo + DeferredFields map[int]resolve.DeferInfo + FlushInterval int64 } func (s *SynchronousResponsePlan) SetFlushInterval(interval int64) { diff --git a/v2/pkg/engine/plan/planner.go b/v2/pkg/engine/plan/planner.go index 4265d56cdd..44f5b7f497 100644 --- a/v2/pkg/engine/plan/planner.go +++ b/v2/pkg/engine/plan/planner.go @@ -10,6 +10,7 @@ import ( "github.com/wundergraph/graphql-go-tools/v2/pkg/astnormalization" "github.com/wundergraph/graphql-go-tools/v2/pkg/astprinter" "github.com/wundergraph/graphql-go-tools/v2/pkg/astvisitor" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" "github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport" ) @@ -17,6 +18,7 @@ type Planner struct { config Configuration nodeSelectionsWalker *astvisitor.Walker nodeSelectionsVisitor *nodeSelectionVisitor + deferVisitor *deferVisitor configurationWalker *astvisitor.Walker configurationVisitor *configurationVisitor planningWalker *astvisitor.Walker @@ -65,6 +67,17 @@ func NewPlanner(config Configuration) (*Planner, error) { nodeSelection.RegisterEnterOperationVisitor(nodeSelectionVisitor) nodeSelection.RegisterSelectionSetVisitor(nodeSelectionVisitor) + // Defer visitor. + deferVisitor := &deferVisitor{ + walker: &nodeSelection, + } + + // Piggy-back on node selection walker. + nodeSelection.RegisterEnterDocumentVisitor(deferVisitor) + nodeSelection.RegisterInlineFragmentVisitor(deferVisitor) + nodeSelection.RegisterFragmentSpreadVisitor(deferVisitor) + nodeSelection.RegisterEnterFieldVisitor(deferVisitor) + // configuration configurationWalker := astvisitor.NewWalker(48) configVisitor := &configurationVisitor{ @@ -76,6 +89,8 @@ func NewPlanner(config Configuration) (*Planner, error) { configurationWalker.RegisterFieldVisitor(configVisitor) configurationWalker.RegisterEnterOperationVisitor(configVisitor) configurationWalker.RegisterSelectionSetVisitor(configVisitor) + configurationWalker.RegisterEnterInlineFragmentVisitor(configVisitor) + configurationWalker.RegisterEnterFragmentSpreadVisitor(configVisitor) // planning @@ -92,6 +107,7 @@ func NewPlanner(config Configuration) (*Planner, error) { configurationVisitor: configVisitor, nodeSelectionsWalker: &nodeSelection, nodeSelectionsVisitor: nodeSelectionVisitor, + deferVisitor: deferVisitor, planningWalker: &planningWalker, planningVisitor: planningVisitor, prepareOperationWalker: &prepareOperationWalker, @@ -158,6 +174,8 @@ func (p *Planner) Plan(operation, definition *ast.Document, operationName string p.planningVisitor.planners = p.configurationVisitor.planners p.planningVisitor.Config = p.config p.planningVisitor.skipFieldsRefs = p.nodeSelectionsVisitor.skipFieldsRefs + p.planningVisitor.deferredFragments = p.deferVisitor.deferredFragments + p.planningVisitor.deferredFields = p.deferVisitor.deferredFields p.planningWalker.ResetVisitors() p.planningWalker.SetVisitorFilter(p.planningVisitor) @@ -214,7 +232,20 @@ func (p *Planner) findPlanningPaths(operation, definition *ast.Document, report return } - p.createPlanningPaths(operation, definition, report) + p.createPlanningPaths(operation, definition, nil, report) + + if len(p.deferVisitor.deferredFragments) > 0 { + planners := p.configurationVisitor.planners + + for _, targetDefer := range p.deferVisitor.deferredFragments { + p.createPlanningPaths(operation, definition, &targetDefer, report) + if report.HasErrors() { + return + } + planners = append(planners, p.configurationVisitor.planners...) + } + p.configurationVisitor.planners = planners + } } func (p *Planner) selectNodes(operation, definition *ast.Document, report *operationreport.Report) { @@ -322,7 +353,7 @@ func (p *Planner) isResolvable(walker astvisitor.Walker, operation, definition * return resolvableReport } -func (p *Planner) createPlanningPaths(operation, definition *ast.Document, report *operationreport.Report) { +func (p *Planner) createPlanningPaths(operation, definition *ast.Document, targetDefer *resolve.DeferInfo, report *operationreport.Report) { if p.config.Debug.PrintPlanningPaths { p.debugMessage("Create planning paths") } @@ -337,6 +368,9 @@ func (p *Planner) createPlanningPaths(operation, definition *ast.Document, repor p.configurationVisitor.fieldDependsOn, p.configurationVisitor.fieldRequirementsConfigs = p.nodeSelectionsVisitor.fieldDependsOn, p.nodeSelectionsVisitor.fieldRequirementsConfigs + p.configurationVisitor.targetDefer = targetDefer + p.configurationVisitor.planners = nil + p.configurationVisitor.secondaryRun = false p.configurationWalker.Walk(operation, definition, report) if report.HasErrors() { diff --git a/v2/pkg/engine/plan/planner_test.go b/v2/pkg/engine/plan/planner_test.go index 4928954e2f..a06f8d7dc8 100644 --- a/v2/pkg/engine/plan/planner_test.go +++ b/v2/pkg/engine/plan/planner_test.go @@ -304,6 +304,203 @@ func TestPlanner_Plan(t *testing.T) { })) }) + t.Run("defer planning", func(t *testing.T) { + t.Run("simple inline fragment", test(testDefinition, ` + query WithInlineDefer { + hero { + name + ... on Droid @defer { + primaryFunction + favoriteEpisode + } + } + } + `, "WithInlineDefer", &SynchronousResponsePlan{ + Response: &resolve.GraphQLResponse{ + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("hero"), + Value: &resolve.Object{ + Path: []string{"hero"}, + Nullable: true, + TypeName: "Character", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + }, + { + Name: []byte("primaryFunction"), + Value: &resolve.String{ + Path: []string{"primaryFunction"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + { + Name: []byte("favoriteEpisode"), + Value: &resolve.Enum{ + Path: []string{"favoriteEpisode"}, + Nullable: true, + TypeName: "Episode", + Values: []string{ + "NEWHOPE", + "EMPIRE", + "JEDI", + }, + InaccessibleValues: []string{}, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + }, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{ + FetchConfiguration: resolve.FetchConfiguration{ + DataSource: &FakeDataSource{&StatefulSource{}}, + }, + DataSourceIdentifier: []byte("plan.FakeDataSource"), + }, + &resolve.SingleFetch{ + FetchConfiguration: resolve.FetchConfiguration{ + DataSource: &FakeDataSource{&StatefulSource{}}, + }, + DataSourceIdentifier: []byte("plan.FakeDataSource"), + DeferInfo: &resolve.DeferInfo{ + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + }, + }, + DeferredFragments: []resolve.DeferInfo{ + { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + DeferredFields: map[int]resolve.DeferInfo{ + 1: { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + 2: { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, Configuration{ + DisableResolveFieldPositions: true, + DisableIncludeInfo: true, + DataSources: []DataSource{testDefinitionDSConfiguration}, + })) + }) + t.Run("operation selection", func(t *testing.T) { cfg := Configuration{ DataSources: []DataSource{testDefinitionDSConfiguration}, diff --git a/v2/pkg/engine/plan/visitor.go b/v2/pkg/engine/plan/visitor.go index 28477430dc..7f6b2229fe 100644 --- a/v2/pkg/engine/plan/visitor.go +++ b/v2/pkg/engine/plan/visitor.go @@ -3,6 +3,7 @@ package plan import ( "bytes" "fmt" + "iter" "reflect" "regexp" "slices" @@ -46,6 +47,8 @@ type Visitor struct { includeQueryPlans bool indirectInterfaceFields map[int]indirectInterfaceField pathCache map[astvisitor.VisitorKind]map[int]string + deferredFragments []resolve.DeferInfo + deferredFields map[int]resolve.DeferInfo } type indirectInterfaceField struct { @@ -333,6 +336,20 @@ func (v *Visitor) EnterField(ref int) { Info: v.resolveFieldInfo(ref, fieldDefinitionTypeRef, onTypeNames), } + if deferInfo, ok := v.deferredFields[ref]; ok { + // This field is part of a deferred fragment. + v.currentField.DeferPaths = append(v.currentField.DeferPaths, deferInfo.Path) + } + + // Add deferred paths below this field. + for path := range v.followingDeferredFragmentPaths(fieldAliasOrName.String()) { + if !slices.ContainsFunc(v.currentField.DeferPaths, func(other ast.Path) bool { + return path.Equals(other) + }) { + v.currentField.DeferPaths = append(v.currentField.DeferPaths, path) + } + } + if bytes.Equal(fieldName, literal.TYPENAME) { str := &resolve.String{ Nullable: false, @@ -351,6 +368,19 @@ func (v *Visitor) EnterField(ref int) { v.mapFieldConfig(ref) } +func (v *Visitor) followingDeferredFragmentPaths(itemName string) iter.Seq[ast.Path] { + return func(yield func(ast.Path) bool) { + for _, frag := range v.deferredFragments { + fullPath := v.Walker.Path.WithoutInlineFragmentNames().DotDelimitedString() + "." + itemName + if strings.HasPrefix(frag.Path.WithoutInlineFragmentNames().DotDelimitedString(), fullPath) { + if !yield(frag.Path) { + return + } + } + } + } +} + func (v *Visitor) mapFieldConfig(ref int) { typeName := v.Walker.EnclosingTypeDefinition.NameString(v.Definition) fieldNameStr := v.Operation.FieldNameString(ref) @@ -879,7 +909,9 @@ func (v *Visitor) EnterOperationDefinition(ref int) { } v.plan = &SynchronousResponsePlan{ - Response: graphQLResponse, + Response: graphQLResponse, + DeferredFragments: v.deferredFragments, + DeferredFields: v.deferredFields, } } @@ -1176,6 +1208,7 @@ func (v *Visitor) configureFetch(internal *objectFetchConfiguration, external re DependsOnFetchIDs: internal.dependsOnFetchIDs, }, DataSourceIdentifier: []byte(dataSourceType), + DeferInfo: internal.deferInfo, } if !v.Config.DisableIncludeInfo { diff --git a/v2/pkg/engine/resolve/response.go b/v2/pkg/engine/resolve/response.go index 51812b38a3..f51970ee01 100644 --- a/v2/pkg/engine/resolve/response.go +++ b/v2/pkg/engine/resolve/response.go @@ -22,11 +22,12 @@ type GraphQLSubscriptionTrigger struct { } type GraphQLResponse struct { - Data *Object - RenameTypeNames []RenameTypeName - Info *GraphQLResponseInfo - Fetches *FetchTreeNode - DataSources []DataSourceInfo + Data *Object + RenameTypeNames []RenameTypeName + Info *GraphQLResponseInfo + Fetches *FetchTreeNode + DataSources []DataSourceInfo + DeferredResponses []*GraphQLResponse } type GraphQLResponseInfo struct { From c2ae61bd72276acafbf1ef00b1c2fd78bbd79bde Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Tue, 11 Mar 2025 18:14:46 -0700 Subject: [PATCH 5/9] feat: postprocessing to extract deferred fields and fetches --- .../postprocess/extract_deferred_fields.go | 250 +++++++++++ .../extract_deferred_fields_test.go | 413 ++++++++++++++++++ v2/pkg/engine/postprocess/postprocess.go | 22 +- 3 files changed, 680 insertions(+), 5 deletions(-) create mode 100644 v2/pkg/engine/postprocess/extract_deferred_fields.go create mode 100644 v2/pkg/engine/postprocess/extract_deferred_fields_test.go diff --git a/v2/pkg/engine/postprocess/extract_deferred_fields.go b/v2/pkg/engine/postprocess/extract_deferred_fields.go new file mode 100644 index 0000000000..e577621c51 --- /dev/null +++ b/v2/pkg/engine/postprocess/extract_deferred_fields.go @@ -0,0 +1,250 @@ +package postprocess + +import ( + "iter" + "slices" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" +) + +type extractDeferredFields struct{} + +func (e *extractDeferredFields) Process(resp *resolve.GraphQLResponse, defers []resolve.DeferInfo) { + visitor := newDeferredFieldsVisitor(resp, defers) + visitor.walker = &PlanWalker{ + objectVisitor: visitor, + fieldVisitor: visitor, + } + visitor.walker.Walk(resp.Data, resp.Info) + + for key, di := range visitor.responseItems { + if key == "" { + resp.Data.Fields = di.response.Data.Fields + resp.Data.Fetches = di.response.Data.Fetches + continue + } + resp.DeferredResponses = append(resp.DeferredResponses, di.response) + } +} + +func newDeferredFieldsVisitor(resp *resolve.GraphQLResponse, defers []resolve.DeferInfo) *deferredFieldsVisitor { + ret := &deferredFieldsVisitor{ + deferredFragments: defers, + responseItems: make(map[string]*responseItem, len(defers)), + } + + for _, di := range defers { + ret.responseItems[di.Path.DotDelimitedString()] = &responseItem{ + deferInfo: &di, + response: &resolve.GraphQLResponse{ + Info: resp.Info, + }, + } + } + + // The immediate response. + ret.responseItems[""] = &responseItem{ + response: &resolve.GraphQLResponse{ + Info: resp.Info, + }, + } + return ret +} + +type deferredFieldsVisitor struct { + deferredFragments []resolve.DeferInfo + responseItems map[string]*responseItem + + walker *PlanWalker +} + +func (v *deferredFieldsVisitor) EnterObject(obj *resolve.Object) { + var ( + currentField *resolve.Field + ) + + if len(v.walker.CurrentFields) > 0 { + currentField = v.walker.CurrentFields[len(v.walker.CurrentFields)-1] + } + + for resp := range v.matchingResponseItems() { + newObject := resp.copyObjectWithoutFields(obj) + + if currentField != nil { + if resp.deferInfo == nil || slices.ContainsFunc(currentField.DeferPaths, func(el ast.Path) bool { + return resp.deferInfo != nil && el.Equals(resp.deferInfo.Path) + }) { + resp.updateCurrentFieldObject(currentField, newObject) + } + } + resp.objectStack = append(resp.objectStack, newObject) + + if resp.response.Data == nil { + resp.response.Data = newObject + } + } +} + +func (v *deferredFieldsVisitor) LeaveObject(*resolve.Object) { + for resp := range v.matchingResponseItems() { + if depth := len(resp.objectStack); depth > 1 { + resp.objectStack = resp.objectStack[:depth-1] + } + } +} + +func (v *deferredFieldsVisitor) EnterField(field *resolve.Field) { + // Reasons to append a field: + // 1. It's above a defer fragment. matchingResponseItems does this. + // 2. It's marked as deferred for some responses, send it there. The field has this information. + // 3. It's not marked as deferred, so sent to the immediate response. + // 4. It's above a non-deferred field? TODO(cd): post-cleanup. + var deferred bool + for resp := range v.matchingResponseItems() { + if slices.ContainsFunc(field.DeferPaths, func(el ast.Path) bool { + return resp.deferInfo != nil && el.Equals(resp.deferInfo.Path) + }) { + resp.appendField(field) + deferred = true + } + } + resp := v.responseItems[""] + + switch field.Value.(type) { + case *resolve.Object, *resolve.Array: + resp.appendField(field) + default: + if !deferred { + resp.appendField(field) + } + } +} + +func (v *deferredFieldsVisitor) LeaveField(field *resolve.Field) {} + +func (v *deferredFieldsVisitor) matchingResponseItems() iter.Seq[*responseItem] { + return func(yield func(*responseItem) bool) { + for path, resp := range v.responseItems { + if path == "" || resp.deferInfo.HasPrefix(v.walker.path) { + if !yield(resp) { + return + } + } + } + } +} + +type responseItem struct { + deferInfo *resolve.DeferInfo + response *resolve.GraphQLResponse + objectStack []*resolve.Object + lastArray *resolve.Array +} + +func (r *responseItem) currentObject() *resolve.Object { + if len(r.objectStack) == 0 { + return nil + } + return r.objectStack[len(r.objectStack)-1] +} + +func (r *responseItem) appendField(field *resolve.Field) { + newField := r.copyFieldWithoutObjectFields(field) + r.currentObject().Fields = append(r.currentObject().Fields, newField) + + if _, ok := field.Value.(*resolve.Array); ok { + r.lastArray = newField.Value.(*resolve.Array) + } else { + r.lastArray = nil + } +} + +func (r *responseItem) copyFieldWithoutObjectFields(f *resolve.Field) *resolve.Field { + switch fv := f.Value.(type) { + case *resolve.Object: + ret := &resolve.Field{ + Name: f.Name, + Position: f.Position, + DeferPaths: f.DeferPaths, + Stream: f.Stream, + OnTypeNames: f.OnTypeNames, + ParentOnTypeNames: f.ParentOnTypeNames, + Info: f.Info, + } + ret.Value = r.copyObjectWithoutFields(fv) + return ret + case *resolve.Array: + arrObj, ok := fv.Item.(*resolve.Object) + if !ok { + return f.Copy() + } + ret := &resolve.Field{ + Name: f.Name, + Position: f.Position, + DeferPaths: f.DeferPaths, + Stream: f.Stream, + OnTypeNames: f.OnTypeNames, + ParentOnTypeNames: f.ParentOnTypeNames, + Info: f.Info, + } + newItem := r.copyObjectWithoutFields(arrObj) + + ret.Value = &resolve.Array{ + Path: fv.Path, + Nullable: fv.Nullable, + Item: newItem, + } + return ret + default: + return f.Copy() + } +} + +func (r *responseItem) copyObjectWithoutFields(fv *resolve.Object) *resolve.Object { + newValue := fv.Copy().(*resolve.Object) + newValue.Fields = nil + + if len(fv.PossibleTypes) > 0 { + possibleTypes := make(map[string]struct{}, len(fv.PossibleTypes)) + for k, v := range fv.PossibleTypes { + possibleTypes[k] = v + } + newValue.PossibleTypes = possibleTypes + } + newValue.SourceName = fv.SourceName + newValue.TypeName = fv.TypeName + newValue.Fetches = r.fetchesForDeferFrom(fv.Fetches) + + return newValue +} + +func (r *responseItem) updateCurrentFieldObject(field *resolve.Field, obj *resolve.Object) { + switch field.Value.(type) { + case *resolve.Array: + // This object is an item in an array. + if r.lastArray != nil { + if _, ok := r.lastArray.Item.(*resolve.Object); ok { + r.lastArray.Item = obj + } + } + case *resolve.Object: + // This object is a field in another object. + if r.currentObject() != nil && len(r.currentObject().Fields) > 0 { + r.currentObject().Fields[len(r.currentObject().Fields)-1].Value = obj + } + } +} + +func (r *responseItem) fetchesForDeferFrom(fetches []resolve.Fetch) []resolve.Fetch { + var ret []resolve.Fetch + for _, fetch := range fetches { + if single, ok := fetch.(*resolve.SingleFetch); ok && single != nil { + if !single.DeferInfo.Equals(r.deferInfo) { + continue + } + } + ret = append(ret, fetch) + } + return ret +} diff --git a/v2/pkg/engine/postprocess/extract_deferred_fields_test.go b/v2/pkg/engine/postprocess/extract_deferred_fields_test.go new file mode 100644 index 0000000000..2c60c3b7cc --- /dev/null +++ b/v2/pkg/engine/postprocess/extract_deferred_fields_test.go @@ -0,0 +1,413 @@ +package postprocess + +import ( + "fmt" + "reflect" + "testing" + + "github.com/kylelemons/godebug/pretty" + "github.com/stretchr/testify/assert" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" +) + +func TestExtractDeferredFields_Process(t *testing.T) { + tests := []struct { + name string + input *resolve.GraphQLResponse + defers []resolve.DeferInfo + expected *resolve.GraphQLResponse + }{ + { + name: "trivial case", + input: &resolve.GraphQLResponse{ + Info: &resolve.GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("hero"), + Value: &resolve.Object{ + Path: []string{"hero"}, + Nullable: true, + TypeName: "Character", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{FetchDependencies: resolve.FetchDependencies{FetchID: 1}}, + }, + }, + }, + expected: &resolve.GraphQLResponse{ + Info: &resolve.GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("hero"), + Value: &resolve.Object{ + Path: []string{"hero"}, + Nullable: true, + TypeName: "Character", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{FetchDependencies: resolve.FetchDependencies{FetchID: 1}}, + }, + }, + }, + }, + { + name: "simple case", + input: &resolve.GraphQLResponse{ + Info: &resolve.GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("hero"), + Value: &resolve.Object{ + Path: []string{"hero"}, + Nullable: true, + TypeName: "Character", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + }, + { + Name: []byte("primaryFunction"), + Value: &resolve.String{ + Path: []string{"primaryFunction"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + { + Name: []byte("favoriteEpisode"), + Value: &resolve.Enum{ + Path: []string{"favoriteEpisode"}, + Nullable: true, + TypeName: "Episode", + Values: []string{ + "NEWHOPE", + "EMPIRE", + "JEDI", + }, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + }, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{FetchDependencies: resolve.FetchDependencies{FetchID: 1}}, + &resolve.SingleFetch{ + FetchDependencies: resolve.FetchDependencies{FetchID: 1}, + DeferInfo: &resolve.DeferInfo{ + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + }, + }, + defers: []resolve.DeferInfo{ + { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + expected: &resolve.GraphQLResponse{ + Info: &resolve.GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("hero"), + Value: &resolve.Object{ + Path: []string{"hero"}, + Nullable: true, + TypeName: "Character", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + }, + }, + }, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{FetchDependencies: resolve.FetchDependencies{FetchID: 1}}, + }, + }, + DeferredResponses: []*resolve.GraphQLResponse{ + { + Info: &resolve.GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("hero"), + Value: &resolve.Object{ + Path: []string{"hero"}, + Nullable: true, + TypeName: "Character", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("primaryFunction"), + Value: &resolve.String{ + Path: []string{"primaryFunction"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + { + Name: []byte("favoriteEpisode"), + Value: &resolve.Enum{ + Path: []string{"favoriteEpisode"}, + Nullable: true, + TypeName: "Episode", + Values: []string{ + "NEWHOPE", + "EMPIRE", + "JEDI", + }, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + }, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{ + FetchDependencies: resolve.FetchDependencies{FetchID: 1}, + DeferInfo: &resolve.DeferInfo{ + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("hero"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := &extractDeferredFields{} + e.Process(tt.input, tt.defers) + + if !assert.Equal(t, tt.expected, tt.input) { + formatterConfig := map[reflect.Type]interface{}{ + reflect.TypeOf([]byte{}): func(b []byte) string { return fmt.Sprintf(`"%s"`, string(b)) }, + } + + prettyCfg := &pretty.Config{ + Diffable: true, + IncludeUnexported: false, + Formatter: formatterConfig, + } + + if diff := prettyCfg.Compare(tt.expected, tt.input); diff != "" { + t.Errorf("Plan does not match(-want +got)\n%s", diff) + } + } + }) + } +} diff --git a/v2/pkg/engine/postprocess/postprocess.go b/v2/pkg/engine/postprocess/postprocess.go index 654b3ca14a..b0f4a6b82b 100644 --- a/v2/pkg/engine/postprocess/postprocess.go +++ b/v2/pkg/engine/postprocess/postprocess.go @@ -25,6 +25,7 @@ type Processor struct { dedupe *deduplicateSingleFetches processResponseTree []ResponseTreeProcessor processFetchTree []FetchTreeProcessor + extractDeferredFields *extractDeferredFields } type processorOptions struct { @@ -131,6 +132,7 @@ func NewProcessor(options ...ProcessorOption) *Processor { disable: opts.disableMergeFields, }, }, + extractDeferredFields: &extractDeferredFields{}, } } @@ -140,12 +142,10 @@ func (p *Processor) Process(pre plan.Plan) plan.Plan { for i := range p.processResponseTree { p.processResponseTree[i].Process(t.Response.Data) } - p.createFetchTree(t.Response) - p.dedupe.ProcessFetchTree(t.Response.Fetches) - p.resolveInputTemplates.ProcessFetchTree(t.Response.Fetches) - for i := range p.processFetchTree { - p.processFetchTree[i].ProcessFetchTree(t.Response.Fetches) + if len(t.DeferredFragments) > 0 { + p.extractDeferredFields.Process(t.Response, t.DeferredFragments) } + p.processFetchTrees(t.Response) case *plan.SubscriptionResponsePlan: for i := range p.processResponseTree { p.processResponseTree[i].ProcessSubscription(t.Response.Response.Data) @@ -195,6 +195,18 @@ func (p *Processor) createFetchTree(res *resolve.GraphQLResponse) { } } +func (p *Processor) processFetchTrees(resp *resolve.GraphQLResponse) { + p.createFetchTree(resp) + p.dedupe.ProcessFetchTree(resp.Fetches) + p.resolveInputTemplates.ProcessFetchTree(resp.Fetches) + for i := range p.processFetchTree { + p.processFetchTree[i].ProcessFetchTree(resp.Fetches) + } + for _, deferred := range resp.DeferredResponses { + p.processFetchTrees(deferred) + } +} + func (p *Processor) appendTriggerToFetchTree(res *resolve.GraphQLSubscription) { var input struct { Body struct { From 99f02495bd2cb7cf9902112af86398f95ef18704 Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Tue, 11 Mar 2025 18:15:42 -0700 Subject: [PATCH 6/9] feat: multipart response type and sample implementation --- .../engine/resolve/multipart_json_writer.go | 130 +++++++++++ .../resolve/multipart_response_writer_test.go | 221 ++++++++++++++++++ v2/pkg/engine/resolve/response.go | 6 + 3 files changed, 357 insertions(+) create mode 100644 v2/pkg/engine/resolve/multipart_json_writer.go create mode 100644 v2/pkg/engine/resolve/multipart_response_writer_test.go diff --git a/v2/pkg/engine/resolve/multipart_json_writer.go b/v2/pkg/engine/resolve/multipart_json_writer.go new file mode 100644 index 0000000000..a6676d81cf --- /dev/null +++ b/v2/pkg/engine/resolve/multipart_json_writer.go @@ -0,0 +1,130 @@ +package resolve + +import ( + "bytes" + "encoding/json" + "fmt" + "io" +) + +// TODO: should this go somewhere else? + +// MultipartJSONWriter is a writer that writes multipart incremental responses. +// Is assumes that all parts share the name contentType. +// Note that it is not smart enough to stream parts to the writer. Parts are written in bulk. +// Useful for testing and smaller applications. +type MultipartJSONWriter struct { + Writer io.Writer + BoundaryToken string + + buf bytes.Buffer + wroteInitial bool +} + +var _ IncrementalResponseWriter = (*MultipartJSONWriter)(nil) + +const ( + // DefaultBoundaryToken is the default boundary token used in multipart responses. + DefaultBoundaryToken = "graphql-go-tools" + + jsonContentType = "application/json; charset=utf-8" +) + +func (w *MultipartJSONWriter) Write(p []byte) (n int, err error) { + n, err = w.buf.Write(p) + if err != nil { + return n, fmt.Errorf("writing: %w", err) + } + return n, nil +} + +func (w *MultipartJSONWriter) Flush(path []any) (err error) { + if w.buf.Len() == 0 { + return nil + } + + var part incrementalPart + + if err := json.Unmarshal(w.buf.Bytes(), &part); err != nil { + return fmt.Errorf("unmarshaling data: %w", err) + } + part.HasNext = true + + if _, err := w.Writer.Write(w.partHeader()); err != nil { + return fmt.Errorf("writing part header: %w", err) + } + defer w.buf.Reset() + + if w.wroteInitial { + part.Incremental = []incrementalDataPart{ + { + Data: part.Data, + Path: path, + }, + } + part.Data = nil + + if len(path) > 0 { + part.Incremental[0].Path = path + } + } + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(part); err != nil { + return fmt.Errorf("encoding part body: %w", err) + } + if buf.Len() > 0 { + buf.Truncate(buf.Len() - 1) // remove trailing newline + } + if _, err := buf.WriteTo(w.Writer); err != nil { + return fmt.Errorf("writing part body: %w", err) + } + + if _, err := w.Writer.Write([]byte("\r\n")); err != nil { + return fmt.Errorf("writing part terminator: %w", err) + } + w.wroteInitial = true + + return nil +} + +func (w *MultipartJSONWriter) Complete() error { + if w.wroteInitial { + // Kind of a hack, but should work. + if _, err := w.Writer.Write([]byte(string(w.partHeader()) + `{"hasNext":false,"incremental":[]}` + "\r\n")); err != nil { + return fmt.Errorf("writing final part: %w", err) + } + } + if _, err := w.Writer.Write([]byte(fmt.Sprintf("--%s--\r\n", w.boundaryToken()))); err != nil { + return fmt.Errorf("writing final boundary: %w", err) + } + return nil +} + +func (w *MultipartJSONWriter) partHeader() []byte { + return []byte(fmt.Sprintf("--%s\r\nContent-Type: %s\r\n\r\n", w.boundaryToken(), jsonContentType)) +} + +func (w *MultipartJSONWriter) boundaryToken() string { + if len(w.BoundaryToken) == 0 { + return DefaultBoundaryToken + } + return w.BoundaryToken +} + +// incrementalPart is a part of a multipart response. +// It can contain a full response (for the first or only part) in `data`, or an incremental part in `incremental`. +type incrementalPart struct { + HasNext bool `json:"hasNext"` + + Data json.RawMessage `json:"data,omitempty"` + Incremental []incrementalDataPart `json:"incremental,omitempty"` + + Errors json.RawMessage `json:"errors,omitempty"` + Extensions json.RawMessage `json:"extensions,omitempty"` +} + +type incrementalDataPart struct { + Data json.RawMessage `json:"data"` + Path []any `json:"path,omitempty"` +} diff --git a/v2/pkg/engine/resolve/multipart_response_writer_test.go b/v2/pkg/engine/resolve/multipart_response_writer_test.go new file mode 100644 index 0000000000..46c0c057ff --- /dev/null +++ b/v2/pkg/engine/resolve/multipart_response_writer_test.go @@ -0,0 +1,221 @@ +package resolve_test + +import ( + "bytes" + "errors" + "io" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" +) + +func TestMultipartJSONWriter(t *testing.T) { + tests := []struct { + name string + boundaryToken string + path []any + parts [][]string + expected string + }{ + { + name: "simple case", + boundaryToken: "boundary", + path: []any{"path", "to", "part"}, + parts: [][]string{[]string{`{"data":"part1"}`}, []string{`{"data":"part2"}`}, []string{`{"data":"part3"}`}}, + expected: strings.ReplaceAll(`--boundary +Content-Type: application/json; charset=utf-8 + +{"hasNext":true,"data":"part1"} +--boundary +Content-Type: application/json; charset=utf-8 + +{"hasNext":true,"incremental":[{"data":"part2","path":["path","to","part"]}]} +--boundary +Content-Type: application/json; charset=utf-8 + +{"hasNext":true,"incremental":[{"data":"part3","path":["path","to","part"]}]} +--boundary +Content-Type: application/json; charset=utf-8 + +{"hasNext":false,"incremental":[]} +--boundary-- +`, "\n", "\r\n"), + }, + { + name: "multiple writes", + boundaryToken: "boundary", + path: []any{"path", 4, "part"}, + parts: [][]string{[]string{`{"data":`, `"part1a`, `part1b"}`}, []string{`{"data":"part2"}`}, []string{`{"data":"part3a`, `part3b"}`}}, + expected: strings.ReplaceAll(`--boundary +Content-Type: application/json; charset=utf-8 + +{"hasNext":true,"data":"part1apart1b"} +--boundary +Content-Type: application/json; charset=utf-8 + +{"hasNext":true,"incremental":[{"data":"part2","path":["path",4,"part"]}]} +--boundary +Content-Type: application/json; charset=utf-8 + +{"hasNext":true,"incremental":[{"data":"part3apart3b","path":["path",4,"part"]}]} +--boundary +Content-Type: application/json; charset=utf-8 + +{"hasNext":false,"incremental":[]} +--boundary-- +`, "\n", "\r\n"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + writer := &resolve.MultipartJSONWriter{ + Writer: &buf, + BoundaryToken: tt.boundaryToken, + } + + for _, part := range tt.parts { + for _, p := range part { + _, err := writer.Write([]byte(p)) + require.NoError(t, err) + } + err := writer.Flush(tt.path) + require.NoError(t, err) + } + + err := writer.Complete() + require.NoError(t, err) + + assert.Equal(t, tt.expected, buf.String()) + }) + } +} + +func TestMultipartJSONWriter_Write(t *testing.T) { + tests := []struct { + name string + input []byte + expectedWrite int + expectedErr error + }{ + { + name: "successful write", + input: []byte("test data"), + expectedWrite: 9, + expectedErr: nil, + }, + { + name: "empty input", + input: []byte(""), + expectedWrite: 0, + expectedErr: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + writer := &resolve.MultipartJSONWriter{ + Writer: &buf, + } + + n, err := writer.Write(tt.input) + assert.Equal(t, tt.expectedWrite, n) + assert.Equal(t, tt.expectedErr, err) + }) + } +} + +func TestMultipartJSONWriter_Flush(t *testing.T) { + tests := []struct { + name string + input []byte + expectedErr error + }{ + { + name: "successful flush", + input: []byte(`{"data":"test data"}`), + expectedErr: nil, + }, + { + name: "flush with empty buffer", + input: []byte(""), + expectedErr: nil, + }, + { + name: "flush with error", + input: []byte(`{"error": "data"}`), + expectedErr: errors.New("flush error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var w io.Writer + if tt.expectedErr != nil { + w = &errorWriter{ + err: tt.expectedErr, + } + } else { + w = &bytes.Buffer{} + } + writer := &resolve.MultipartJSONWriter{ + Writer: w, + } + + _, err := writer.Write(tt.input) + require.NoError(t, err) + + err = writer.Flush(nil) + assert.ErrorIs(t, err, tt.expectedErr) + }) + } +} + +func TestMultipartJSONWriter_Complete(t *testing.T) { + tests := []struct { + name string + expectedErr error + }{ + { + name: "success", + expectedErr: nil, + }, + { + name: "error", + expectedErr: errors.New("write error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var w io.Writer + if tt.expectedErr != nil { + w = &errorWriter{ + err: tt.expectedErr, + } + } else { + w = &bytes.Buffer{} + } + writer := &resolve.MultipartJSONWriter{ + Writer: w, + } + + err := writer.Complete() + assert.ErrorIs(t, err, tt.expectedErr) + }) + } +} + +type errorWriter struct { + err error +} + +func (e *errorWriter) Write(p []byte) (n int, err error) { + return 0, e.err +} diff --git a/v2/pkg/engine/resolve/response.go b/v2/pkg/engine/resolve/response.go index f51970ee01..f6cd770e78 100644 --- a/v2/pkg/engine/resolve/response.go +++ b/v2/pkg/engine/resolve/response.go @@ -48,6 +48,12 @@ type SubscriptionResponseWriter interface { Complete() } +type IncrementalResponseWriter interface { + ResponseWriter + Flush(path []any) error + Complete() error +} + func writeGraphqlResponse(buf *BufPair, writer io.Writer, ignoreData bool) (err error) { hasErrors := buf.Errors.Len() != 0 hasData := buf.Data.Len() != 0 && !ignoreData From b342da41b9afb5305411fd210b86ec70d005012e Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Tue, 11 Mar 2025 18:16:18 -0700 Subject: [PATCH 7/9] feat: fetching and resolving deferred fields --- v2/pkg/engine/resolve/fetch.go | 4 + v2/pkg/engine/resolve/resolve.go | 71 +++++++++++++-- v2/pkg/engine/resolve/resolve_test.go | 126 ++++++++++++++++++++++++++ 3 files changed, 191 insertions(+), 10 deletions(-) diff --git a/v2/pkg/engine/resolve/fetch.go b/v2/pkg/engine/resolve/fetch.go index 9f5d68fc95..a3f9fda526 100644 --- a/v2/pkg/engine/resolve/fetch.go +++ b/v2/pkg/engine/resolve/fetch.go @@ -55,6 +55,9 @@ func (f *FetchItem) Equals(other *FetchItem) bool { if !ok { return false } + if !l.DeferInfo.Equals(r.DeferInfo) { + return false + } return l.FetchConfiguration.Equals(&r.FetchConfiguration) } @@ -77,6 +80,7 @@ type SingleFetch struct { DataSourceIdentifier []byte Trace *DataSourceLoadTrace Info *FetchInfo + DeferInfo *DeferInfo } func (s *SingleFetch) Dependencies() FetchDependencies { diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index aecfdf97ce..440977d3f2 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -241,29 +241,80 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons defer func() { r.maxConcurrency <- struct{}{} }() - t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields) - err := t.resolvable.Init(ctx, data, response.Info.OperationType) - if err != nil { + if err := r.doResolve(ctx, t, response, data, writer); err != nil { return nil, err } + if iw, ok := writer.(IncrementalResponseWriter); ok { + if err := iw.Complete(); err != nil { + return nil, fmt.Errorf("completing response: %w", err) + } + } + return resp, nil +} + +var errInvalidWriter = errors.New("invalid writer") + +func (r *Resolver) doResolve(ctx *Context, t *tools, response *GraphQLResponse, data []byte, writer io.Writer) error { + err := t.resolvable.Init(ctx, data, response.Info.OperationType) + if err != nil { + return err + } if !ctx.ExecutionOptions.SkipLoader { - err = t.loader.LoadGraphQLResponseData(ctx, response, t.resolvable) + err := t.loader.LoadGraphQLResponseData(ctx, response, t.resolvable) if err != nil { - return nil, err + return err } } buf := &bytes.Buffer{} - err = t.resolvable.Resolve(ctx.ctx, response.Data, response.Fetches, buf) - if err != nil { - return nil, err + if err := t.resolvable.Resolve(ctx.ctx, response.Data, response.Fetches, buf); err != nil { + return err + } + + if _, err := buf.WriteTo(writer); err != nil { + return fmt.Errorf("writing response: %w", err) } - _, err = buf.WriteTo(writer) - return resp, err + if iw, ok := writer.(IncrementalResponseWriter); ok { + if err := iw.Flush(resolvedPath(response.Data.Path)); err != nil { + return fmt.Errorf("flushing immediate response: %w", err) + } + } + + if len(response.DeferredResponses) > 0 { + iw, ok := writer.(IncrementalResponseWriter) + if !ok { + return fmt.Errorf("%w: writer %T does not support incremental writing", errInvalidWriter, writer) + } + + for i, deferredResponse := range response.DeferredResponses { + if err := r.doResolve(ctx, t, deferredResponse, nil, iw); err != nil { + return fmt.Errorf("resolving deferred response %d: %w", i, err) + } + if err := iw.Flush(resolvedPath(deferredResponse.Data.Path)); err != nil { + return fmt.Errorf("flushing incremental response: %w", err) + } + } + } + return nil +} + +func resolvedPath(data []string) []any { + if len(data) == 0 { + return nil + } + ret := make([]any, len(data)) + for i, v := range data { + if v == "@" { + ret[i] = 0 // TODO(cd): need the real values here. + continue + } + ret[i] = v + } + return ret } type trigger struct { diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 37b9f31e26..23365aaf18 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -8,6 +8,7 @@ import ( "io" "net" "net/http" + "strings" "sync" "sync/atomic" "testing" @@ -4769,6 +4770,131 @@ func TestResolver_WithVariableRemapping(t *testing.T) { } } +func TestResolver_ResolveGraphQLIncrementalResponse(t *testing.T) { + cases := []struct { + name string + response *GraphQLResponse + expected string + }{ + { + name: "sunny day", + response: &GraphQLResponse{ + Info: &GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &Object{ + Nullable: false, + Fields: []*Field{ + { + Name: []byte("hero"), + Value: &Object{ + Path: []string{"hero"}, + Nullable: true, + TypeName: "Character", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}}, + Fields: []*Field{ + { + Name: []byte("name"), + Value: &String{ + Path: []string{"name"}, + Nullable: false, + }, + }, + }, + }, + }, + }, + }, + Fetches: Single(&SingleFetch{ + FetchConfiguration: FetchConfiguration{DataSource: FakeDataSource(`{"hero":{"name":"Luke"}}`)}, + }), + DeferredResponses: []*GraphQLResponse{ + { + Info: &GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &Object{ + Path: []string{"hero"}, + Nullable: true, + TypeName: "Character", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}}, + Fields: []*Field{ + { + Name: []byte("__typename"), + Value: &String{ + Path: []string{"__typename"}, + }, + }, + { + Name: []byte("primaryFunction"), + Value: &String{ + Path: []string{"primaryFunction"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + // Defer: &DeferField{ + // Path: []string{"query", "hero", "$0Droid"}, + // }, + }, + { + Name: []byte("favoriteEpisode"), + Value: &Enum{ + Path: []string{"favoriteEpisode"}, + Nullable: true, + TypeName: "Episode", + Values: []string{ + "NEWHOPE", + "EMPIRE", + "JEDI", + }, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + // Defer: &DeferField{ + // Path: []string{"query", "hero", "$0Droid"}, + // }, + }, + }, + }, + Fetches: Single(&SingleFetch{ + FetchConfiguration: FetchConfiguration{DataSource: FakeDataSource(`{"hero":{"__typename":"Droid","primaryFunction":"Astromech","favoriteEpisode":"NEWHOPE"}}`)}, + }), + }, + }, + }, + expected: strings.ReplaceAll(`--graphql-go-tools +Content-Type: application/json; charset=utf-8 + +{"hasNext":true,"data":{"hero":{"name":"Luke"}}} +--graphql-go-tools +Content-Type: application/json; charset=utf-8 + +{"hasNext":true,"incremental":[{"data":{"__typename":"Droid","primaryFunction":"Astromech","favoriteEpisode":"NEWHOPE"},"path":["hero"]}]} +--graphql-go-tools +Content-Type: application/json; charset=utf-8 + +{"hasNext":false,"incremental":[]} +--graphql-go-tools-- +`, "\n", "\r\n"), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + r := newResolver(ctx) + var buf bytes.Buffer + w := &MultipartJSONWriter{ + Writer: &buf, + BoundaryToken: "graphql-go-tools", + } + info, err := r.ResolveGraphQLResponse(&Context{ctx: ctx}, tc.response, nil, w) + require.NoError(t, err) + assert.Equal(t, tc.expected, buf.String()) + require.NotNil(t, info) + }) + } +} + type SubscriptionRecorder struct { buf *bytes.Buffer messages []string From dde793d6dc8d1afc51e057cc37691bf1c6e1170a Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Wed, 12 Mar 2025 16:11:44 -0700 Subject: [PATCH 8/9] fix: handle arrays properly, other cleanups --- v2/pkg/engine/plan/planner_test.go | 251 ++++++++++++ .../postprocess/extract_deferred_fields.go | 139 +++---- .../extract_deferred_fields_test.go | 377 +++++++++++++++++- v2/pkg/engine/postprocess/plan_visitor.go | 16 +- v2/pkg/engine/resolve/node_object.go | 19 +- v2/pkg/engine/resolve/node_object_test.go | 14 +- 6 files changed, 714 insertions(+), 102 deletions(-) diff --git a/v2/pkg/engine/plan/planner_test.go b/v2/pkg/engine/plan/planner_test.go index a06f8d7dc8..034a267dc7 100644 --- a/v2/pkg/engine/plan/planner_test.go +++ b/v2/pkg/engine/plan/planner_test.go @@ -499,6 +499,257 @@ func TestPlanner_Plan(t *testing.T) { DisableIncludeInfo: true, DataSources: []DataSource{testDefinitionDSConfiguration}, })) + + t.Run("simple inline fragment with arrays", test(testDefinition, ` + query WithInlineDeferWithArray { + searchResults { + ... on Human { + name + } + ... on Droid @defer { + name + primaryFunction + favoriteEpisode + } + } + } + `, "WithInlineDeferWithArray", &SynchronousResponsePlan{ + Response: &resolve.GraphQLResponse{ + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("searchResults"), + Value: &resolve.Array{ + Path: []string{"searchResults"}, + Nullable: true, + Item: &resolve.Object{ + TypeName: "SearchResult", + Nullable: true, + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}, "Starship": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Human")}, + }, + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + { + Name: []byte("primaryFunction"), + Value: &resolve.String{ + Path: []string{"primaryFunction"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + { + Name: []byte("favoriteEpisode"), + Value: &resolve.Enum{ + Path: []string{"favoriteEpisode"}, + Nullable: true, + TypeName: "Episode", + Values: []string{ + "NEWHOPE", + "EMPIRE", + "JEDI", + }, + InaccessibleValues: []string{}, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + }, + }, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{ + FetchConfiguration: resolve.FetchConfiguration{ + DataSource: &FakeDataSource{&StatefulSource{}}, + }, + DataSourceIdentifier: []byte("plan.FakeDataSource"), + }, + &resolve.SingleFetch{ + FetchConfiguration: resolve.FetchConfiguration{ + DataSource: &FakeDataSource{&StatefulSource{}}, + }, + DataSourceIdentifier: []byte("plan.FakeDataSource"), + DeferInfo: &resolve.DeferInfo{ + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + }, + }, + DeferredFragments: []resolve.DeferInfo{ + { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + DeferredFields: map[int]resolve.DeferInfo{ + 1: { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + 2: { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + 3: { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, Configuration{ + DisableResolveFieldPositions: true, + DisableIncludeInfo: true, + DataSources: []DataSource{testDefinitionDSConfiguration}, + })) }) t.Run("operation selection", func(t *testing.T) { diff --git a/v2/pkg/engine/postprocess/extract_deferred_fields.go b/v2/pkg/engine/postprocess/extract_deferred_fields.go index e577621c51..7584cab066 100644 --- a/v2/pkg/engine/postprocess/extract_deferred_fields.go +++ b/v2/pkg/engine/postprocess/extract_deferred_fields.go @@ -60,39 +60,21 @@ type deferredFieldsVisitor struct { } func (v *deferredFieldsVisitor) EnterObject(obj *resolve.Object) { - var ( - currentField *resolve.Field - ) - - if len(v.walker.CurrentFields) > 0 { - currentField = v.walker.CurrentFields[len(v.walker.CurrentFields)-1] - } - - for resp := range v.matchingResponseItems() { - newObject := resp.copyObjectWithoutFields(obj) - - if currentField != nil { - if resp.deferInfo == nil || slices.ContainsFunc(currentField.DeferPaths, func(el ast.Path) bool { - return resp.deferInfo != nil && el.Equals(resp.deferInfo.Path) - }) { - resp.updateCurrentFieldObject(currentField, newObject) + if len(v.walker.CurrentFields) == 0 { + // Set up root objects. + for _, resp := range v.responseItems { + newObject := resp.copyObjectWithoutFields(obj) + resp.objectStack = append(resp.objectStack, newObject) + + if resp.response.Data == nil { + resp.response.Data = newObject } } - resp.objectStack = append(resp.objectStack, newObject) - - if resp.response.Data == nil { - resp.response.Data = newObject - } + return } } -func (v *deferredFieldsVisitor) LeaveObject(*resolve.Object) { - for resp := range v.matchingResponseItems() { - if depth := len(resp.objectStack); depth > 1 { - resp.objectStack = resp.objectStack[:depth-1] - } - } -} +func (v *deferredFieldsVisitor) LeaveObject(*resolve.Object) {} func (v *deferredFieldsVisitor) EnterField(field *resolve.Field) { // Reasons to append a field: @@ -102,8 +84,8 @@ func (v *deferredFieldsVisitor) EnterField(field *resolve.Field) { // 4. It's above a non-deferred field? TODO(cd): post-cleanup. var deferred bool for resp := range v.matchingResponseItems() { - if slices.ContainsFunc(field.DeferPaths, func(el ast.Path) bool { - return resp.deferInfo != nil && el.Equals(resp.deferInfo.Path) + if resp.deferInfo != nil && slices.ContainsFunc(field.DeferPaths, func(el ast.Path) bool { + return el.Equals(resp.deferInfo.Path) }) { resp.appendField(field) deferred = true @@ -121,12 +103,31 @@ func (v *deferredFieldsVisitor) EnterField(field *resolve.Field) { } } -func (v *deferredFieldsVisitor) LeaveField(field *resolve.Field) {} +func (v *deferredFieldsVisitor) LeaveField(field *resolve.Field) { + popObjectStack := func(resp *responseItem) { + if depth := len(resp.objectStack); depth > 1 { + switch fv := field.Value.(type) { + case *resolve.Object: + resp.objectStack = resp.objectStack[:depth-1] + case *resolve.Array: + if _, ok := fv.Item.(*resolve.Object); ok { + resp.objectStack = resp.objectStack[:depth-1] + } + } + } + } + for resp := range v.matchingResponseItems() { + popObjectStack(resp) + } + popObjectStack(v.responseItems[""]) +} +// matchingResponseItems returns a sequence of response items that match the current path. +// It specifically excludes the immediate response. func (v *deferredFieldsVisitor) matchingResponseItems() iter.Seq[*responseItem] { return func(yield func(*responseItem) bool) { for path, resp := range v.responseItems { - if path == "" || resp.deferInfo.HasPrefix(v.walker.path) { + if path != "" && resp.deferInfo.HasPrefix(v.walker.path) { if !yield(resp) { return } @@ -139,7 +140,6 @@ type responseItem struct { deferInfo *resolve.DeferInfo response *resolve.GraphQLResponse objectStack []*resolve.Object - lastArray *resolve.Array } func (r *responseItem) currentObject() *resolve.Object { @@ -153,52 +153,36 @@ func (r *responseItem) appendField(field *resolve.Field) { newField := r.copyFieldWithoutObjectFields(field) r.currentObject().Fields = append(r.currentObject().Fields, newField) - if _, ok := field.Value.(*resolve.Array); ok { - r.lastArray = newField.Value.(*resolve.Array) - } else { - r.lastArray = nil + switch fv := newField.Value.(type) { + case *resolve.Object: + r.objectStack = append(r.objectStack, fv) + case *resolve.Array: + if item, ok := fv.Item.(*resolve.Object); ok { + r.objectStack = append(r.objectStack, item) + } } } func (r *responseItem) copyFieldWithoutObjectFields(f *resolve.Field) *resolve.Field { + ret := &resolve.Field{ + Name: f.Name, + Value: f.Value.Copy(), + Position: f.Position, + DeferPaths: f.DeferPaths, + Stream: f.Stream, + OnTypeNames: f.OnTypeNames, + ParentOnTypeNames: f.ParentOnTypeNames, + Info: f.Info, + } switch fv := f.Value.(type) { case *resolve.Object: - ret := &resolve.Field{ - Name: f.Name, - Position: f.Position, - DeferPaths: f.DeferPaths, - Stream: f.Stream, - OnTypeNames: f.OnTypeNames, - ParentOnTypeNames: f.ParentOnTypeNames, - Info: f.Info, - } ret.Value = r.copyObjectWithoutFields(fv) - return ret case *resolve.Array: - arrObj, ok := fv.Item.(*resolve.Object) - if !ok { - return f.Copy() + if arrObj, ok := fv.Item.(*resolve.Object); ok { + ret.Value.(*resolve.Array).Item = r.copyObjectWithoutFields(arrObj) } - ret := &resolve.Field{ - Name: f.Name, - Position: f.Position, - DeferPaths: f.DeferPaths, - Stream: f.Stream, - OnTypeNames: f.OnTypeNames, - ParentOnTypeNames: f.ParentOnTypeNames, - Info: f.Info, - } - newItem := r.copyObjectWithoutFields(arrObj) - - ret.Value = &resolve.Array{ - Path: fv.Path, - Nullable: fv.Nullable, - Item: newItem, - } - return ret - default: - return f.Copy() } + return ret } func (r *responseItem) copyObjectWithoutFields(fv *resolve.Object) *resolve.Object { @@ -219,23 +203,6 @@ func (r *responseItem) copyObjectWithoutFields(fv *resolve.Object) *resolve.Obje return newValue } -func (r *responseItem) updateCurrentFieldObject(field *resolve.Field, obj *resolve.Object) { - switch field.Value.(type) { - case *resolve.Array: - // This object is an item in an array. - if r.lastArray != nil { - if _, ok := r.lastArray.Item.(*resolve.Object); ok { - r.lastArray.Item = obj - } - } - case *resolve.Object: - // This object is a field in another object. - if r.currentObject() != nil && len(r.currentObject().Fields) > 0 { - r.currentObject().Fields[len(r.currentObject().Fields)-1].Value = obj - } - } -} - func (r *responseItem) fetchesForDeferFrom(fetches []resolve.Fetch) []resolve.Fetch { var ret []resolve.Fetch for _, fetch := range fetches { diff --git a/v2/pkg/engine/postprocess/extract_deferred_fields_test.go b/v2/pkg/engine/postprocess/extract_deferred_fields_test.go index 2c60c3b7cc..371cb99f06 100644 --- a/v2/pkg/engine/postprocess/extract_deferred_fields_test.go +++ b/v2/pkg/engine/postprocess/extract_deferred_fields_test.go @@ -386,6 +386,381 @@ func TestExtractDeferredFields_Process(t *testing.T) { }, }, }, + { + name: "simple case with arrays", + input: &resolve.GraphQLResponse{ + Info: &resolve.GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("searchResults"), + Value: &resolve.Array{ + Path: []string{"searchResults"}, + Nullable: true, + Item: &resolve.Object{ + TypeName: "SearchResult", + Nullable: true, + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}, "Starship": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Human")}, + }, + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + { + Name: []byte("primaryFunction"), + Value: &resolve.String{ + Path: []string{"primaryFunction"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + { + Name: []byte("favoriteEpisode"), + Value: &resolve.Enum{ + Path: []string{"favoriteEpisode"}, + Nullable: true, + TypeName: "Episode", + Values: []string{ + "NEWHOPE", + "EMPIRE", + "JEDI", + }, + InaccessibleValues: []string{}, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + }, + }, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{DataSourceIdentifier: []byte("plan.FakeDataSource")}, + &resolve.SingleFetch{ + DataSourceIdentifier: []byte("plan.FakeDataSource"), + DeferInfo: &resolve.DeferInfo{ + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + }, + }, + defers: []resolve.DeferInfo{ + { + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + expected: &resolve.GraphQLResponse{ + Info: &resolve.GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("searchResults"), + Value: &resolve.Array{ + Path: []string{"searchResults"}, + Nullable: true, + Item: &resolve.Object{ + Nullable: true, + TypeName: "SearchResult", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}, "Starship": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Human")}, + }, + }, + }, + }, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{DataSourceIdentifier: []byte("plan.FakeDataSource")}, + }, + }, + DeferredResponses: []*resolve.GraphQLResponse{ + { + Info: &resolve.GraphQLResponseInfo{ + OperationType: ast.OperationTypeQuery, + }, + Data: &resolve.Object{ + Nullable: false, + Fields: []*resolve.Field{ + { + Name: []byte("searchResults"), + Value: &resolve.Array{ + Nullable: true, + Path: []string{"searchResults"}, + Item: &resolve.Object{ + Nullable: true, + TypeName: "SearchResult", + PossibleTypes: map[string]struct{}{"Droid": {}, "Human": {}, "Starship": {}}, + Fields: []*resolve.Field{ + { + Name: []byte("name"), + Value: &resolve.String{ + Path: []string{"name"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + { + Name: []byte("primaryFunction"), + Value: &resolve.String{ + Path: []string{"primaryFunction"}, + Nullable: false, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + { + Name: []byte("favoriteEpisode"), + Value: &resolve.Enum{ + Path: []string{"favoriteEpisode"}, + Nullable: true, + TypeName: "Episode", + Values: []string{ + "NEWHOPE", + "EMPIRE", + "JEDI", + }, + InaccessibleValues: []string{}, + }, + OnTypeNames: [][]byte{[]byte("Droid")}, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + }, + }, + DeferPaths: []ast.Path{ + { + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + Fetches: []resolve.Fetch{ + &resolve.SingleFetch{ + DataSourceIdentifier: []byte("plan.FakeDataSource"), + DeferInfo: &resolve.DeferInfo{ + Path: ast.Path{ + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("query"), + }, + ast.PathItem{ + Kind: ast.FieldName, + FieldName: []byte("searchResults"), + }, + ast.PathItem{ + Kind: ast.InlineFragmentName, + FieldName: []byte("Droid"), + FragmentRef: 1, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, } for _, tt := range tests { @@ -400,7 +775,7 @@ func TestExtractDeferredFields_Process(t *testing.T) { prettyCfg := &pretty.Config{ Diffable: true, - IncludeUnexported: false, + IncludeUnexported: true, Formatter: formatterConfig, } diff --git a/v2/pkg/engine/postprocess/plan_visitor.go b/v2/pkg/engine/postprocess/plan_visitor.go index bcc2601d8e..c050e70179 100644 --- a/v2/pkg/engine/postprocess/plan_visitor.go +++ b/v2/pkg/engine/postprocess/plan_visitor.go @@ -69,15 +69,15 @@ func (e *PlanWalker) SetSkip(skip bool) { e.skip = skip } -func (e *PlanWalker) registerObjectVisitor(visitor PlanObjectVisitor) { +func (e *PlanWalker) RegisterObjectVisitor(visitor PlanObjectVisitor) { e.objectVisitor = visitor } -func (e *PlanWalker) registerArrayVisitor(visitor PlanArrayVisitor) { +func (e *PlanWalker) RegisterArrayVisitor(visitor PlanArrayVisitor) { e.arrayVisitor = visitor } -func (e *PlanWalker) registerFieldVisitor(visitor PlanFieldVisitor) { +func (e *PlanWalker) RegisterFieldVisitor(visitor PlanFieldVisitor) { e.fieldVisitor = visitor } @@ -151,12 +151,12 @@ func (e *PlanWalker) walkNode(node resolve.Node) { } func (e *PlanWalker) walkField(field *resolve.Field) { - e.pushPath(field.Value.NodePath()) - defer e.popPath(field.Value.NodePath()) - e.onEnterField(field) defer e.onLeaveField(field) + e.pushPath(field.Value.NodePath()) + defer e.popPath(field.Value.NodePath()) + e.pushField(field) defer e.popField() @@ -180,8 +180,8 @@ func (e *PlanWalker) onLeaveField(field *resolve.Field) { } func (e *PlanWalker) walkObject(object *resolve.Object) { - e.objectVisitor.EnterObject(object) - defer e.objectVisitor.LeaveObject(object) + e.onEnterObject(object) + defer e.onLeaveObject(object) e.pushObject(object) defer e.popObject() diff --git a/v2/pkg/engine/resolve/node_object.go b/v2/pkg/engine/resolve/node_object.go index 0a109d628a..6c30acc663 100644 --- a/v2/pkg/engine/resolve/node_object.go +++ b/v2/pkg/engine/resolve/node_object.go @@ -219,16 +219,23 @@ func (d *DeferInfo) HasPrefix(prefix []string) bool { } var skip int for i, p := range deferPath { - if p.Kind == ast.InlineFragmentName { - skip++ - continue - } idx := i + skip if idx >= len(prefix) { return true } - if idx >= len(prefix) || prefix[idx] != string(p.FieldName) { - return false + + switch p.Kind { + case ast.InlineFragmentName: + skip++ + continue + case ast.ArrayIndex: + if prefix[idx] != "@" { + return false + } + case ast.FieldName: + if idx >= len(prefix) || prefix[idx] != string(p.FieldName) { + return false + } } } return true diff --git a/v2/pkg/engine/resolve/node_object_test.go b/v2/pkg/engine/resolve/node_object_test.go index 5ede6f8635..4c3f9a8c50 100644 --- a/v2/pkg/engine/resolve/node_object_test.go +++ b/v2/pkg/engine/resolve/node_object_test.go @@ -10,7 +10,7 @@ import ( var ( pathItem1 = ast.PathItem{Kind: ast.FieldName, FieldName: []byte("query")} pathItem2 = ast.PathItem{Kind: ast.FieldName, FieldName: []byte("object1")} - pathItem3 = ast.PathItem{Kind: ast.ArrayIndex, ArrayIndex: 3, FieldName: []byte("field1")} + pathItem3 = ast.PathItem{Kind: ast.FieldName, FieldName: []byte("field1")} fragmentItem = ast.PathItem{Kind: ast.InlineFragmentName, FieldName: []byte("frag2"), FragmentRef: 2} arrayItem = ast.PathItem{Kind: ast.ArrayIndex, ArrayIndex: 3, FieldName: []byte("arrayField3")} @@ -228,6 +228,18 @@ func TestDeferInfo_HasPrefix(t *testing.T) { prefix: []string{"query", "x"}, expected: false, }, + { + name: "handle arrays", + deferInfo: &DeferInfo{Path: ast.Path{pathItem1, arrayItem, pathItem2}}, + prefix: []string{"query", "@", "object1"}, + expected: true, + }, + { + name: "handle arrays, but mis-match", + deferInfo: &DeferInfo{Path: ast.Path{pathItem1, arrayItem, pathItem2}}, + prefix: []string{"query", "@", "x"}, + expected: false, + }, { name: "nil DeferInfo, non-empty prefix", deferInfo: nil, From f9f55c399d65044a77e749e3b07f9461eada7cc1 Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Wed, 12 Mar 2025 16:48:56 -0700 Subject: [PATCH 9/9] fix: clean up incremental response hack --- v2/pkg/engine/resolve/multipart_json_writer.go | 10 ++-------- .../resolve/multipart_response_writer_test.go | 18 +++++------------- v2/pkg/engine/resolve/resolve.go | 14 +++++++------- v2/pkg/engine/resolve/resolve_test.go | 6 +----- v2/pkg/engine/resolve/response.go | 2 +- 5 files changed, 16 insertions(+), 34 deletions(-) diff --git a/v2/pkg/engine/resolve/multipart_json_writer.go b/v2/pkg/engine/resolve/multipart_json_writer.go index a6676d81cf..d44d02a8db 100644 --- a/v2/pkg/engine/resolve/multipart_json_writer.go +++ b/v2/pkg/engine/resolve/multipart_json_writer.go @@ -38,7 +38,7 @@ func (w *MultipartJSONWriter) Write(p []byte) (n int, err error) { return n, nil } -func (w *MultipartJSONWriter) Flush(path []any) (err error) { +func (w *MultipartJSONWriter) Flush(path []any, isFinal bool) (err error) { if w.buf.Len() == 0 { return nil } @@ -48,7 +48,7 @@ func (w *MultipartJSONWriter) Flush(path []any) (err error) { if err := json.Unmarshal(w.buf.Bytes(), &part); err != nil { return fmt.Errorf("unmarshaling data: %w", err) } - part.HasNext = true + part.HasNext = !isFinal if _, err := w.Writer.Write(w.partHeader()); err != nil { return fmt.Errorf("writing part header: %w", err) @@ -89,12 +89,6 @@ func (w *MultipartJSONWriter) Flush(path []any) (err error) { } func (w *MultipartJSONWriter) Complete() error { - if w.wroteInitial { - // Kind of a hack, but should work. - if _, err := w.Writer.Write([]byte(string(w.partHeader()) + `{"hasNext":false,"incremental":[]}` + "\r\n")); err != nil { - return fmt.Errorf("writing final part: %w", err) - } - } if _, err := w.Writer.Write([]byte(fmt.Sprintf("--%s--\r\n", w.boundaryToken()))); err != nil { return fmt.Errorf("writing final boundary: %w", err) } diff --git a/v2/pkg/engine/resolve/multipart_response_writer_test.go b/v2/pkg/engine/resolve/multipart_response_writer_test.go index 46c0c057ff..4c4288a880 100644 --- a/v2/pkg/engine/resolve/multipart_response_writer_test.go +++ b/v2/pkg/engine/resolve/multipart_response_writer_test.go @@ -37,11 +37,7 @@ Content-Type: application/json; charset=utf-8 --boundary Content-Type: application/json; charset=utf-8 -{"hasNext":true,"incremental":[{"data":"part3","path":["path","to","part"]}]} ---boundary -Content-Type: application/json; charset=utf-8 - -{"hasNext":false,"incremental":[]} +{"hasNext":false,"incremental":[{"data":"part3","path":["path","to","part"]}]} --boundary-- `, "\n", "\r\n"), }, @@ -61,11 +57,7 @@ Content-Type: application/json; charset=utf-8 --boundary Content-Type: application/json; charset=utf-8 -{"hasNext":true,"incremental":[{"data":"part3apart3b","path":["path",4,"part"]}]} ---boundary -Content-Type: application/json; charset=utf-8 - -{"hasNext":false,"incremental":[]} +{"hasNext":false,"incremental":[{"data":"part3apart3b","path":["path",4,"part"]}]} --boundary-- `, "\n", "\r\n"), }, @@ -79,12 +71,12 @@ Content-Type: application/json; charset=utf-8 BoundaryToken: tt.boundaryToken, } - for _, part := range tt.parts { + for i, part := range tt.parts { for _, p := range part { _, err := writer.Write([]byte(p)) require.NoError(t, err) } - err := writer.Flush(tt.path) + err := writer.Flush(tt.path, i == len(tt.parts)-1) require.NoError(t, err) } @@ -171,7 +163,7 @@ func TestMultipartJSONWriter_Flush(t *testing.T) { _, err := writer.Write(tt.input) require.NoError(t, err) - err = writer.Flush(nil) + err = writer.Flush(nil, true) assert.ErrorIs(t, err, tt.expectedErr) }) } diff --git a/v2/pkg/engine/resolve/resolve.go b/v2/pkg/engine/resolve/resolve.go index 440977d3f2..a4175b250c 100644 --- a/v2/pkg/engine/resolve/resolve.go +++ b/v2/pkg/engine/resolve/resolve.go @@ -243,7 +243,7 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons }() t := newTools(r.options, r.allowedErrorExtensionFields, r.allowedErrorFields) - if err := r.doResolve(ctx, t, response, data, writer); err != nil { + if err := r.doResolve(ctx, t, response, data, writer, true); err != nil { return nil, err } @@ -257,7 +257,7 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons var errInvalidWriter = errors.New("invalid writer") -func (r *Resolver) doResolve(ctx *Context, t *tools, response *GraphQLResponse, data []byte, writer io.Writer) error { +func (r *Resolver) doResolve(ctx *Context, t *tools, response *GraphQLResponse, data []byte, writer io.Writer, isFinal bool) error { err := t.resolvable.Init(ctx, data, response.Info.OperationType) if err != nil { return err @@ -279,7 +279,8 @@ func (r *Resolver) doResolve(ctx *Context, t *tools, response *GraphQLResponse, } if iw, ok := writer.(IncrementalResponseWriter); ok { - if err := iw.Flush(resolvedPath(response.Data.Path)); err != nil { + isReallyFinal := isFinal && len(response.DeferredResponses) == 0 + if err := iw.Flush(resolvedPath(response.Data.Path), isReallyFinal); err != nil { return fmt.Errorf("flushing immediate response: %w", err) } } @@ -291,12 +292,11 @@ func (r *Resolver) doResolve(ctx *Context, t *tools, response *GraphQLResponse, } for i, deferredResponse := range response.DeferredResponses { - if err := r.doResolve(ctx, t, deferredResponse, nil, iw); err != nil { + isReallyFinal := isFinal && i == len(response.DeferredResponses)-1 + + if err := r.doResolve(ctx, t, deferredResponse, nil, iw, isReallyFinal); err != nil { return fmt.Errorf("resolving deferred response %d: %w", i, err) } - if err := iw.Flush(resolvedPath(deferredResponse.Data.Path)); err != nil { - return fmt.Errorf("flushing incremental response: %w", err) - } } } return nil diff --git a/v2/pkg/engine/resolve/resolve_test.go b/v2/pkg/engine/resolve/resolve_test.go index 23365aaf18..a2324fe310 100644 --- a/v2/pkg/engine/resolve/resolve_test.go +++ b/v2/pkg/engine/resolve/resolve_test.go @@ -4868,11 +4868,7 @@ Content-Type: application/json; charset=utf-8 --graphql-go-tools Content-Type: application/json; charset=utf-8 -{"hasNext":true,"incremental":[{"data":{"__typename":"Droid","primaryFunction":"Astromech","favoriteEpisode":"NEWHOPE"},"path":["hero"]}]} ---graphql-go-tools -Content-Type: application/json; charset=utf-8 - -{"hasNext":false,"incremental":[]} +{"hasNext":false,"incremental":[{"data":{"__typename":"Droid","primaryFunction":"Astromech","favoriteEpisode":"NEWHOPE"},"path":["hero"]}]} --graphql-go-tools-- `, "\n", "\r\n"), }, diff --git a/v2/pkg/engine/resolve/response.go b/v2/pkg/engine/resolve/response.go index f6cd770e78..59d305b0da 100644 --- a/v2/pkg/engine/resolve/response.go +++ b/v2/pkg/engine/resolve/response.go @@ -50,7 +50,7 @@ type SubscriptionResponseWriter interface { type IncrementalResponseWriter interface { ResponseWriter - Flush(path []any) error + Flush(path []any, isFinal bool) error Complete() error }