Skip to content

Conversation

@NathanFlurry
Copy link
Member

No description provided.

@vercel
Copy link

vercel bot commented Nov 4, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Ready Ready Preview Comment Nov 5, 2025 4:06am
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Nov 5, 2025 4:06am
rivet-inspector Ignored Ignored Preview Nov 5, 2025 4:06am
rivet-site Ignored Ignored Preview Nov 5, 2025 4:06am

Copy link
Member Author

NathanFlurry commented Nov 4, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 4, 2025

PR Review: Add Runner SSE Ping Interval

Summary

This PR adds a keepalive ping mechanism to the serverless runner's SSE connection to prevent premature connection closures. The implementation follows a similar pattern used elsewhere in the codebase.

Positive Aspects

Follows Established Patterns: The ping loop implementation closely mirrors the existing pattern in actor/router-endpoints.ts:416-428, which is good for consistency.

Appropriate Interval: The 1-second ping interval matches SSE_PING_INTERVAL used elsewhere in the codebase.

Proper State Management: Added #isRunnerStopped flag to track runner lifecycle and exit the ping loop cleanly.

Good Logging: Debug logs help troubleshoot connection issues with clear messages about why the loop exits.

Issues & Concerns

🔴 Critical: Potential Race Condition

There's a race condition in the state management:

#isRunnerStopped: boolean = false;

// In onShutdown callback (line 147-150):
onShutdown: () => {
    this.#runnerStopped.resolve(undefined);
    this.#isRunnerStopped = true;
}

Problem: The #isRunnerStopped flag is not thread-safe. If onShutdown is called while the ping loop is executing writeSSE or sleep, the flag update might not be immediately visible, potentially causing one extra ping to be sent after shutdown begins.

Impact: Low - likely harmless in practice since the stream will be closed, but not ideal for clean shutdown semantics.

Recommendation: Consider if the #runnerStopped.promise alone is sufficient for coordination, or document why the boolean flag is needed in addition to the promise.

🟡 Minor: Error Handling Gap

The ping loop doesn't have try-catch for writeSSE failures:

await stream.writeSSE({ event: "ping", data: "" });
await stream.sleep(RUNNER_SSE_PING_INTERVAL);

Problem: If writeSSE throws (e.g., stream already closed but not detected by the checks), the error will propagate up and potentially crash the handler.

Comparison: The similar implementation in router-endpoints.ts:351-439 wraps its entire SSE handling in try-catch.

Recommendation: Add try-catch around the ping loop or at least around writeSSE to handle write failures gracefully.

🟡 Minor: Inconsistent Constant Naming

The constant is named RUNNER_SSE_PING_INTERVAL but there's already SSE_PING_INTERVAL (exported from router-endpoints.ts:50) with the same value.

Recommendation: Consider reusing the existing SSE_PING_INTERVAL constant to avoid duplication and ensure consistency if this value ever needs to change.

🟢 Nitpick: Stream Check Order

The checks for stream.closed and stream.aborted come after checking #isRunnerStopped. While not wrong, checking the stream state first might be more efficient since it's the more common exit condition.

Performance Considerations

✅ The 1-second interval is reasonable and matches the existing pattern. No performance concerns.

Security Considerations

✅ No security issues identified. The ping mechanism is read-only and doesn't expose sensitive data.

Test Coverage

⚠️ No test changes included. Consider adding tests for:

  • Ping loop operates correctly during normal operation
  • Ping loop exits cleanly when #isRunnerStopped is set
  • Ping loop exits when stream is closed/aborted
  • No errors propagate from failed ping writes

Recommendations

  1. High Priority: Add error handling around the ping loop
  2. Medium Priority: Consider reusing SSE_PING_INTERVAL constant
  3. Low Priority: Document the relationship between #isRunnerStopped flag and #runnerStopped promise
  4. Optional: Add tests for the ping mechanism

Example Fix for Error Handling

