Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0b2771f
perf: implement event-sourced architecture
pranaygp Dec 18, 2025
ce7fc09
fix(world-vercel): handle wire format for step in event results
pranaygp Dec 23, 2025
0b26b2b
feat(world-postgres): handle hook_disposed event in event handler
pranaygp Dec 23, 2025
3b918e6
feat(world-postgres): enforce hook token uniqueness per tenant
pranaygp Dec 23, 2025
a73d077
feat: add e2e test for hook token conflict
pranaygp Dec 23, 2025
3e42977
chore: remove flaky hookTokenConflictWorkflow test
pranaygp Dec 23, 2025
5b45de6
fix: address code review issues
pranaygp Dec 23, 2025
8566245
fix: throw error on corrupted event log instead of warning
pranaygp Dec 23, 2025
6e11240
Move event log corruption check to step_created event
pranaygp Dec 23, 2025
0a3431f
Remove unused paused/resumed run events and states
pranaygp Dec 23, 2025
969da00
Add changeset for paused/resumed removal
pranaygp Dec 23, 2025
ce24493
Fix hook token conflict error to use WorkflowAPIError with status 409
pranaygp Dec 23, 2025
6bd8baa
Remove obsolete tests for deprecated direct create methods
pranaygp Dec 23, 2025
038cdd6
Remove fatal field from step_failed event
pranaygp Dec 23, 2025
6f26c56
Rename step's lastKnownError to error
pranaygp Dec 23, 2025
88e61fd
Add terminal state validation and comprehensive event lifecycle tests
pranaygp Dec 24, 2025
7419729
Add terminal state validation to world-postgres and rename firstStart…
pranaygp Dec 24, 2025
736cecd
Add step_retrying on terminal step tests
pranaygp Dec 24, 2025
faecbea
Fix performance regression: reuse validation reads
pranaygp Dec 24, 2025
32f21aa
perf: add events.createBatch() for batch event creation
pranaygp Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .changeset/batch-event-creation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"@workflow/core": patch
"@workflow/world": patch
"@workflow/world-local": patch
"@workflow/world-postgres": patch
"@workflow/world-vercel": patch
---

perf: add events.createBatch() for batch event creation

- Add `createBatch()` method to Storage interface for creating multiple events atomically
- Use batch event creation in suspension handler for improved performance
- Use batch event creation for wait_completed events in runtime
19 changes: 19 additions & 0 deletions .changeset/event-sourced-entities.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
"@workflow/core": patch
"@workflow/world": patch
"@workflow/world-local": patch
"@workflow/world-postgres": patch
"@workflow/world-vercel": patch
"@workflow/web": patch
"@workflow/web-shared": patch
---

perf: implement event-sourced architecture for runs, steps, and hooks

- Add run lifecycle events (run_created, run_started, run_completed, run_failed, run_cancelled)
- Add step_retrying event for non-fatal step failures that will be retried
- Remove `fatal` field from step_failed event (step_failed now implies terminal failure)
- Rename step's `lastKnownError` to `error` for consistency with server
- Update world implementations to create/update entities from events via events.create()
- Entities (runs, steps, hooks) are now materializations of the event log
- This makes the system faster, easier to reason about, and resilient to data inconsistencies
15 changes: 15 additions & 0 deletions .changeset/remove-paused-resumed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"@workflow/world": patch
"@workflow/world-local": patch
"@workflow/world-vercel": patch
"@workflow/cli": patch
"@workflow/web": patch
"@workflow/web-shared": patch
---

**BREAKING CHANGE**: Remove unused paused/resumed run events and states

- Remove `run_paused` and `run_resumed` event types
- Remove `paused` status from `WorkflowRunStatus`
- Remove `PauseWorkflowRunParams` and `ResumeWorkflowRunParams` types
- Remove `pauseWorkflowRun` and `resumeWorkflowRun` functions from world-vercel
2 changes: 0 additions & 2 deletions packages/cli/src/lib/inspect/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ const STATUS_COLORS: Record<
failed: chalk.red,
cancelled: chalk.strikethrough.yellow,
pending: chalk.blue,
paused: chalk.yellow,
};

