Skip to content

Commit 6ed3131

Browse files
committed
chore: Initial structure and implementation of FDv2 datasystem
1 parent 42dbbf7 commit 6ed3131

File tree

16 files changed

+1093
-125
lines changed

16 files changed

+1093
-125
lines changed

ldclient/client.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
DataStoreStatusProviderImpl,
3131
DataStoreUpdateSinkImpl
3232
)
33+
from ldclient.impl.datasystem import DataAvailability, DataSystem
34+
from ldclient.impl.datasystem.fdv2 import FDv2
3335
from ldclient.impl.evaluator import Evaluator, error_reason
3436
from ldclient.impl.events.diagnostics import (
3537
_DiagnosticAccumulator,
@@ -249,14 +251,19 @@ def __start_up(self, start_wait: float):
249251
self.__hooks_lock = ReadWriteLock()
250252
self.__hooks = self._config.hooks + plugin_hooks # type: List[Hook]
251253

252-
# Initialize data system (FDv1) to encapsulate v1 data plumbing
253-
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
254-
FDv1
255-
)
254+
datasystem_config = self._config.datasystem_config
255+
if datasystem_config is None:
256+
# Initialize data system (FDv1) to encapsulate v1 data plumbing
257+
from ldclient.impl.datasystem.fdv1 import ( # local import to avoid circular dependency
258+
FDv1
259+
)
260+
261+
self._data_system: DataSystem = FDv1(self._config)
262+
else:
263+
self._data_system = FDv2(datasystem_config, disabled=self._config.offline)
256264

257-
self._data_system = FDv1(self._config)
258265
# Provide flag evaluation function for value-change tracking
259-
self._data_system.set_flag_value_eval_fn(
266+
self._data_system.set_flag_value_eval_fn( # type: ignore
260267
lambda key, context: self.variation(key, context, None)
261268
)
262269
# Expose providers and store from data system
@@ -265,7 +272,7 @@ def __start_up(self, start_wait: float):
265272
self._data_system.data_source_status_provider
266273
)
267274
self.__flag_tracker = self._data_system.flag_tracker
268-
self._store = self._data_system.store # type: FeatureStore
275+
self._store: FeatureStore = self._data_system.store # type: ignore
269276

270277
big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments)
271278
self.__big_segment_store_manager = big_segment_store_manager
@@ -286,7 +293,7 @@ def __start_up(self, start_wait: float):
286293
diagnostic_accumulator = self._set_event_processor(self._config)
287294

288295
# Pass diagnostic accumulator to data system for streaming metrics
289-
self._data_system.set_diagnostic_accumulator(diagnostic_accumulator)
296+
self._data_system.set_diagnostic_accumulator(diagnostic_accumulator) # type: ignore
290297

291298
self.__register_plugins(environment_metadata)
292299

@@ -475,11 +482,7 @@ def is_initialized(self) -> bool:
475482
if self.is_offline() or self._config.use_ldd:
476483
return True
477484

478-
return (
479-
self._data_system._update_processor.initialized()
480-
if self._data_system._update_processor
481-
else False
482-
)
485+
return self._data_system.data_availability.at_least(DataAvailability.CACHED)
483486

484487
def flush(self):
485488
"""Flushes all pending analytics events.

ldclient/config.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
Note that the same class can also be imported from the ``ldclient.client`` submodule.
55
"""
66

7+
from dataclasses import dataclass
78
from threading import Event
8-
from typing import Callable, List, Optional, Set
9+
from typing import Callable, List, Optional, Set, TypeVar
910

