Skip to content

Commit 15b7024

Browse files
authored
eventstream connection must be closed by user (#208)
Tried to infer when user wanted it to close via refcounts. This was difficult to pull off, and the strange side-effects were difficult to explain. Instead, go for the simple "you must close it" approach.
1 parent b481f3c commit 15b7024

File tree

4 files changed

+303
-301
lines changed

4 files changed

+303
-301
lines changed

awscrt/eventstream/rpc.py

Lines changed: 97 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from enum import IntEnum
1717
from functools import partial
1818
from typing import Optional, Sequence
19-
import weakref
2019

2120
__all__ = [
2221
'MessageType',
@@ -106,27 +105,20 @@ class ClientConnectionHandler(ABC):
106105
Inherit from this class and override methods to handle connection events.
107106
All callbacks for this connection will be invoked on the same thread,
108107
and `on_connection_setup()` will always be the first callback invoked.
109-
110-
Note that if the handler is garbage-collected, its callbacks will
111-
not be invoked. To receive all events, maintain a reference to the
112-
handler until its connection shuts down (or fails setup).
113108
"""
114109

115110
@abstractmethod
116111
def on_connection_setup(self, connection, error, **kwargs) -> None:
117112
"""Invoked upon completion of the setup attempt.
118113
119114
If setup was successful, the connection is provided to the user.
120-
The user must keep a reference to the connection or it will be
121-
garbage-collected and closed. A common pattern is to store a
122-
reference to the connection in the handler. Example:
123-
```
124-
def on_connection_setup(self, connection, error, **kwargs):
125-
if error:
126-
... do error handling ...
127-
else:
128-
self.connection = connection
129-
```
115+
116+
Note that the network connection stays alive until it is closed,
117+
even if no local references to the connection object remain.
118+
The user should store a reference to this connection, and call
119+
connection.close() when they are done with it to avoid leaking
120+
resources.
121+
130122
Setup will always be the first callback invoked on the handler.
131123
If setup failed, no further callbacks will be invoked on this handler.
132124
@@ -147,9 +139,6 @@ def on_connection_shutdown(self, reason: Optional[Exception], **kwargs) -> None:
147139
148140
This event will not be invoked if connection setup failed.
149141
150-
Note that this event will not be invoked if the handler is
151-
garbage-collected before the shutdown process completes.
152-
153142
Args:
154143
reason: Reason will be None if the user initiated the shutdown,
155144
otherwise the reason will be an Exception.
@@ -214,12 +203,31 @@ def _from_binding_msg_args(headers, payload, message_type, flags):
214203
return (headers, payload, message_type, flags)
215204

216205

206+
def _on_message_flush(bound_future, bound_callback, error_code):
207+
# invoked when a message is flushed (written to wire), or canceled due to connection error.
208+
e = awscrt.exceptions.from_code(error_code) if error_code else None
209+
try:
210+
if bound_callback:
211+
bound_callback(error=e)
212+
finally:
213+
# ensure future completes, even if user callback had unhandled exception
214+
if error_code:
215+
bound_future.set_exception(e)
216+
else:
217+
bound_future.set_result(None)
218+
219+
217220
class ClientConnection(NativeResource):
218221
"""A client connection for the event-stream RPC protocol.
219222
220223
Use :meth:`ClientConnection.connect()` to establish a new
221224
connection.
222225
226+
Note that the network connection stays alive until it is closed,
227+
even if no local references to the connection object remain.
228+
The user should store a reference to any connections, and call
229+
close() when they are done with them to avoid leaking resources.
230+
223231
Attributes:
224232
host_name (str): Remote host name.
225233
@@ -228,18 +236,20 @@ class ClientConnection(NativeResource):
228236
shutdown_future (concurrent.futures.Future[None]): Completes when this
229237
connection has finished shutting down. Future will contain a
230238
result of None, or an exception indicating why shutdown occurred.
231-
Note that the connection may have been garbage-collected before
232-
this future completes.
233239
"""
234240

235-
__slots__ = ['host_name', 'port', 'shutdown_future']
241+
__slots__ = ['host_name', 'port', 'shutdown_future', '_connect_future', '_handler']
236242

237-
def __init__(self, host_name, port):
243+
def __init__(self, host_name, port, handler):
238244
# Do no instantiate directly, use static connect method
239245
super().__init__()
240246
self.host_name = host_name # type: str
241247
self.port = port # type: int
242248
self.shutdown_future = Future() # type: Future
249+
self.shutdown_future.set_running_or_notify_cancel() # prevent cancel
250+
self._connect_future = Future() # type: Future
251+
self._connect_future.set_running_or_notify_cancel() # prevent cancel
252+
self._handler = handler # type: ClientConnectionHandler
243253

244254
@classmethod
245255
def connect(
@@ -275,121 +285,76 @@ def connect(
275285
Otherwise it will contain an exception.
276286
If the connection is successful, it will be made available via
277287
the handler's on_connection_setup callback.
288+
Note that this network connection stays alive until it is closed,
289+
even if no local references to the connection object remain.
290+
The user should store a reference to any connections, and call
291+
close() when they are done with them to avoid leaking resources.
278292
"""
279293

280294
if not socket_options:
281295
socket_options = SocketOptions()
282296

283-
future = Future()
284-
285297
# Connection is not made available to user until setup callback fires
286-
connection = cls(host_name, port)
287-
288-
# We must be careful to avoid circular references that prevent the connection from getting GC'd.
289-
# Only the internal _on_setup callback binds strong references to the connection and handler.
290-
# This is ok because it fires exactly once, and references to it are cleared afterwards.
291-
# All other callbacks must bind weak references to the handler,
292-
# or references to futures within the connection rather than the connection itself.
293-
handler_weakref = weakref.ref(handler)
298+
connection = cls(host_name, port, handler)
294299

300+
# connection._binding is set within the following call */
295301
_awscrt.event_stream_rpc_client_connection_connect(
296302
host_name,
297303
port,
298304
bootstrap,
299305
socket_options,
300306
tls_connection_options,
301-
partial(cls._on_connection_setup, future, handler, connection),
302-
partial(cls._on_connection_shutdown, connection.shutdown_future, handler_weakref),
303-
partial(cls._on_protocol_message, handler_weakref))
307+
connection)
304308

305-
return future
309+
return connection._connect_future
306310

307-
@staticmethod
308-
def _on_connection_setup(bound_future, bound_handler, bound_connection, binding, error_code):
311+
def _on_connection_setup(self, error_code):
309312
if error_code:
310313
connection = None
311314
error = awscrt.exceptions.from_code(error_code)
312315
else:
313-
connection = bound_connection
314-
connection._binding = binding
316+
connection = self
315317
error = None
316318

317319
try:
318-
bound_handler.on_connection_setup(connection=connection, error=error)
320+
self._handler.on_connection_setup(connection=connection, error=error)
319321
finally:
320322
# ensure future completes, even if user callback had unhandled exception
321323
if error:
322-
bound_future.set_exception(error)
324+
self._connect_future.set_exception(error)
323325
else:
324-
bound_future.set_result(None)
326+
self._connect_future.set_result(None)
325327

326-
@staticmethod
327-
def _on_connection_shutdown(bound_future, bound_weak_handler, error_code):
328+
def _on_connection_shutdown(self, error_code):
328329
reason = awscrt.exceptions.from_code(error_code) if error_code else None
329330
try:
330-
handler = bound_weak_handler()
331-
if handler:
332-
handler.on_connection_shutdown(reason=reason)
331+
self._handler.on_connection_shutdown(reason=reason)
333332
finally:
334333
# ensure future completes, even if user callback had unhandled exception
335334
if reason:
336-
bound_future.set_exception(reason)
335+
self.shutdown_future.set_exception(reason)
337336
else:
338-
bound_future.set_result(None)
337+
self.shutdown_future.set_result(None)
339338

340-
@staticmethod
341-
def _on_continuation_closed(bound_future, bound_weak_handler):
342-
try:
343-
handler = bound_weak_handler()
344-
if handler:
345-
handler.on_continuation_closed()
346-
finally:
347-
# ensure future completes, even if user callback had unhandled exception
348-
bound_future.set_result(None)
349-
350-
@staticmethod
351-
def _on_protocol_message(bound_weak_handler, headers, payload, message_type, flags):
352-
handler = bound_weak_handler()
353-
if handler:
354-
# transform from simple types to actual classes
355-
headers, payload, message_type, flags = _from_binding_msg_args(headers, payload, message_type, flags)
356-
handler.on_protocol_message(
357-
headers=headers,
358-
payload=payload,
359-
message_type=message_type,
360-
flags=flags)
361-
362-
@staticmethod
363-
def _on_continuation_message(bound_weak_handler, headers, payload, message_type, flags):
364-
handler = bound_weak_handler()
365-
if handler:
366-
# transform from simple types to actual classes
367-
headers, payload, message_type, flags = _from_binding_msg_args(headers, payload, message_type, flags)
368-
handler.on_continuation_message(
369-
headers=headers,
370-
payload=payload,
371-
message_type=message_type,
372-
flags=flags)
373-
374-
@staticmethod
375-
def _on_flush(bound_future, bound_callback, error_code):
376-
# invoked when a message is flushed (written to wire), or canceled due to connection error.
377-
e = awscrt.exceptions.from_code(error_code) if error_code else None
378-
try:
379-
if bound_callback:
380-
bound_callback(error=e)
381-
finally:
382-
# ensure future completes, even if user callback had unhandled exception
383-
if error_code:
384-
bound_future.set_exception(e)
385-
else:
386-
bound_future.set_result(None)
339+
def _on_protocol_message(self, headers, payload, message_type, flags):
340+
# transform from simple types to actual classes
341+
headers, payload, message_type, flags = _from_binding_msg_args(headers, payload, message_type, flags)
342+
self._handler.on_protocol_message(
343+
headers=headers,
344+
payload=payload,
345+
message_type=message_type,
346+
flags=flags)
387347

388348
def close(self):
389349
"""Close the connection.
390350
391-
Shutdown is asynchronous. This call has no effect if the connection is already
392-
closing.
351+
Shutdown is asynchronous. This call has no effect if the connection is
352+
already closed or closing.
353+
354+
Note that, if the network connection hasn't already ended,
355+
close() MUST be called to avoid leaking resources. The network
356+
connection will not terminate simply because there are no references
357+
to the connection object.
393358
394359
Returns:
395360
concurrent.futures.Future: This connection's :attr:`shutdown_future`,
@@ -453,6 +418,7 @@ def send_protocol_message(
453418
or an exception if the message fails to send.
454419
"""
455420
future = Future()
421+
future.set_running_or_notify_cancel() # prevent cancel
456422

457423
# native code deals with simplified types
458424
headers, payload, message_type, flags = _to_binding_msg_args(headers, payload, message_type, flags)
@@ -463,7 +429,7 @@ def send_protocol_message(
463429
payload,
464430
message_type,
465431
flags,
466-
partial(self._on_flush, future, on_flush))
432+
partial(_on_message_flush, future, on_flush))
467433
return future
468434

469435
def new_stream(self, handler: 'ClientContinuationHandler') -> 'ClientContinuation':
@@ -479,13 +445,9 @@ def new_stream(self, handler: 'ClientContinuationHandler') -> 'ClientContinuatio
479445
Returns:
480446
The new continuation object.
481447
"""
482-
handler_weakref = weakref.ref(handler)
483-
closed_future = Future()
484-
binding = _awscrt.event_stream_rpc_client_connection_new_stream(
485-
self,
486-
partial(self._on_continuation_message, handler_weakref),
487-
partial(self._on_continuation_closed, closed_future, handler_weakref))
488-
return ClientContinuation(binding, closed_future, self)
448+
continuation = ClientContinuation(handler, self)
449+
continuation._binding = _awscrt.event_stream_rpc_client_connection_new_stream(self)
450+
return continuation
489451

490452

491453
class ClientContinuation(NativeResource):
@@ -504,12 +466,13 @@ class ClientContinuation(NativeResource):
504466
when the continuation has closed.
505467
"""
506468

507-
def __init__(self, binding, closed_future, connection):
469+
def __init__(self, handler, connection):
508470
# Do not instantiate directly, use ClientConnection.new_stream()
509471
super().__init__()
510-
self._binding = binding
472+
self._handler = handler
511473
self.connection = connection # type: ClientConnection
512-
self.closed_future = closed_future # type: Future
474+
self.closed_future = Future() # type: Future
475+
self.closed_future.set_running_or_notify_cancel() # prevent cancel
513476

514477
def activate(
515478
self,
@@ -562,18 +525,23 @@ def activate(
562525
"""
563526

564527
flush_future = Future()
528+
flush_future.set_running_or_notify_cancel() # prevent cancel
565529

566530
# native code deals with simplified types
567531
headers, payload, message_type, flags = _to_binding_msg_args(headers, payload, message_type, flags)
568532

569533
_awscrt.event_stream_rpc_client_continuation_activate(
570534
self._binding,
535+
# don't give binding a reference to self until activate() is called.
536+
# this reference is used for invoking callbacks, and its existence
537+
# keeps the python object alive until the closed callback fires
538+
self,
571539
operation,
572540
headers,
573541
payload,
574542
message_type,
575543
flags,
576-
partial(ClientConnection._on_flush, flush_future, on_flush))
544+
partial(_on_message_flush, flush_future, on_flush))
577545

578546
return flush_future
579547

@@ -626,6 +594,7 @@ def send_message(
626594
or an exception if the message fails to send.
627595
"""
628596
future = Future()
597+
future.set_running_or_notify_cancel() # prevent cancel
629598
# native code deals with simplified types
630599
headers, payload, message_type, flags = _to_binding_msg_args(headers, payload, message_type, flags)
631600

@@ -635,12 +604,28 @@ def send_message(
635604
payload,
636605
message_type,
637606
flags,
638-
partial(ClientConnection._on_flush, future, on_flush))
607+
partial(_on_message_flush, future, on_flush))
639608
return future
640609

641610
def is_closed(self):
642611
return _awscrt.event_stream_rpc_client_continuation_is_closed(self._binding)
643612

613+
def _on_continuation_closed(self):
614+
try:
615+
self._handler.on_continuation_closed()
616+
finally:
617+
# ensure future completes, even if user callback had unhandled exception
618+
self.closed_future.set_result(None)
619+
620+
def _on_continuation_message(self, headers, payload, message_type, flags):
621+
# transform from simple types to actual classes
622+
headers, payload, message_type, flags = _from_binding_msg_args(headers, payload, message_type, flags)
623+
self._handler.on_continuation_message(
624+
headers=headers,
625+
payload=payload,
626+
message_type=message_type,
627+
flags=flags)
628+
644629

645630
class ClientContinuationHandler(ABC):
646631
"""Base class for handling stream continuation events.
@@ -651,10 +636,6 @@ class ClientContinuationHandler(ABC):
651636
652637
A common pattern is to store the continuation within its handler. Ex:
653638
`continuation_handler.continuation = connection.new_stream(continuation_handler)`
654-
655-
Note that if the handler is garbage-collected, its callbacks will no
656-
longer be invoked. Maintain a reference to the handler until the
657-
continuation is closed to receive all events.
658639
"""
659640

660641
@abstractmethod
@@ -690,9 +671,6 @@ def on_continuation_closed(self, **kwargs) -> None:
690671
The continuation is closed when a message is sent or received with
691672
the TERMINATE_STREAM flag, or when the connection shuts down.
692673
693-
Note that this event will not be invoked if the handler is
694-
garbage-collected before the stream completes.
695-
696674
Args:
697675
**kwargs: Forward compatibility kwargs.
698676
"""

0 commit comments

Comments
 (0)