Skip to content

Commit a76a940

Browse files
authored
event-stream headers and protocol messages (#203)
Define headers, enums, flags, etc necessary for sending and receiving protocol messages. 2 choices made for simplicity over efficiency: 1) Rather the pass complex `EventStreamHeader` class back and forth between python and C, I always pass a simple `(name, value, type)` tuple. Transforming back and forth between them is less efficient than dealing directly with the `EventStreamHeader` class in C, but it makes the C code much simpler and less likely to leak references. We can always come back and make this more efficient in the future without affecting the public API. 2) Deep-copying contents of python headers to C headers. We could get away with non-owning references to the header names and values, but the code would be more complex. Again, we can change this under the hood without affecting the public API.
1 parent 5e391b7 commit a76a940

File tree

7 files changed

+943
-35
lines changed

7 files changed

+943
-35
lines changed

awscrt/eventstream.py

Lines changed: 333 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,289 @@
1010
from awscrt import NativeResource
1111
import awscrt.exceptions
1212
from awscrt.io import ClientBootstrap, SocketOptions, TlsConnectionOptions
13+
from collections.abc import ByteString
1314
from concurrent.futures import Future
15+
from enum import IntEnum
1416
from functools import partial
15-
from typing import Optional
17+
from typing import Any, Optional, Sequence
18+
from uuid import UUID
1619
import weakref
1720

1821