// Send ping every second to keep the connection alive
while (true) {
    if (this.#isRunnerStopped) {
        logger().debug({ msg: "runner is stopped" });
        break;
    }

    if (stream.closed || stream.aborted) {
        logger().debug({
            msg: "runner sse stream closed",
            closed: stream.closed,
            aborted: stream.aborted,
        });
        break;
    }

    try {
        await stream.writeSSE({ event: "ping", data: "" });
    } catch (error) {
        logger().debug({ msg: "failed to write sse ping", error });
        break;
    }
    
    await stream.sleep(RUNNER_SSE_PING_INTERVAL);
}

Overall, this is a solid addition that addresses a real need for keepalive on SSE connections. The main concern is the lack of error handling, which should be addressed before merging.

@claude
Copy link

claude bot commented Nov 5, 2025

Code Review

This PR adds a ping mechanism to the SSE (Server-Sent Events) connection for the runner to keep it alive. Overall, the implementation is solid, but I have a few observations and suggestions:

Positive Aspects

  1. Clear intent: The ping mechanism addresses a common issue with long-lived SSE connections being closed by proxies/load balancers
  2. Reasonable ping interval: 1 second (1000ms) is appropriate for keeping connections alive
  3. Proper cleanup checking: The code checks both #isRunnerStopped and stream status before sending pings
  4. Structured logging: Follows the codebase pattern with structured debug logs

Potential Issues & Suggestions

1. Error Handling in the Ping Loop

The while (true) loop doesn't have error handling around stream.writeSSE(). If this throws an error, it could crash the entire SSE handler.

Suggestion: Wrap the ping write in a try-catch:

try {
    await stream.writeSSE({ event: "ping", data: "" });
} catch (err) {
    logger().debug({
        msg: "error sending ping, sse stream likely closed",
        error: err,
    });
    break;
}

2. Race Condition Risk

There's a potential race condition between checking stream.closed || stream.aborted and actually writing to the stream. If the stream closes between the check and the write, it could cause an unhandled error.

The try-catch suggestion above would also mitigate this issue.

3. Duplicate Comment

Line 623 has a comment about waiting for the runner to stop "if the SSE stream aborted early for any reason". This comment already existed before line 600, but now with the ping loop, the comment at line 623 is more relevant. Consider updating the comment to clarify why we still wait even after the ping loop exits.

Suggestion:

// Wait for the runner to fully stop. The ping loop may have exited early
// due to stream closure, but we need to ensure clean shutdown.
await this.#runnerStopped.promise;

4. Consider Making Ping Interval Configurable

The RUNNER_SSE_PING_INTERVAL is hardcoded to 1000ms. Depending on the deployment environment (different proxy timeout configurations), this might need to be adjusted. Consider making it configurable through RunnerConfig if not already.

5. Memory Leak Prevention

The ping loop keeps a reference to the stream. If #isRunnerStopped never becomes true and the stream never closes (edge case), this could be a slow memory leak. The current checks should prevent this, but worth keeping in mind.

Test Coverage

No test changes were included in this PR. Consider adding:

  • A test verifying ping messages are sent at the expected interval
  • A test ensuring the ping loop exits cleanly when the runner stops
  • A test ensuring the ping loop exits when the stream is closed/aborted

Minor Style Notes

  1. Line 604-607: The structured logging format is good, but consider if both closed and aborted flags are necessary for debugging, or if just logging "stream closed" with one flag is sufficient.

Security Considerations

No security concerns identified. The ping mechanism doesn't expose sensitive data.

Performance Considerations

The 1-second ping interval is reasonable and shouldn't cause performance issues. Each ping is minimal overhead (empty data payload).


Recommendation: The core implementation is good. I'd suggest adding error handling around the stream.writeSSE() call in the ping loop to make it more robust. The lack of tests is a minor concern but not blocking if this is consistent with the rest of the codebase's testing practices.

