From 4e449090ace73f01191969840aa1ab6ccd96ccfc Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Wed, 22 Oct 2025 15:35:13 -0400 Subject: [PATCH] chore: Add persistence store support for FDv2 --- ldclient/config.py | 13 +- ldclient/impl/datasourcev2/status.py | 54 +- ldclient/impl/datasystem/config.py | 48 +- ldclient/impl/datasystem/fdv2.py | 209 +++++-- ldclient/impl/datasystem/store.py | 3 +- ldclient/interfaces.py | 44 +- .../testing/impl/datasystem/test_config.py | 19 - .../impl/datasystem/test_fdv2_persistence.py | 524 ++++++++++++++++++ 8 files changed, 820 insertions(+), 94 deletions(-) create mode 100644 ldclient/testing/impl/datasystem/test_fdv2_persistence.py diff --git a/ldclient/config.py b/ldclient/config.py index af5e62b7..01e12fec 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -19,6 +19,7 @@ from ldclient.interfaces import ( BigSegmentStore, DataSourceUpdateSink, + DataStoreMode, EventProcessor, FeatureStore, UpdateProcessor @@ -161,19 +162,23 @@ def disable_ssl_verification(self) -> bool: @dataclass(frozen=True) class DataSystemConfig: - """ - Configuration for LaunchDarkly's data acquisition strategy. - """ + """Configuration for LaunchDarkly's data acquisition strategy.""" initializers: Optional[List[Builder[Initializer]]] """The initializers for the data system.""" - primary_synchronizer: Builder[Synchronizer] + primary_synchronizer: Optional[Builder[Synchronizer]] """The primary synchronizer for the data system.""" secondary_synchronizer: Optional[Builder[Synchronizer]] = None """The secondary synchronizers for the data system.""" + data_store_mode: DataStoreMode = DataStoreMode.READ_WRITE + """The data store mode specifies the mode in which the persistent store will operate, if present.""" + + data_store: Optional[FeatureStore] = None + """The (optional) persistent data store instance.""" + # TODO(fdv2): Implement this synchronizer up and hook it up everywhere. # TODO(fdv2): Remove this when FDv2 is fully launched fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None diff --git a/ldclient/impl/datasourcev2/status.py b/ldclient/impl/datasourcev2/status.py index ca384415..3f417f34 100644 --- a/ldclient/impl/datasourcev2/status.py +++ b/ldclient/impl/datasourcev2/status.py @@ -1,13 +1,18 @@ import time +from copy import copy from typing import Callable, Optional +from ldclient.impl.datasystem.store import Store from ldclient.impl.listeners import Listeners from ldclient.impl.rwlock import ReadWriteLock from ldclient.interfaces import ( DataSourceErrorInfo, DataSourceState, DataSourceStatus, - DataSourceStatusProvider + DataSourceStatusProvider, + DataStoreStatus, + DataStoreStatusProvider, + FeatureStore ) @@ -55,3 +60,50 @@ def add_listener(self, listener: Callable[[DataSourceStatus], None]): def remove_listener(self, listener: Callable[[DataSourceStatus], None]): self.__listeners.remove(listener) + + +class DataStoreStatusProviderImpl(DataStoreStatusProvider): + def __init__(self, store: Optional[FeatureStore], listeners: Listeners): + self.__store = store + self.__listeners = listeners + + self.__lock = ReadWriteLock() + self.__status = DataStoreStatus(True, False) + + def update_status(self, status: DataStoreStatus): + """ + update_status is called from the data store to push a status update. + """ + self.__lock.lock() + modified = False + + if self.__status != status: + self.__status = status + modified = True + + self.__lock.unlock() + + if modified: + self.__listeners.notify(status) + + @property + def status(self) -> DataStoreStatus: + self.__lock.rlock() + status = copy(self.__status) + self.__lock.runlock() + + return status + + def is_monitoring_enabled(self) -> bool: + if self.__store is None: + return False + if hasattr(self.__store, "is_monitoring_enabled") is False: + return False + + return self.__store.is_monitoring_enabled() # type: ignore + + def add_listener(self, listener: Callable[[DataStoreStatus], None]): + self.__listeners.add(listener) + + def remove_listener(self, listener: Callable[[DataStoreStatus], None]): + self.__listeners.remove(listener) diff --git a/ldclient/impl/datasystem/config.py b/ldclient/impl/datasystem/config.py index e9c42efd..d2755865 100644 --- a/ldclient/impl/datasystem/config.py +++ b/ldclient/impl/datasystem/config.py @@ -16,6 +16,7 @@ StreamingDataSourceBuilder ) from ldclient.impl.datasystem import Initializer, Synchronizer +from ldclient.interfaces import DataStoreMode, FeatureStore T = TypeVar("T") @@ -30,6 +31,8 @@ class ConfigBuilder: # pylint: disable=too-few-public-methods _initializers: Optional[List[Builder[Initializer]]] = None _primary_synchronizer: Optional[Builder[Synchronizer]] = None _secondary_synchronizer: Optional[Builder[Synchronizer]] = None + _store_mode: DataStoreMode = DataStoreMode.READ_ONLY + _data_store: Optional[FeatureStore] = None def initializers(self, initializers: Optional[List[Builder[Initializer]]]) -> "ConfigBuilder": """ @@ -50,17 +53,27 @@ def synchronizers( self._secondary_synchronizer = secondary return self + def data_store(self, data_store: FeatureStore, store_mode: DataStoreMode) -> "ConfigBuilder": + """ + Sets the data store configuration for the data system. + """ + self._data_store = data_store + self._store_mode = store_mode + return self + def build(self) -> DataSystemConfig: """ Builds the data system configuration. """ - if self._primary_synchronizer is None: - raise ValueError("Primary synchronizer must be set") + if self._secondary_synchronizer is not None and self._primary_synchronizer is None: + raise ValueError("Primary synchronizer must be set if secondary is set") return DataSystemConfig( initializers=self._initializers, primary_synchronizer=self._primary_synchronizer, secondary_synchronizer=self._secondary_synchronizer, + data_store_mode=self._store_mode, + data_store=self._data_store, ) @@ -147,18 +160,29 @@ def custom() -> ConfigBuilder: return ConfigBuilder() -# TODO(fdv2): Implement these methods -# -# Daemon configures the SDK to read from a persistent store integration -# that is populated by Relay Proxy or other SDKs. The SDK will not connect -# to LaunchDarkly. In this mode, the SDK never writes to the data store. +# TODO(fdv2): Need to update these so they don't rely on the LDConfig +def daemon(config: LDConfig, store: FeatureStore) -> ConfigBuilder: + """ + Daemon configures the SDK to read from a persistent store integration + that is populated by Relay Proxy or other SDKs. The SDK will not connect + to LaunchDarkly. In this mode, the SDK never writes to the data store. + """ + return default(config).data_store(store, DataStoreMode.READ_ONLY) + -# PersistentStore is similar to Default, with the addition of a persistent -# store integration. Before data has arrived from LaunchDarkly, the SDK is -# able to evaluate flags using data from the persistent store. Once fresh -# data is available, the SDK will no longer read from the persistent store, -# although it will keep it up-to-date. +def persistent_store(config: LDConfig, store: FeatureStore) -> ConfigBuilder: + """ + PersistentStore is similar to Default, with the addition of a persistent + store integration. Before data has arrived from LaunchDarkly, the SDK is + able to evaluate flags using data from the persistent store. Once fresh + data is available, the SDK will no longer read from the persistent store, + although it will keep it up-to-date. + """ + return default(config).data_store(store, DataStoreMode.READ_WRITE) + +# TODO(fdv2): Implement these methods +# # WithEndpoints configures the data system with custom endpoints for # LaunchDarkly's streaming and polling synchronizers. This method is not # necessary for most use-cases, but can be useful for testing or custom diff --git a/ldclient/impl/datasystem/fdv2.py b/ldclient/impl/datasystem/fdv2.py index cfb61750..3106074f 100644 --- a/ldclient/impl/datasystem/fdv2.py +++ b/ldclient/impl/datasystem/fdv2.py @@ -1,22 +1,147 @@ +import logging import time from threading import Event, Thread -from typing import Callable, List, Optional +from typing import Any, Callable, Dict, List, Mapping, Optional from ldclient.config import Builder, DataSystemConfig -from ldclient.impl.datasourcev2.status import DataSourceStatusProviderImpl +from ldclient.feature_store import _FeatureStoreDataSetSorter +from ldclient.impl.datasourcev2.status import ( + DataSourceStatusProviderImpl, + DataStoreStatusProviderImpl +) from ldclient.impl.datasystem import DataAvailability, Synchronizer from ldclient.impl.datasystem.store import Store from ldclient.impl.flag_tracker import FlagTrackerImpl from ldclient.impl.listeners import Listeners -from ldclient.impl.util import _Fail +from ldclient.impl.repeating_task import RepeatingTask +from ldclient.impl.rwlock import ReadWriteLock +from ldclient.impl.util import _Fail, log from ldclient.interfaces import ( DataSourceState, DataSourceStatus, DataSourceStatusProvider, + DataStoreMode, + DataStoreStatus, DataStoreStatusProvider, FeatureStore, FlagTracker ) +from ldclient.versioned_data_kind import VersionedDataKind + + +class FeatureStoreClientWrapper(FeatureStore): + """Provides additional behavior that the client requires before or after feature store operations. + Currently this just means sorting the data set for init() and dealing with data store status listeners. + """ + + def __init__(self, store: FeatureStore, store_update_sink: DataStoreStatusProviderImpl): + self.store = store + self.__store_update_sink = store_update_sink + self.__monitoring_enabled = self.is_monitoring_enabled() + + # Covers the following variables + self.__lock = ReadWriteLock() + self.__last_available = True + self.__poller: Optional[RepeatingTask] = None + + def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]): + return self.__wrapper(lambda: self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data))) + + def get(self, kind, key, callback): + return self.__wrapper(lambda: self.store.get(kind, key, callback)) + + def all(self, kind, callback): + return self.__wrapper(lambda: self.store.all(kind, callback)) + + def delete(self, kind, key, version): + return self.__wrapper(lambda: self.store.delete(kind, key, version)) + + def upsert(self, kind, item): + return self.__wrapper(lambda: self.store.upsert(kind, item)) + + @property + def initialized(self) -> bool: + return self.store.initialized + + def __wrapper(self, fn: Callable): + try: + return fn() + except BaseException: + if self.__monitoring_enabled: + self.__update_availability(False) + raise + + def __update_availability(self, available: bool): + try: + self.__lock.lock() + if available == self.__last_available: + return + self.__last_available = available + finally: + self.__lock.unlock() + + if available: + log.warning("Persistent store is available again") + + status = DataStoreStatus(available, False) + self.__store_update_sink.update_status(status) + + if available: + try: + self.__lock.lock() + if self.__poller is not None: + self.__poller.stop() + self.__poller = None + finally: + self.__lock.unlock() + + return + + log.warning("Detected persistent store unavailability; updates will be cached until it recovers") + task = RepeatingTask("ldclient.check-availability", 0.5, 0, self.__check_availability) + + self.__lock.lock() + self.__poller = task + self.__poller.start() + self.__lock.unlock() + + def __check_availability(self): + try: + if self.store.is_available(): + self.__update_availability(True) + except BaseException as e: + log.error("Unexpected error from data store status function: %s", e) + + def is_monitoring_enabled(self) -> bool: + """ + This methods determines whether the wrapped store can support enabling monitoring. + + The wrapped store must provide a monitoring_enabled method, which must + be true. But this alone is not sufficient. + + Because this class wraps all interactions with a provided store, it can + technically "monitor" any store. However, monitoring also requires that + we notify listeners when the store is available again. + + We determine this by checking the store's `available?` method, so this + is also a requirement for monitoring support. + + These extra checks won't be necessary once `available` becomes a part + of the core interface requirements and this class no longer wraps every + feature store. + """ + + if not hasattr(self.store, 'is_monitoring_enabled'): + return False + + if not hasattr(self.store, 'is_available'): + return False + + monitoring_enabled = getattr(self.store, 'is_monitoring_enabled') + if not callable(monitoring_enabled): + return False + + return monitoring_enabled() class FDv2: @@ -29,9 +154,6 @@ class FDv2: def __init__( self, config: DataSystemConfig, - # # TODO: These next 2 parameters should be moved into the Config. - # persistent_store: Optional[FeatureStore] = None, - # store_writable: bool = True, disabled: bool = False, ): """ @@ -56,19 +178,24 @@ def __init__( # Set up event listeners self._flag_change_listeners = Listeners() self._change_set_listeners = Listeners() + self._data_store_listeners = Listeners() # Create the store self._store = Store(self._flag_change_listeners, self._change_set_listeners) # Status providers self._data_source_status_provider = DataSourceStatusProviderImpl(Listeners()) + self._data_store_status_provider = DataStoreStatusProviderImpl(None, Listeners()) + + # Configure persistent store if provided + if self._config.data_store is not None: + self._data_store_status_provider = DataStoreStatusProviderImpl(self._config.data_store, Listeners()) + writable = self._config.data_store_mode == DataStoreMode.READ_WRITE + wrapper = FeatureStoreClientWrapper(self._config.data_store, self._data_store_status_provider) + self._store.with_persistence( + wrapper, writable, self._data_store_status_provider + ) - # # Configure persistent store if provided - # if persistent_store is not None: - # self._store.with_persistence( - # persistent_store, store_writable, self._data_source_status_provider - # ) - # # Flag tracker (evaluation function set later by client) self._flag_tracker = FlagTrackerImpl( self._flag_change_listeners, @@ -80,8 +207,6 @@ def __init__( self._threads: List[Thread] = [] # Track configuration - # TODO: What is the point of checking if primary_synchronizer is not - # None? Doesn't it have to be set? self._configured_with_data_sources = ( (config.initializers is not None and len(config.initializers) > 0) or config.primary_synchronizer is not None @@ -94,7 +219,7 @@ def start(self, set_on_ready: Event): :param set_on_ready: Event to set when the system is ready or has failed """ if self._disabled: - print("Data system is disabled, SDK will return application-defined default values") + log.warning("Data system is disabled, SDK will return application-defined default values") set_on_ready.set() return @@ -139,25 +264,11 @@ def _run_main_loop(self, set_on_ready: Event): # Run initializers first self._run_initializers(set_on_ready) - # # If we have persistent store with status monitoring, start recovery monitoring - # if ( - # self._configured_with_data_sources - # and self._data_store_status_provider is not None - # and hasattr(self._data_store_status_provider, 'add_listener') - # ): - # recovery_thread = Thread( - # target=self._run_persistent_store_outage_recovery, - # name="FDv2-store-recovery", - # daemon=True - # ) - # recovery_thread.start() - # self._threads.append(recovery_thread) - # Run synchronizers self._run_synchronizers(set_on_ready) except Exception as e: - print(f"Error in FDv2 main loop: {e}") + log.error(f"Error in FDv2 main loop: {e}") # Ensure ready event is set even on error if not set_on_ready.is_set(): set_on_ready.set() @@ -173,16 +284,16 @@ def _run_initializers(self, set_on_ready: Event): try: initializer = initializer_builder() - print(f"Attempting to initialize via {initializer.name}") + log.info(f"Attempting to initialize via {initializer.name}") basis_result = initializer.fetch() if isinstance(basis_result, _Fail): - print(f"Initializer {initializer.name} failed: {basis_result.error}") + log.warning(f"Initializer {initializer.name} failed: {basis_result.error}") continue basis = basis_result.value - print(f"Initialized via {initializer.name}") + log.info(f"Initialized via {initializer.name}") # Apply the basis to the store self._store.apply(basis.change_set, basis.persist) @@ -191,7 +302,7 @@ def _run_initializers(self, set_on_ready: Event): if not set_on_ready.is_set(): set_on_ready.set() except Exception as e: - print(f"Initializer failed with exception: {e}") + log.error(f"Initializer failed with exception: {e}") def _run_synchronizers(self, set_on_ready: Event): """Run synchronizers to keep data up-to-date.""" @@ -208,7 +319,7 @@ def synchronizer_loop(self: 'FDv2'): # Try primary synchronizer try: primary_sync = self._primary_synchronizer_builder() - print(f"Primary synchronizer {primary_sync.name} is starting") + log.info(f"Primary synchronizer {primary_sync.name} is starting") remove_sync, fallback_v1 = self._consume_synchronizer_results( primary_sync, set_on_ready, self._fallback_condition @@ -222,20 +333,20 @@ def synchronizer_loop(self: 'FDv2'): self._primary_synchronizer_builder = self._fdv1_fallback_synchronizer_builder if self._primary_synchronizer_builder is None: - print("No more synchronizers available") + log.warning("No more synchronizers available") self._data_source_status_provider.update_status( DataSourceState.OFF, self._data_source_status_provider.status.error ) break else: - print("Fallback condition met") + log.info("Fallback condition met") if self._secondary_synchronizer_builder is None: continue secondary_sync = self._secondary_synchronizer_builder() - print(f"Secondary synchronizer {secondary_sync.name} is starting") + log.info(f"Secondary synchronizer {secondary_sync.name} is starting") remove_sync, fallback_v1 = self._consume_synchronizer_results( secondary_sync, set_on_ready, self._recovery_condition @@ -247,7 +358,7 @@ def synchronizer_loop(self: 'FDv2'): self._primary_synchronizer_builder = self._fdv1_fallback_synchronizer_builder if self._primary_synchronizer_builder is None: - print("No more synchronizers available") + log.warning("No more synchronizers available") self._data_source_status_provider.update_status( DataSourceState.OFF, self._data_source_status_provider.status.error @@ -255,13 +366,13 @@ def synchronizer_loop(self: 'FDv2'): # TODO: WE might need to also set that threading.Event here break - print("Recovery condition met, returning to primary synchronizer") + log.info("Recovery condition met, returning to primary synchronizer") except Exception as e: - print(f"Failed to build primary synchronizer: {e}") + log.error(f"Failed to build primary synchronizer: {e}") break except Exception as e: - print(f"Error in synchronizer loop: {e}") + log.error(f"Error in synchronizer loop: {e}") finally: # Ensure we always set the ready event when exiting if not set_on_ready.is_set(): @@ -289,7 +400,7 @@ def _consume_synchronizer_results( """ try: for update in synchronizer.sync(): - print(f"Synchronizer {synchronizer.name} update: {update.state}") + log.info(f"Synchronizer {synchronizer.name} update: {update.state}") if self._stop_event.is_set(): return False, False @@ -314,18 +425,11 @@ def _consume_synchronizer_results( return False, False except Exception as e: - print(f"Error consuming synchronizer results: {e}") + log.error(f"Error consuming synchronizer results: {e}") return True, False return True, False - # def _run_persistent_store_outage_recovery(self): - # """Monitor persistent store status and trigger recovery when needed.""" - # # This is a simplified version - in a full implementation we'd need - # # to properly monitor store status and trigger commit operations - # # when the store comes back online after an outage - # pass - # def _fallback_condition(self, status: DataSourceStatus) -> bool: """ Determine if we should fallback to secondary synchronizer. @@ -387,8 +491,7 @@ def data_source_status_provider(self) -> DataSourceStatusProvider: @property def data_store_status_provider(self) -> DataStoreStatusProvider: """Get the data store status provider.""" - raise NotImplementedError - # return self._data_store_status_provider + return self._data_store_status_provider @property def flag_tracker(self) -> FlagTracker: diff --git a/ldclient/impl/datasystem/store.py b/ldclient/impl/datasystem/store.py index 435a0faf..94f015e7 100644 --- a/ldclient/impl/datasystem/store.py +++ b/ldclient/impl/datasystem/store.py @@ -20,6 +20,7 @@ ) from ldclient.impl.dependency_tracker import DependencyTracker, KindAndKey from ldclient.impl.listeners import Listeners +from ldclient.impl.util import log from ldclient.interfaces import ( DataStoreStatusProvider, FeatureStore, @@ -144,7 +145,7 @@ def apply(self, change_set: ChangeSet, persist: bool) -> None: except Exception as e: # Log error but don't re-raise - matches Go behavior - print(f"Store: couldn't apply changeset: {e}") + log.error(f"Store: couldn't apply changeset: {e}") def _set_basis(self, change_set: ChangeSet, persist: bool) -> None: """ diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 86a023fa..cae5c237 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -14,6 +14,31 @@ from .versioned_data_kind import VersionedDataKind +class DataStoreMode(Enum): + """ + DataStoreMode represents the mode of operation of a Data Store in FDV2 + mode. + + This enum is not stable, and not subject to any backwards compatibility + guarantees or semantic versioning. It is not suitable for production usage. + + Do not use it. + You have been warned. + """ + + READ_ONLY = 'read-only' + """ + READ_ONLY indicates that the data store is read-only. Data will never be + written back to the store by the SDK. + """ + + READ_WRITE = 'read-write' + """ + READ_WRITE indicates that the data store is read-write. Data from + initializers/synchronizers may be written to the store as necessary. + """ + + class FeatureStore: """ Interface for a versioned store for feature flags and related objects received from LaunchDarkly. @@ -923,8 +948,8 @@ class DataStoreStatus: __metaclass__ = ABCMeta def __init__(self, available: bool, stale: bool): - self.__available = available - self.__stale = stale + self._available = available + self._stale = stale @property def available(self) -> bool: @@ -939,7 +964,7 @@ def available(self) -> bool: :return: if store is available """ - return self.__available + return self._available @property def stale(self) -> bool: @@ -952,7 +977,18 @@ def stale(self) -> bool: :return: true if data should be rewritten """ - return self.__stale + return self._stale + + def __eq__(self, other): + """ + Ensures two instances of DataStoreStatus are the same if their properties are the same. + + :param other: The other instance to compare + :return: True if instances are equal, False otherwise + """ + if isinstance(other, DataStoreStatus): + return self._available == other._available and self._stale == other._stale + return False class DataStoreUpdateSink: diff --git a/ldclient/testing/impl/datasystem/test_config.py b/ldclient/testing/impl/datasystem/test_config.py index db73aece..5142fb82 100644 --- a/ldclient/testing/impl/datasystem/test_config.py +++ b/ldclient/testing/impl/datasystem/test_config.py @@ -68,25 +68,6 @@ def test_config_builder_build_success(): assert config.secondary_synchronizer == mock_secondary -def test_config_builder_build_missing_primary_synchronizer(): - """Test that build fails when primary synchronizer is not set.""" - builder = ConfigBuilder() - - with pytest.raises(ValueError, match="Primary synchronizer must be set"): - builder.build() - - -def test_config_builder_build_with_initializers_only(): - """Test that build fails when only initializers are set.""" - builder = ConfigBuilder() - mock_initializer = Mock() - - builder.initializers([mock_initializer]) - - with pytest.raises(ValueError, match="Primary synchronizer must be set"): - builder.build() - - def test_config_builder_method_chaining(): """Test that all builder methods support method chaining.""" builder = ConfigBuilder() diff --git a/ldclient/testing/impl/datasystem/test_fdv2_persistence.py b/ldclient/testing/impl/datasystem/test_fdv2_persistence.py new file mode 100644 index 00000000..c64757ab --- /dev/null +++ b/ldclient/testing/impl/datasystem/test_fdv2_persistence.py @@ -0,0 +1,524 @@ +# pylint: disable=missing-docstring + +from threading import Event +from typing import Any, Callable, Dict, List, Mapping, Optional + +from ldclient.config import DataSystemConfig +from ldclient.impl.datasystem import DataAvailability +from ldclient.impl.datasystem.fdv2 import FDv2 +from ldclient.integrations.test_datav2 import TestDataV2 +from ldclient.interfaces import ( + DataStoreMode, + DataStoreStatus, + FeatureStore, + FlagChange +) +from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind + + +class StubFeatureStore(FeatureStore): + """ + A simple stub implementation of FeatureStore for testing. + Records all operations and allows inspection of state. + """ + def __init__(self, initial_data: Optional[Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]] = None): + self._data: Dict[VersionedDataKind, Dict[str, dict]] = { + FEATURES: {}, + SEGMENTS: {} + } + self._initialized = False + self._available = True + self._monitoring_enabled = False + + # Track operations for assertions + self.init_called_count = 0 + self.upsert_calls: List[tuple] = [] + self.delete_calls: List[tuple] = [] + self.get_calls: List[tuple] = [] + self.all_calls: List[VersionedDataKind] = [] + + if initial_data: + self.init(initial_data) + + def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]): + self.init_called_count += 1 + self._data = { + FEATURES: dict(all_data.get(FEATURES, {})), + SEGMENTS: dict(all_data.get(SEGMENTS, {})) + } + self._initialized = True + + def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] = lambda x: x): + self.get_calls.append((kind, key)) + item = self._data.get(kind, {}).get(key) + return callback(item) if item else None + + def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any] = lambda x: x): + self.all_calls.append(kind) + items = self._data.get(kind, {}) + return {key: callback(value) for key, value in items.items()} + + def delete(self, kind: VersionedDataKind, key: str, version: int): + self.delete_calls.append((kind, key, version)) + existing = self._data.get(kind, {}).get(key) + if existing and existing.get('version', 0) < version: + self._data[kind][key] = {'key': key, 'version': version, 'deleted': True} + + def upsert(self, kind: VersionedDataKind, item: dict): + self.upsert_calls.append((kind, item.get('key'), item.get('version'))) + key = item['key'] + existing = self._data.get(kind, {}).get(key) + if not existing or existing.get('version', 0) < item.get('version', 0): + self._data[kind][key] = item + + @property + def initialized(self) -> bool: + return self._initialized + + def is_available(self) -> bool: + """For monitoring support""" + return self._available + + def is_monitoring_enabled(self) -> bool: + """For monitoring support""" + return self._monitoring_enabled + + def set_available(self, available: bool): + """Test helper to simulate availability changes""" + self._available = available + + def enable_monitoring(self): + """Test helper to enable monitoring""" + self._monitoring_enabled = True + + def get_data_snapshot(self) -> Mapping[VersionedDataKind, Mapping[str, dict]]: + """Test helper to get a snapshot of current data""" + return { + FEATURES: dict(self._data[FEATURES]), + SEGMENTS: dict(self._data[SEGMENTS]) + } + + def reset_operation_tracking(self): + """Test helper to reset operation tracking""" + self.init_called_count = 0 + self.upsert_calls = [] + self.delete_calls = [] + self.get_calls = [] + self.all_calls = [] + + +def test_persistent_store_read_only_mode(): + """Test that READ_ONLY mode reads from store but never writes""" + # Pre-populate persistent store with a flag + initial_data = { + FEATURES: { + 'existing-flag': { + 'key': 'existing-flag', + 'version': 1, + 'on': True, + 'variations': [True, False], + 'fallthrough': {'variation': 0} + } + }, + SEGMENTS: {} + } + + persistent_store = StubFeatureStore(initial_data) + + # Create synchronizer that will provide new data + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("new-flag").on(True)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_ONLY, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Verify data system is initialized and available + assert fdv2.data_availability.at_least(DataAvailability.REFRESHED) + + # Verify the store was initialized once (by us) but no additional writes happened + # The persistent store should have been read from, but not written to + assert persistent_store.init_called_count == 1 # Only our initial setup + assert len(persistent_store.upsert_calls) == 0 # No upserts in READ_ONLY mode + + fdv2.stop() + + +def test_persistent_store_read_write_mode(): + """Test that READ_WRITE mode reads from store and writes updates back""" + # Pre-populate persistent store with a flag + initial_data = { + FEATURES: { + 'existing-flag': { + 'key': 'existing-flag', + 'version': 1, + 'on': True, + 'variations': [True, False], + 'fallthrough': {'variation': 0} + } + }, + SEGMENTS: {} + } + + persistent_store = StubFeatureStore(initial_data) + persistent_store.reset_operation_tracking() # Reset tracking after initial setup + + # Create synchronizer that will provide new data + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("new-flag").on(True)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # In READ_WRITE mode, the store should be initialized with new data + assert persistent_store.init_called_count >= 1 # At least one init call for the new data + + # Verify the new flag was written to persistent store + snapshot = persistent_store.get_data_snapshot() + assert 'new-flag' in snapshot[FEATURES] + + fdv2.stop() + + +def test_persistent_store_delta_updates_read_write(): + """Test that delta updates are written to persistent store in READ_WRITE mode""" + persistent_store = StubFeatureStore() + + # Create synchronizer + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + + # Set up flag change listener to detect the update + flag_changed = Event() + change_count = [0] # Use list to allow modification in nested function + + def listener(flag_change: FlagChange): + change_count[0] += 1 + if change_count[0] == 2: # First change is from initial sync, second is our update + flag_changed.set() + + fdv2.flag_tracker.add_listener(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + persistent_store.reset_operation_tracking() + + # Make a delta update + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(False)) + + # Wait for the flag change to propagate + assert flag_changed.wait(1), "Flag change did not propagate in time" + + # Verify the update was written to persistent store + assert len(persistent_store.upsert_calls) > 0 + assert any(call[1] == 'feature-flag' for call in persistent_store.upsert_calls) + + # Verify the updated flag is in the store + snapshot = persistent_store.get_data_snapshot() + assert 'feature-flag' in snapshot[FEATURES] + assert snapshot[FEATURES]['feature-flag']['on'] is False + + fdv2.stop() + + +def test_persistent_store_delta_updates_read_only(): + """Test that delta updates are NOT written to persistent store in READ_ONLY mode""" + persistent_store = StubFeatureStore() + + # Create synchronizer + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_ONLY, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + + # Set up flag change listener to detect the update + flag_changed = Event() + change_count = [0] # Use list to allow modification in nested function + + def listener(flag_change: FlagChange): + change_count[0] += 1 + if change_count[0] == 2: # First change is from initial sync, second is our update + flag_changed.set() + + fdv2.flag_tracker.add_listener(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + persistent_store.reset_operation_tracking() + + # Make a delta update + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(False)) + + # Wait for the flag change to propagate + assert flag_changed.wait(1), "Flag change did not propagate in time" + + # Verify NO updates were written to persistent store in READ_ONLY mode + assert len(persistent_store.upsert_calls) == 0 + + fdv2.stop() + + +def test_persistent_store_with_initializer_and_synchronizer(): + """Test that both initializer and synchronizer data are persisted in READ_WRITE mode""" + persistent_store = StubFeatureStore() + + # Create initializer with one flag + td_initializer = TestDataV2.data_source() + td_initializer.update(td_initializer.flag("init-flag").on(True)) + + # Create synchronizer with another flag + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("sync-flag").on(False)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=[td_initializer.build_initializer], + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + + # Set up flag change listener to detect when synchronizer data arrives + sync_flag_arrived = Event() + + def listener(flag_change: FlagChange): + if flag_change.key == "sync-flag": + sync_flag_arrived.set() + + fdv2.flag_tracker.add_listener(listener) + fdv2.start(set_on_ready) + + assert set_on_ready.wait(1), "Data system did not become ready in time" + + # Wait for synchronizer to fully initialize + # The synchronizer does a full data set transfer, so it replaces the initializer data + assert sync_flag_arrived.wait(1), "Synchronizer data did not arrive in time" + + # The synchronizer flag should be in the persistent store + # (it replaces the init-flag since synchronizer does a full data set) + snapshot = persistent_store.get_data_snapshot() + assert 'init-flag' not in snapshot[FEATURES] + assert 'sync-flag' in snapshot[FEATURES] + + fdv2.stop() + + +def test_persistent_store_delete_operations(): + """Test that delete operations are written to persistent store in READ_WRITE mode""" + # We'll need to manually trigger a delete via the store + # This is more of an integration test with the Store class + from ldclient.impl.datasystem.protocolv2 import ( + Change, + ChangeSet, + ChangeType, + IntentCode, + ObjectKind + ) + from ldclient.impl.datasystem.store import Store + from ldclient.impl.listeners import Listeners + + # Pre-populate with a flag + initial_data = { + FEATURES: { + 'deletable-flag': { + 'key': 'deletable-flag', + 'version': 1, + 'on': True, + 'variations': [True, False], + 'fallthrough': {'variation': 0} + } + }, + SEGMENTS: {} + } + + persistent_store = StubFeatureStore(initial_data) + + store = Store(Listeners(), Listeners()) + store.with_persistence(persistent_store, True, None) + + # First, initialize the store with the data so it's in memory + init_changeset = ChangeSet( + intent_code=IntentCode.TRANSFER_FULL, + changes=[ + Change( + action=ChangeType.PUT, + kind=ObjectKind.FLAG, + key='deletable-flag', + version=1, + object={ + 'key': 'deletable-flag', + 'version': 1, + 'on': True, + 'variations': [True, False], + 'fallthrough': {'variation': 0} + } + ) + ], + selector=None + ) + store.apply(init_changeset, True) + + persistent_store.reset_operation_tracking() + + # Now apply a changeset with a delete + delete_changeset = ChangeSet( + intent_code=IntentCode.TRANSFER_CHANGES, + changes=[ + Change( + action=ChangeType.DELETE, + kind=ObjectKind.FLAG, + key='deletable-flag', + version=2, + object=None + ) + ], + selector=None + ) + + store.apply(delete_changeset, True) + + # Verify delete was called on persistent store + assert len(persistent_store.delete_calls) > 0 + assert any(call[1] == 'deletable-flag' for call in persistent_store.delete_calls) + + +def test_data_store_status_provider(): + """Test that data store status provider is correctly initialized""" + persistent_store = StubFeatureStore() + + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + + # Verify data store status provider exists + status_provider = fdv2.data_store_status_provider + assert status_provider is not None + + # Get initial status + status = status_provider.status + assert status is not None + assert status.available is True + + fdv2.start(set_on_ready) + assert set_on_ready.wait(1), "Data system did not become ready in time" + + fdv2.stop() + + +def test_data_store_status_monitoring_not_enabled_by_default(): + """Test that monitoring is not enabled by default""" + persistent_store = StubFeatureStore() + + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + fdv2 = FDv2(config) + + # Monitoring should not be enabled because the store doesn't support it + status_provider = fdv2.data_store_status_provider + assert status_provider.is_monitoring_enabled() is False + + +def test_data_store_status_monitoring_enabled_when_supported(): + """Test that monitoring is enabled when the store supports it""" + persistent_store = StubFeatureStore() + persistent_store.enable_monitoring() + + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=persistent_store, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + fdv2 = FDv2(config) + + # Monitoring should be enabled + status_provider = fdv2.data_store_status_provider + assert status_provider.is_monitoring_enabled() is True + + +def test_no_persistent_store_status_provider_without_store(): + """Test that data store status provider exists even without a persistent store""" + td_synchronizer = TestDataV2.data_source() + td_synchronizer.update(td_synchronizer.flag("feature-flag").on(True)) + + config = DataSystemConfig( + data_store_mode=DataStoreMode.READ_WRITE, + data_store=None, + initializers=None, + primary_synchronizer=td_synchronizer.build_synchronizer, + ) + + set_on_ready = Event() + fdv2 = FDv2(config) + + # Status provider should exist but not be monitoring + status_provider = fdv2.data_store_status_provider + assert status_provider is not None + assert status_provider.is_monitoring_enabled() is False + + fdv2.start(set_on_ready) + assert set_on_ready.wait(1), "Data system did not become ready in time" + + fdv2.stop()