22+
_BYTE_MIN = -2**7
23+
_BYTE_MAX = 2**7 - 1
24+
_INT16_MIN = -2**15
25+
_INT16_MAX = 2**15 - 1
26+
_INT32_MIN = -2**31
27+
_INT32_MAX = 2**31 - 1
28+
_INT64_MIN = -2**63
29+
_INT64_MAX = 2**63 - 1
30+
31+
32+
class EventStreamHeaderType(IntEnum):
33+
"""Supported types for the value within an EventStreamHeader"""
34+
35+
BOOL_TRUE = 0
36+
"""Value is True.
37+
38+
No actual value is transmitted on the wire."""
39+
40+
BOOL_FALSE = 1
41+
"""Value is False.
42+
43+
No actual value is transmitted on the wire."""
44+
45+
BYTE = 2
46+
"""Value is signed 8-bit int."""
47+
48+
INT16 = 3
49+
"""Value is signed 16-bit int."""
50+
51+
INT32 = 4
52+
"""Value is signed 32-bit int."""
53+
54+
INT64 = 5
55+
"""Value is signed 64-bit int."""
56+
57+
BYTE_BUF = 6
58+
"""Value is raw bytes."""
59+
60+
STRING = 7
61+
"""Value is a str.
62+
63+
Transmitted on the wire as utf-8"""
64+
65+
TIMESTAMP = 8
66+
"""Value is a posix timestamp (seconds since Unix epoch).
67+
68+
Transmitted on the wire as a 64-bit int"""
69+
70+
UUID = 9
71+
"""Value is a UUID.
72+
73+
Transmitted on the wire as 16 bytes"""
74+
75+
def __format__(self, format_spec):
76+
# override so formatted string doesn't simply look like an int
77+
return str(self)
78+
79+
80+
class EventStreamHeader:
81+
"""A header in an event-stream message.
82+
83+
Each header has a name, value, and type.
84+
class:`EventStreamHeaderType` enumerates the supported value types.
85+
"""
86+
87+
def __init__(self, name: str, value: Any, header_type: EventStreamHeaderType):
88+
# do not call directly, use EventStreamHeader.from_xyz() methods.
89+
self._name = name
90+
self._value = value
91+
self._type = header_type
92+
93+
@classmethod
94+
def from_bool(cls, name: str, value: bool) -> 'EventStreamHeader':
95+
if value:
96+
return cls(name, True, EventStreamHeaderType.BOOL_TRUE)
97+
else:
98+
return cls(name, False, EventStreamHeaderType.BOOL_FALSE)
99+
100+
@classmethod
101+
def from_byte(cls, name: str, value: int) -> 'EventStreamHeader':
102+
value = int(value)
103+
if value < _BYTE_MIN or value > _BYTE_MAX:
104+
raise ValueError("Value {} cannot fit in signed 8-bit byte".format(value))
105+
return cls(name, value, EventStreamHeaderType.BYTE)
106+
107+
@classmethod
108+
def from_int16(cls, name: str, value: int) -> 'EventStreamHeader':
109+
value = int(value)
110+
if value < _INT16_MIN or value > _INT16_MAX:
111+
raise ValueError("Value {} cannot fit in signed 16-bit int".format(value))
112+
return cls(name, value, EventStreamHeaderType.INT16)
113+
114+
@classmethod
115+
def from_int32(cls, name: str, value: int) -> 'EventStreamHeader':
116+
value = int(value)
117+
if value < _INT32_MIN or value > _INT32_MAX:
118+
raise ValueError("Value {} cannot fit in signed 32-bit int".format(value))
119+
return cls(name, value, EventStreamHeaderType.INT32)
120+
121+
@classmethod
122+
def from_int64(cls, name: str, value: int) -> 'EventStreamHeader':
123+
value = int(value)
124+
if value < _INT64_MIN or value > _INT64_MAX:
125+
raise ValueError("Value {} cannot fit in signed 64-bit int".format(value))
126+
return cls(name, value, EventStreamHeaderType.INT64)
127+
128+
@classmethod
129+
def from_byte_buf(cls, name: str, value: ByteString) -> 'EventStreamHeader':
130+
return cls(name, value, EventStreamHeaderType.BYTE_BUF)
131+
132+
@classmethod
133+
def from_string(cls, name: str, value: str) -> 'EventStreamHeader':
134+
value = str(value)
135+
return cls(name, value, EventStreamHeaderType.STRING)
136+
137+
@classmethod
138+
def from_timestamp(cls, name: str, value: int) -> 'EventStreamHeader':
139+
value = int(value)
140+
if value < _INT64_MIN or value > _INT64_MAX:
141+
raise ValueError("Value {} exceeds timestamp limits".format(value))
142+
return cls(name, value, EventStreamHeaderType.TIMESTAMP)
143+
144+
@classmethod
145+
def from_uuid(cls, name: str, value: UUID) -> 'EventStreamHeader':
146+
if not isinstance(value, UUID):
147+
raise TypeError("Value must be UUID, not {}".format(type(value)))
148+
return cls(name, value, EventStreamHeaderType.UUID)
149+
150+
@classmethod
151+
def _from_binding_tuple(cls, binding_tuple):
152+
# native code deals with a simplified tuple, rather than full class
153+
name, value, header_type = binding_tuple
154+
header_type = EventStreamHeaderType(header_type)
155+
if header_type == EventStreamHeaderType.UUID:
156+
value = UUID(bytes=value)
157+
return cls(name, value, header_type)
158+
159+
def _as_binding_tuple(self):
160+
# native code deals with a simplified tuple, rather than full class
161+
if self._type == EventStreamHeaderType.UUID:
162+
value = self._value.bytes
163+
else:
164+
value = self._value
165+
return (self._name, value, self._type)
166+
167+
@property
168+
def name(self) -> str:
169+
return self._name
170+
171+
@property
172+
def type(self) -> EventStreamHeaderType:
173+
return self._type
174+
175+
@property
176+
def value(self) -> Any:
177+
return self._value
178+
179+
def _value_as(self, header_type: EventStreamHeaderType) -> Any:
180+
if self._type != header_type:
181+
raise TypeError("Header type is {}, not {}".format(self._type, header_type))
182+
return self._value
183+
184+
def value_as_bool(self) -> bool:
185+
if self._type == EventStreamHeaderType.BOOL_TRUE:
186+
return True
187+
if self._type == EventStreamHeaderType.BOOL_FALSE:
188+
return False
189+
raise TypeError(
190+
"Header type is {}, not {} or {}".format(
191+
self._type,
192+
EventStreamHeaderType.BOOL_TRUE,
193+
EventStreamHeaderType.BOOL_FALSE))
194+
195+
def value_as_byte(self) -> int:
196+
return self._value_as(EventStreamHeaderType.BYTE)
197+
198+
def value_as_int16(self) -> int:
199+
return self._value_as(EventStreamHeaderType.INT16)
200+
201+
def value_as_int32(self) -> int:
202+
return self._value_as(EventStreamHeaderType.INT32)
203+
204+
def value_as_int64(self) -> int:
205+
return self._value_as(EventStreamHeaderType.INT64)
206+
207+
def value_as_byte_buf(self) -> ByteString:
208+
return self._value_as(EventStreamHeaderType.BYTE_BUF)
209+
210+
def value_as_string(self) -> str:
211+
return self._value_as(EventStreamHeaderType.STRING)
212+
213+
def value_as_timestamp(self) -> int:
214+
return self._value_as(EventStreamHeaderType.TIMESTAMP)
215+
216+
def value_as_uuid(self) -> UUID:
217+
return self._value_as(EventStreamHeaderType.UUID)
218+
219+
def __str__(self):
220+
return "{}: {} <{}>".format(
221+
self._name,
222+
repr(self._value),
223+
self._type.name)
224+
225+
def __repr__(self):
226+
return "{}({}, {}, {})".format(
227+
self.__class__.__name__,
228+
repr(self._name),
229+
repr(self._value),
230+
repr(self._type))
231+
232+
233+
class EventStreamRpcMessageType(IntEnum):
234+
"""Types of event-stream RPC messages.
235+
236+
Different message types expect specific headers and flags, consult documentation."""
237+
# TODO: flesh out these docs
238+
239+
APPLICATION_MESSAGE = 0
240+
"""Application message"""
241+
242+
APPLICATION_ERROR = 1
243+
"""Application error"""
244+
245+
PING = 2
246+
"""Ping"""
247+
248+
PING_RESPONSE = 3
249+
"""Ping response"""
250+
251+
CONNECT = 4
252+
"""Connect"""
253+
254+
CONNECT_ACK = 5
255+
"""Connect acknowledgement"""
256+
257+
PROTOCOL_ERROR = 6
258+
"""Protocol error"""
259+
260+
INTERNAL_ERROR = 7
261+
"""Internal error"""
262+
263+
def __format__(self, format_spec):
264+
# override so formatted string doesn't simply look like an int
265+
return str(self)
266+
267+
268+
class EventStreamRpcMessageFlag:
269+
"""Flags for event-stream RPC messages.
270+
271+
Flags may be XORed together.
272+
Not all flags can be used with all messages types, consult documentation.
273+
"""
274+
# TODO: flesh out these docs
275+
276+
# TODO: when python 3.5 is dropped this class should inherit from IntFlag.
277+
# When doing this, be sure to update type-hints and callbacks to pass
278+
# EventStreamRpcMessageFlag instead of plain int.
279+
280+
NONE = 0
281+
"""No flags"""
282+
283+
CONNECTION_ACCEPTED = 0x1
284+
"""Connection accepted
285+
286+
If this flag is absent from a CONNECT_ACK, the connection has been rejected."""
287+
288+
TERMINATE_STREAM = 0x2
289+
"""Terminate stream"""
290+
291+
def __format__(self, format_spec):
292+
# override so formatted string doesn't simply look like an int
293+
return str(self)
294+
295+
19296
class EventStreamRpcClientConnectionHandler(ABC):
20297
"""Base class for handling connection events.
21298
@@ -35,7 +312,7 @@ class EventStreamRpcClientConnectionHandler(ABC):
35312
"""
36313

