Skip to content

Commit bd843bc

Browse files
authored
fix(testing-sdk): only return new data in checkpoint response (#393)
*Issue #, if available:* #384 *Description of changes:* Currently the testing library local runner checkpoint API returns the entire operation list on each checkpoint, instead of only what changed since the last checkpoint call. Although this typically works correctly, this is not correct behaviour, and can make it harder to debug issues. This logic is also necessary to ensure that we re-invoke when there is stale data on the language SDK side compared to the backend (i.e. checkpoint data that has not reached the client yet). By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 98afd9b commit bd843bc

12 files changed

+666
-398
lines changed

packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/__tests__/checkpoint-handlers.test.ts

Lines changed: 112 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -250,20 +250,23 @@ describe("checkpoint handlers", () => {
250250
expect(registerUpdatesSpy).toHaveBeenCalledWith(input.Updates);
251251

252252
expect(result.CheckpointToken).toBeDefined();
253-
expect(result.NewExecutionState?.Operations).toEqual(
254-
expect.arrayContaining([
255-
expect.objectContaining({
256-
Type: OperationType.EXECUTION,
257-
Status: OperationStatus.STARTED,
258-
}),
259-
expect.objectContaining({
260-
Id: "test-step-op",
261-
Type: OperationType.STEP,
262-
Status: OperationStatus.STARTED,
263-
Name: "TestStep",
264-
}),
265-
]),
266-
);
253+
expect(result.NewExecutionState?.Operations).toEqual([
254+
{
255+
Id: "test-step-op",
256+
Name: "TestStep",
257+
StartTimestamp: expect.any(Date),
258+
Status: "STARTED",
259+
Type: "STEP",
260+
},
261+
{
262+
EndTimestamp: expect.any(Date),
263+
Id: "test-update-op",
264+
StartTimestamp: expect.any(Date),
265+
Status: "SUCCEEDED",
266+
StepDetails: { Attempt: 1, Result: "test result" },
267+
Type: "STEP",
268+
},
269+
]);
267270
expect(result.NewExecutionState?.NextMarker).toBeUndefined();
268271
});
269272

@@ -337,20 +340,15 @@ describe("checkpoint handlers", () => {
337340

338341
expect(registerUpdatesSpy).toHaveBeenCalledWith([]);
339342
expect(result.CheckpointToken).toBeDefined();
340-
expect(result.NewExecutionState?.Operations).toEqual(
341-
expect.arrayContaining([
342-
expect.objectContaining({
343-
Type: OperationType.EXECUTION,
344-
Status: OperationStatus.STARTED,
345-
}),
346-
expect.objectContaining({
347-
Id: "test-step-op",
348-
Type: OperationType.STEP,
349-
Status: OperationStatus.STARTED,
350-
Name: "TestStep",
351-
}),
352-
]),
353-
);
343+
expect(result.NewExecutionState?.Operations).toEqual([
344+
{
345+
Id: "test-step-op",
346+
Name: "TestStep",
347+
StartTimestamp: expect.any(Date),
348+
Status: "STARTED",
349+
Type: "STEP",
350+
},
351+
]);
354352
});
355353

356354
it("should generate new checkpoint token with different UUID", () => {
@@ -382,23 +380,18 @@ describe("checkpoint handlers", () => {
382380
});
383381

384382
expect(result.CheckpointToken).toEqual(expectedNewToken);
385-
expect(result.NewExecutionState?.Operations).toEqual(
386-
expect.arrayContaining([
387-
expect.objectContaining({
388-
Type: OperationType.EXECUTION,
389-
Status: OperationStatus.STARTED,
390-
}),
391-
expect.objectContaining({
392-
Id: "test-step-op",
393-
Type: OperationType.STEP,
394-
Status: OperationStatus.STARTED,
395-
Name: "TestStep",
396-
}),
397-
]),
398-
);
383+
expect(result.NewExecutionState?.Operations).toEqual([
384+
{
385+
Id: "test-step-op",
386+
Name: "TestStep",
387+
StartTimestamp: expect.any(Date),
388+
Status: "STARTED",
389+
Type: "STEP",
390+
},
391+
]);
399392
});
400393

401-
it("should include all operations from storage in response", () => {
394+
it("should include changed operations from storage in response", () => {
402395
const executionArn = "test-execution-arn";
403396
const executionId = createExecutionId(executionArn);
404397
const { storage, invocationResult } =
@@ -429,26 +422,84 @@ describe("checkpoint handlers", () => {
429422
);
430423

431424
expect(result.CheckpointToken).toBeDefined();
432-
expect(result.NewExecutionState?.Operations).toEqual(
433-
expect.arrayContaining([
434-
expect.objectContaining({
435-
Type: OperationType.EXECUTION,
436-
Status: OperationStatus.STARTED,
437-
}),
438-
expect.objectContaining({
439-
Id: "test-step-op",
425+
expect(result.NewExecutionState?.Operations).toEqual([
426+
{
427+
Id: "test-step-op",
428+
Name: "TestStep",
429+
StartTimestamp: expect.any(Date),
430+
Status: "STARTED",
431+
Type: "STEP",
432+
},
433+
{
434+
Id: "another-op",
435+
StartTimestamp: expect.any(Date),
436+
Status: "STARTED",
437+
Type: "WAIT",
438+
WaitDetails: { ScheduledEndTimestamp: expect.any(Date) },
439+
},
440+
]);
441+
expect(result.NewExecutionState?.NextMarker).toBeUndefined();
442+
});
443+
444+
it("should call getDirtyOperations to return only changed operations", () => {
445+
const executionArn = "test-execution-arn";
446+
const executionId = createExecutionId(executionArn);
447+
const { storage, invocationResult } =
448+
createExecutionWithOperations(executionId);
449+
450+
const getDirtyOperationsSpy = jest.spyOn(storage, "getDirtyOperations");
451+
getDirtyOperationsSpy.mockReturnValue([
452+
{
453+
Id: "dirty-op-1",
454+
Type: OperationType.STEP,
455+
Status: OperationStatus.STARTED,
456+
StartTimestamp: new Date(),
457+
},
458+
{
459+
Id: "dirty-op-2",
460+
Type: OperationType.CALLBACK,
461+
Status: OperationStatus.SUCCEEDED,
462+
StartTimestamp: new Date(),
463+
},
464+
]);
465+
466+
const input: CheckpointDurableExecutionRequest = {
467+
DurableExecutionArn: executionArn,
468+
CheckpointToken: encodeCheckpointToken({
469+
executionId,
470+
invocationId: invocationResult.invocationId,
471+
token: "test-token",
472+
}),
473+
Updates: [
474+
{
475+
Id: "new-op",
440476
Type: OperationType.STEP,
441-
Status: OperationStatus.STARTED,
442-
Name: "TestStep",
443-
}),
444-
expect.objectContaining({
445-
Id: "another-op",
446-
Type: OperationType.WAIT,
447-
Status: OperationStatus.STARTED,
448-
}),
449-
]),
477+
Action: OperationAction.START,
478+
},
479+
],
480+
};
481+
482+
const result = processCheckpointDurableExecution(
483+
executionArn,
484+
input,
485+
executionManager,
450486
);
451-
expect(result.NewExecutionState?.NextMarker).toBeUndefined();
487+
488+
expect(getDirtyOperationsSpy).toHaveBeenCalledTimes(1);
489+
expect(result.NewExecutionState?.Operations).toEqual([
490+
{
491+
Id: "dirty-op-1",
492+
Type: OperationType.STEP,
493+
Status: OperationStatus.STARTED,
494+
StartTimestamp: expect.any(Date),
495+
},
496+
{
497+
Id: "dirty-op-2",
498+
Type: OperationType.CALLBACK,
499+
Status: OperationStatus.SUCCEEDED,
500+
StartTimestamp: expect.any(Date),
501+
},
502+
]);
452503
});
453504
});
454505
});

packages/aws-durable-execution-sdk-js-testing/src/checkpoint-server/handlers/checkpoint-handlers.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ export function processCheckpointDurableExecution(
105105
createCheckpointToken(input.CheckpointToken),
106106
);
107107

108-
validateCheckpointUpdates(updates, storage.operationDataMap);
108+
validateCheckpointUpdates(updates, storage.getAllOperationData());
109109
storage.registerUpdates(updates);
110110

111111
const output: CheckpointDurableExecutionResponse = {
@@ -115,10 +115,7 @@ export function processCheckpointDurableExecution(
115115
token: randomUUID(),
116116
}),
117117
NewExecutionState: {
118-
// TODO: implement pagination
119-
Operations: Array.from(storage.operationDataMap.values()).map(
120-
(data) => data.operation,
121-
),
118+
Operations: storage.getDirtyOperations(),
122119
NextMarker: undefined,
123120
},
124121
};

0 commit comments

Comments
 (0)