Skip to content

Commit 0c18fe1

Browse files
authored
Support Type Merging within additional type defs with subscriptions (#8733)
1 parent 48dc37e commit 0c18fe1

File tree

11 files changed

+360
-7
lines changed

11 files changed

+360
-7
lines changed

.changeset/solid-needles-shout.md

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
---
2+
'@graphql-mesh/config': patch
3+
'@graphql-mesh/types': patch
4+
'@graphql-mesh/utils': patch
5+
---
6+
7+
Support Type Merging within additional type defs for subscriptions
8+
9+
This allows subscription events to resolve fields from other subgraphs.
10+
11+
For example, if you have a `products` subgraph like this:
12+
13+
```gql filename="products.graphql"
14+
type Query {
15+
hello: String!
16+
}
17+
type Product @key(fields: "id") {
18+
id: ID!
19+
name: String!
20+
price: Float!
21+
}
22+
```
23+
24+
we need add the subscription fields like this:
25+
26+
```ts filename="mesh.config.ts"
27+
import { defineConfig, loadGraphQLHTTPSubgraph } from '@graphql-mesh/compose-cli'
28+
29+
export const composeConfig = defineConfig({
30+
subgraphs: [
31+
{
32+
sourceHandler: loadGraphQLHTTPSubgraph('products', {
33+
endpoint: `http://localhost:3000/graphql`
34+
})
35+
}
36+
],
37+
additionalTypeDefs: /* GraphQL */ `
38+
extend schema {
39+
subscription: Subscription
40+
}
41+
type Subscription {
42+
newProduct: Product! @resolveTo(pubsubTopic: "new_product", sourceName: "products")
43+
}
44+
`
45+
})
46+
```
47+
48+
you can subscribe to Hive Gateway like this:
49+
50+
```graphql
51+
subscription {
52+
newProduct {
53+
name
54+
price
55+
}
56+
}
57+
```
58+
59+
emit an event to the Redis instance on the `new_product` topic
60+
this:
61+
62+
```redis
63+
PUBLISH new_product '{"id":"roomba70x"}'
64+
```
65+
66+
The subscriber will then receive the following event:
67+
68+
```json
69+
{
70+
"data": {
71+
"newProduct": {
72+
"name": "Roomba 70x",
73+
"price": 279.99
74+
}
75+
}
76+
}
77+
```
78+
79+
Because Hive Gateway merged the `Product` type from the `products` subgraph into the root schema, it can resolve the `name` and `price` fields even though they are not defined in the `additionalTypeDefs`.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { Redis } from 'ioredis';
2+
import { defineConfig as defineGatewayConfig } from '@graphql-hive/gateway';
3+
import { RedisPubSub } from '@graphql-hive/pubsub/redis';
4+
import { toMeshPubSub } from '@graphql-mesh/types';
5+
6+
/**
7+
* When a Redis connection enters "subscriber mode" (after calling SUBSCRIBE), it can only execute
8+
* subscriber commands (SUBSCRIBE, UNSUBSCRIBE, etc.). Meaning, it cannot execute other commands like PUBLISH.
9+
* To avoid this, we use two separate Redis clients: one for publishing and one for subscribing.
10+
*/
11+
const pub = new Redis({
12+
host: process.env['REDIS_HOST'],
13+
port: parseInt(process.env['REDIS_PORT']!),
14+
autoResubscribe: true,
15+
});
16+
const sub = new Redis({
17+
host: process.env['REDIS_HOST'],
18+
port: parseInt(process.env['REDIS_PORT']!),
19+
autoResubscribe: true,
20+
});
21+
22+
export const gatewayConfig = defineGatewayConfig({
23+
pubsub: toMeshPubSub(
24+
new RedisPubSub(
25+
{ pub, sub },
26+
{
27+
channelPrefix: 'gw',
28+
},
29+
),
30+
),
31+
});
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { Opts } from '@e2e/opts';
2+
import { defineConfig, loadGraphQLHTTPSubgraph } from '@graphql-mesh/compose-cli';
3+
4+
const opts = Opts(process.argv);
5+
6+
export const composeConfig = defineConfig({
7+
subgraphs: [
8+
{
9+
sourceHandler: loadGraphQLHTTPSubgraph('products', {
10+
endpoint: `http://localhost:${opts.getServicePort('products')}/graphql`,
11+
}),
12+
},
13+
],
14+
additionalTypeDefs: /* GraphQL */ `
15+
extend schema {
16+
subscription: Subscription
17+
}
18+
type Subscription {
19+
newProduct: Product! @resolveTo(pubsubTopic: "new_product", sourceName: "products")
20+
}
21+
`,
22+
});
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"name": "@e2e/subscriptions-type-merging",
3+
"private": true,
4+
"dependencies": {
5+
"@graphql-hive/gateway": "^1.16.3",
6+
"@graphql-hive/pubsub": "next",
7+
"graphql": "^16.9.0",
8+
"graphql-sse": "^2.5.4",
9+
"ioredis": "^5.7.0"
10+
}
11+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { createServer } from 'node:http';
2+
import { parse } from 'graphql';
3+
import { createYoga } from 'graphql-yoga';
4+
import { buildSubgraphSchema } from '@apollo/subgraph';
5+
import { Opts } from '@e2e/opts';
6+
7+
const port = Opts(process.argv).getServicePort('products');
8+
9+
createServer(
10+
createYoga({
11+
schema: buildSubgraphSchema({
12+
typeDefs: parse(/* GraphQL */ `
13+
type Query {
14+
hello: String!
15+
}
16+
type Product @key(fields: "id") {
17+
id: ID!
18+
name: String!
19+
price: Float!
20+
}
21+
`),
22+
resolvers: {
23+
Query: {
24+
hello: () => 'world',
25+
},
26+
Product: {
27+
__resolveReference: ref => ({
28+
id: ref.id,
29+
name: `Roomba X${ref.id}`,
30+
price: 100,
31+
}),
32+
},
33+
},
34+
}),
35+
}),
36+
).listen(port, () => {
37+
console.log(`Products subgraph running on http://localhost:${port}/graphql`);
38+
});
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import { setTimeout } from 'node:timers/promises';
2+
import { createClient } from 'graphql-sse';
3+
import Redis from 'ioredis';
4+
import { createTenv } from '@e2e/tenv';
5+
import { createDeferred } from '@graphql-tools/utils';
6+
import { AsyncDisposableStack } from '@whatwg-node/disposablestack';
7+
import { fetch } from '@whatwg-node/fetch';
8+
9+
const { container, compose, service, serve } = createTenv(__dirname);
10+
11+
const redisEnv = {
12+
REDIS_HOST: '',
13+
REDIS_PORT: 0,
14+
};
15+
beforeAll(async () => {
16+
const redis = await container({
17+
name: 'redis',
18+
image: 'redis:8',
19+
containerPort: 6379,
20+
healthcheck: ['CMD-SHELL', 'redis-cli ping'],
21+
env: {
22+
LANG: '', // fixes "Failed to configure LOCALE for invalid locale name."
23+
},
24+
});
25+
redisEnv.REDIS_HOST = '0.0.0.0';
26+
redisEnv.REDIS_PORT = redis.port;
27+
});
28+
29+
const leftoverStack = new AsyncDisposableStack();
30+
afterAll(async () => {
31+
try {
32+
await leftoverStack.disposeAsync();
33+
} catch (e) {
34+
console.error('Failed to dispose leftover stack', e);
35+
}
36+
});
37+
38+
it('consumes the pubsub topics and resolves the types correctly', async () => {
39+
await using products = await service('products');
40+
await using composition = await compose({ output: 'graphql', services: [products] });
41+
await using gw = await serve({ supergraph: composition.output, env: redisEnv });
42+
const sseClient = createClient({
43+
retryAttempts: 0,
44+
url: `http://0.0.0.0:${gw.port}/graphql`,
45+
fetchFn: fetch,
46+
});
47+
const iterator = sseClient.iterate({
48+
query: /* GraphQL */ `
49+
subscription {
50+
newProduct {
51+
id
52+
name
53+
price
54+
}
55+
}
56+
`,
57+
});
58+
leftoverStack.defer(() => iterator.return?.() as any);
59+
const pub = new Redis({
60+
host: redisEnv.REDIS_HOST,
61+
port: redisEnv.REDIS_PORT,
62+
});
63+
leftoverStack.defer(() => pub.disconnect());
64+
for (let i = 0; i < 3; i++) {
65+
const id = i + '';
66+
const publishing = (async () => {
67+
// Publish messages after making sure the user's subscribed
68+
await setTimeout(500);
69+
await pub.publish('gw:new_product', JSON.stringify({ id }));
70+
})();
71+
await expect(iterator.next()).resolves.toMatchObject({
72+
value: {
73+
data: {
74+
newProduct: {
75+
id,
76+
name: 'Roomba X' + id,
77+
price: 100,
78+
},
79+
},
80+
},
81+
done: false,
82+
});
83+
// Avait publishing to ensure no unhandled rejections
84+
await publishing;
85+
}
86+
});