@NathanFlurry NathanFlurry force-pushed the 11-04-chore_rivetkit_add_runner_sse_ping_interval branch from 504772d to ff28677 Compare November 5, 2025 02:07
@NathanFlurry NathanFlurry force-pushed the 11-04-chore_rivetkit_log_close_code_when_client_ws_closes branch from 31223d5 to b41010e Compare November 5, 2025 02:07
@claude
Copy link

claude bot commented Nov 5, 2025

Code Review

I've reviewed this PR which adds SSE ping functionality to keep the runner connection alive. Overall, the implementation looks solid and follows existing patterns in the codebase. Here are my findings:

Positive Aspects ✅

  1. Consistent with existing patterns: The implementation mirrors the SSE ping pattern already used in router-endpoints.ts:416-428, which is good for maintainability.

  2. Proper loop exit conditions: The code correctly checks both #isRunnerStopped, stream.closed, and stream.aborted to exit the ping loop.

  3. Appropriate ping interval: Using 1000ms (1 second) matches the SSE_PING_INTERVAL constant used elsewhere in the codebase.

  4. Good logging: Debug logs provide visibility into why the ping loop exits.

Potential Issues & Recommendations 🔍

1. Race Condition Risk (Minor)

The #isRunnerStopped flag is set in the onShutdown callback (line 149), but there's no synchronization mechanism. While JavaScript's single-threaded nature makes this less critical, consider this edge case:

  • The ping loop checks #isRunnerStopped (false)
  • Before the next line executes, onShutdown fires and sets it to true
  • The ping continues and calls stream.writeSSE()

Recommendation: This is likely acceptable given JavaScript's execution model, but be aware of potential timing issues if the stream is closing simultaneously.

2. Error Handling (Moderate)

The while (true) loop doesn't have try-catch around stream.writeSSE(). If the write fails (e.g., connection already closed), it could throw an exception.

Looking at router-endpoints.ts:429-438, there's a try-catch wrapper around the entire ping loop. Consider adding similar error handling here:

```typescript
try {
while (true) {
// ... existing checks ...
await stream.writeSSE({ event: "ping", data: "" });
await stream.sleep(RUNNER_SSE_PING_INTERVAL);
}
} catch (error) {
logger().error({
msg: "error in runner sse ping loop",
error: stringifyError(error),
});
}
```

3. Constant Consistency (Minor)

Consider exporting RUNNER_SSE_PING_INTERVAL or reusing the existing SSE_PING_INTERVAL from router-endpoints.ts to maintain a single source of truth for the ping interval across the codebase.

4. Code Comment Clarity (Minor)

The comment on line 623 says "Wait for the runner to stop if the SSE stream aborted early for any reason". This is accurate but could be clearer about why this is necessary. Consider:

```typescript
// Ensure runner has fully stopped before closing the SSE stream.
// If the stream aborted early (e.g., network issue), we still need to wait
// for graceful shutdown to complete.
await this.#runnerStopped.promise;
```

Security Considerations 🔒

No security concerns identified. The ping mechanism doesn't expose sensitive data.

Performance Considerations ⚡

  1. Network overhead: 1-second pings are reasonable for connection keep-alive
  2. CPU usage: Minimal - just a sleep loop with simple checks
  3. Memory: No leaks detected - the loop exits properly when the stream closes

Test Coverage 🧪

Missing: There don't appear to be tests for this new functionality. Consider adding tests for:

  • SSE connection stays alive with pings
  • Ping loop exits when runner stops
  • Ping loop exits when stream closes/aborts
  • Graceful handling of write errors during ping

You could add tests in the driver test suite following patterns from other RivetKit tests.

Summary

This is a solid implementation that addresses a real need (keeping SSE connections alive). The main recommendation is to add error handling around the writeSSE() call to prevent potential unhandled exceptions. The other suggestions are minor improvements for robustness and clarity.

Approval Status: Approve with minor recommendations ✅


Review generated using Claude Code

@claude
Copy link

claude bot commented Nov 5, 2025

PR Review

Summary

