Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { describe, test, expect, vi, beforeEach, afterEach } from 'vitest';
import { SupabaseBroadcastAdapter } from '../src/lib/SupabaseBroadcastAdapter';
import {
createMockClient,
} from './helpers/test-utils';
import { RUN_ID } from './fixtures';
import { mockChannelSubscription } from './mocks';

/**
* Tests for configurable stabilization delay
* Uses fake timers to verify the delay behavior without actual waiting
*/
describe('SupabaseBroadcastAdapter - Configurable Stabilization Delay', () => {
beforeEach(() => {
vi.useFakeTimers();
// Silence console logs/errors in tests
vi.spyOn(console, 'error').mockImplementation(() => { /* intentionally empty */ });
vi.spyOn(console, 'log').mockImplementation(() => { /* intentionally empty */ });
});

afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});

test('should wait for custom delay before subscription completes', async () => {
const customDelay = 500;
const { client, mocks } = createMockClient();

const adapter = new SupabaseBroadcastAdapter(client, {
stabilizationDelayMs: customDelay
});

// Setup channel subscription that emits SUBSCRIBED immediately
mockChannelSubscription(mocks);

// Start subscription (returns promise)
const subscribePromise = adapter.subscribeToRun(RUN_ID);

// Track whether promise has resolved
let isResolved = false;
subscribePromise.then(() => { isResolved = true; });

// Flush only microtasks (not timers) to process the SUBSCRIBED event
await Promise.resolve();

// At this point, SUBSCRIBED has been received but we should still be waiting
// for the stabilization delay
expect(isResolved).toBe(false);

// Advance time by less than custom delay
await vi.advanceTimersByTimeAsync(customDelay - 100);
expect(isResolved).toBe(false); // Still waiting

// Advance past the custom delay
await vi.advanceTimersByTimeAsync(100);
expect(isResolved).toBe(true); // Now it's ready!
});

test('should use default 300ms delay when not configured', async () => {
const { client, mocks } = createMockClient();

const adapter = new SupabaseBroadcastAdapter(client);

mockChannelSubscription(mocks);

const subscribePromise = adapter.subscribeToRun(RUN_ID);

let isResolved = false;
subscribePromise.then(() => { isResolved = true; });

// Flush only microtasks
await Promise.resolve();

// Should NOT be ready before 300ms
await vi.advanceTimersByTimeAsync(299);
expect(isResolved).toBe(false);

// Should be ready after 300ms
await vi.advanceTimersByTimeAsync(1);
expect(isResolved).toBe(true);
});

test('should be immediately ready when delay is 0', async () => {
const { client, mocks } = createMockClient();

const adapter = new SupabaseBroadcastAdapter(client, {
stabilizationDelayMs: 0
});

mockChannelSubscription(mocks);

const subscribePromise = adapter.subscribeToRun(RUN_ID);

let isResolved = false;
subscribePromise.then(() => { isResolved = true; });

// Flush microtasks and timers
await vi.runAllTimersAsync();

// Should be ready immediately
expect(isResolved).toBe(true);
});