packages/legacy/config/yaml-config.graphql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ type AdditionalSubscriptionObject {
147147
pubsubTopic: String!
148148
result: String
149149
filterBy: String
150+
sourceName: String
150151
}
151152

152153
union PubSub = String | PubSubConfig

packages/legacy/types/src/config-schema.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,9 @@
810810
},
811811
"filterBy": {
812812
"type": "string"
813+
},
814+
"sourceName": {
815+
"type": "string"
813816
}
814817
},
815818
"required": ["targetTypeName", "targetFieldName", "pubsubTopic"]

packages/legacy/types/src/config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1800,6 +1800,7 @@ export interface AdditionalSubscriptionObject {
18001800
pubsubTopic: string;
18011801
result?: string;
18021802
filterBy?: string;
1803+
sourceName?: string;
18031804
}
18041805
/**
18051806
* Backend cache

packages/legacy/utils/src/resolve-additional-resolvers.ts

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,15 @@ import {
2020
type MeshPubSub,
2121
type YamlConfig,
2222
} from '@graphql-mesh/types';
23-
import type { IResolvers, MaybePromise } from '@graphql-tools/utils';
23+
import {
24+
resolveExternalValue,
25+
Subschema,
26+
type MergedTypeResolver,
27+
type StitchingInfo,
28+
} from '@graphql-tools/delegate';
29+
import type { IResolvers, Maybe, MaybePromise } from '@graphql-tools/utils';
2430
import { parseSelectionSet } from '@graphql-tools/utils';
31+
import { handleMaybePromise } from '@whatwg-node/promise-helpers';
2532
import { loadFromModuleExportExpression } from './load-from-module-export-expression.js';
2633
import { withFilter } from './with-filter.js';
2734

@@ -202,11 +209,49 @@ export function resolveAdditionalResolversWithoutImport(
202209
[additionalResolver.targetTypeName]: {
203210
[additionalResolver.targetFieldName]: {
204211
subscribe: subscribeFn,
205-
resolve: (payload: any) => {
206-
if (baseOptions.valuesFromResults) {
207-
return baseOptions.valuesFromResults(payload);
212+
resolve: (payload: any, _, ctx, info) => {
213+
function handlePayload(payload: any) {
214+
if (baseOptions.valuesFromResults) {
215+
return baseOptions.valuesFromResults(payload);
216+
}
217+
return payload;
218+
}
219+
if (additionalResolver.sourceName) {
220+
const stitchingInfo = info?.schema.extensions?.stitchingInfo as Maybe<
221+
StitchingInfo<any>
222+
>;
223+
if (!stitchingInfo) {
224+
throw new Error(
225+
`Stitching Information object not found in the resolve information, contact maintainers!`,
226+
);
227+
}
228+
const returnTypeName = getNamedType(info.returnType).name;
229+
const mergedTypeInfo = stitchingInfo?.mergedTypes?.[returnTypeName];
230+
if (!mergedTypeInfo) {
231+
throw new Error(
232+
`This "${returnTypeName}" type is not a merged type, disable typeMerging in the config!`,
233+
);
234+
}
235+
const subschema = Array.from(stitchingInfo.subschemaMap?.values() || []).find(
236+
s => s.name === additionalResolver.sourceName,
237+
);
238+
if (!subschema) {
239+
throw new Error(`The source "${additionalResolver.sourceName}" is not found`);
240+
}
241+
const resolver = mergedTypeInfo?.resolvers?.get(subschema);
242+
if (!resolver) {
243+
throw new Error(
244+
`The type "${returnTypeName}" is not resolvable from the source "${additionalResolver.sourceName}", check your typeMerging configuration!`,
245+
);
246+
}
247+
const selectionSet = info.fieldNodes[0].selectionSet;
248+
return handleMaybePromise(
249+
() =>
250+
resolver(payload, ctx, info, subschema, selectionSet, undefined, info.returnType),
251+
handlePayload,
252+
);
208253
}
209-
return payload;
254+
return handlePayload(payload);
210255
},
211256
},
212257
},

0 commit comments

Comments
 (0)