Skip to content

Commit c1c25ca

Browse files
authored
Websocket flow-control (#429)
Expose (and thoroughly document) flow control via the "read window".
1 parent 885b776 commit c1c25ca

File tree

7 files changed

+226
-29
lines changed

7 files changed

+226
-29
lines changed

awscrt/websocket.py

Lines changed: 136 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,17 @@
33
44
Use the :func:`connect()` to establish a :class:`WebSocket` client connection.
55
6+
Note from the developer: This is a very low-level API, which forces the
7+
user to deal with things like data fragmentation.
8+
A higher-level API could easily be built on top of this.
9+
10+
.. _authoring-callbacks:
11+
12+
Authoring Callbacks
13+
-------------------
614
All network operations in `awscrt.websocket` are asynchronous.
715
Callbacks are always invoked on the WebSocket's networking thread.
8-
You MUST NOT perform blocking operations from any callback, or you will cause a deadlock.
16+
You MUST NOT perform blocking network operations from any callback, or you will cause a deadlock.
917
For example: do not send a frame, and then wait for that frame to complete,
1018
within a callback. The WebSocket cannot do work until your callback returns,
1119
so the thread will be stuck. You can send the frame from within the callback,
@@ -17,9 +25,66 @@
1725
All functions and methods in `awscrt.websocket` are thread-safe.
1826
They can be called from any mix of threads.
1927
20-
Note from the developer: This is a very low-level API, which forces the
21-
user to deal with things like data fragmentation.
22-
A higher-level API could easily be built on top of this.
28+
.. _flow-control-reading:
29+
30+
Flow Control (reading)
31+
----------------------
32+
By default, the WebSocket will read from the network as fast as it can hand you the data.
33+
You must prevent the WebSocket from reading data faster than you can process it,
34+
or memory usage could balloon until your application explodes.
35+
36+
There are two ways to manage this.
37+
38+
First, and simplest, is to process incoming data synchronously within the
39+
`on_incoming_frame` callbacks. Since callbacks are invoked on the WebSocket's
40+
networking thread, the WebSocket cannot read more data until the callback returns.
41+
Therefore, processing the data in a synchronous manner
42+
(i.e. writing to disk, printing to screen, etc) will naturally
43+
affect `TCP flow control <https://en.wikipedia.org/wiki/Transmission_Control_Protocol#Flow_control>`_,
44+
and prevent data from arriving too fast. However, you MUST NOT perform a blocking
45+
network operation from within the callback or you risk deadlock (see :ref:`authoring-callbacks`).
46+
47+
The second, more complex, way requires you to manage the size of the read window.
48+
Do this if you are processing the data asynchronously
49+
(i.e. sending the data along on another network connection).
50+
Create the WebSocket with `manage_read_window` set true,
51+
and set `initial_read_window` to the number of bytes you are ready to receive right away.
52+
Whenever the read window reaches 0, you will stop receiving anything.
53+
The read window shrinks as you receive the payload from "data" frames (TEXT, BINARY, CONTINUATION).
54+
Call :meth:`WebSocket.increment_read_window()` to increase the window again keep frames flowing in.
55+
You only need to worry about the payload from "data" frames.
56+
The WebSocket automatically increments its window to account for any
57+
other incoming bytes, including other parts of a frame (opcode, payload-length, etc)
58+
and the payload of other frame types (PING, PONG, CLOSE).
59+
You'll probably want to do it like this:
60+
Pick the max amount of memory to buffer, and set this as the `initial_read_window`.
61+
When data arrives, the window has shrunk by that amount.
62+
Send this data along on the other network connection.
63+
When that data is done sending, call `increment_read_window()`
64+
by the amount you just finished sending.
65+
If you don't want to receive any data at first, set the `initial_read_window` to 0,
66+
and `increment_read_window()` when you're ready.
67+
Maintaining a larger window is better for overall throughput.
68+
69+
.. _flow-control-writing:
70+
71+
Flow Control (writing)
72+
----------------------
73+
You must also ensure that you do not continually send frames faster than the other
74+
side can read them, or memory usage could balloon until your application explodes.
75+
76+
The simplest approach is to only send 1 frame at a time.
77+
Use the :meth:`WebSocket.send_frame()` `on_complete` callback to know when the send is complete.
78+
Then you can try and send another.
79+
80+
A more complex, but higher throughput, way is to let multiple frames be in flight
81+
but have a cap. If the number of frames in flight, or bytes in flight, reaches
82+
your cap then wait until some frames complete before trying to send more.
83+
84+
.. _api:
85+
86+
API
87+
---
2388
"""
2489

2590
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
@@ -84,6 +149,17 @@ class Opcode(IntEnum):
84149
See `RFC 6455 section 5.5.3 <https://www.rfc-editor.org/rfc/rfc6455#section-5.5.3>`_.
85150
"""
86151

152+
def is_data_frame(self):
153+
"""True if this is a "data frame" opcode.
154+
155+
TEXT, BINARY, and CONTINUATION are "data frames". The rest are "control" frames.
156+
157+
If the WebSocket was created with `manage_read_window`,
158+
then the read window shrinks as "data frames" are received.
159+
See :ref:`flow-control-reading` for a thorough explanation.
160+
"""
161+
return self.value in (Opcode.TEXT, Opcode.BINARY, Opcode.CONTINUATION)
162+
87163

88164
MAX_PAYLOAD_LENGTH = 0x7FFFFFFFFFFFFFFF
89165
"""The maximum frame payload length allowed by RFC 6455"""
@@ -164,6 +240,17 @@ class IncomingFrame:
164240
165241
See `RFC 6455 section 5.4 - Fragmentation <https://www.rfc-editor.org/rfc/rfc6455#section-5.4>`_"""
166242

243+
def is_data_frame(self):
244+
"""True if this is a "data frame".
245+
246+
TEXT, BINARY, and CONTINUATION are "data frames". The rest are "control frames".
247+
248+
If the WebSocket was created with `manage_read_window`,
249+
then the read window shrinks as "data frames" are received.
250+
See :ref:`flow-control-reading` for a thorough explanation.
251+
"""
252+
return self.opcode.is_data_frame()
253+
167254

168255
@dataclass
169256
class OnIncomingFrameBeginData:
@@ -186,6 +273,11 @@ class OnIncomingFramePayloadData:
186273
Once all `frame.payload_length` bytes have been received
187274
(or the network connection is lost), the `on_incoming_frame_complete`
188275
callback will be invoked.
276+
277+
If the WebSocket was created with `manage_read_window`,
278+
and this is a "data frame" (TEXT, BINARY, CONTINUATION),
279+
then the read window shrinks by `len(data)`.
280+
See :ref:`flow-control-reading` for a thorough explanation.
189281
"""
190282

191283
frame: IncomingFrame
@@ -261,8 +353,8 @@ def send_frame(
261353
If you are not an expert, stick to sending :attr:`Opcode.TEXT` or :attr:`Opcode.BINARY` frames,
262354
and don't touch the FIN bit.
263355
264-
If you want to limit the amount of unsent data buffered in memory,
265-
wait until one frame completes before sending another.
356+
See :ref:`flow-control-writing` to learn about limiting the amount of
357+
unsent data buffered in memory.
266358
267359
Args:
268360
opcode: :class:`Opcode` for this frame.
@@ -286,7 +378,7 @@ def send_frame(
286378
or even guarantee that the data has left the machine yet,
287379
but it's on track to get there).
288380
289-
Read the :mod:`page notes<awscrt.websocket>` before authoring any callbacks.
381+
Be sure to read about :ref:`authoring-callbacks`.
290382
"""
291383
def _on_complete(error_code):
292384
cbdata = OnSendFrameCompleteData()
@@ -309,6 +401,21 @@ def _on_complete(error_code):
309401
fin,
310402
_on_complete)
311403

404+
def increment_read_window(self, size: int):
405+
"""Manually increment the read window by this many bytes, to continue receiving frames.
406+
407+
See :ref:`flow-control-reading` for a thorough explanation.
408+
If the WebSocket was created without `manage_read_window`, this function does nothing.
409+
This function may be called from any thread.
410+
411+
Args:
412+
size: in bytes
413+
"""
414+
if size < 0:
415+
raise ValueError("Increment size cannot be negative")
416+
417+
_awscrt.websocket_increment_read_window(self._binding, size)
418+
312419

313420
class _WebSocketCore(NativeResource):
314421
# Private class that handles wrangling callback data from C -> Python.
@@ -431,13 +538,13 @@ def connect(
431538
socket_options: Optional[SocketOptions] = None,
432539
tls_connection_options: Optional[TlsConnectionOptions] = None,
433540
proxy_options: Optional[HttpProxyOptions] = None,
541+
manage_read_window: bool = False,
542+
initial_read_window: Optional[int] = None,
434543
on_connection_setup: Callable[[OnConnectionSetupData], None],
435544
on_connection_shutdown: Optional[Callable[[OnConnectionShutdownData], None]] = None,
436545
on_incoming_frame_begin: Optional[Callable[[OnIncomingFrameBeginData], None]] = None,
437546
on_incoming_frame_payload: Optional[Callable[[OnIncomingFramePayloadData], None]] = None,
438547
on_incoming_frame_complete: Optional[Callable[[OnIncomingFrameCompleteData], None]] = None,
439-
enable_read_backpressure: bool = False,
440-
initial_read_window: Optional[int] = None,
441548
):
442549
"""Asynchronously establish a client WebSocket connection.
443550
@@ -459,7 +566,7 @@ def connect(
459566
done with a healthy WebSocket, to ensure that it shuts down and cleans up.
460567
It is very easy to accidentally keep a reference around without realizing it.
461568
462-
Read the :mod:`page notes<awscrt.websocket>` before authoring your callbacks.
569+
Be sure to read about :ref:`authoring-callbacks`.
463570
464571
Args:
465572
host: Hostname to connect to.
@@ -491,6 +598,17 @@ def connect(
491598
proxy_options: HTTP Proxy options.
492599
If not specified, no proxy is used.
493600
601+
manage_read_window: Set true to manually manage the flow-control read window.
602+
If false (the default), data arrives as fast as possible.
603+
See :ref:`flow-control-reading` for a thorough explanation.
604+
605+
initial_read_window: The initial size of the read window, in bytes.
606+
This must be set if `manage_read_window` is true,
607+
otherwise it is ignored.
608+
See :ref:`flow-control-reading` for a thorough explanation.
609+
An initial size of 0 will prevent any frames from arriving
610+
until :meth:`WebSocket.increment_read_window()` is called.
611+
494612
on_connection_setup: Callback invoked when the connect completes.
495613
Takes a single :class:`OnConnectionSetupData` argument.
496614
@@ -526,6 +644,10 @@ def connect(
526644
on_incoming_frame_payload: Optional callback, invoked 0+ times as payload data arrives.
527645
Takes a single :class:`OnIncomingFramePayloadData` argument.
528646
647+
If `manage_read_window` is on, and this is a "data frame",
648+
then the read window shrinks accordingly.
649+
See :ref:`flow-control-reading` for a thorough explanation.
650+
529651
If this callback raises an exception, the connection will shut down.
530652
531653
on_incoming_frame_complete: Optional callback, invoked when the WebSocket
@@ -538,12 +660,11 @@ def connect(
538660
539661
If this callback raises an exception, the connection will shut down.
540662
"""
541-
# TODO: document backpressure
542-
if enable_read_backpressure:
663+
if manage_read_window:
543664
if initial_read_window is None:
544-
raise ValueError("'initial_read_window' must be set if 'enable_read_backpressure' is enabled")
665+
raise ValueError("'initial_read_window' must be set if 'manage_read_window' is enabled")
545666
else:
546-
initial_read_window = 0x7FFFFFFF # TODO: fix how this works in C
667+
initial_read_window = 0 # value is ignored anyway
547668

548669
if initial_read_window < 0:
549670
raise ValueError("'initial_read_window' cannot be negative")
@@ -572,7 +693,7 @@ def connect(
572693
socket_options,
573694
tls_connection_options,
574695
proxy_options,
575-
enable_read_backpressure,
696+
manage_read_window,
576697
initial_read_window,
577698
core)
578699

source/websocket.c

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ PyObject *aws_py_websocket_client_connect(PyObject *self, PyObject *args) {
6262
PyObject *socket_options_py; /* O */
6363
PyObject *tls_options_py; /* O */
6464
PyObject *proxy_options_py; /* O */
65-
int enable_read_backpressure; /* p - boolean predicate */
65+
int manage_read_window; /* p - boolean predicate */
6666
Py_ssize_t initial_read_window; /* n */
6767
PyObject *websocket_core_py; /* O */
6868

@@ -77,7 +77,7 @@ PyObject *aws_py_websocket_client_connect(PyObject *self, PyObject *args) {
7777
&socket_options_py,
7878
&tls_options_py,
7979
&proxy_options_py,
80-
&enable_read_backpressure,
80+
&manage_read_window,
8181
&initial_read_window,
8282
&websocket_core_py)) {
8383
return NULL;
@@ -142,7 +142,7 @@ PyObject *aws_py_websocket_client_connect(PyObject *self, PyObject *args) {
142142
.on_incoming_frame_begin = s_websocket_on_incoming_frame_begin,
143143
.on_incoming_frame_payload = s_websocket_on_incoming_frame_payload,
144144
.on_incoming_frame_complete = s_websocket_on_incoming_frame_complete,
145-
.manual_window_management = enable_read_backpressure != 0,
145+
.manual_window_management = manage_read_window != 0,
146146
};
147147
if (aws_websocket_client_connect(&options) != AWS_OP_SUCCESS) {
148148
PyErr_SetAwsLastError();
@@ -516,10 +516,23 @@ PyObject *aws_py_websocket_send_frame(PyObject *self, PyObject *args) {
516516
}
517517

518518
PyObject *aws_py_websocket_increment_read_window(PyObject *self, PyObject *args) {
519-
/* TODO implement */
520519
(void)self;
521-
(void)args;
522-
return NULL;
520+
521+
PyObject *binding_py; /* O */
522+
Py_ssize_t size; /* n */
523+
524+
if (!PyArg_ParseTuple(args, "On", &binding_py, &size)) {
525+
return NULL;
526+
}
527+
528+
struct aws_websocket *websocket = PyCapsule_GetPointer(binding_py, s_websocket_capsule_name);
529+
if (!websocket) {
530+
return NULL;
531+
}
532+
533+
/* already checked that size was non-negative out in python */
534+
aws_websocket_increment_read_window(websocket, (size_t)size);
535+
Py_RETURN_NONE;
523536
}
524537

525538
PyObject *aws_py_websocket_create_handshake_request(PyObject *self, PyObject *args) {

test/test_mqtt5.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,7 @@ def test_connect_with_invalid_port(self):
539539
def test_connect_with_invalid_port_for_websocket_connection(self):
540540
client_options = mqtt5.ClientOptions("will be set by _create_client", 1883)
541541
client, callbacks = self._test_connect_fail(
542-
auth_type=AuthType.WS_BAD_PORT, client_options=client_options, expected_error_code=46)
542+
auth_type=AuthType.WS_BAD_PORT, client_options=client_options)
543543
client.stop()
544544
callbacks.future_stopped.result(TIMEOUT)
545545

0 commit comments

Comments
 (0)