33from threading import Event , Thread
44from typing import Any , Callable , Dict , List , Mapping , Optional
55
6- from ldclient .config import Builder , DataSystemConfig
6+ from ldclient .config import Builder , Config , DataSystemConfig
77from ldclient .feature_store import _FeatureStoreDataSetSorter
88from ldclient .impl .datasourcev2 .status import (
99 DataSourceStatusProviderImpl ,
@@ -153,8 +153,8 @@ class FDv2:
153153
154154 def __init__ (
155155 self ,
156- config : DataSystemConfig ,
157- disabled : bool = False ,
156+ config : Config ,
157+ data_system_config : DataSystemConfig ,
158158 ):
159159 """
160160 Initialize a new FDv2 data system.
@@ -165,10 +165,11 @@ def __init__(
165165 :param disabled: Whether the data system is disabled (offline mode)
166166 """
167167 self ._config = config
168- self ._primary_synchronizer_builder : Optional [Builder [Synchronizer ]] = config .primary_synchronizer
169- self ._secondary_synchronizer_builder = config .secondary_synchronizer
170- self ._fdv1_fallback_synchronizer_builder = config .fdv1_fallback_synchronizer
171- self ._disabled = disabled
168+ self ._data_system_config = data_system_config
169+ self ._primary_synchronizer_builder : Optional [Builder [Synchronizer ]] = data_system_config .primary_synchronizer
170+ self ._secondary_synchronizer_builder = data_system_config .secondary_synchronizer
171+ self ._fdv1_fallback_synchronizer_builder = data_system_config .fdv1_fallback_synchronizer
172+ self ._disabled = self ._config .offline
172173
173174 # Diagnostic accumulator provided by client for streaming metrics
174175 # TODO(fdv2): Either we need to use this, or we need to provide it to
@@ -188,10 +189,10 @@ def __init__(
188189 self ._data_store_status_provider = DataStoreStatusProviderImpl (None , Listeners ())
189190
190191 # Configure persistent store if provided
191- if self ._config .data_store is not None :
192- self ._data_store_status_provider = DataStoreStatusProviderImpl (self ._config .data_store , Listeners ())
193- writable = self ._config .data_store_mode == DataStoreMode .READ_WRITE
194- wrapper = FeatureStoreClientWrapper (self ._config .data_store , self ._data_store_status_provider )
192+ if self ._data_system_config .data_store is not None :
193+ self ._data_store_status_provider = DataStoreStatusProviderImpl (self ._data_system_config .data_store , Listeners ())
194+ writable = self ._data_system_config .data_store_mode == DataStoreMode .READ_WRITE
195+ wrapper = FeatureStoreClientWrapper (self ._data_system_config .data_store , self ._data_store_status_provider )
195196 self ._store .with_persistence (
196197 wrapper , writable , self ._data_store_status_provider
197198 )
@@ -208,8 +209,8 @@ def __init__(
208209
209210 # Track configuration
210211 self ._configured_with_data_sources = (
211- (config .initializers is not None and len (config .initializers ) > 0 )
212- or config .primary_synchronizer is not None
212+ (data_system_config .initializers is not None and len (data_system_config .initializers ) > 0 )
213+ or data_system_config .primary_synchronizer is not None
213214 )
214215
215216 def start (self , set_on_ready : Event ):
@@ -268,32 +269,32 @@ def _run_main_loop(self, set_on_ready: Event):
268269 self ._run_synchronizers (set_on_ready )
269270
270271 except Exception as e :
271- log .error (f "Error in FDv2 main loop: { e } " )
272+ log .error ("Error in FDv2 main loop: %s" , e )
272273 # Ensure ready event is set even on error
273274 if not set_on_ready .is_set ():
274275 set_on_ready .set ()
275276
276277 def _run_initializers (self , set_on_ready : Event ):
277278 """Run initializers to get initial data."""
278- if self ._config .initializers is None :
279+ if self ._data_system_config .initializers is None :
279280 return
280281
281- for initializer_builder in self ._config .initializers :
282+ for initializer_builder in self ._data_system_config .initializers :
282283 if self ._stop_event .is_set ():
283284 return
284285
285286 try :
286- initializer = initializer_builder ()
287- log .info (f "Attempting to initialize via { initializer .name } " )
287+ initializer = initializer_builder (self . _config )
288+ log .info ("Attempting to initialize via %s" , initializer .name )
288289
289290 basis_result = initializer .fetch ()
290291
291292 if isinstance (basis_result , _Fail ):
292- log .warning (f "Initializer { initializer . name } failed: { basis_result .error } " )
293+ log .warning ("Initializer %s failed: %s" , initializer . name , basis_result .error )
293294 continue
294295
295296 basis = basis_result .value
296- log .info (f "Initialized via { initializer .name } " )
297+ log .info ("Initialized via %s" , initializer .name )
297298
298299 # Apply the basis to the store
299300 self ._store .apply (basis .change_set , basis .persist )
@@ -302,12 +303,12 @@ def _run_initializers(self, set_on_ready: Event):
302303 if not set_on_ready .is_set ():
303304 set_on_ready .set ()
304305 except Exception as e :
305- log .error (f "Initializer failed with exception: { e } " )
306+ log .error ("Initializer failed with exception: %s" , e )
306307
307308 def _run_synchronizers (self , set_on_ready : Event ):
308309 """Run synchronizers to keep data up-to-date."""
309310 # If no primary synchronizer configured, just set ready and return
310- if self ._config .primary_synchronizer is None :
311+ if self ._data_system_config .primary_synchronizer is None :
311312 if not set_on_ready .is_set ():
312313 set_on_ready .set ()
313314 return
@@ -318,8 +319,8 @@ def synchronizer_loop(self: 'FDv2'):
318319 while not self ._stop_event .is_set () and self ._primary_synchronizer_builder is not None :
319320 # Try primary synchronizer
320321 try :
321- primary_sync = self ._primary_synchronizer_builder ()
322- log .info (f "Primary synchronizer { primary_sync . name } is starting" )
322+ primary_sync = self ._primary_synchronizer_builder (self . _config )
323+ log .info ("Primary synchronizer %s is starting" , primary_sync . name )
323324
324325 remove_sync , fallback_v1 = self ._consume_synchronizer_results (
325326 primary_sync , set_on_ready , self ._fallback_condition
@@ -345,8 +346,8 @@ def synchronizer_loop(self: 'FDv2'):
345346 if self ._secondary_synchronizer_builder is None :
346347 continue
347348
348- secondary_sync = self ._secondary_synchronizer_builder ()
349- log .info (f "Secondary synchronizer { secondary_sync . name } is starting" )
349+ secondary_sync = self ._secondary_synchronizer_builder (self . _config )
350+ log .info ("Secondary synchronizer %s is starting" , secondary_sync . name )
350351
351352 remove_sync , fallback_v1 = self ._consume_synchronizer_results (
352353 secondary_sync , set_on_ready , self ._recovery_condition
@@ -368,11 +369,11 @@ def synchronizer_loop(self: 'FDv2'):
368369
369370 log .info ("Recovery condition met, returning to primary synchronizer" )
370371 except Exception as e :
371- log .error (f "Failed to build primary synchronizer: { e } " )
372+ log .error ("Failed to build primary synchronizer: %s" , e )
372373 break
373374
374375 except Exception as e :
375- log .error (f "Error in synchronizer loop: { e } " )
376+ log .error ("Error in synchronizer loop: %s" , e )
376377 finally :
377378 # Ensure we always set the ready event when exiting
378379 if not set_on_ready .is_set ():
@@ -400,7 +401,7 @@ def _consume_synchronizer_results(
400401 """
401402 try :
402403 for update in synchronizer .sync ():
403- log .info (f "Synchronizer { synchronizer .name } update: { update .state } " )
404+ log .info ("Synchronizer %s update: %s" , synchronizer .name , update .state )
404405 if self ._stop_event .is_set ():
405406 return False , False
406407
@@ -425,7 +426,7 @@ def _consume_synchronizer_results(
425426 return False , False
426427
427428 except Exception as e :
428- log .error (f "Error consuming synchronizer results: { e } " )
429+ log .error ("Error consuming synchronizer results: %s" , e )
429430 return True , False
430431
431432 return True , False
0 commit comments