1011
from ldclient.feature_store import InMemoryFeatureStore
1112
from ldclient.hook import Hook
13+
from ldclient.impl.datasystem import Initializer, Synchronizer
1214
from ldclient.impl.util import (
1315
log,
1416
validate_application_info,
@@ -152,6 +154,32 @@ def disable_ssl_verification(self) -> bool:
152154
return self.__disable_ssl_verification
153155

154156

157+
T = TypeVar("T")
158+
159+
Builder = Callable[[], T]
160+
161+
162+
@dataclass(frozen=True)
163+
class DataSystemConfig:
164+
"""
165+
Configuration for LaunchDarkly's data acquisition strategy.
166+
"""
167+
168+
initializers: Optional[List[Builder[Initializer]]]
169+
"""The initializers for the data system."""
170+
171+
primary_synchronizer: Builder[Synchronizer]
172+
"""The primary synchronizer for the data system."""
173+
174+
secondary_synchronizer: Optional[Builder[Synchronizer]] = None
175+
"""The secondary synchronizers for the data system."""
176+
177+
# TODO(fdv2): Implement this synchronizer up and hook it up everywhere.
178+
# TODO(fdv2): Remove this when FDv2 is fully launched
179+
fdv1_fallback_synchronizer: Optional[Builder[Synchronizer]] = None
180+
"""An optional fallback synchronizer that will read from FDv1"""
181+
182+
155183
class Config:
156184
"""Advanced configuration options for the SDK client.
157185
@@ -194,6 +222,7 @@ def __init__(
194222
enable_event_compression: bool = False,
195223
omit_anonymous_contexts: bool = False,
196224
payload_filter_key: Optional[str] = None,
225+
datasystem_config: Optional[DataSystemConfig] = None,
197226
):
198227
"""
199228
:param sdk_key: The SDK key for your LaunchDarkly account. This is always required.
@@ -264,6 +293,7 @@ def __init__(
264293
:param enable_event_compression: Whether or not to enable GZIP compression for outgoing events.
265294
:param omit_anonymous_contexts: Sets whether anonymous contexts should be omitted from index and identify events.
266295
:param payload_filter_key: The payload filter is used to selectively limited the flags and segments delivered in the data source payload.
296+
:param datasystem_config: Configuration for the upcoming enhanced data system design. This is experimental and should not be set without direction from LaunchDarkly support.
267297
"""
268298
self.__sdk_key = validate_sdk_key_format(sdk_key, log)
269299

@@ -303,6 +333,7 @@ def __init__(
303333
self.__payload_filter_key = payload_filter_key
304334
self._data_source_update_sink: Optional[DataSourceUpdateSink] = None
305335
self._instance_id: Optional[str] = None
336+
self._datasystem_config = datasystem_config
306337

307338
def copy_with_new_sdk_key(self, new_sdk_key: str) -> 'Config':
308339
"""Returns a new ``Config`` instance that is the same as this one, except for having a different SDK key.
@@ -546,6 +577,15 @@ def data_source_update_sink(self) -> Optional[DataSourceUpdateSink]:
546577
"""
547578
return self._data_source_update_sink
548579

580+
@property
581+
def datasystem_config(self) -> Optional[DataSystemConfig]:
582+
"""
583+
Configuration for the upcoming enhanced data system design. This is
584+
experimental and should not be set without direction from LaunchDarkly
585+
support.
586+
"""
587+
return self._datasystem_config
588+
549589
def _validate(self):
550590
if self.offline is False and self.sdk_key == '':
551591
log.warning("Missing or blank SDK key")

ldclient/impl/datasourcev2/polling.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def __init__(
9090
"ldclient.datasource.polling", poll_interval, 0, self._poll
9191
)
9292

93+
@property
9394
def name(self) -> str:
9495
"""Returns the name of the initializer."""
9596
return "PollingDataSourceV2"
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import time
2+
from typing import Callable, Optional
3+
4+
from ldclient.impl.listeners import Listeners
5+
from ldclient.impl.rwlock import ReadWriteLock
6+
from ldclient.interfaces import (
7+
DataSourceErrorInfo,
8+
DataSourceState,
9+
DataSourceStatus,
10+
DataSourceStatusProvider
11+
)
12+
13+
14+
class DataSourceStatusProviderImpl(DataSourceStatusProvider):
15+
def __init__(self, listeners: Listeners):
16+
self.__listeners = listeners
17+
self.__status = DataSourceStatus(DataSourceState.INITIALIZING, 0, None)
18+
self.__lock = ReadWriteLock()
19+
20+
@property
21+
def status(self) -> DataSourceStatus:
22+
self.__lock.rlock()
23+
status = self.__status
24+
self.__lock.runlock()
25+
26+
return status
27+
28+
def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]):
29+
status_to_broadcast = None
30+
31+
try:
32+
self.__lock.lock()
33+
old_status = self.__status
34+
35+
if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING:
36+
new_state = DataSourceState.INITIALIZING
37+
38+
if new_state == old_status.state and new_error is None:
39+
return
40+
41+
new_since = self.__status.since if new_state == self.__status.state else time.time()
42+
new_error = self.__status.error if new_error is None else new_error
43+
44+
self.__status = DataSourceStatus(new_state, new_since, new_error)
45+
46+
status_to_broadcast = self.__status
47+
finally:
48+
self.__lock.unlock()
49+
50+
if status_to_broadcast is not None:
51+
self.__listeners.notify(status_to_broadcast)
52+
53+
def add_listener(self, listener: Callable[[DataSourceStatus], None]):
54+
self.__listeners.add(listener)
55+
56+
def remove_listener(self, listener: Callable[[DataSourceStatus], None]):
57+
self.__listeners.remove(listener)

ldclient/impl/datasourcev2/streaming.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,13 @@ def __init__(
129129
self._config = config
130130
self._sse: Optional[SSEClient] = None
131131

132+
@property
133+
def name(self) -> str:
134+
"""
135+
Returns the name of the synchronizer, which is used for logging and debugging.
136+
"""
137+
return "streaming"
138+
132139
def sync(self) -> Generator[Update, None, None]:
133140
"""
134141
sync should begin the synchronization process for the data source, yielding

ldclient/impl/datastore/status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
from copy import copy
4-
from typing import TYPE_CHECKING, Callable
4+
from typing import TYPE_CHECKING, Callable, Protocol
55

66
from ldclient.impl.listeners import Listeners
77
from ldclient.impl.rwlock import ReadWriteLock

ldclient/impl/datasystem/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,14 @@ class Initializer(Protocol): # pylint: disable=too-few-public-methods
156156
as new changes occur.
157157
"""
158158

159+
@property
160+
@abstractmethod
161+
def name(self) -> str:
162+
"""
163+
Returns the name of the initializer, which is used for logging and debugging.
164+
"""
165+
raise NotImplementedError
166+
159167
@abstractmethod
160168
def fetch(self) -> BasisResult:
161169
"""
@@ -188,6 +196,13 @@ class Synchronizer(Protocol): # pylint: disable=too-few-public-methods
188196
of the data source, including any changes that have occurred since the last
189197
synchronization.
190198
"""
199+
@property
200+
@abstractmethod
201+
def name(self) -> str:
202+
"""
203+
Returns the name of the synchronizer, which is used for logging and debugging.
204+
"""
205+
raise NotImplementedError
191206

192207
@abstractmethod
193208
def sync(self) -> Generator[Update, None, None]:

ldclient/impl/datasystem/config.py

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
Configuration for LaunchDarkly's data acquisition strategy.
33
"""
44

5-
from dataclasses import dataclass
65
from typing import Callable, List, Optional, TypeVar
76

87
from ldclient.config import Config as LDConfig
8+
from ldclient.config import DataSystemConfig
99
from ldclient.impl.datasourcev2.polling import (
1010
PollingDataSource,
1111
PollingDataSourceBuilder,
@@ -22,22 +22,6 @@
2222
Builder = Callable[[], T]
2323

2424

25-
@dataclass(frozen=True)
26-
class Config:
27-
"""
28-
Configuration for LaunchDarkly's data acquisition strategy.
29-
"""
30-
31-
initializers: Optional[List[Builder[Initializer]]]
32-
"""The initializers for the data system."""
33-
34-
primary_synchronizer: Builder[Synchronizer]
35-
"""The primary synchronizer for the data system."""
36-
37-
secondary_synchronizer: Optional[Builder[Synchronizer]]
38-
"""The secondary synchronizers for the data system."""
39-
40-
4125
class ConfigBuilder: # pylint: disable=too-few-public-methods
4226
"""
4327
Builder for the data system configuration.
@@ -47,7 +31,7 @@ class ConfigBuilder: # pylint: disable=too-few-public-methods
4731
_primary_synchronizer: Optional[Builder[Synchronizer]] = None
4832
_secondary_synchronizer: Optional[Builder[Synchronizer]] = None
4933

50-
def initializers(self, initializers: List[Builder[Initializer]]) -> "ConfigBuilder":
34+
def initializers(self, initializers: Optional[List[Builder[Initializer]]]) -> "ConfigBuilder":
5135
"""
5236
Sets the initializers for the data system.
5337
"""
@@ -66,14 +50,14 @@ def synchronizers(
6650
self._secondary_synchronizer = secondary
6751
return self
6852

69-
def build(self) -> Config:
53+
def build(self) -> DataSystemConfig:
7054
"""
7155
Builds the data system configuration.
7256
"""
7357
if self._primary_synchronizer is None:
7458
raise ValueError("Primary synchronizer must be set")
7559

76-
return Config(
60+
return DataSystemConfig(
7761
initializers=self._initializers,
7862
primary_synchronizer=self._primary_synchronizer,
7963
secondary_synchronizer=self._secondary_synchronizer,
@@ -144,7 +128,7 @@ def polling(config: LDConfig) -> ConfigBuilder:
144128
streaming, but may be necessary in some network environments.
145129
"""
146130

147-
polling_builder = __polling_ds_builder(config)
131+
polling_builder: Builder[Synchronizer] = __polling_ds_builder(config)
148132

149133
builder = ConfigBuilder()
150134
builder.synchronizers(polling_builder)

ldclient/impl/datasystem/fdv1.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,16 @@ def flag_tracker(self) -> FlagTracker:
142142

143143
@property
144144
def data_availability(self) -> DataAvailability:
145-
return self._data_availability
145+
if self._config.offline:
146+
return DataAvailability.DEFAULTS
147+
148+
if self._update_processor is not None and self._update_processor.initialized():
149+
return DataAvailability.REFRESHED
150+
151+
if self._store_wrapper.initialized:
152+
return DataAvailability.CACHED
153+
154+
return DataAvailability.DEFAULTS
146155

147156
@property
148157
def target_availability(self) -> DataAvailability:

0 commit comments

Comments
 (0)