This PR adds a ping interval mechanism to the engine actor driver's SSE (Server-Sent Events) stream to keep connections alive, similar to the existing pattern in router-endpoints.ts.

Positive Aspects

Consistent with existing patterns - The implementation mirrors the pattern already used in router-endpoints.ts:416-428, which is good for codebase consistency.

Proper ping interval - Uses the same 1000ms interval as the existing SSE_PING_INTERVAL constant.

Dual exit conditions - Checks both #isRunnerStopped flag and stream state (closed or aborted).

Issues & Recommendations

🔴 Critical: Missing Error Handling

The new ping loop lacks try-catch error handling. The reference implementation in router-endpoints.ts:370-443 wraps the entire SSE logic in a try-catch block:

try {
  // ... SSE ping loop ...
} catch (error) {
  loggerWithoutContext().error({
    msg: "error in sse connection",
    error,
  });
  // Cleanup on error
  stream.close();
}

Recommendation: Wrap the ping loop in a try-catch to handle potential errors from stream.writeSSE() or stream.sleep().


⚠️ Potential Race Condition

The #isRunnerStopped flag is set in the onShutdown callback at actor-driver.ts:149, but there's a potential race condition:

  1. The flag is checked at the start of each loop iteration
  2. await stream.sleep(RUNNER_SSE_PING_INTERVAL) introduces a 1-second delay
  3. During this delay, the runner could stop, but the loop won't check until after the sleep completes

Recommendation: Consider checking #isRunnerStopped after the sleep as well, or use an abortable sleep pattern.


📝 Code Quality: Inconsistent Constant Naming

The constant is named RUNNER_SSE_PING_INTERVAL in this file, but the same value in router-endpoints.ts is SSE_PING_INTERVAL.

Recommendation: Consider either:

  1. Extracting this to a shared constant
  2. Using a consistent naming convention (e.g., both files use SSE_PING_INTERVAL)

🤔 Minor: Structured Logging Inconsistency

Per CLAUDE.md guidelines:

Use tracing for logging. Do not format parameters into the main message, instead use tracing's structured logging.

The logging follows this correctly, but the messages could be more descriptive:

logger().debug({
  msg: "runner is stopped",
});

Could be:

logger().debug({
  msg: "exiting sse ping loop, runner is stopped",
});

Testing Consideration

No tests were added for this functionality. Consider adding a test that:

  1. Verifies ping events are sent at the correct interval
  2. Confirms the loop exits when the runner stops
  3. Ensures the loop exits when the stream closes/aborts

Security Considerations

✅ No security concerns identified - this is a keep-alive mechanism with no user input.

Performance Considerations

✅ The 1-second ping interval is reasonable and matches existing patterns.
✅ Minimal overhead from the simple loop.


Suggested Fixes

// Send ping every second to keep the connection alive
try {
  while (true) {
    if (this.#isRunnerStopped) {
      logger().debug({
        msg: "exiting sse ping loop, runner is stopped",
      });
      break;
    }

    if (stream.closed || stream.aborted) {
      logger().debug({
        msg: "exiting sse ping loop, stream closed",
        closed: stream.closed,
        aborted: stream.aborted,
      });
      break;
    }

    await stream.writeSSE({ event: "ping", data: "" });
    await stream.sleep(RUNNER_SSE_PING_INTERVAL);
  }
} catch (error) {
  logger().error({
    msg: "error in runner sse ping loop",
    error,
  });
  // Stream will be cleaned up when streamSSE callback returns
}

Verdict

Approve with recommendations - The core functionality is sound, but error handling should be added before merging. The other suggestions are non-blocking improvements.

@NathanFlurry NathanFlurry force-pushed the 11-04-chore_rivetkit_log_close_code_when_client_ws_closes branch from b41010e to ed48aff Compare November 5, 2025 02:18
@NathanFlurry NathanFlurry force-pushed the 11-04-chore_rivetkit_add_runner_sse_ping_interval branch from ff28677 to 691501d Compare November 5, 2025 02:18
@claude
Copy link

