Skip to content

Commit 5e391b7

Browse files
authored
event-stream RPC connection (#201)
implemented connect() and close() so far. Chose to use a handler, instead of individual callbacks, so that it's harder to create circular references that would prevent the connection from ever cleaning up.
1 parent cf70714 commit 5e391b7

File tree

6 files changed

+625
-1
lines changed

6 files changed

+625
-1
lines changed

awscrt/eventstream.py

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
"""
2+
event-stream library for `awscrt`.
3+
"""
4+
5+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
6+
# SPDX-License-Identifier: Apache-2.0.
7+
8+
import _awscrt
9+
from abc import ABC, abstractmethod
10+
from awscrt import NativeResource
11+
import awscrt.exceptions
12+
from awscrt.io import ClientBootstrap, SocketOptions, TlsConnectionOptions
13+
from concurrent.futures import Future
14+
from functools import partial
15+
from typing import Optional
16+
import weakref
17+
18+
19+
class EventStreamRpcClientConnectionHandler(ABC):
20+
"""Base class for handling connection events.
21+
22+
Inherit from this class and override functions to handle connection events.
23+
All events for this connection will be invoked on the same thread,
24+
and `on_connection_setup()` will always be the first event invoked.
25+
The `connection` property will be set before any events are invoked.
26+
If the connect attempt is unsuccessful, no events will be invoked.
27+
28+
Note that the `on_connection_shutdown()` event will not be invoked
29+
if the handler is garbage-collected before the connection's internal
30+
resources finish shutting down.
31+
32+
Attributes:
33+
connection (Optional[EventStreamRpcClientConnection]): Initially None.
34+
Will be set to the actual connection before any events are invoked.
35+
"""
36+
37+
def __init__(self):
38+
self.connection = None
39+
40+
@abstractmethod
41+
def on_connection_setup(self, **kwargs) -> None:
42+
"""Invoked when connection has been successfully established.
43+
44+
This will always be the first callback invoked on the handler.
45+
"""
46+
pass
47+
48+
@abstractmethod
49+
def on_connection_shutdown(self, reason: Optional[Exception], **kwargs) -> None:
50+
"""Invoked when the connection finishes shutting down.
51+
52+
Note that this event will not be invoked if the handler is
53+
garbage-collected before the shutdown process completes"""
54+
pass
55+
56+
@abstractmethod
57+
def on_protocol_message(self, **kwargs) -> None:
58+
# TODO define signature
59+
pass
60+
61+
62+
class EventStreamRpcClientConnection(NativeResource):
63+
"""A client connection for the event-stream RPC protocol.
64+
65+
Use :meth:`EventStreamRpcClientConnection.connect()` to establish a new
66+
connection.
67+
68+
Attributes:
69+
host_name (str): Remote host name.
70+
71+
port (int): Remote port.
72+
73+
shutdown_future (concurrent.futures.Future[None]): Completes when this
74+
connection has finished shutting down. Future will contain a
75+
result of None, or an exception indicating why shutdown occurred.
76+
Note that the connection may have been garbage-collected before
77+
this future completes.
78+
"""
79+
80+
__slots__ = ['host_name', 'port', 'shutdown_future']
81+
82+
def __init__(self, host_name, port):
83+
# Do no instantiate directly, use static connect method
84+
super().__init__()
85+
self.host_name = host_name
86+
self.port = port
87+
self.shutdown_future = Future()
88+
89+
@classmethod
90+
def connect(
91+
cls,
92+
*,
93+
handler: EventStreamRpcClientConnectionHandler,
94+
host_name: str,
95+
port: int,
96+
bootstrap: ClientBootstrap,
97+
socket_options: Optional[SocketOptions] = None,
98+
tls_connection_options: Optional[TlsConnectionOptions] = None) -> Future:
99+
"""Asynchronously establish a new EventStreamRpcClientConnection.
100+
101+
Args:
102+
TODO (int): fill this out
103+
Returns:
104+
concurrent.futures.Future: A Future which completes when the connection succeeds or fails.
105+
If successful, the Future will contain None.
106+
Otherwise it will contain an exception.
107+
If the connection is successful, it is accessible via `handler.connection`.
108+
"""
109+
110+
if not socket_options:
111+
socket_options = SocketOptions()
112+
113+
future = Future()
114+
115+
# Connection is not made available to user until setup callback fires
116+
connection = cls(host_name, port)
117+
118+
# We must be careful to avoid circular references that prevent the connection from getting GC'd.
119+
# Only the internal _on_setup callback binds strong references to the connection and handler.
120+
# This is ok because it fires exactly once, and references to it are cleared afterwards.
121+
# All other callbacks must bind weak references to the handler,
122+
# or references to futures within the connection rather than the connection itself.
123+
handler_weakref = weakref.ref(handler)
124+
125+
connection._binding = _awscrt.event_stream_rpc_client_connection_connect(
126+
host_name,
127+
port,
128+
bootstrap,
129+
socket_options,
130+
tls_connection_options,
131+
partial(cls._on_connection_setup, future, handler, connection),
132+
partial(cls._on_connection_shutdown, connection.shutdown_future, handler_weakref),
133+
partial(cls._on_protocol_message, handler_weakref))
134+
135+
return future
136+
137+
@staticmethod
138+
def _on_connection_setup(bound_future, bound_handler, bound_connection, error_code):
139+
try:
140+
if error_code:
141+
e = awscrt.exceptions.from_code(error_code)
142+
bound_future.set_exception(e)
143+
else:
144+
bound_handler.connection = bound_connection
145+
bound_handler.on_connection_setup()
146+
bound_future.set_result(None)
147+
except Exception as e:
148+
# user callback had unhandled exception, set future as failed
149+
bound_future.set_exception(e)
150+
raise
151+
152+
@staticmethod
153+
def _on_connection_shutdown(bound_future, bound_weak_handler, error_code):
154+
reason = awscrt.exceptions.from_code(error_code) if error_code else None
155+
try:
156+
handler = bound_weak_handler()
157+
if handler:
158+
handler.on_connection_shutdown(reason=reason)
159+
finally:
160+
# user callback had unhandled exception, use finally to ensure future gets set
161+
if reason:
162+
bound_future.set_exception(reason)
163+
else:
164+
bound_future.set_result(None)
165+
166+
@staticmethod
167+
def _on_protocol_message(bound_weak_header, headers, payload, message_type, flags):
168+
handler = bound_weak_handler()
169+
if handler:
170+
handler.on_protocol_message(
171+
headers=headers,
172+
payload=payload,
173+
message_type=message_type,
174+
flags=flags)
175+
176+
def close(self):
177+
"""Close the connection.
178+
179+
Shutdown is asynchronous. This call has no effect if the connection is already
180+
closing.
181+
182+
Returns:
183+
concurrent.futures.Future: This connection's :attr:`shutdown_future`,
184+
which completes when shutdown has finished.
185+
"""
186+
# TODO: let user pass their own exception/error-code/reason for closing
187+
_awscrt.event_stream_rpc_client_connection_close(self._binding)
188+
return self.shutdown_future
189+
190+
def is_open(self):
191+
"""
192+
Returns:
193+
bool: True if this connection is open and usable, False otherwise.
194+
Check :attr:`shutdown_future` to know when the connection is completely
195+
finished shutting down.
196+
"""
197+
return _awscrt.event_stream_rpc_client_connection_is_open(self._binding)

source/event_stream.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#ifndef AWS_CRT_PYTHON_EVENT_STREAM_H
2+
#define AWS_CRT_PYTHON_EVENT_STREAM_H
3+
/**
4+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
* SPDX-License-Identifier: Apache-2.0.
6+
*/
7+
#include "module.h"
8+
9+
PyObject *aws_py_event_stream_rpc_client_connection_connect(PyObject *self, PyObject *args);
10+
PyObject *aws_py_event_stream_rpc_client_connection_close(PyObject *self, PyObject *args);
11+
PyObject *aws_py_event_stream_rpc_client_connection_is_open(PyObject *self, PyObject *args);
12+
13+
#endif /* AWS_CRT_PYTHON_EVENT_STREAM_H */

0 commit comments

Comments
 (0)