Skip to content

Commit ffec6e7

Browse files
authored
Export PubSub operation factory (#8747)
* Export PubSub operation factory * uPDATE SNAPSHOT
1 parent 15b2ab5 commit ffec6e7

File tree

5 files changed

+161
-162
lines changed

5 files changed

+161
-162
lines changed

.changeset/tangy-otters-search.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@graphql-mesh/transport-rest': patch
3+
'@graphql-mesh/utils': patch
4+
---
5+
6+
Export PubSub resolver factory

examples/openapi-location-weather/tests/__snapshots__/location-weather.test.ts.snap

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1659,11 +1659,11 @@ type Query {
16591659
lon: Float!
16601660
output_type: queryInput_current_lightning_output_type
16611661
1662-
"""Radial search distance from point in KM. (Default 50 KM - Max 150 KM)"""
1662+
"""Radial search distance from point in KM. (Default 20 KM - Max 75 KM)"""
16631663
search_distance_km: Int
16641664
16651665
"""
1666-
Search time backwards from current time in minutes. (Default 30 minutes - Max 240 minutes)
1666+
Search time backwards from current time in minutes. (Default 30 minutes - Max 45 minutes)
16671667
"""
16681668
search_mins: Int
16691669

packages/legacy/utils/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ export * from './getAdditionalResolversFromTypeDefs.js';
2525
export * from './get-def-directives.js';
2626
export * from './disposable.js';
2727
export * from './in-context-sdk.js';
28+
export * from './with-filter.js';

packages/legacy/utils/src/resolve-additional-resolvers.ts

Lines changed: 144 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,142 @@ function generateValuesFromResults(resultExpression: string): (result: any) => a
171171
};
172172
}
173173