claude bot commented Nov 5, 2025

Code Review

Summary

This PR adds a ping mechanism to keep the SSE (Server-Sent Events) connection alive between the runner and the engine. The implementation sends periodic ping events every second to detect connection closures, particularly important for Cloudflare Workers environments.

Positive Aspects ✅

  1. Consistent with existing patterns: The implementation mirrors the existing SSE ping pattern in router-endpoints.ts:413-428, which is good for maintainability
  2. Proper shutdown handling: The addition of #isRunnerStopped flag provides a clean way to break out of the ping loop when the runner shuts down
  3. Good logging: Debug logs help troubleshoot connection issues
  4. Addresses a real issue: SSE connections need keep-alive pings, especially on platforms like Cloudflare Workers

Issues & Recommendations 🔍

1. Potential Race Condition (Medium Priority)

The #isRunnerStopped flag is set in the onShutdown callback at line 149, but there's a potential race condition:

  • If shutdownRunner() is called after the SSE stream closes but before the ping loop checks #isRunnerStopped
  • The flag might not be visible immediately across different async contexts

Recommendation: Consider using an AbortController pattern instead, which is more robust for async cancellation.

2. Missing Constant Export (Low Priority)

The constant RUNNER_SSE_PING_INTERVAL is defined locally at line 57, while the similar SSE_PING_INTERVAL in router-endpoints.ts:50 is exported. This inconsistency could make testing harder.

Recommendation: Consider exporting RUNNER_SSE_PING_INTERVAL for consistency and testability.

3. No Test Coverage (Medium Priority)

This critical connection management logic lacks test coverage. The ping mechanism and shutdown behavior should be tested.

Recommendation: Add tests to verify:

  • Ping events are sent at the correct interval
  • Loop exits when runner stops
  • Loop exits when stream closes/aborts
  • No memory leaks from the infinite loop

4. Error Handling in Ping Loop (Low Priority)

If stream.writeSSE() throws an error, the loop will crash without cleanup.

Recommendation: Wrap the ping write in a try-catch:

try {
    await stream.writeSSE({ event: "ping", data: "" });
} catch (error) {
    logger().debug({ msg: "ping write failed", error });
    break;
}

5. Comment Clarity (Minor)

Line 623 says "Wait for the runner to stop if the SSE stream aborted early for any reason" - but this always waits, not just when aborting early.

Recommendation: Update comment to: "Ensure runner cleanup completes before closing SSE handler"

Performance Considerations

  • The 1-second ping interval matches the existing pattern, which is reasonable
  • The busy loop with await stream.sleep() is appropriate for this use case
  • Memory impact is minimal (just one boolean flag added)

Security Considerations

No security concerns identified. The ping mechanism doesn't expose sensitive data and follows secure SSE patterns.

Conclusion

This is a solid addition that addresses a real need. The main concerns are around test coverage and the potential race condition with #isRunnerStopped. Consider the recommendations above to make the implementation more robust.

Overall: ✅ Approve with suggestions

@graphite-app graphite-app bot force-pushed the 11-04-chore_rivetkit_log_close_code_when_client_ws_closes branch from ed48aff to a0b95ae Compare November 5, 2025 02:34
@graphite-app graphite-app bot force-pushed the 11-04-chore_rivetkit_add_runner_sse_ping_interval branch from 691501d to e9b6bde Compare November 5, 2025 02:35
@claude
Copy link

claude bot commented Nov 5, 2025

Code Review for PR #3362

Summary

This PR adds a ping mechanism to keep the SSE (Server-Sent Events) connection alive for serverless runners by sending a ping event every second.


Positive Aspects

  1. Good solution for connection keepalive - Adding periodic pings is a standard and effective way to keep SSE connections alive
  2. Reasonable ping interval - 1 second (1000ms) is a good default that balances keepalive effectiveness with network overhead
  3. Multiple exit conditions - The while loop checks both runner shutdown state and stream state before continuing
  4. Good logging - Debug logs provide visibility into why the loop exits