test('should allow different delays for different adapter instances', async () => {
const { client: client1, mocks: mocks1 } = createMockClient();
const { client: client2, mocks: mocks2 } = createMockClient();

const adapter1 = new SupabaseBroadcastAdapter(client1, {
stabilizationDelayMs: 200
});

const adapter2 = new SupabaseBroadcastAdapter(client2, {
stabilizationDelayMs: 400
});

mockChannelSubscription(mocks1);
mockChannelSubscription(mocks2);

const promise1 = adapter1.subscribeToRun('run-1');
const promise2 = adapter2.subscribeToRun('run-2');

let resolved1 = false;
let resolved2 = false;
promise1.then(() => { resolved1 = true; });
promise2.then(() => { resolved2 = true; });

// Flush microtasks only
await Promise.resolve();

// After 200ms, adapter1 should be ready but adapter2 should not
await vi.advanceTimersByTimeAsync(200);
expect(resolved1).toBe(true);
expect(resolved2).toBe(false);

// After 400ms total, both should be ready
await vi.advanceTimersByTimeAsync(200);
expect(resolved1).toBe(true);
expect(resolved2).toBe(true);
});
});
26 changes: 19 additions & 7 deletions pkgs/client/__tests__/integration/concurrent-operations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ describe('Concurrent Operations Tests', () => {

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

// Start flows sequentially to avoid overwhelming the system
console.log('=== Starting flows ===');
Expand Down Expand Up @@ -100,10 +102,16 @@ describe('Concurrent Operations Tests', () => {
const supabaseClient1 = createTestSupabaseClient();
const supabaseClient2 = createTestSupabaseClient();
const supabaseClient3 = createTestSupabaseClient();

const pgflowClient1 = new PgflowClient(supabaseClient1);
const pgflowClient2 = new PgflowClient(supabaseClient2);
const pgflowClient3 = new PgflowClient(supabaseClient3);

const pgflowClient1 = new PgflowClient(supabaseClient1, {
realtimeStabilizationDelayMs: 1000,
});
const pgflowClient2 = new PgflowClient(supabaseClient2, {
realtimeStabilizationDelayMs: 1000,
});
const pgflowClient3 = new PgflowClient(supabaseClient3, {
realtimeStabilizationDelayMs: 1000,
});

// Client 1 starts the flow
const input = { data: 'multi-client-test' };
Expand Down Expand Up @@ -186,7 +194,9 @@ describe('Concurrent Operations Tests', () => {

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

// Start fewer runs to reduce system load
const runs = await Promise.all([
Expand Down Expand Up @@ -268,7 +278,9 @@ describe('Concurrent Operations Tests', () => {

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

// Start flows sequentially for reliability
const runA = await pgflowClient.startFlow(flowA.slug, { type: 'flow-a' });
Expand Down
48 changes: 36 additions & 12 deletions pkgs/client/__tests__/integration/flow-lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ describe('Flow Lifecycle Integration', () => {
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'first_step')`;

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const input = { url: 'https://example.com' };
const run = await pgflowClient.startFlow(testFlow.slug, input);
Expand Down Expand Up @@ -51,7 +53,9 @@ describe('Flow Lifecycle Integration', () => {
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'step_two')`;

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const run = await pgflowClient.startFlow(testFlow.slug, {
data: 'test',
Expand Down Expand Up @@ -83,7 +87,9 @@ describe('Flow Lifecycle Integration', () => {
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'step_two')`;

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const run = await pgflowClient.startFlow(testFlow.slug, {
data: 'consistency-test',
Expand Down Expand Up @@ -119,7 +125,9 @@ describe('Flow Lifecycle Integration', () => {

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const input = { data: 'lifecycle-test' };
const run = await pgflowClient.startFlow(testFlow.slug, input);
Expand Down Expand Up @@ -154,7 +162,9 @@ describe('Flow Lifecycle Integration', () => {
await grantMinimalPgflowPermissions(sql);

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

await expect(
pgflowClient.startFlow('nonexistent-flow', { data: 'test' })
Expand All @@ -178,7 +188,9 @@ describe('Flow Lifecycle Integration', () => {

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const run = await pgflowClient.startFlow(testFlow.slug, {
data: 'will-fail',
Expand Down Expand Up @@ -212,7 +224,9 @@ describe('Flow Lifecycle Integration', () => {

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const originalRun = await pgflowClient.startFlow(testFlow.slug, {
data: 'retrieve-test',
Expand Down Expand Up @@ -255,7 +269,9 @@ describe('Flow Lifecycle Integration', () => {
await grantMinimalPgflowPermissions(sql);

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const run = await pgflowClient.getRun(
'00000000-0000-0000-0000-000000000000'
Expand All @@ -279,7 +295,9 @@ describe('Flow Lifecycle Integration', () => {
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'cached_step')`;

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const run1 = await pgflowClient.startFlow(testFlow.slug, {
data: 'cache-test',
Expand Down Expand Up @@ -312,7 +330,9 @@ describe('Flow Lifecycle Integration', () => {
await sql`SELECT pgflow.add_step(${testFlow2.slug}, 'step2')`;

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const run1 = await pgflowClient.startFlow(testFlow1.slug, {
data: 'flow1',
Expand Down Expand Up @@ -354,7 +374,9 @@ describe('Flow Lifecycle Integration', () => {
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'custom_step')`;

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const customRunId = `12345678-1234-1234-1234-${Date.now()
.toString()
Expand Down Expand Up @@ -384,7 +406,9 @@ describe('Flow Lifecycle Integration', () => {
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'complex_step')`;

const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const complexInput = {
user: {
Expand Down
4 changes: 3 additions & 1 deletion pkgs/client/__tests__/integration/full-stack-dsl.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ describe('Full Stack DSL Integration', () => {
// 5. Start flow via client
const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

const input = { url: 'https://api.example.com/test' };
const run = await pgflowClient.startFlow(SimpleFlow.slug, input);
Expand Down
4 changes: 3 additions & 1 deletion pkgs/client/__tests__/integration/happy-path-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ describe('Happy Path E2E Integration', () => {

const sqlClient = new PgflowSqlClient(sql);
const supabaseClient = createTestSupabaseClient();
const pgflowClient = new PgflowClient(supabaseClient);
const pgflowClient = new PgflowClient(supabaseClient, {
realtimeStabilizationDelayMs: 1000,
});

// Track all events received
const receivedRunEvents: any[] = [];
Expand Down
Loading
Loading