Skip to content

Commit 6f8c74b

Browse files
committed
feat: implement ancestor cleanup during termination
- Add ancestor cleanup logic in checkpoint manager termination evaluation - Use original stepId from metadata to avoid hashed stepId issues - Clean up operations with finished ancestors in RETRY_WAITING, IDLE_NOT_AWAITED, and IDLE_AWAITED states - Add comprehensive unit tests for ancestor cleanup functionality - Update CompletionConfig TSDoc with race condition behavior note Fixes memory leaks and ensures proper resource cleanup when ancestor contexts complete.
1 parent 2fd7f66 commit 6f8c74b

File tree

6 files changed

+276
-1
lines changed

6 files changed

+276
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { handler } from "./min-successful-with-passing-threshold";
2+
import { createTests } from "../../../utils/test-helper";
3+
import { OperationStatus } from "@aws/durable-execution-sdk-js-testing";
4+
5+
createTests({
6+
localRunnerConfig: {
7+
skipTime: false,
8+
checkpointDelay: 100,
9+
},
10+
handler,
11+
tests: (runner) => {
12+
it("should complete early when minSuccessful is reached", async () => {
13+
const execution = await runner.run();
14+
const result = execution.getResult() as any;
15+
16+
// Assert overall results
17+
expect(result.successCount).toBe(2);
18+
expect(result.completionReason).toBe("MIN_SUCCESSFUL_REACHED");
19+
expect(result.totalCount).toBe(5);
20+
21+
// Get the parallel operation to verify individual branch results
22+
// Get individual branch operations
23+
const branch1 = runner.getOperation("branch-1");
24+
const branch2 = runner.getOperation("branch-2");
25+
const branch3 = runner.getOperation("branch-3");
26+
const branch4 = runner.getOperation("branch-4");
27+
const branch5 = runner.getOperation("branch-5");
28+
29+
// First two branches should succeed (branch-1 and branch-2 complete fastest)
30+
expect(branch1?.getStatus()).toBe(OperationStatus.SUCCEEDED);
31+
expect(branch2?.getStatus()).toBe(OperationStatus.SUCCEEDED);
32+
expect(branch3?.getStatus()).toBe(OperationStatus.SUCCEEDED);
33+
expect(branch4?.getStatus()).toBe(OperationStatus.SUCCEEDED);
34+
35+
// Remaining branches should be in STARTED state (not completed)
36+
expect(branch5?.getStatus()).toBe(OperationStatus.STARTED);
37+
38+
// Verify the results array matches
39+
expect(result.results).toEqual(["Branch 1 result", "Branch 2 result"]);
40+
});
41+
},
42+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import {
2+
DurableContext,
3+
withDurableExecution,
4+
} from "@aws/durable-execution-sdk-js";
5+
import { ExampleConfig } from "../../../types";
6+
import { log } from "../../../utils/logger";
7+
8+
export const config: ExampleConfig = {
9+
name: "Parallel minSuccessful",
10+
description: "Parallel execution with minSuccessful completion config",
11+
};
12+
13+
export const handler = withDurableExecution(
14+
async (event: any, context: DurableContext) => {
15+
log("Starting parallel execution with minSuccessful: 2");
16+
17+
// First brach finishes first
18+
// Branch 2 to 4 finish in the same time
19+
// Branc 5 will finish later
20+
const results = await context.parallel(
21+
"min-successful-branches",
22+
[
23+
{
24+
name: "branch-1",
25+
func: async (ctx) => {
26+
return await ctx.step("branch-1", async () => {
27+
await new Promise((resolve) => setTimeout(resolve, 10));
28+
return "Branch 1 result";
29+
});
30+
},
31+
},
32+
{
33+
name: "branch-2",
34+
func: async (ctx) => {
35+
return await ctx.step("branch-2", async () => {
36+
await new Promise((resolve) => setTimeout(resolve, 50));
37+
return "Branch 2 result";
38+
});
39+
},
40+
},
41+
{
42+
name: "branch-3",
43+
func: async (ctx) => {
44+
return await ctx.step("branch-3", async () => {
45+
await new Promise((resolve) => setTimeout(resolve, 50));
46+
return "Branch 3 result";
47+
});
48+
},
49+
},
50+
{
51+
name: "branch-4",
52+
func: async (ctx) => {
53+
return await ctx.step("branch-4", async () => {
54+
await new Promise((resolve) => setTimeout(resolve, 50));
55+
return "Branch 4 result";
56+
});
57+
},
58+
},
59+
{
60+
name: "branch-5",
61+
func: async (ctx) => {
62+
return await ctx.step("branch-4", async () => {
63+
await new Promise((resolve) => setTimeout(resolve, 2000));
64+
return "Branch 4 result";
65+
});
66+
},
67+
},
68+
],
69+
{
70+
completionConfig: {
71+
minSuccessful: 2,
72+
},
73+
},
74+
);
75+
76+
await context.wait({ seconds: 1 });
77+
78+
log(`Completed with ${results.successCount} successes`);
79+
log(`Completion reason: ${results.completionReason}`);
80+
81+
return {
82+
successCount: results.successCount,
83+
totalCount: results.totalCount,
84+
completionReason: results.completionReason,
85+
results: results.getResults(),
86+
};
87+
},
88+
);

packages/aws-durable-execution-sdk-js-examples/template.yml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,6 +1123,31 @@ Resources:
11231123
DURABLE_EXAMPLES_VERBOSE: "true"
11241124
Metadata:
11251125
SkipBuild: "True"
1126+
MinSuccessfulWithPassingThreshold:
1127+
Type: AWS::Serverless::Function
1128+
Properties:
1129+
FunctionName: ParallelminSuccessful-22x-NodeJS-Local
1130+
CodeUri: ./dist
1131+
Handler: min-successful-with-passing-threshold.handler
1132+
Runtime: nodejs22.x
1133+
Architectures:
1134+
- x86_64
1135+
MemorySize: 128
1136+
Timeout: 60
1137+
Role:
1138+
Fn::GetAtt:
1139+
- DurableFunctionRole
1140+
- Arn
1141+
DurableConfig:
1142+
ExecutionTimeout: 60
1143+
RetentionPeriodInDays: 7
1144+
Environment:
1145+
Variables:
1146+
AWS_ENDPOINT_URL_LAMBDA: http://host.docker.internal:5000
1147+
DURABLE_VERBOSE_MODE: "false"
1148+
DURABLE_EXAMPLES_VERBOSE: "true"
1149+
Metadata:
1150+
SkipBuild: "True"
11261151
ParallelToleratedFailureCount:
11271152
Type: AWS::Serverless::Function
11281153
Properties:

packages/aws-durable-execution-sdk-js/src/types/batch.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,14 @@ export interface BatchResult<TResult> {
6969
}
7070

7171
/**
72+
* Configuration for early completion of map/parallel operations
73+
*
74+
* @remarks
75+
* **Race Condition Behavior**: When multiple children complete simultaneously,
76+
* the parent operation may have more completed children than the specified threshold
77+
* by the time the completion check occurs. This is expected behavior due to the
78+
* asynchronous nature of concurrent execution.
79+
*
7280
* @public
7381
*/
7482
export interface CompletionConfig {

packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-ancestor.test.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { createTestCheckpointManager } from "../../testing/create-test-checkpoin
44
import { createMockExecutionContext } from "../../testing/mock-context";
55
import { EventEmitter } from "events";
66
import { createDefaultLogger } from "../logger/default-logger";
7+
import { OperationLifecycleState } from "../../types/operation-lifecycle-state";
8+
import { OperationSubType } from "../../types/core";
79

810
describe("CheckpointManager - Ancestor Functionality", () => {
911
let checkpointManager: CheckpointManager;
@@ -92,6 +94,107 @@ describe("CheckpointManager - Ancestor Functionality", () => {
9294
});
9395
});
9496

97+
describe("ancestor cleanup during termination", () => {
98+
it("should clean up operations with finished ancestors during termination", () => {
99+
// Create operations first (before marking ancestors as finished)
100+
checkpointManager.markOperationState(
101+
"1-2-3",
102+
OperationLifecycleState.RETRY_WAITING,
103+
{
104+
metadata: {
105+
stepId: "1-2-3",
106+
name: "test-step",
107+
type: "STEP",
108+
subType: OperationSubType.STEP,
109+
parentId: "1-2",
110+
},
111+
},
112+
);
113+
114+
checkpointManager.markOperationState(
115+
"1-2-4",
116+
OperationLifecycleState.IDLE_AWAITED,
117+
{
118+
metadata: {
119+
stepId: "1-2-4",
120+
name: "test-step-2",
121+
type: "STEP",
122+
subType: OperationSubType.STEP,
123+
parentId: "1-2",
124+
},
125+
},
126+
);
127+
128+
// Create operation without finished ancestor (should not be cleaned up)
129+
checkpointManager.markOperationState(
130+
"1-3-1",
131+
OperationLifecycleState.RETRY_WAITING,
132+
{
133+
metadata: {
134+
stepId: "1-3-1",
135+
name: "test-step-3",
136+
type: "STEP",
137+
subType: OperationSubType.STEP,
138+
parentId: "1-3",
139+
},
140+
},
141+
);
142+
143+
// Verify operations exist before cleanup
144+
expect(checkpointManager.getOperationState("1-2-3")).toBe(
145+
OperationLifecycleState.RETRY_WAITING,
146+
);
147+
expect(checkpointManager.getOperationState("1-2-4")).toBe(
148+
OperationLifecycleState.IDLE_AWAITED,
149+
);
150+
expect(checkpointManager.getOperationState("1-3-1")).toBe(
151+
OperationLifecycleState.RETRY_WAITING,
152+
);
153+
154+
// Now mark ancestor as finished
155+
checkpointManager.markAncestorFinished("1-2");
156+
157+
// Trigger termination logic that includes ancestor cleanup
158+
(checkpointManager as any).checkAndTerminate();
159+
160+
// Operations with finished ancestors should be cleaned up
161+
expect(checkpointManager.getOperationState("1-2-3")).toBeUndefined();
162+
expect(checkpointManager.getOperationState("1-2-4")).toBeUndefined();
163+
164+
// Operation without finished ancestor should remain
165+
expect(checkpointManager.getOperationState("1-3-1")).toBe(
166+
OperationLifecycleState.RETRY_WAITING,
167+
);
168+
});
169+
170+
it("should not clean up operations in EXECUTING state even with finished ancestors", () => {
171+
// Create operation first
172+
checkpointManager.markOperationState(
173+
"1-2-3",
174+
OperationLifecycleState.EXECUTING,
175+
{
176+
metadata: {
177+
stepId: "1-2-3",
178+
name: "test-step",
179+
type: "STEP",
180+
subType: OperationSubType.STEP,
181+
parentId: "1-2",
182+
},
183+
},
184+
);
185+
186+
// Then mark ancestor as finished
187+
checkpointManager.markAncestorFinished("1-2");
188+
189+
(checkpointManager as any).checkAndTerminate();
190+
191+
// EXECUTING operation should not be cleaned up
192+
expect(checkpointManager.getOperationState("1-2-3")).toBe(
193+
OperationLifecycleState.EXECUTING,
194+
);
195+
});
196+
});
197+
95198
describe("checkpoint with finished ancestors", () => {
96199
it("should skip checkpoint when ancestor is finished", async () => {
97200
checkpointManager.markAncestorFinished("1-2");

packages/aws-durable-execution-sdk-js/src/utils/checkpoint/checkpoint-manager.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,16 @@ export class CheckpointManager implements Checkpoint {
612612
op.state === OperationLifecycleState.IDLE_NOT_AWAITED ||
613613
op.state === OperationLifecycleState.IDLE_AWAITED
614614
) {
615-
// Note: Ancestor completion checking removed - operations will continue normally
615+
// Use the original stepId from metadata, not the potentially hashed op.stepId
616+
const originalStepId = op.metadata.stepId;
617+
if (this.hasFinishedAncestor(originalStepId)) {
618+
log(
619+
"🧹",
620+
`Cleaning up operation with completed ancestor: ${originalStepId}`,
621+
);
622+
this.cleanupOperation(op.stepId);
623+
this.operations.delete(op.stepId);
624+
}
616625
}
617626
}
618627

0 commit comments

Comments
 (0)