🔍 Potential Issues & Concerns

1. Race Condition with #isRunnerStopped flag

The #isRunnerStopped boolean flag is not thread-safe/concurrency-safe. In JavaScript/TypeScript, while you don't have true multi-threading, you do have async concurrency concerns:

  • The flag is set in the onShutdown callback (line 156)
  • It's checked in the ping loop (line 661)
  • These could execute in different microtask ticks, creating a race window

Recommendation: Since you already have this.#runnerStopped.promise, consider using that instead:

// Option 1: Use Promise.race
while (true) {
    const result = await Promise.race([
        stream.sleep(RUNNER_SSE_PING_INTERVAL).then(() => 'continue'),
        this.#runnerStopped.promise.then(() => 'stop')
    ]);
    
    if (result === 'stop') {
        logger().debug({ msg: "runner is stopped" });
        break;
    }
    
    if (stream.closed || stream.aborted) {
        logger().debug({
            msg: "runner sse stream closed",
            closed: stream.closed,
            aborted: stream.aborted,
        });
        break;
    }
    
    await stream.writeSSE({ event: "ping", data: "" });
}

2. Error Handling Missing

The stream.writeSSE() call could potentially throw errors (network issues, serialization errors, etc.), but there's no try-catch block.

Recommendation:

try {
    await stream.writeSSE({ event: "ping", data: "" });
} catch (err) {
    logger().warn({
        msg: "failed to send ping",
        error: err,
    });
    break; // Exit loop if we can't send pings
}

3. Redundant Final Wait

After the ping loop exits (either due to shutdown or stream closure), the code waits for this.#runnerStopped.promise again (line 682). This is somewhat redundant:

  • If the loop exited due to #isRunnerStopped, the promise is already resolved
  • If the loop exited due to stream closure, this blocks waiting for shutdown

The comment on line 681 explains this is intentional ("Wait for the runner to stop if the SSE stream aborted early"), but consider:

  • Is it correct to block here if the client disconnected?
  • Should there be a timeout to prevent indefinite blocking?

Question for consideration: What happens if the SSE stream closes but the runner doesn't stop? Does this create a resource leak or blocking operation?

4. Check Order Efficiency

The stream.closed || stream.aborted check happens after checking #isRunnerStopped and after the sleep. Consider checking stream state before the sleep to avoid unnecessary delays when the stream is already closed.


🧪 Test Coverage

There don't appear to be any new tests for this functionality. Consider adding tests for:

  • Ping events are sent at the correct interval
  • Loop exits cleanly when runner stops
  • Loop exits cleanly when stream closes/aborts
  • Error handling when writeSSE fails

🎯 Minor Suggestions

  1. Make ping interval configurable: Consider making RUNNER_SSE_PING_INTERVAL configurable via environment variable or config for different deployment scenarios

  2. Add metrics/telemetry: Consider tracking:

    • Number of pings sent per connection
    • Connection duration
    • Reason for connection termination
  3. Logging format consistency: The new logs use structured format (object with msg field) which is good, but verify this matches the project's logging conventions


📝 Overall Assessment

Code Quality: ⭐⭐⭐⭐ (4/5) - Well-structured with good intentions, but has some concurrency and error handling concerns

Recommendation: Request changes to address the race condition and error handling issues before merging.

@claude
Copy link

claude bot commented Nov 5, 2025

Code Review

Summary

This PR adds a ping mechanism to the SSE (Server-Sent Events) stream in the serverless runner to keep the connection alive. The implementation sends a ping event every second while the runner is active.

Positive Aspects

  • Good intent: Keeping SSE connections alive with periodic pings is a standard practice to prevent timeout issues
  • Proper lifecycle tracking: The addition of #isRunnerStopped flag provides explicit state tracking
  • Defensive checks: The while loop correctly checks both runner state and stream state before sending pings
  • Structured logging: Follows the project's logging conventions with structured fields