const isStreamId = (value: string) => {
Expand All @@ -116,7 +115,6 @@ const showStatusLegend = () => {
'failed',
'cancelled',
'pending',
'paused',
];

const legendItems = statuses.map((status) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/lib/inspect/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ export const startRun = async (
};

export const cancelRun = async (world: World, runId: string) => {
await world.runs.cancel(runId);
await world.events.create(runId, { eventType: 'run_cancelled' });
logger.log(chalk.green(`Cancel signal sent to run ${runId}`));
};
5 changes: 5 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,11 @@ describe('e2e', () => {
}
);

// TODO: Add test for concurrent hook token conflict once workflow-server PR is merged and deployed
// PR: https://github.com/vercel/workflow-server/pull/XXX (pranaygp/event-sourced-api-v3 branch)
// The test should verify that two concurrent workflows cannot use the same hook token
// See: hookCleanupTestWorkflow for sequential token reuse (after workflow completion)

test(
'stepFunctionPassingWorkflow - step function references can be passed as arguments (without closure vars)',
{ timeout: 60_000 },
Expand Down
92 changes: 64 additions & 28 deletions packages/core/src/events-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ describe('EventsConsumer', () => {
await waitForNextTick();

expect(callback).toHaveBeenCalledWith(event);
// Without auto-advance, callback is only called once
expect(callback).toHaveBeenCalledTimes(1);
});
});
Expand All @@ -87,6 +88,7 @@ describe('EventsConsumer', () => {
await waitForNextTick();

expect(callback).toHaveBeenCalledWith(event);
// Without auto-advance, callback is only called once
expect(callback).toHaveBeenCalledTimes(1);
});

Expand All @@ -109,23 +111,27 @@ describe('EventsConsumer', () => {
consumer.subscribe(callback);
await waitForNextTick();

// callback finishes at event1, index advances to 1
// Without auto-advance, event2 is NOT processed
expect(consumer.eventIndex).toBe(1);
expect(consumer.callbacks).toHaveLength(0);
});

it('should not increment event index when callback returns false', async () => {
it('should NOT auto-advance when all callbacks return NotConsumed', async () => {
const event = createMockEvent();
const consumer = new EventsConsumer([event]);
const callback = vi.fn().mockReturnValue(EventConsumerResult.NotConsumed);

consumer.subscribe(callback);
await waitForNextTick();
await waitForNextTick(); // Extra tick to confirm no auto-advance

// Without auto-advance, eventIndex stays at 0
expect(consumer.eventIndex).toBe(0);
expect(consumer.callbacks).toContain(callback);
});

it('should process multiple callbacks until one returns true', async () => {
it('should process multiple callbacks until one returns Consumed or Finished', async () => {
const event = createMockEvent();
const consumer = new EventsConsumer([event]);
const callback1 = vi
Expand All @@ -140,15 +146,17 @@ describe('EventsConsumer', () => {
consumer.subscribe(callback2);
consumer.subscribe(callback3);
await waitForNextTick();
await waitForNextTick(); // For next event processing

expect(callback1).toHaveBeenCalledWith(event);
expect(callback2).toHaveBeenCalledWith(event);
// callback3 sees the next event (null since we only have one event)
expect(callback3).toHaveBeenCalledWith(null);
expect(consumer.eventIndex).toBe(1);
expect(consumer.callbacks).toEqual([callback1, callback3]);
});

it('should process all callbacks when none return true', async () => {
it('should NOT advance when all callbacks return NotConsumed', async () => {
const event = createMockEvent();
const consumer = new EventsConsumer([event]);
const callback1 = vi
Expand All @@ -169,6 +177,7 @@ describe('EventsConsumer', () => {
expect(callback1).toHaveBeenCalledWith(event);
expect(callback2).toHaveBeenCalledWith(event);
expect(callback3).toHaveBeenCalledWith(event);
// Without auto-advance, eventIndex stays at 0
expect(consumer.eventIndex).toBe(0);
expect(consumer.callbacks).toEqual([callback1, callback2, callback3]);
});
Expand Down Expand Up @@ -211,7 +220,7 @@ describe('EventsConsumer', () => {
expect(callback2).toHaveBeenCalledWith(null);
});

it('should handle complex event processing scenario', async () => {
it('should handle complex event processing with multiple consumers', async () => {
const events = [
createMockEvent({ id: 'event-1', event_type: 'type-a' }),
createMockEvent({ id: 'event-2', event_type: 'type-b' }),
Expand Down Expand Up @@ -241,13 +250,14 @@ describe('EventsConsumer', () => {
consumer.subscribe(typeBCallback);
await waitForNextTick();
await waitForNextTick(); // Wait for recursive processing
await waitForNextTick(); // Wait for final processing

// typeACallback processes event-1 and gets removed, so it won't process event-3
// typeACallback processes event-1 and gets removed
expect(typeACallback).toHaveBeenCalledTimes(1); // Called for event-1 only
// typeBCallback processes event-2 and gets removed
expect(typeBCallback).toHaveBeenCalledTimes(1); // Called for event-2
expect(consumer.eventIndex).toBe(2); // Only 2 events processed (event-3 remains)
expect(consumer.callbacks).toHaveLength(0); // Both callbacks removed after consuming their events
// eventIndex is at 2 (after event-1 and event-2 were consumed)
expect(consumer.eventIndex).toBe(2);
expect(consumer.callbacks).toHaveLength(0);
});
});

Expand Down Expand Up @@ -297,8 +307,9 @@ describe('EventsConsumer', () => {
consumer.subscribe(callback3);
await waitForNextTick();

// callback2 should be removed when it returns true
// callback2 should be removed when it returns Finished
expect(consumer.callbacks).toEqual([callback1, callback3]);
// callback3 is called with the next event (null after event-1)
expect(callback3).toHaveBeenCalledWith(null);
});

Expand All @@ -314,25 +325,6 @@ describe('EventsConsumer', () => {
expect(consumer.eventIndex).toBe(1);
});

it('should handle multiple subscriptions happening in sequence', async () => {
const event1 = createMockEvent({ id: 'event-1' });
const event2 = createMockEvent({ id: 'event-2' });
const consumer = new EventsConsumer([event1, event2]);

const callback1 = vi.fn().mockReturnValue(EventConsumerResult.Finished);
const callback2 = vi.fn().mockReturnValue(EventConsumerResult.Finished);

consumer.subscribe(callback1);
await waitForNextTick();

consumer.subscribe(callback2);
await waitForNextTick();

expect(callback1).toHaveBeenCalledWith(event1);
expect(callback2).toHaveBeenCalledWith(event2);
expect(consumer.eventIndex).toBe(2);
});

it('should handle empty events array gracefully', async () => {
const consumer = new EventsConsumer([]);
const callback = vi.fn().mockReturnValue(EventConsumerResult.NotConsumed);
Expand All @@ -343,5 +335,49 @@ describe('EventsConsumer', () => {
expect(callback).toHaveBeenCalledWith(null);
expect(consumer.eventIndex).toBe(0);
});

it('should process events in order with proper consumers', async () => {
// This test simulates the workflow scenario:
// - run_created consumer consumes it
// - step consumer gets step_created, step_completed
const events = [
createMockEvent({ id: 'run-created', event_type: 'run_created' }),
createMockEvent({ id: 'step-created', event_type: 'step_created' }),
createMockEvent({ id: 'step-completed', event_type: 'step_completed' }),
];
const consumer = new EventsConsumer(events);

// Run lifecycle consumer - consumes run_created
const runConsumer = vi.fn().mockImplementation((event: Event | null) => {
if (event?.event_type === 'run_created') {
return EventConsumerResult.Consumed;
}
return EventConsumerResult.NotConsumed;
});

// Step consumer - consumes step_created, finishes on step_completed
const stepConsumer = vi.fn().mockImplementation((event: Event | null) => {
if (event?.event_type === 'step_created') {
return EventConsumerResult.Consumed;
}
if (event?.event_type === 'step_completed') {
return EventConsumerResult.Finished;
}
return EventConsumerResult.NotConsumed;
});

consumer.subscribe(runConsumer);
consumer.subscribe(stepConsumer);
await waitForNextTick();
await waitForNextTick();
await waitForNextTick();

// runConsumer consumes run_created
expect(runConsumer).toHaveBeenCalledWith(events[0]);
// stepConsumer consumes step_created, then finishes on step_completed
expect(stepConsumer).toHaveBeenCalledWith(events[1]);
expect(stepConsumer).toHaveBeenCalledWith(events[2]);
expect(consumer.eventIndex).toBe(3);
});
});
});
5 changes: 5 additions & 0 deletions packages/core/src/events-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,10 @@ export class EventsConsumer {
return;
}
}

// If we reach here, all callbacks returned NotConsumed.
// We do NOT auto-advance - every event must have a consumer.
// With proper consumers for run_created/run_started/step_created,
// this should not cause events to get stuck.
};
}
1 change: 1 addition & 0 deletions packages/core/src/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export interface StepInvocationQueueItem {
stepName: string;
args: Serializable[];
closureVars?: Record<string, Serializable>;
hasCreatedEvent?: boolean;
}

export interface HookInvocationQueueItem {
Expand Down
Loading
Loading