174+
export interface PubSubOperationOptions {
175+
pubsubTopic: string;
176+
pubsub?: MeshPubSub | HivePubSub;
177+
filterBy?: string;
178+
result?: string;
179+
}
180+
181+
export function getResolverForPubSubOperation(opts: PubSubOperationOptions) {
182+
const pubsubTopic = opts.pubsubTopic;
183+
let subscribeFn = function subscriber(
184+
root: any,
185+
args: Record<string, any>,
186+
context: MeshContext,
187+
info: GraphQLResolveInfo,
188+
): MaybePromise<AsyncIterator<any>> {
189+
const resolverData = { root, args, context, info, env: process.env };
190+
const topic = stringInterpolator.parse(pubsubTopic, resolverData);
191+
const ps = context?.pubsub || opts?.pubsub;
192+
if (isHivePubSub(ps)) {
193+
return ps.subscribe(topic)[Symbol.asyncIterator]();
194+
}
195+
return ps.asyncIterator(topic)[Symbol.asyncIterator]();
196+
};
197+
if (opts.filterBy) {
198+
let filterFunction: any;
199+
try {
200+
// eslint-disable-next-line no-new-func
201+
filterFunction = new Function('root', 'args', 'context', 'info', `return ${opts.filterBy};`);
202+
} catch (e) {
203+
throw new Error(
204+
`Error while parsing filterBy expression "${opts.filterBy}" in additional subscription resolver: ${e.message}`,
205+
);
206+
}
207+
subscribeFn = withFilter(subscribeFn as any, filterFunction);
208+
}
209+
const valuesFromResults = opts.result ? generateValuesFromResults(opts.result) : undefined;
210+
211+
return {
212+
subscribe: subscribeFn,
213+
resolve: (payload: any, _: any, ctx, info: GraphQLResolveInfo) => {
214+
function resolvePayload(payload: any) {
215+
if (valuesFromResults) {
216+
return valuesFromResults(payload);
217+
}
218+
return payload;
219+
}
220+
const stitchingInfo = info?.schema.extensions?.stitchingInfo as Maybe<StitchingInfo<any>>;
221+
if (!stitchingInfo) {
222+
return resolvePayload(payload); // no stitching, cannot be resolved anywhere else
223+
}
224+
const returnTypeName = getNamedType(info.returnType).name;
225+
const mergedTypeInfo = stitchingInfo.mergedTypes[returnTypeName];
226+
if (!mergedTypeInfo) {
227+
return resolvePayload(payload); // this type is not merged or resolvable
228+
}
229+
230+
// we dont compare fragment definitions because they mean there are type-conditions
231+
// more advanced behavior. if we encounter such a case, the missing selection set
232+
// will have fields and we will perform a call to the subschema
233+
const requestedSelSet = info.fieldNodes[0]?.selectionSet;
234+
if (!requestedSelSet) {
235+
return resolvePayload(payload); // should never happen, but hey 🤷‍♂️
236+
}
237+
238+
const availableSelSet = selectionSetOfData(resolvePayload(payload));
239+
const missingSelectionSet = subtractSelectionSets(requestedSelSet, availableSelSet);
240+
if (!missingSelectionSet.selections.length) {
241+
// all of the fields are already in the payload
242+
return resolvePayload(payload);
243+
}
244+
245+
// find the best subgraph by diffing the selection sets
246+
let subschema: Subschema | null = null;
247+
let mergedTypeConfig: MergedTypeConfig | null = null;
248+
for (const [requiredSubschema, requiredSelSet] of mergedTypeInfo.selectionSets) {
249+
const tentativeMergedTypeConfig = requiredSubschema.merge?.[returnTypeName];
250+
if (tentativeMergedTypeConfig?.fields) {
251+
// this resolver requires additional fields (think `@requires(fields: "x")`)
252+
// TODO: actually implement whether the payload already contains those fields
253+
// TODO: is there a better way for finding a match?
254+
continue;
255+
}
256+
const diff = subtractSelectionSets(requiredSelSet, availableSelSet);
257+
if (!diff.selections.length) {
258+
// all of the fields of the requesting (available) selection set is exist in the required selection set
259+
subschema = requiredSubschema;
260+
mergedTypeConfig = tentativeMergedTypeConfig;
261+
break;
262+
}
263+
}
264+
if (!subschema || !mergedTypeConfig) {
265+
// the type cannot be resolved
266+
return resolvePayload(payload);
267+
}
268+
269+
return handleMaybePromise(
270+
() => {
271+
if (mergedTypeConfig.argsFromKeys) {
272+
return batchDelegateToSchema({
273+
schema: subschema,
274+
operation: 'query' as OperationTypeNode,
275+
fieldName: mergedTypeConfig.fieldName,
276+
returnType: new GraphQLList(info.returnType),
277+
key: mergedTypeConfig.key?.(payload) || payload, // TODO: should use valueFromResults on the args too?
278+
argsFromKeys: mergedTypeConfig.argsFromKeys,
279+
valuesFromResults: mergedTypeConfig.valuesFromResults,
280+
selectionSet: missingSelectionSet,
281+
context: ctx,
282+
info,
283+
dataLoaderOptions: mergedTypeConfig.dataLoaderOptions,
284+
skipTypeMerging: false, // important to be false so that fields outside this subgraph can be resolved properly
285+
});
286+
}
287+
if (mergedTypeConfig.args) {
288+
return delegateToSchema({
289+
schema: subschema,
290+
operation: 'query' as OperationTypeNode,
291+
fieldName: mergedTypeConfig.fieldName,
292+
returnType: info.returnType,
293+
args: mergedTypeConfig.args(payload), // TODO: should use valueFromResults on the args too?
294+
selectionSet: missingSelectionSet,
295+
context: ctx,
296+
info,
297+
skipTypeMerging: false, // important to be false so that fields outside this subgraph can be resolved properly
298+
});
299+
}
300+
// no way to delegate to anything, return empty - i.e. resolve just payload
301+
// should not happen though, there'll be something to use
302+
return {};
303+
},
304+
resolved => resolvePayload(mergeDeep([payload, resolved])),
305+
);
306+
},
307+
};
308+
}
309+
174310
export function resolveAdditionalResolversWithoutImport(
175311
additionalResolver:
176312
| YamlConfig.AdditionalStitchingResolverObject
@@ -179,143 +315,18 @@ export function resolveAdditionalResolversWithoutImport(
179315
pubsub?: MeshPubSub | HivePubSub,
180316
): IResolvers {
181317
const baseOptions: any = {};
182-
if (additionalResolver.result) {
183-
baseOptions.valuesFromResults = generateValuesFromResults(additionalResolver.result);
184-
}
185318
if ('pubsubTopic' in additionalResolver) {
186-
const pubsubTopic = additionalResolver.pubsubTopic;
187-
let subscribeFn = function subscriber(
188-
root: any,
189-
args: Record<string, any>,
190-
context: MeshContext,
191-
info: GraphQLResolveInfo,
192-
): MaybePromise<AsyncIterator<any>> {
193-
const resolverData = { root, args, context, info, env: process.env };
194-
const topic = stringInterpolator.parse(pubsubTopic, resolverData);
195-
const ps = context?.pubsub || pubsub;
196-
if (isHivePubSub(ps)) {
197-
return ps.subscribe(topic)[Symbol.asyncIterator]();
198-
}
199-
return ps.asyncIterator(topic)[Symbol.asyncIterator]();
200-
};
201-
if (additionalResolver.filterBy) {
202-
let filterFunction: any;
203-
try {
204-
// eslint-disable-next-line no-new-func
205-
filterFunction = new Function(
206-
'root',
207-
'args',
208-
'context',
209-
'info',
210-
`return ${additionalResolver.filterBy};`,
211-
);
212-
} catch (e) {
213-
throw new Error(
214-
`Error while parsing filterBy expression "${additionalResolver.filterBy}" in additional subscription resolver: ${e.message}`,
215-
);
216-
}
217-
subscribeFn = withFilter(subscribeFn, filterFunction);
218-
}
319+
const { subscribe, resolve } = getResolverForPubSubOperation({
320+
pubsubTopic: additionalResolver.pubsubTopic,
321+
pubsub,
322+
filterBy: additionalResolver.filterBy,
323+
result: additionalResolver.result,
324+
});
219325
return {
220326
[additionalResolver.targetTypeName]: {
221327
[additionalResolver.targetFieldName]: {
222-
subscribe: subscribeFn,
223-
resolve: (payload: any, _, ctx, info) => {
224-
function resolvePayload(payload: any) {
225-
if (baseOptions.valuesFromResults) {
226-
return baseOptions.valuesFromResults(payload);
227-
}
228-
return payload;
229-
}
230-
const stitchingInfo = info?.schema.extensions?.stitchingInfo as Maybe<
231-
StitchingInfo<any>
232-
>;
233-
if (!stitchingInfo) {
234-
return resolvePayload(payload); // no stitching, cannot be resolved anywhere else
235-
}
236-
const returnTypeName = getNamedType(info.returnType).name;
237-
const mergedTypeInfo = stitchingInfo.mergedTypes[returnTypeName];
238-
if (!mergedTypeInfo) {
239-
return resolvePayload(payload); // this type is not merged or resolvable
240-
}
241-
242-
// we dont compare fragment definitions because they mean there are type-conditions
243-
// more advanced behavior. if we encounter such a case, the missing selection set
244-
// will have fields and we will perform a call to the subschema
245-
const requestedSelSet = info.fieldNodes[0]?.selectionSet;
246-
if (!requestedSelSet) {
247-
return resolvePayload(payload); // should never happen, but hey 🤷‍♂️
248-
}
249-
250-
const availableSelSet = selectionSetOfData(resolvePayload(payload));
251-
const missingSelectionSet = subtractSelectionSets(requestedSelSet, availableSelSet);
252-
if (!missingSelectionSet.selections.length) {
253-
// all of the fields are already in the payload
254-
return resolvePayload(payload);
255-
}
256-
257-
// find the best subgraph by diffing the selection sets
258-
let subschema: Subschema | null = null;
259-
let mergedTypeConfig: MergedTypeConfig | null = null;
260-
for (const [requiredSubschema, requiredSelSet] of mergedTypeInfo.selectionSets) {
261-
const tentativeMergedTypeConfig = requiredSubschema.merge?.[returnTypeName];
262-
if (tentativeMergedTypeConfig?.fields) {
263-
// this resolver requires additional fields (think `@requires(fields: "x")`)
264-
// TODO: actually implement whether the payload already contains those fields
265-
// TODO: is there a better way for finding a match?
266-
continue;
267-
}
268-
const diff = subtractSelectionSets(requiredSelSet, availableSelSet);
269-
if (!diff.selections.length) {
270-
// all of the fields of the requesting (available) selection set is exist in the required selection set
271-
subschema = requiredSubschema;
272-
mergedTypeConfig = tentativeMergedTypeConfig;
273-
break;
274-
}
275-
}
276-
if (!subschema || !mergedTypeConfig) {
277-
// the type cannot be resolved
278-
return resolvePayload(payload);
279-
}
280-
281-
return handleMaybePromise(
282-
() => {
283-
if (mergedTypeConfig.argsFromKeys) {
284-
return batchDelegateToSchema({
285-
schema: subschema,
286-
operation: 'query' as OperationTypeNode,
287-
fieldName: mergedTypeConfig.fieldName,
288-
returnType: new GraphQLList(info.returnType),
289-
key: mergedTypeConfig.key?.(payload) || payload, // TODO: should use valueFromResults on the args too?
290-
argsFromKeys: mergedTypeConfig.argsFromKeys,
291-
valuesFromResults: mergedTypeConfig.valuesFromResults,
292-
selectionSet: missingSelectionSet,
293-
context: ctx,
294-
info,
295-
dataLoaderOptions: mergedTypeConfig.dataLoaderOptions,
296-
skipTypeMerging: false, // important to be false so that fields outside this subgraph can be resolved properly
297-
});
298-
}
299-
if (mergedTypeConfig.args) {
300-
return delegateToSchema({
301-
schema: subschema,
302-
operation: 'query' as OperationTypeNode,
303-
fieldName: mergedTypeConfig.fieldName,
304-
returnType: info.returnType,
305-
args: mergedTypeConfig.args(payload), // TODO: should use valueFromResults on the args too?
306-
selectionSet: missingSelectionSet,
307-
context: ctx,
308-
info,
309-
skipTypeMerging: false, // important to be false so that fields outside this subgraph can be resolved properly
310-
});
311-
}
312-
// no way to delegate to anything, return empty - i.e. resolve just payload
313-
// should not happen though, there'll be something to use
314-
return {};
315-
},
316-
resolved => resolvePayload(mergeDeep([payload, resolved])),
317-
);
318-
},
328+
subscribe,
329+
resolve,
319330
},
320331
},
321332
};
Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { GraphQLField } from 'graphql';
22
import { stringInterpolator } from '@graphql-mesh/string-interpolation';
33
import { toMeshPubSub, type HivePubSub, type Logger, type MeshPubSub } from '@graphql-mesh/types';
4-
import { createGraphQLError } from '@graphql-tools/utils';
4+
import { getResolverForPubSubOperation } from '@graphql-mesh/utils';
55

66
interface ProcessPubSubOperationAnnotationsOpts {
77
field: GraphQLField<any, any>;
@@ -16,30 +16,11 @@ export function processPubSubOperationAnnotations({
1616
pubsubTopic,
1717
logger: globalLogger,
1818
}: ProcessPubSubOperationAnnotationsOpts) {
19-
field.subscribe = function pubSubSubscribeFn(root, args, context, info) {
20-
const logger = context?.logger || globalLogger;
21-
const operationLogger = logger.child({ operation: `${info.parentType.name}.${field.name}` });
22-
const meshOrHivePubsub: MeshPubSub | HivePubSub = context?.pubsub || globalPubsub;
23-
if (!meshOrHivePubsub) {
24-
return new TypeError(`You should have PubSub defined in either the config or the context!`);
25-
}
26-
const pubsub = toMeshPubSub(meshOrHivePubsub);
27-
const interpolationData = { root, args, context, info, env: process.env };
28-
let interpolatedPubSubTopic: string = stringInterpolator.parse(pubsubTopic, interpolationData);
29-
if (interpolatedPubSubTopic.startsWith('webhook:')) {
30-
const [, expectedMethod, expectedUrl] = interpolatedPubSubTopic.split(':');
31-
const expectedPath = new URL(expectedUrl, 'http://localhost').pathname;
32-
interpolatedPubSubTopic = `webhook:${expectedMethod}:${expectedPath}`;
33-
}
34-
operationLogger.debug(
35-
`${info.parentType.name}.${field.name} => Subscribing to pubSubTopic: ${interpolatedPubSubTopic}`,
36-
);
37-
return pubsub.asyncIterator(interpolatedPubSubTopic);
38-
};
39-
field.resolve = function pubSubResolver(root, args, context, info) {
40-
const logger = context?.logger || globalLogger;
41-
const operationLogger = logger.child({ operation: `${info.parentType.name}.${field.name}` });
42-
operationLogger.debug('received', { root, pubsubTopic });
43-
return root;
44-
};
19+
const { subscribe, resolve } = getResolverForPubSubOperation({
20+
pubsubTopic,
21+
pubsub: globalPubsub,
22+
});
23+
field.subscribe = subscribe;
24+
field.resolve = resolve;
25+
return field;
4526
}

0 commit comments

Comments
 (0)