Issues & Concerns

1. Potential Resource Leak / Infinite Loop ⚠️

The ping loop starts immediately after sending the initial payload, but there's a potential issue:

await stream.writeSSE({ data: payload });

// Send ping every second to keep the connection alive
while (true) {
    // ... checks ...
    await stream.writeSSE({ event: "ping", data: "" });
    await stream.sleep(RUNNER_SSE_PING_INTERVAL);
}

await this.#runnerStopped.promise;

Problem: If both exit conditions fail (e.g., #isRunnerStopped is false and stream appears open but is in an unexpected state), this becomes a busy loop. While unlikely, there's no timeout or max iteration safeguard.

Recommendation: Add a maximum iteration count or overall timeout as a safety mechanism.

2. Error Handling Missing ⚠️

The stream.writeSSE() call inside the loop has no error handling. If writing to the stream fails (network issues, client disconnect, etc.), the error could propagate unexpectedly.

Recommendation: Wrap the ping write in a try-catch and break the loop on error:

try {
    await stream.writeSSE({ event: "ping", data: "" });
} catch (error) {
    logger().debug({ msg: "failed to send ping", error });
    break;
}

3. Race Condition Potential ⚠️

The #isRunnerStopped flag is set in the onShutdown callback (line 156), but there's no synchronization mechanism. While JavaScript is single-threaded, async operations could create edge cases where:

  • onShutdown is called
  • The flag is set to true
  • The ping loop check happens between iterations
  • But the check occurs before the flag update is visible

Recommendation: While this is likely not an issue in practice with JavaScript's event loop, consider using the existing promise-based mechanism (#runnerStopped.promise) with Promise.race() instead of polling a boolean flag.

4. Timing Precision

The constant RUNNER_SSE_PING_INTERVAL = 1000 (1 second) might be too aggressive for some deployments, or too slow for others.

Recommendation: Consider making this configurable through the runner config, with 1000ms as a reasonable default.

5. Test Coverage ⚠️

No tests were added to verify:

  • Pings are actually sent at the expected interval
  • The loop properly exits when the runner stops
  • The loop properly exits when the stream closes
  • Error handling works correctly

Recommendation: Add tests in tests/driver-engine.test.ts to cover these scenarios.

Suggested Implementation Pattern

Consider using Promise.race() for cleaner cancellation:

const pingInterval = setInterval(async () => {
    try {
        if (!stream.closed && !stream.aborted) {
            await stream.writeSSE({ event: "ping", data: "" });
        }
    } catch (error) {
        logger().debug({ msg: "failed to send ping", error });
        clearInterval(pingInterval);
    }
}, RUNNER_SSE_PING_INTERVAL);

try {
    await this.#runnerStopped.promise;
} finally {
    clearInterval(pingInterval);
}

This approach:

  • Avoids the while loop and manual state checking
  • Properly cleans up the interval when done
  • Handles errors gracefully
  • Works naturally with the existing promise-based lifecycle

Minor Notes

  • ✅ Logging follows structured format correctly
  • ✅ Constant naming follows convention
  • ✅ TypeScript types are implicit but correct

Verdict

Request Changes - The implementation needs error handling and the infinite loop concern should be addressed before merging. The suggested setInterval pattern would be more idiomatic and safer.

@claude
Copy link

claude bot commented Nov 5, 2025

Code Review

I've reviewed this PR which adds SSE ping functionality to keep the runner connection alive. Overall, the implementation is solid and follows existing patterns in the codebase. Here are my detailed findings:

✅ Positive Aspects

  1. Consistent with existing patterns: The implementation mirrors the SSE ping pattern already used in router-endpoints.ts:416-428, which is excellent for maintainability
  2. Appropriate ping interval: Uses 1000ms which matches the SSE_PING_INTERVAL constant used elsewhere
  3. Proper exit conditions: Correctly checks both #isRunnerStopped, stream.closed, and stream.aborted to exit the ping loop
  4. Good logging: Debug logs provide visibility into why the ping loop exits
  5. Addresses a real need: SSE connections need keepalive pings, especially on platforms like Cloudflare Workers

🔍 Issues & Recommendations

1. Missing Error Handling (High Priority)

The new ping loop lacks try-catch error handling. The reference implementation in router-endpoints.ts:429-442 wraps the entire SSE logic in a try-catch block to handle potential errors.

Current code:

while (true) {
  // ... checks ...
  await stream.writeSSE({ event: "ping", data: "" });
  await stream.sleep(RUNNER_SSE_PING_INTERVAL);
}

Recommendation: Add error handling to prevent unhandled exceptions if stream.writeSSE() fails:

try {
  while (true) {
    if (this.#isRunnerStopped) {
      logger().debug({ msg: "runner is stopped" });
      break;
    }

    if (stream.closed || stream.aborted) {
      logger().debug({
        msg: "runner sse stream closed",
        closed: stream.closed,
        aborted: stream.aborted,
      });
      break;
    }

    await stream.writeSSE({ event: "ping", data: "" });
    await stream.sleep(RUNNER_SSE_PING_INTERVAL);
  }
} catch (error) {
  logger().error({
    msg: "error in runner sse ping loop",
    error: stringifyError(error),
  });
  // Stream cleanup happens when streamSSE callback returns
}

2. Consider Reusing Existing Constant (Low Priority)

There's already a SSE_PING_INTERVAL constant exported from router-endpoints.ts:50 with the same value (1000). Consider importing and reusing it instead of defining RUNNER_SSE_PING_INTERVAL to maintain a single source of truth:

import { SSE_PING_INTERVAL } from "@/actor/router-endpoints";

This would ensure consistency if the ping interval ever needs to change globally.

3. Test Coverage (Medium Priority)

No tests were added for this functionality. Consider adding tests for:

  • Ping events are sent at the correct interval
  • Ping loop exits when runner stops (#isRunnerStopped becomes true)
  • Ping loop exits when stream closes/aborts
  • Graceful error handling when writeSSE fails

4. Minor: Potential Race Condition (Low Priority)

The #isRunnerStopped flag is set in the onShutdown callback at actor-driver.ts:156. While JavaScript's single-threaded execution model makes this mostly safe, there's a theoretical edge case where:

  1. The ping loop checks #isRunnerStopped (false)
  2. Before writeSSE executes, onShutdown fires and sets it to true
  3. One extra ping gets sent after shutdown begins

Impact: Very low - the stream checks should catch this anyway, and one extra ping is harmless. The error handling suggestion above would also mitigate this.

🔒 Security Considerations

No security concerns identified. The ping mechanism doesn't expose sensitive data.

⚡ Performance Considerations

  • Network overhead: 1-second pings are reasonable for connection keepalive
  • CPU usage: Minimal - just a sleep loop with simple checks
  • Memory: No leaks detected - the loop exits properly when conditions are met

Summary

This is a solid implementation that addresses a real need (keeping SSE connections alive). The main recommendation is to add error handling around the ping loop to prevent potential unhandled exceptions. The other suggestions are minor improvements for code quality and consistency.

Status: ✅ Approve with recommendations

The error handling is the only critical item - the rest are nice-to-haves that can be addressed now or in a follow-up PR.

@graphite-app
Copy link
Contributor

graphite-app bot commented Nov 5, 2025

Merge activity

  • Nov 5, 5:07 AM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Nov 5, 5:08 AM UTC: CI is running for this pull request on a draft pull request (#3371) due to your merge queue CI optimization settings.
  • Nov 5, 5:11 AM UTC: Merged by the Graphite merge queue via draft PR: #3371.

@graphite-app graphite-app bot closed this Nov 5, 2025
@graphite-app graphite-app bot deleted the 11-04-chore_rivetkit_add_runner_sse_ping_interval branch November 5, 2025 05:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants