Skip to content

Commit c4e3152

Browse files
committed
Merge branch 'master' of github.com:launchdarkly/python-server-sdk
2 parents 61989c9 + bec80a0 commit c4e3152

File tree

5 files changed

+76
-23
lines changed

5 files changed

+76
-23
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
All notable changes to the LaunchDarkly Python SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org).
44

5+
## [6.9.4] - 2019-08-19
6+
### Fixed:
7+
- Under conditions where analytics events are being generated at an extremely high rate (for instance, if an application is evaluating a flag repeatedly in a tight loop on many threads), a thread could be blocked indefinitely within `variation` while waiting for the internal event processing logic to catch up with the backlog. The logic has been changed to drop events if necessary so threads will not be blocked (similar to how the SDK already drops events if the size of the event buffer is exceeded). If that happens, this warning message will be logged once: "Events are being produced faster than they can be processed; some events will be dropped". Under normal conditions this should never happen; this change is meant to avoid a concurrency bottleneck in applications that are already so busy that thread starvation is likely.
8+
59
## [6.9.3] - 2019-06-11
610
### Fixed:
711
- Usages of `Logger.warn()` were causing deprecation warnings in some versions of Python. Changed these to `Logger.warning()`. ([#125](https://github.com/launchdarkly/python-server-sdk/issues/125))

ldclient/event_processor.py

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ def __init__(self, capacity):
187187
def add_event(self, event):
188188
if len(self._events) >= self._capacity:
189189
if not self._exceeded_capacity:
190-
log.warning("Event queue is full-- dropped an event")
190+
log.warning("Exceeded event queue capacity. Increase capacity to avoid dropping events.")
191191
self._exceeded_capacity = True
192192
else:
193193
self._events.append(event)
@@ -205,13 +205,13 @@ def clear(self):
205205

206206

207207
class EventDispatcher(object):
208-
def __init__(self, queue, config, http_client):
209-
self._queue = queue
208+
def __init__(self, inbox, config, http_client):
209+
self._inbox = inbox
210210
self._config = config
211211
self._http = create_http_pool_manager(num_pools=1, verify_ssl=config.verify_ssl) if http_client is None else http_client
212212
self._close_http = (http_client is None) # so we know whether to close it later
213213
self._disabled = False
214-
self._buffer = EventBuffer(config.events_max_pending)
214+
self._outbox = EventBuffer(config.events_max_pending)
215215
self._user_keys = SimpleLRUCache(config.user_keys_capacity)
216216
self._formatter = EventOutputFormatter(config)
217217
self._last_known_past_time = 0
@@ -226,7 +226,7 @@ def _run_main_loop(self):
226226
log.info("Starting event processor")
227227
while True:
228228
try:
229-
message = self._queue.get(block=True)
229+
message = self._inbox.get(block=True)
230230
if message.type == 'event':
231231
self._process_event(message.param)
232232
elif message.type == 'flush':
@@ -248,7 +248,7 @@ def _process_event(self, event):
248248
return
249249

250250
# Always record the event in the summarizer.
251-
self._buffer.add_to_summary(event)
251+
self._outbox.add_to_summary(event)
252252

253253
# Decide whether to add the event to the payload. Feature events may be added twice, once for
254254
# the event (if tracked) and once for debugging.
@@ -271,13 +271,13 @@ def _process_event(self, event):
271271

272272
if add_index_event:
273273
ie = { 'kind': 'index', 'creationDate': event['creationDate'], 'user': user }
274-
self._buffer.add_event(ie)
274+
self._outbox.add_event(ie)
275275
if add_full_event:
276-
self._buffer.add_event(event)
276+
self._outbox.add_event(event)
277277
if add_debug_event:
278278
debug_event = event.copy()
279279
debug_event['debug'] = True
280-
self._buffer.add_event(debug_event)
280+
self._outbox.add_event(debug_event)
281281

282282
# Add to the set of users we've noticed, and return true if the user was already known to us.
283283
def notice_user(self, user):
@@ -298,13 +298,13 @@ def _should_debug_event(self, event):
298298
def _trigger_flush(self):
299299
if self._disabled:
300300
return
301-
payload = self._buffer.get_payload()
301+
payload = self._outbox.get_payload()
302302
if len(payload.events) > 0 or len(payload.summary.counters) > 0:
303303
task = EventPayloadSendTask(self._http, self._config, self._formatter, payload,
304304
self._handle_response)
305305
if self._flush_workers.execute(task.run):
306306
# The events have been handed off to a flush worker; clear them from our buffer.
307-
self._buffer.clear()
307+
self._outbox.clear()
308308
else:
309309
# We're already at our limit of concurrent flushes; leave the events in the buffer.
310310
pass
@@ -330,22 +330,23 @@ def _do_shutdown(self):
330330

331331

332332
class DefaultEventProcessor(EventProcessor):
333-
def __init__(self, config, http=None):
334-
self._queue = queue.Queue(config.events_max_pending)
333+
def __init__(self, config, http=None, dispatcher_class=None):
334+
self._inbox = queue.Queue(config.events_max_pending)
335+
self._inbox_full = False
335336
self._flush_timer = RepeatingTimer(config.flush_interval, self.flush)
336337
self._users_flush_timer = RepeatingTimer(config.user_keys_flush_interval, self._flush_users)
337338
self._flush_timer.start()
338339
self._users_flush_timer.start()
339340
self._close_lock = Lock()
340341
self._closed = False
341-
EventDispatcher(self._queue, config, http)
342+
(dispatcher_class or EventDispatcher)(self._inbox, config, http)
342343

343344
def send_event(self, event):
344345
event['creationDate'] = int(time.time() * 1000)
345-
self._queue.put(EventProcessorMessage('event', event))
346+
self._post_to_inbox(EventProcessorMessage('event', event))
346347

347348
def flush(self):
348-
self._queue.put(EventProcessorMessage('flush', None))
349+
self._post_to_inbox(EventProcessorMessage('flush', None))
349350

350351
def stop(self):
351352
with self._close_lock:
@@ -355,16 +356,34 @@ def stop(self):
355356
self._flush_timer.stop()
356357
self._users_flush_timer.stop()
357358
self.flush()
359+
# Note that here we are not calling _post_to_inbox, because we *do* want to wait if the inbox
360+
# is full; an orderly shutdown can't happen unless these messages are received.
358361
self._post_message_and_wait('stop')
359362

363+
def _post_to_inbox(self, message):
364+
try:
365+
self._inbox.put(message, block=False)
366+
except queue.Full:
367+
if not self._inbox_full:
368+
# possible race condition here, but it's of no real consequence - we'd just get an extra log line
369+
self._inbox_full = True
370+
log.warning("Events are being produced faster than they can be processed; some events will be dropped")
371+
360372
def _flush_users(self):
361-
self._queue.put(EventProcessorMessage('flush_users', None))
373+
self._inbox.put(EventProcessorMessage('flush_users', None))
362374

363375
# Used only in tests
364376
def _wait_until_inactive(self):
365377
self._post_message_and_wait('test_sync')
366378

367379
def _post_message_and_wait(self, type):
368380
reply = Event()
369-
self._queue.put(EventProcessorMessage(type, reply))
381+
self._inbox.put(EventProcessorMessage(type, reply))
370382
reply.wait()
383+
384+
# These magic methods allow use of the "with" block in tests
385+
def __enter__(self):
386+
return self
387+
388+
def __exit__(self, tyep, value, traceback):
389+
self.stop()

ldclient/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = "6.9.3"
1+
VERSION = "6.9.4"

testing/test_event_processor.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import pytest
3+
from threading import Thread
34
import time
45

56
from ldclient.config import Config
@@ -460,6 +461,35 @@ def test_will_still_send_after_429_error():
460461
def test_will_still_send_after_500_error():
461462
verify_recoverable_http_error(500)
462463

464+
def test_does_not_block_on_full_inbox():
465+
config = Config(events_max_pending=1) # this sets the size of both the inbox and the outbox to 1
466+
ep_inbox_holder = [ None ]
467+
ep_inbox = None
468+
469+
def dispatcher_factory(inbox, config, http):
470+
ep_inbox_holder[0] = inbox # it's an array because otherwise it's hard for a closure to modify a variable
471+
return None # the dispatcher object itself doesn't matter, we only manipulate the inbox
472+
def event_consumer():
473+
while True:
474+
message = ep_inbox.get(block=True)
475+
if message.type == 'stop':
476+
message.param.set()
477+
return
478+
def start_consuming_events():
479+
Thread(target=event_consumer).start()
480+
481+
with DefaultEventProcessor(config, mock_http, dispatcher_factory) as ep:
482+
ep_inbox = ep_inbox_holder[0]
483+
event1 = { 'kind': 'custom', 'key': 'event1', 'user': user }
484+
event2 = { 'kind': 'custom', 'key': 'event2', 'user': user }
485+
ep.send_event(event1)
486+
ep.send_event(event2) # this event should be dropped - inbox is full
487+
message1 = ep_inbox.get(block=False)
488+
had_no_more = ep_inbox.empty()
489+
start_consuming_events()
490+
assert message1.param == event1
491+
assert had_no_more
492+
463493
def verify_unrecoverable_http_error(status):
464494
setup_processor(Config(sdk_key = 'SDK_KEY'))
465495

testing/test_feature_store_helpers.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ def test_get_can_throw_exception(self, cached):
137137
core = MockCore()
138138
wrapper = make_wrapper(core, cached)
139139
core.error = CustomError()
140-
with pytest.raises(CustomError, message="expected exception"):
140+
with pytest.raises(CustomError):
141141
wrapper.get(THINGS, "key", lambda x: x)
142142

143143
@pytest.mark.parametrize("cached", [False, True])
@@ -204,7 +204,7 @@ def test_get_all_can_throw_exception(self, cached):
204204
core = MockCore()
205205
wrapper = make_wrapper(core, cached)
206206
core.error = CustomError()
207-
with pytest.raises(CustomError, message="expected exception"):
207+
with pytest.raises(CustomError):
208208
wrapper.all(THINGS)
209209

210210
@pytest.mark.parametrize("cached", [False, True])
@@ -255,7 +255,7 @@ def test_upsert_can_throw_exception(self, cached):
255255
core = MockCore()
256256
wrapper = make_wrapper(core, cached)
257257
core.error = CustomError()
258-
with pytest.raises(CustomError, message="expected exception"):
258+
with pytest.raises(CustomError):
259259
wrapper.upsert(THINGS, { "key": "x", "version": 1 })
260260

261261
@pytest.mark.parametrize("cached", [False, True])
@@ -281,7 +281,7 @@ def test_delete_can_throw_exception(self, cached):
281281
core = MockCore()
282282
wrapper = make_wrapper(core, cached)
283283
core.error = CustomError()
284-
with pytest.raises(CustomError, message="expected exception"):
284+
with pytest.raises(CustomError):
285285
wrapper.delete(THINGS, "x", 1)
286286

287287
def test_uncached_initialized_queries_state_only_until_inited(self):

0 commit comments

Comments
 (0)