1+ from __future__ import absolute_import
2+
13import json
24from threading import Thread
35
4- import time
6+ import backoff
7+ import requests
58from sseclient import SSEClient
6-
79from ldclient .interfaces import UpdateProcessor
810from ldclient .util import _stream_headers , log
911
@@ -13,44 +15,47 @@ def __init__(self, sdk_key, config, requester, store, ready):
1315 Thread .__init__ (self )
1416 self .daemon = True
1517 self ._sdk_key = sdk_key
18+ self ._uri = config .stream_uri
1619 self ._config = config
1720 self ._requester = requester
1821 self ._store = store
1922 self ._running = False
2023 self ._ready = ready
24+ self ._headers = _stream_headers (self ._sdk_key )
2125
2226 def run (self ):
23- log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._config . stream_uri )
27+ log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._uri )
2428 self ._running = True
25- hdrs = _stream_headers (self ._sdk_key )
26- uri = self ._config .stream_uri
2729 while self ._running :
28- try :
29- messages = SSEClient (uri , verify = self ._config .verify_ssl , headers = hdrs )
30- for msg in messages :
31- if not self ._running :
32- break
33- if self .process_message (self ._store , self ._requester , msg , self ._ready ) is True :
34- self ._ready .set ()
35- except Exception as e :
36- log .error ("Could not connect to LaunchDarkly stream: " + str (e .message ) +
37- " waiting 1 second before trying again." )
38- time .sleep (1 )
30+ self ._connect ()
31+
32+ def _backoff_expo ():
33+ return backoff .expo (max_value = 30 )
34+
35+ @backoff .on_exception (_backoff_expo , requests .exceptions .RequestException , max_tries = None , jitter = backoff .full_jitter )
36+ def _connect (self ):
37+ messages = SSEClient (self ._uri , verify = self ._config .verify_ssl , headers = self ._headers )
38+ for msg in messages :
39+ if not self ._running :
40+ break
41+ message_ok = self .process_message (self ._store , self ._requester , msg , self ._ready )
42+ if message_ok is True and self ._ready .is_set () is False :
43+ self ._ready .set ()
3944
4045 def stop (self ):
4146 log .info ("Stopping StreamingUpdateProcessor" )
4247 self ._running = False
4348
4449 def initialized (self ):
45- return self ._running and self ._ready .is_set () and self ._store .initialized
50+ return self ._running and self ._ready .is_set () is True and self ._store .initialized is True
4651
4752 @staticmethod
4853 def process_message (store , requester , msg , ready ):
4954 log .debug ("Received stream event {} with data: {}" .format (msg .event , msg .data ))
5055 if msg .event == 'put' :
5156 payload = json .loads (msg .data )
5257 store .init (payload )
53- if not ready .is_set () and store .initialized :
58+ if not ready .is_set () is True and store .initialized is True :
5459 log .info ("StreamingUpdateProcessor initialized ok" )
5560 return True
5661 elif msg .event == 'patch' :
@@ -63,7 +68,7 @@ def process_message(store, requester, msg, ready):
6368 store .upsert (key , requester .get_one (key ))
6469 elif msg .event == "indirect/put" :
6570 store .init (requester .get_all ())
66- if not ready .is_set () and store .initialized :
71+ if not ready .is_set () is True and store .initialized is True :
6772 log .info ("StreamingUpdateProcessor initialized ok" )
6873 return True
6974 elif msg .event == 'delete' :
0 commit comments