-
Notifications
You must be signed in to change notification settings - Fork 51
feat: Add real-time feature flags support via SSE #389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 files reviewed, 4 comments
- Import threading module at top of client.py - Add threading.Lock (_sse_lock) for sse_connected flag - Wrap all sse_connected reads/writes with lock - Store SSE response and call close() for graceful shutdown - Ensure thread-safe cleanup in _close_sse_connection() 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Protects feature_flags and feature_flags_by_key with _flags_lock: - Added threading.Lock() for all flag data structure access - Protected reads in get_feature_flag and _compute_payload_locally - Protected writes in _update_flag_state and _process_flag_update - Protected reads in _load_feature_flags and _fetch_feature_flags_from_api - Copies flag dict before holding lock during computation Prevents race conditions between: - Background SSE thread modifying flags - Main thread reading flags for evaluation - Poller thread updating flags from API 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Set realtime_flags to False in _close_sse_connection() to stop the SSE listener thread from attempting reconnection after the client has been shut down. Without this, the check on line 2304 would still pass even after shutdown since realtime_flags remained True. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Remove unused json and Thread imports flagged by ruff linter. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
- Add type annotation for feature_flags_by_key: Optional[dict[str, dict]] - Add type annotation for sse_response: Optional[Any] - Add type annotation for old_flags_copy: dict[str, dict] - Refactor _compute_payload_locally to avoid unreachable code - Add assertion after None check in _process_flag_update - Move json import inside sse_listener function scope - Remove unreachable local imports (threading was already at top) All mypy errors in client.py are now resolved. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Add type: ignore[import-untyped] comment for the local requests import in _setup_sse_connection to suppress mypy warning about missing stubs. Update mypy-baseline.txt to reflect the 3 fixed violations. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces real-time feature flag updates via Server-Sent Events (SSE), enabling the PostHog SDK to receive flag changes instantly without polling. When the realtime_flags configuration option is enabled (default: false), the client establishes a persistent SSE connection to stream flag updates in real-time.
Key changes:
- Added
realtime_flagsandon_feature_flags_updateconfiguration options to the Client class - Implemented SSE connection management with automatic reconnection on failures (5-second delay)
- Added thread-safe flag update processing with locks to protect concurrent access to flag data structures
- Integrated SSE connection lifecycle with client shutdown for proper cleanup
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 11 comments.
| File | Description |
|---|---|
| posthog/client.py | Core implementation: adds threading support, SSE connection management (_setup_sse_connection, _close_sse_connection), flag update processing (_process_flag_update), thread-safe flag access with locks, and integration with client lifecycle (initialization and shutdown) |
| posthog/test/test_realtime_feature_flags.py | Comprehensive test suite covering SSE connection setup, flag updates/deletions/additions, callback functionality, error handling, and cleanup behavior |
| realtime_flags_example.py | Well-documented example demonstrating real-time flag usage with callbacks and proper shutdown handling |
| mypy-baseline.txt | Removes 3 resolved type errors related to dict assignment and unreachable code statements |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| with self._sse_lock: | ||
| if self.sse_connected: | ||
| self.log.debug("[FEATURE FLAGS] SSE connection already established") | ||
| return | ||
| self.sse_connected = True |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sse_connected flag is set to True at line 2266 before the SSE connection is actually established. If an exception occurs during thread creation or if the SSE listener thread fails immediately (e.g., ImportError at line 2345 or any other exception at line 2349), the flag remains True even though no connection exists.
This could prevent future reconnection attempts since _setup_sse_connection checks this flag at line 2263 and returns early if it's already true. The flag should only be set to True after the connection is successfully established (e.g., after receiving a 200 status code in the listener thread).
| self.log.warning( | ||
| "[FEATURE FLAGS] requests library required for real-time flags" | ||
| ) | ||
| except Exception as e: | ||
| self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}") | ||
|
|
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an ImportError or other exception occurs during SSE setup (lines 2345-2350), the sse_connected flag is never reset to False. The flag was set to True at line 2266 before the connection attempt, but if the thread creation or initialization fails with an exception, the flag remains True even though no connection was established.
This prevents future reconnection attempts because _setup_sse_connection checks sse_connected at line 2263 and returns early if it's already True. Consider resetting sse_connected to False in the exception handlers at lines 2345-2350.
| self.log.warning( | |
| "[FEATURE FLAGS] requests library required for real-time flags" | |
| ) | |
| except Exception as e: | |
| self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}") | |
| with self._sse_lock: | |
| self.sse_connected = False | |
| self.log.warning( | |
| "[FEATURE FLAGS] requests library required for real-time flags" | |
| ) | |
| except Exception as e: | |
| with self._sse_lock: | |
| self.sse_connected = False | |
| self.log.exception(f"[FEATURE FLAGS] Failed to setup SSE connection: {e}") |
| response = requests.get( | ||
| url, headers=headers, stream=True, timeout=None | ||
| ) | ||
| self.sse_response = response |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assignment to self.sse_response at line 2286 is not thread-safe. The sse_response attribute is accessed and modified from multiple threads (the SSE listener thread and the main thread during shutdown) without synchronization. This could lead to race conditions where _close_sse_connection tries to close a response that is being assigned or is in an inconsistent state.
Consider protecting access to sse_response with the _sse_lock or a separate lock to ensure thread-safe access.
| if self.sse_response: | ||
| try: | ||
| self.sse_response.close() | ||
| except Exception: | ||
| pass | ||
| self.sse_response = None |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assignment to self.sse_response = None in the finally block is not thread-safe. This could create a race condition where _close_sse_connection attempts to close sse_response while it's being set to None here, potentially causing an AttributeError or missing the close operation.
Consider protecting this assignment with the _sse_lock to ensure thread-safe cleanup.
| if self.sse_response: | |
| try: | |
| self.sse_response.close() | |
| except Exception: | |
| pass | |
| self.sse_response = None | |
| with self._sse_lock: | |
| if self.sse_response: | |
| try: | |
| self.sse_response.close() | |
| except Exception: | |
| pass | |
| self.sse_response = None |
| if self.feature_flags_by_key is None: | ||
| self.feature_flags_by_key = {} | ||
|
|
||
| if self.feature_flags is None: | ||
| self.feature_flags = [] | ||
|
|
||
| # Update the lookup table | ||
| # mypy doesn't track that the setter ensures feature_flags_by_key is a dict | ||
| assert self.feature_flags_by_key is not None | ||
| self.feature_flags_by_key[flag_key] = flag_data | ||
|
|
||
| # Update or add to the array | ||
| flag_exists = False | ||
| for i, f in enumerate(self.feature_flags): | ||
| if f.get("key") == flag_key: | ||
| self.feature_flags[i] = flag_data | ||
| flag_exists = True | ||
| break | ||
|
|
||
| if not flag_exists: | ||
| self.feature_flags.append(flag_data) |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting self.feature_flags_by_key and self.feature_flags directly bypasses the feature_flags setter, which means they can become inconsistent. The setter at lines 367-380 ensures that feature_flags_by_key is properly synchronized with feature_flags, but this direct manipulation breaks that invariant.
If self.feature_flags is modified directly (line 2418-2419, 2428-2435), the setter's synchronization logic won't run. This could lead to inconsistencies where a flag exists in one data structure but not the other, or where the dictionary and list representations don't match.
Consider either: (1) using the setter by assigning to self.feature_flags after making all modifications, or (2) ensuring both data structures are kept in sync manually when bypassing the setter.
| del self.feature_flags_by_key[flag_key] | ||
|
|
||
| # Also remove from the array | ||
| if self.feature_flags: | ||
| self.feature_flags = [ | ||
| f for f in self.feature_flags if f.get("key") != flag_key | ||
| ] |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flag deletion logic has a consistency issue. First, it deletes the flag from feature_flags_by_key at line 2397, then it assigns a filtered list to self.feature_flags at line 2401-2403. However, the feature_flags setter (lines 367-380) rebuilds feature_flags_by_key from scratch based on feature_flags, which would overwrite the manual deletion at line 2397.
The order of operations is:
- Delete from
feature_flags_by_key(line 2397) - Assign to
feature_flagsproperty (line 2401), which triggers the setter - The setter rebuilds
feature_flags_by_keyfrom the filteredfeature_flagslist
This means the manual deletion at line 2397 is unnecessary and could be removed since the setter will rebuild the dictionary correctly. Alternatively, if the intent is to avoid using the setter, then both data structures should be updated manually without triggering the setter.
| f"[FEATURE FLAGS] SSE connection failed with status {response.status_code}" | ||
| ) | ||
| with self._sse_lock: | ||
| self.sse_connected = False |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the SSE connection returns a non-200 status code, the response object is not properly closed before returning. The response is assigned at line 2286 but only the sse_connected flag is updated before the early return at line 2294. This could lead to a resource leak as the HTTP connection remains open.
Consider closing the response before returning, or ensure it's closed in the finally block which currently only checks self.sse_response after it's been set.
| self.sse_connected = False | |
| self.sse_connected = False | |
| response.close() |
| @@ -0,0 +1,415 @@ | |||
| import time | |||
| import unittest | |||
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Module 'unittest' is imported with both 'import' and 'import from'.
| except Exception: | ||
| pass |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| except Exception: | |
| pass | |
| except Exception as e: | |
| self.log.warning(f"[FEATURE FLAGS] Exception while closing SSE response: {e}") |
| except Exception: | ||
| pass |
Copilot
AI
Dec 15, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'except' clause does nothing but pass and there is no explanatory comment.
| except Exception: | |
| pass | |
| except Exception as e: | |
| self.log.exception(f"[FEATURE FLAGS] Failed to close SSE response: {e}") |
Introduce a new
realtime_flagsconfiguration option that enables real-time feature flag updates through Server-Sent Events (SSE). When enabled, the SDK establishes a persistent connection to receive flag updates without polling.Key changes:
realtime_flagsconfig option (default: false)