33import json
44from threading import Thread
55
6- import time
7-
8- from requests import HTTPError
6+ import backoff
7+ import requests
98from sseclient import SSEClient
109from ldclient .interfaces import UpdateProcessor
1110from ldclient .util import _stream_headers , log
@@ -16,38 +15,32 @@ def __init__(self, sdk_key, config, requester, store, ready):
1615 Thread .__init__ (self )
1716 self .daemon = True
1817 self ._sdk_key = sdk_key
18+ self ._uri = config .stream_uri
1919 self ._config = config
2020 self ._requester = requester
2121 self ._store = store
2222 self ._running = False
2323 self ._ready = ready
24+ self ._headers = _stream_headers (self ._sdk_key )
2425
2526 def run (self ):
26- log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._config . stream_uri )
27+ log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._uri )
2728 self ._running = True
28- hdrs = _stream_headers (self ._sdk_key )
29- uri = self ._config .stream_uri
3029 while self ._running :
31- try :
32- messages = SSEClient (uri , verify = self ._config .verify_ssl , headers = hdrs )
33- for msg in messages :
34- if not self ._running :
35- break
36- message_ok = self .process_message (self ._store , self ._requester , msg , self ._ready )
37- if message_ok is True and self ._ready .is_set () is False :
38- self ._ready .set ()
39- except HTTPError as e :
40- if e .response is not None and e .response .status_code is not None :
41- if 400 <= e .response .status_code < 500 :
42- log .error ("StreamingUpdateProcessor response: " + str (e ) + ". Retries will not be attempted." )
43- if self ._ready .is_set () is False :
44- self ._ready .set ()
45- self ._running = False
46- return
47- except Exception as e :
48- log .error ("Could not connect to LaunchDarkly stream: " + str (e .message ) +
49- " waiting 1 second before trying again." )
50- time .sleep (1 )
30+ self ._connect ()
31+
32+ def _backoff_expo ():
33+ return backoff .expo (max_value = 30 )
34+
35+ @backoff .on_exception (_backoff_expo , requests .exceptions .RequestException , max_tries = None , jitter = backoff .full_jitter )
36+ def _connect (self ):
37+ messages = SSEClient (self ._uri , verify = self ._config .verify_ssl , headers = self ._headers )
38+ for msg in messages :
39+ if not self ._running :
40+ break
41+ message_ok = self .process_message (self ._store , self ._requester , msg , self ._ready )
42+ if message_ok is True and self ._ready .is_set () is False :
43+ self ._ready .set ()
5144
5245 def stop (self ):
5346 log .info ("Stopping StreamingUpdateProcessor" )
0 commit comments