Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions ldclient/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from ldclient.interfaces import (
BigSegmentStore,
DataSourceUpdateSink,
DataStoreMode,
EventProcessor,
FeatureStore,
UpdateProcessor
Expand Down Expand Up @@ -161,19 +162,23 @@ def disable_ssl_verification(self) -> bool:

@dataclass(frozen=True)
class DataSystemConfig:
"""
Configuration for LaunchDarkly's data acquisition strategy.
"""
"""Configuration for LaunchDarkly's data acquisition strategy."""

initializers: Optional[List[Builder[Initializer]]]
"""The initializers for the data system."""

primary_synchronizer: Builder[Synchronizer]
primary_synchronizer: Optional[Builder[Synchronizer]]
"""The primary synchronizer for the data system."""

secondary_synchronizer: Optional[Builder[Synchronizer]] = None
"""The secondary synchronizers for the data system."""

data_store_mode: DataStoreMode = DataStoreMode.READ_WRITE
"""The data store mode specifies the mode in which the persistent store will operate, if present."""

data_store: Optional[FeatureStore] = None
"""The (optional) persistent data store instance."""

# TODO(fdv2): Implement this synchronizer up and hook it up everywhere.
# TODO(fdv2): Remove this when FDv2 is fully launched
fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None
Expand Down
54 changes: 53 additions & 1 deletion ldclient/impl/datasourcev2/status.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import time
from copy import copy
from typing import Callable, Optional

from ldclient.impl.datasystem.store import Store
from ldclient.impl.listeners import Listeners
from ldclient.impl.rwlock import ReadWriteLock
from ldclient.interfaces import (
DataSourceErrorInfo,
DataSourceState,
DataSourceStatus,
DataSourceStatusProvider
DataSourceStatusProvider,
DataStoreStatus,
DataStoreStatusProvider,
FeatureStore
)


Expand Down Expand Up @@ -55,3 +60,50 @@ def add_listener(self, listener: Callable[[DataSourceStatus], None]):

def remove_listener(self, listener: Callable[[DataSourceStatus], None]):
self.__listeners.remove(listener)


class DataStoreStatusProviderImpl(DataStoreStatusProvider):
def __init__(self, store: Optional[FeatureStore], listeners: Listeners):
self.__store = store
self.__listeners = listeners

self.__lock = ReadWriteLock()
self.__status = DataStoreStatus(True, False)

def update_status(self, status: DataStoreStatus):
"""
update_status is called from the data store to push a status update.
"""
self.__lock.lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this has to use lock/unlock vs with?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the LD provided RW lock. There is no context usage for this class.

modified = False

if self.__status != status:
self.__status = status
modified = True

self.__lock.unlock()

if modified:
self.__listeners.notify(status)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the threading situation like for this dispatch? Is the caller thread always the same thread?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, unfortunately that's the way the current listeners work in python. The notify method itself catches all exceptions, which is why we aren't doing it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Well it is good in a sense, because it helps prevent out of order handling from unfortunate thread context switching. If it was multi-threaded, then I would be concerned we would need some async queue type situation.


@property
def status(self) -> DataStoreStatus:
self.__lock.rlock()
status = copy(self.__status)
self.__lock.runlock()

return status

def is_monitoring_enabled(self) -> bool:
if self.__store is None:
return False
if hasattr(self.__store, "is_monitoring_enabled") is False:
return False

return self.__store.is_monitoring_enabled() # type: ignore

def add_listener(self, listener: Callable[[DataStoreStatus], None]):
self.__listeners.add(listener)

def remove_listener(self, listener: Callable[[DataStoreStatus], None]):
self.__listeners.remove(listener)
48 changes: 36 additions & 12 deletions ldclient/impl/datasystem/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
StreamingDataSourceBuilder
)
from ldclient.impl.datasystem import Initializer, Synchronizer
from ldclient.interfaces import DataStoreMode, FeatureStore

T = TypeVar("T")

Expand All @@ -30,6 +31,8 @@ class ConfigBuilder: # pylint: disable=too-few-public-methods
_initializers: Optional[List[Builder[Initializer]]] = None
_primary_synchronizer: Optional[Builder[Synchronizer]] = None
_secondary_synchronizer: Optional[Builder[Synchronizer]] = None
_store_mode: DataStoreMode = DataStoreMode.READ_ONLY
_data_store: Optional[FeatureStore] = None

def initializers(self, initializers: Optional[List[Builder[Initializer]]]) -> "ConfigBuilder":
"""
Expand All @@ -50,17 +53,27 @@ def synchronizers(
self._secondary_synchronizer = secondary
return self

def data_store(self, data_store: FeatureStore, store_mode: DataStoreMode) -> "ConfigBuilder":
"""
Sets the data store configuration for the data system.
"""
self._data_store = data_store
self._store_mode = store_mode
return self

def build(self) -> DataSystemConfig:
"""
Builds the data system configuration.
"""
if self._primary_synchronizer is None:
raise ValueError("Primary synchronizer must be set")
if self._secondary_synchronizer is not None and self._primary_synchronizer is None:
raise ValueError("Primary synchronizer must be set if secondary is set")

return DataSystemConfig(
initializers=self._initializers,
primary_synchronizer=self._primary_synchronizer,
secondary_synchronizer=self._secondary_synchronizer,
data_store_mode=self._store_mode,
data_store=self._data_store,
)


Expand Down Expand Up @@ -147,18 +160,29 @@ def custom() -> ConfigBuilder:
return ConfigBuilder()


# TODO(fdv2): Implement these methods
#
# Daemon configures the SDK to read from a persistent store integration
# that is populated by Relay Proxy or other SDKs. The SDK will not connect
# to LaunchDarkly. In this mode, the SDK never writes to the data store.
# TODO(fdv2): Need to update these so they don't rely on the LDConfig
def daemon(config: LDConfig, store: FeatureStore) -> ConfigBuilder:
"""
Daemon configures the SDK to read from a persistent store integration
that is populated by Relay Proxy or other SDKs. The SDK will not connect
to LaunchDarkly. In this mode, the SDK never writes to the data store.
"""
return default(config).data_store(store, DataStoreMode.READ_ONLY)


# PersistentStore is similar to Default, with the addition of a persistent
# store integration. Before data has arrived from LaunchDarkly, the SDK is
# able to evaluate flags using data from the persistent store. Once fresh
# data is available, the SDK will no longer read from the persistent store,
# although it will keep it up-to-date.
def persistent_store(config: LDConfig, store: FeatureStore) -> ConfigBuilder:
"""
PersistentStore is similar to Default, with the addition of a persistent
store integration. Before data has arrived from LaunchDarkly, the SDK is
able to evaluate flags using data from the persistent store. Once fresh
data is available, the SDK will no longer read from the persistent store,
although it will keep it up-to-date.
"""
return default(config).data_store(store, DataStoreMode.READ_WRITE)


# TODO(fdv2): Implement these methods
#
# WithEndpoints configures the data system with custom endpoints for
# LaunchDarkly's streaming and polling synchronizers. This method is not
# necessary for most use-cases, but can be useful for testing or custom
Expand Down
Loading