Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as Sentry from '@sentry/browser';
import { createClient } from '@supabase/supabase-js';

window.Sentry = Sentry;

const supabaseClient = createClient('https://test.supabase.co', 'test-key', {
db: {
schema: 'pgmq_public',
},
});

Sentry.init({
dsn: 'https://public@dsn.ingest.sentry.io/1337',
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })],
tracesSampleRate: 1.0,
});

// Simulate queue operations
async function performQueueOperations() {
try {
await supabaseClient.rpc('send', {
queue_name: 'todos',
msg: { title: 'Test Todo' },
});

await supabaseClient.rpc('pop', {
queue_name: 'todos',
});
} catch (error) {
Sentry.captureException(error);
}
}

performQueueOperations();
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import type { Page } from '@playwright/test';
import { expect } from '@playwright/test';
import type { Event } from '@sentry/core';
import { sentryTest } from '../../../../utils/fixtures';
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';

async function mockSupabaseRoute(page: Page) {
await page.route('**/rpc/**/send', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([0]),
headers: {
'Content-Type': 'application/json',
},
});
});

await page.route('**/rpc/**/pop', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([
{
msg_id: 0,
},
]),
headers: {
'Content-Type': 'application/json',
},
});
});
}

const bundle = process.env.PW_BUNDLE || '';
// We only want to run this in non-CDN bundle mode
if (bundle.startsWith('bundle')) {
sentryTest.skip();
}