37314
def __init__(self):
38-
self.connection = None
315+
self.connection = None # type: Optional[EventStreamRpcClientConnection]
39316

40317
@abstractmethod
41318
def on_connection_setup(self, **kwargs) -> None:
@@ -54,8 +331,13 @@ def on_connection_shutdown(self, reason: Optional[Exception], **kwargs) -> None:
54331
pass
55332

56333
@abstractmethod
57-
def on_protocol_message(self, **kwargs) -> None:
58-
# TODO define signature
334+
def on_protocol_message(
335+
self,
336+
headers: Sequence[EventStreamHeader],
337+
payload: bytes,
338+
message_type: EventStreamRpcMessageType,
339+
flags: int,
340+
**kwargs) -> None:
59341
pass
60342

61343

@@ -164,9 +446,12 @@ def _on_connection_shutdown(bound_future, bound_weak_handler, error_code):
164446
bound_future.set_result(None)
165447

166448
@staticmethod
167-
def _on_protocol_message(bound_weak_header, headers, payload, message_type, flags):
449+
def _on_protocol_message(bound_weak_handler, headers, payload, message_type, flags):
168450
handler = bound_weak_handler()
169451
if handler:
452+
# transform from simple types to actual classes
453+
headers = [EventStreamHeader._from_binding_tuple(i) for i in headers]
454+
message_type = EventStreamRpcMessageType(message_type)
170455
handler.on_protocol_message(
171456
headers=headers,
172457
payload=payload,
@@ -195,3 +480,46 @@ def is_open(self):
195480
finished shutting down.
196481
"""
197482
return _awscrt.event_stream_rpc_client_connection_is_open(self._binding)
483+
484+
def send_protocol_message(
485+
self,
486+
*,
487+
headers: Optional[Sequence[EventStreamHeader]] = [],
488+
payload: Optional[ByteString] = b'',
489+
message_type: EventStreamRpcMessageType,
490+
flags: int = EventStreamRpcMessageFlag.NONE) -> Future:
491+
"""Send a protocol message.
492+
493+
Protocol messages use stream 0.
494+
495+
Keyword Args:
496+
headers: Message headers.
497+
payload: Binary message payload.
498+
message_type: Message type.
499+
flags: Message flags. Values from EventStreamRpcMessageFlag may be
500+
XORed together. Not all flags can be used with all messages
501+
types, consult documentation.
502+
Returns:
503+
A future which completes with a result of None if the the message
504+
successfully written to the wire. This is still no guarantee
505+
that the peer received or processed the message.
506+
The future will complete with an exception if the connection
507+
is closed before the message can be sent.
508+
509+
"""
510+
511+
future = Future()
512+
513+
def _on_flush(error_code):
514+
if error_code:
515+
e = awscrt.exceptions.from_code(error_code)
516+
future.set_exception(e)
517+
else:
518+
future.set_result(None)
519+
520+
# native code deals with simplified types
521+
headers = [i._as_binding_tuple() for i in headers]
522+
523+
_awscrt.event_stream_rpc_client_connection_send_protocol_message(
524+
self._binding, headers, payload, message_type, flags, _on_flush)
525+
return future

crt/aws-c-event-stream

0 commit comments

Comments
 (0)