Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/@graphql-mesh_cache-redis-8830-dependencies.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@graphql-mesh/cache-redis": patch
---
dependencies updates:
- Added dependency [`@opentelemetry/api@^1.9.0` ↗︎](https://www.npmjs.com/package/@opentelemetry/api/v/1.9.0) (to `dependencies`)
5 changes: 5 additions & 0 deletions .changeset/shiny-paths-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@graphql-mesh/cache-redis': minor
---

Add support of OTEL tracing with spans for Get, Set, Delete and initialisation.
1 change: 1 addition & 0 deletions packages/cache/redis/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"@graphql-mesh/cross-helpers": "^0.4.10",
"@graphql-mesh/string-interpolation": "0.5.9",
"@graphql-mesh/types": "^0.104.13",
"@opentelemetry/api": "^1.9.0",
"@whatwg-node/disposablestack": "^0.0.6",
"ioredis": "^5.3.2",
"ioredis-mock": "^8.8.3",
Expand Down
241 changes: 134 additions & 107 deletions packages/cache/redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
type MeshPubSub,
type YamlConfig,
} from '@graphql-mesh/types';
import { trace, type Tracer } from '@opentelemetry/api';
import { DisposableSymbols } from '@whatwg-node/disposablestack';

function interpolateStrWithEnv(str: string): string {
Expand All @@ -19,113 +20,123 @@ function interpolateStrWithEnv(str: string): string {

export default class RedisCache<V = string> implements KeyValueCache<V>, Disposable {
private client: Redis | Cluster;
private tracer: Tracer;

constructor(
options: YamlConfig.Cache['redis'] & { pubsub?: MeshPubSub | HivePubSub; logger: Logger },
) {
const lazyConnect = options.lazyConnect !== false;
if ('startupNodes' in options) {
const parsedUsername =
interpolateStrWithEnv(options.username?.toString()) || process.env.REDIS_USERNAME;
const parsedPassword =
interpolateStrWithEnv(options.password?.toString()) || process.env.REDIS_PASSWORD;
const parsedDb = interpolateStrWithEnv(options.db?.toString()) || process.env.REDIS_DB;
const numDb = parseInt(parsedDb);
this.client = new Redis.Cluster(
options.startupNodes.map(s => ({
host: s.host && interpolateStrWithEnv(s.host),
port: s.port && parseInt(interpolateStrWithEnv(s.port)),
family: s.family && parseInt(interpolateStrWithEnv(s.family)),
})),
{
dnsLookup: options.dnsLookupAsIs
? (address, callback) => callback(null, address)
: undefined,
redisOptions: {
username: parsedUsername,
password: parsedPassword,
db: isNaN(numDb) ? undefined : numDb,
this.tracer = trace.getTracer('hive.cache.redis');
this.tracer.startActiveSpan('hive.cache.redis.init', span => {
try {
const lazyConnect = options.lazyConnect !== false;
if ('startupNodes' in options) {
const parsedUsername =
interpolateStrWithEnv(options.username?.toString()) || process.env.REDIS_USERNAME;
const parsedPassword =
interpolateStrWithEnv(options.password?.toString()) || process.env.REDIS_PASSWORD;
const parsedDb = interpolateStrWithEnv(options.db?.toString()) || process.env.REDIS_DB;
const numDb = parseInt(parsedDb);
this.client = new Redis.Cluster(
options.startupNodes.map(s => ({
host: s.host && interpolateStrWithEnv(s.host),
port: s.port && parseInt(interpolateStrWithEnv(s.port)),
family: s.family && parseInt(interpolateStrWithEnv(s.family)),
})),
{
dnsLookup: options.dnsLookupAsIs
? (address, callback) => callback(null, address)
: undefined,
redisOptions: {
username: parsedUsername,
password: parsedPassword,
db: isNaN(numDb) ? undefined : numDb,
enableAutoPipelining: true,
...(lazyConnect ? { lazyConnect: true } : {}),
tls: options.tls ? {} : undefined,
},
enableAutoPipelining: true,
enableOfflineQueue: true,
...(lazyConnect ? { lazyConnect: true } : {}),
},
);
} else if ('sentinels' in options) {
this.client = new Redis({
name: options.name,
sentinelPassword:
options.sentinelPassword && interpolateStrWithEnv(options.sentinelPassword),
sentinels: options.sentinels.map(s => ({
host: s.host && interpolateStrWithEnv(s.host),
port: s.port && parseInt(interpolateStrWithEnv(s.port)),
family: s.family && parseInt(interpolateStrWithEnv(s.family)),
})),
role: options.role,
enableTLSForSentinelMode: options.enableTLSForSentinelMode,
enableAutoPipelining: true,
...(lazyConnect ? { lazyConnect: true } : {}),
tls: options.tls ? {} : undefined,
},
enableAutoPipelining: true,
enableOfflineQueue: true,
...(lazyConnect ? { lazyConnect: true } : {}),
},
);
} else if ('sentinels' in options) {
this.client = new Redis({
name: options.name,
sentinelPassword:
options.sentinelPassword && interpolateStrWithEnv(options.sentinelPassword),
sentinels: options.sentinels.map(s => ({
host: s.host && interpolateStrWithEnv(s.host),
port: s.port && parseInt(interpolateStrWithEnv(s.port)),
family: s.family && parseInt(interpolateStrWithEnv(s.family)),
})),
role: options.role,
enableTLSForSentinelMode: options.enableTLSForSentinelMode,
enableAutoPipelining: true,
enableOfflineQueue: true,
lazyConnect,
});
} else if (options.url) {
const redisUrl = new URL(interpolateStrWithEnv(options.url));
enableOfflineQueue: true,
lazyConnect,
});
} else if (options.url) {
const redisUrl = new URL(interpolateStrWithEnv(options.url));

if (!['redis:', 'rediss:'].includes(redisUrl.protocol)) {
throw new Error('Redis URL must use either redis:// or rediss://');
}
if (!['redis:', 'rediss:'].includes(redisUrl.protocol)) {
throw new Error('Redis URL must use either redis:// or rediss://');
}

if (lazyConnect) {
redisUrl.searchParams.set('lazyConnect', 'true');
}
if (lazyConnect) {
redisUrl.searchParams.set('lazyConnect', 'true');
}

redisUrl.searchParams.set('enableAutoPipelining', 'true');
redisUrl.searchParams.set('enableOfflineQueue', 'true');
const IPV6_REGEX =
/^(?:(?:[a-fA-F\d]{1,4}:){7}(?:[a-fA-F\d]{1,4}|:)|(?:[a-fA-F\d]{1,4}:){6}(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|:[a-fA-F\d]{1,4}|:)|(?:[a-fA-F\d]{1,4}:){5}(?::(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,2}|:)|(?:[a-fA-F\d]{1,4}:){4}(?:(?::[a-fA-F\d]{1,4}){0,1}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,3}|:)|(?:[a-fA-F\d]{1,4}:){3}(?:(?::[a-fA-F\d]{1,4}){0,2}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,4}|:)|(?:[a-fA-F\d]{1,4}:){2}(?:(?::[a-fA-F\d]{1,4}){0,3}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,5}|:)|(?:[a-fA-F\d]{1,4}:){1}(?:(?::[a-fA-F\d]{1,4}){0,4}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,6}|:)|(?::(?:(?::[a-fA-F\d]{1,4}){0,5}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,7}|:)))(?:%[0-9a-zA-Z]{1,})?$/gm;
if (IPV6_REGEX.test(redisUrl.hostname)) {
redisUrl.searchParams.set('family', '6');
}
const urlStr = redisUrl.toString();
options.logger.debug(`Connecting to Redis at ${urlStr}`);
this.client = new Redis(urlStr);
} else {
const parsedHost = interpolateStrWithEnv(options.host?.toString()) || process.env.REDIS_HOST;
const parsedPort = interpolateStrWithEnv(options.port?.toString()) || process.env.REDIS_PORT;
const parsedUsername =
interpolateStrWithEnv(options.username?.toString()) || process.env.REDIS_USERNAME;
const parsedPassword =
interpolateStrWithEnv(options.password?.toString()) || process.env.REDIS_PASSWORD;
const parsedDb = interpolateStrWithEnv(options.db?.toString()) || process.env.REDIS_DB;
const parsedFamily =
interpolateStrWithEnv(options.family?.toString()) || process.env.REDIS_FAMILY;
const numPort = parseInt(parsedPort);
const numDb = parseInt(parsedDb);
if (parsedHost) {
options.logger.debug(`Connecting to Redis at ${parsedHost}:${parsedPort}`);
this.client = new Redis({
host: parsedHost,
port: isNaN(numPort) ? undefined : numPort,
username: parsedUsername,
password: parsedPassword,
db: isNaN(numDb) ? undefined : numDb,
family: parsedFamily === '6' ? 6 : undefined,
...(lazyConnect ? { lazyConnect: true } : {}),
enableAutoPipelining: true,
enableOfflineQueue: true,
redisUrl.searchParams.set('enableAutoPipelining', 'true');
redisUrl.searchParams.set('enableOfflineQueue', 'true');
const IPV6_REGEX =
/^(?:(?:[a-fA-F\d]{1,4}:){7}(?:[a-fA-F\d]{1,4}|:)|(?:[a-fA-F\d]{1,4}:){6}(?:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|:[a-fA-F\d]{1,4}|:)|(?:[a-fA-F\d]{1,4}:){5}(?::(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,2}|:)|(?:[a-fA-F\d]{1,4}:){4}(?:(?::[a-fA-F\d]{1,4}){0,1}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,3}|:)|(?:[a-fA-F\d]{1,4}:){3}(?:(?::[a-fA-F\d]{1,4}){0,2}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,4}|:)|(?:[a-fA-F\d]{1,4}:){2}(?:(?::[a-fA-F\d]{1,4}){0,3}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,5}|:)|(?:[a-fA-F\d]{1,4}:){1}(?:(?::[a-fA-F\d]{1,4}){0,4}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,6}|:)|(?::(?:(?::[a-fA-F\d]{1,4}){0,5}:(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)(?:\\.(?:25[0-5]|2[0-4]\d|1\d\d|[1-9]\d|\d)){3}|(?::[a-fA-F\d]{1,4}){1,7}|:)))(?:%[0-9a-zA-Z]{1,})?$/gm;
if (IPV6_REGEX.test(redisUrl.hostname)) {
redisUrl.searchParams.set('family', '6');
}
const urlStr = redisUrl.toString();
options.logger.debug(`Connecting to Redis at ${urlStr}`);
this.client = new Redis(urlStr);
} else {
Comment on lines +96 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Do not log Redis credentials (secret leakage)

Logging urlStr may include username/password from options.url. Sanitize before logging.

Apply this diff:

-          const urlStr = redisUrl.toString();
-          options.logger.debug(`Connecting to Redis at ${urlStr}`);
-          this.client = new Redis(urlStr);
+          const urlStr = redisUrl.toString();
+          // Sanitize credentials for logs
+          const logUrl = new URL(urlStr);
+          if (logUrl.password) {
+            logUrl.password = '***';
+          }
+          options.logger.debug(`Connecting to Redis at ${logUrl.toString()}`);
+          this.client = new Redis(urlStr);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const urlStr = redisUrl.toString();
options.logger.debug(`Connecting to Redis at ${urlStr}`);
this.client = new Redis(urlStr);
} else {
const urlStr = redisUrl.toString();
// Sanitize credentials for logs
const logUrl = new URL(urlStr);
if (logUrl.password) {
logUrl.password = '***';
}
options.logger.debug(`Connecting to Redis at ${logUrl.toString()}`);
this.client = new Redis(urlStr);
} else {
🤖 Prompt for AI Agents
packages/cache/redis/src/index.ts lines 96-99: the code logs the full redis URL
(urlStr) which can contain username/password and leak secrets; instead, before
logging build a sanitized string that strips or redacts credentials (e.g., parse
the URL and omit username/password, or replace them with ****, or log only host
and port), use that sanitized value in options.logger.debug while still using
the original urlStr when creating this.client = new Redis(urlStr).

const parsedHost =
interpolateStrWithEnv(options.host?.toString()) || process.env.REDIS_HOST;
const parsedPort =
interpolateStrWithEnv(options.port?.toString()) || process.env.REDIS_PORT;
const parsedUsername =
interpolateStrWithEnv(options.username?.toString()) || process.env.REDIS_USERNAME;
const parsedPassword =
interpolateStrWithEnv(options.password?.toString()) || process.env.REDIS_PASSWORD;
const parsedDb = interpolateStrWithEnv(options.db?.toString()) || process.env.REDIS_DB;
const parsedFamily =
interpolateStrWithEnv(options.family?.toString()) || process.env.REDIS_FAMILY;
const numPort = parseInt(parsedPort);
const numDb = parseInt(parsedDb);
if (parsedHost) {
options.logger.debug(`Connecting to Redis at ${parsedHost}:${parsedPort}`);
this.client = new Redis({
host: parsedHost,
port: isNaN(numPort) ? undefined : numPort,
username: parsedUsername,
password: parsedPassword,
db: isNaN(numDb) ? undefined : numDb,
family: parsedFamily === '6' ? 6 : undefined,
...(lazyConnect ? { lazyConnect: true } : {}),
enableAutoPipelining: true,
enableOfflineQueue: true,
});
} else {
options.logger.debug(`Connecting to Redis mock`);
this.client = new RedisMock();
}
}
const pubsub = toMeshPubSub(options.pubsub);
// TODO: PubSub.destroy will no longer be needed after v0
const id = pubsub?.subscribe('destroy', () => {
this.client.disconnect(false);
pubsub.unsubscribe(id);
});
} else {
options.logger.debug(`Connecting to Redis mock`);
this.client = new RedisMock();
} finally {
span.end();
}
}
const pubsub = toMeshPubSub(options.pubsub);
// TODO: PubSub.destroy will no longer be needed after v0
const id = pubsub?.subscribe('destroy', () => {
this.client.disconnect(false);
pubsub.unsubscribe(id);
});
}