sentryTest('should capture Supabase queue spans from client.rpc', async ({ getLocalTestUrl, page }) => {
if (shouldSkipTracingTest()) {
return;
}

await mockSupabaseRoute(page);

const url = await getLocalTestUrl({ testDir: __dirname });

const event = await getFirstSentryEnvelopeRequest<Event>(page, url);
const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.'));

expect(queueSpans).toHaveLength(2);

expect(queueSpans![0]).toMatchObject({
description: 'supabase.db.rpc',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.publish',
'sentry.origin': 'auto.db.supabase',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});

expect(queueSpans![1]).toMatchObject({
description: 'supabase.db.rpc',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.process',
'sentry.origin': 'auto.db.supabase',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as Sentry from '@sentry/browser';
import { createClient } from '@supabase/supabase-js';

window.Sentry = Sentry;

const supabaseClient = createClient('https://test.supabase.co', 'test-key', {
db: {
schema: 'pgmq_public',
},
});

Sentry.init({
dsn: 'https://public@dsn.ingest.sentry.io/1337',
integrations: [Sentry.browserTracingIntegration(), Sentry.supabaseIntegration({ supabaseClient })],
tracesSampleRate: 1.0,
});

// Simulate queue operations
async function performQueueOperations() {
try {
await supabaseClient.schema('pgmq_public').rpc('send', {
queue_name: 'todos',
msg: { title: 'Test Todo' },
});

await supabaseClient.schema('pgmq_public').rpc('pop', {
queue_name: 'todos',
});
} catch (error) {
Sentry.captureException(error);
}
}

performQueueOperations();
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { type Page, expect } from '@playwright/test';
import type { Event } from '@sentry/core';
import { sentryTest } from '../../../../utils/fixtures';
import { getFirstSentryEnvelopeRequest, shouldSkipTracingTest } from '../../../../utils/helpers';

async function mockSupabaseRoute(page: Page) {
await page.route('**/rpc/**/send', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([0]),
headers: {
'Content-Type': 'application/json',
},
});
});

await page.route('**/rpc/**/pop', route => {
return route.fulfill({
status: 200,
body: JSON.stringify([
{
msg_id: 0,
},
]),
headers: {
'Content-Type': 'application/json',
},
});
});
}

const bundle = process.env.PW_BUNDLE || '';
// We only want to run this in non-CDN bundle mode
if (bundle.startsWith('bundle')) {
sentryTest.skip();
}

sentryTest('should capture Supabase queue spans from client.schema(...).rpc', async ({ getLocalTestUrl, page }) => {
if (shouldSkipTracingTest()) {
return;
}

await mockSupabaseRoute(page);

const url = await getLocalTestUrl({ testDir: __dirname });

const event = await getFirstSentryEnvelopeRequest<Event>(page, url);

const queueSpans = event.spans?.filter(({ op }) => op?.startsWith('queue.'));

expect(queueSpans).toHaveLength(2);

expect(queueSpans![0]).toMatchObject({
description: 'supabase.db.rpc',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.publish',
'sentry.origin': 'auto.db.supabase',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});

expect(queueSpans![1]).toMatchObject({
description: 'supabase.db.rpc',
parent_span_id: event.contexts?.trace?.span_id,
span_id: expect.any(String),
start_timestamp: expect.any(Number),
timestamp: expect.any(Number),
trace_id: event.contexts?.trace?.trace_id,
data: expect.objectContaining({
'sentry.op': 'queue.process',
'sentry.origin': 'auto.db.supabase',
'messaging.destination.name': 'todos',
'messaging.message.id': '0',
}),
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"build": "next build",
"start": "next start",
"clean": "npx rimraf node_modules pnpm-lock.yaml .next",
"start-local-supabase": "supabase init --force --workdir . && supabase start -o env && supabase db reset",
"start-local-supabase": "supabase start -o env && supabase db reset",
"test:prod": "TEST_ENV=production playwright test",
"test:build": "pnpm install && pnpm start-local-supabase && pnpm build",
"test:assert": "pnpm test:prod"
Expand All @@ -25,7 +25,7 @@
"next": "14.2.25",
"react": "18.2.0",
"react-dom": "18.2.0",
"supabase": "2.19.7",
"supabase": "2.23.4",
"typescript": "4.9.5"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { NextApiRequest, NextApiResponse } from 'next';
import { createClient } from '@supabase/supabase-js';
import * as Sentry from '@sentry/nextjs';

// These are the default development keys for a local Supabase instance
const NEXT_PUBLIC_SUPABASE_URL = 'http://localhost:54321';
const SUPABASE_SERVICE_ROLE_KEY =
'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU';

const supabaseClient = createClient(NEXT_PUBLIC_SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY, {
db: {
schema: 'pgmq_public',
},
});

Sentry.instrumentSupabaseClient(supabaseClient);

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
// Step 1: Batch produce multiple messages
const { data: sendData, error: sendError } = await supabaseClient.rpc('send_batch', {
queue_name: 'batch-flow-queue',
messages: [
{
taskType: 'email',
recipient: 'user1@example.com',
subject: 'Welcome!',
},
{
taskType: 'email',
recipient: 'user2@example.com',
subject: 'Verification',
},
{
taskType: 'sms',
recipient: '+1234567890',
message: 'Your code is 123456',
},
],
});

if (sendError) {
return res.status(500).json({ error: `Send batch failed: ${sendError.message}` });
}

// Step 2: Consume multiple messages from the queue
const { data: receiveData, error: receiveError } = await supabaseClient.rpc('receive', {
queue_name: 'batch-flow-queue',
vt: 30,
qty: 3,
});

if (receiveError) {
return res.status(500).json({ error: `Receive failed: ${receiveError.message}` });
}

// Step 3: Process all messages
const processedMessages = receiveData?.map((msg: any) => ({
messageId: msg.msg_id,
taskType: msg.message?.taskType,
processed: true,
}));

// Step 4: Archive all processed messages
const messageIds = receiveData?.map((msg: any) => msg.msg_id).filter(Boolean);
if (messageIds && messageIds.length > 0) {
const { error: archiveError } = await supabaseClient.rpc('archive', {
queue_name: 'batch-flow-queue',
msg_ids: messageIds,
});

if (archiveError) {
return res.status(500).json({ error: `Archive failed: ${archiveError.message}` });
}
}

return res.status(200).json({
success: true,
batchSize: 3,
produced: { messageIds: sendData },
consumed: {
count: receiveData?.length || 0,
messages: processedMessages,
},
});
}
Loading
Loading