Expand All @@ -134,26 +145,42 @@ export default class RedisCache<V = string> implements KeyValueCache<V>, Disposa
}

set(key: string, value: V, options?: KeyValueCacheSetOptions): Promise<any> {
const stringifiedValue = JSON.stringify(value);
if (options?.ttl && options.ttl > 0) {
return this.client.set(key, stringifiedValue, 'PX', options.ttl * 1000);
} else {
return this.client.set(key, stringifiedValue);
}
return this.tracer.startActiveSpan('hive.cache.set', async span => {
try {
const stringifiedValue = JSON.stringify(value);
if (options?.ttl && options.ttl > 0) {
return await this.client.set(key, stringifiedValue, 'PX', options.ttl * 1000);
} else {
return await this.client.set(key, stringifiedValue);
}
} finally {
span.end();
}
});
}

get(key: string): Promise<V | undefined> {
return this.client.get(key).then(value => (value != null ? JSON.parse(value) : undefined));
return this.tracer.startActiveSpan('hive.cache.get', span =>
this.client
.get(key)
.then(value => (value != null ? JSON.parse(value) : undefined))
.finally(() => span.end()),
);
}

getKeysByPrefix(prefix: string): Promise<string[]> {
return scanPatterns(this.client, `${prefix}*`);
}

delete(key: string): Promise<boolean> {
return this.client.del(key).then(
value => value > 0,
() => false,
return this.tracer.startActiveSpan('hive.cache.delete', span =>
this.client
.del(key)
.then(
value => value > 0,
() => false,
)
.finally(() => span.end()),
);
}
}
Expand Down
Loading
Loading