Skip to content

Commit 082ebfe

Browse files
Mqtt311 operation statistics support (#437)
MQTT311 operation statistics support
1 parent 755ceac commit 082ebfe

File tree

7 files changed

+191
-1
lines changed

7 files changed

+191
-1
lines changed

awscrt/mqtt.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import awscrt.exceptions
1515
from awscrt.http import HttpProxyOptions, HttpRequest
1616
from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions
17+
from dataclasses import dataclass
1718

1819

1920
class QoS(IntEnum):
@@ -150,6 +151,22 @@ def __init__(self, bootstrap=None, tls_ctx=None):
150151
self._binding = _awscrt.mqtt_client_new(bootstrap, tls_ctx)
151152

152153

154+
@dataclass
155+
class OperationStatisticsData:
156+
"""Dataclass containing some simple statistics about the current state of the connection's queue of operations
157+
158+
Args:
159+
incomplete_operation_count (int): total number of operations submitted to the connection that have not yet been completed. Unacked operations are a subset of this.
160+
incomplete_operation_size (int): total packet size of operations submitted to the connection that have not yet been completed. Unacked operations are a subset of this.
161+
unacked_operation_count (int): total number of operations that have been sent to the server and are waiting for a corresponding ACK before they can be completed.
162+
unacked_operation_size (int): total packet size of operations that have been sent to the server and are waiting for a corresponding ACK before they can be completed.
163+
"""
164+
incomplete_operation_count: int = 0
165+
incomplete_operation_size: int = 0
166+
unacked_operation_count: int = 0
167+
unacked_operation_size: int = 0
168+
169+
153170
class Connection(NativeResource):
154171
"""MQTT client connection.
155172
@@ -703,6 +720,30 @@ def puback(packet_id, error_code):
703720

704721
return future, packet_id
705722

723+
def get_stats(self):
724+
"""Queries the connection's internal statistics for incomplete operations.
725+
726+
Returns:
727+
A future with a (:class:`OperationStatisticsData`)
728+
"""
729+
730+
future = Future()
731+
732+
def get_stats_result(
733+
incomplete_operation_count,
734+
incomplete_operation_size,
735+
unacked_operation_count,
736+
unacked_operation_size):
737+
operation_statistics_data = OperationStatisticsData(
738+
incomplete_operation_count,
739+
incomplete_operation_size,
740+
unacked_operation_count,
741+
unacked_operation_size)
742+
future.set_result(operation_statistics_data)
743+
744+
_awscrt.mqtt_client_connection_get_stats(self._binding, get_stats_result)
745+
return future
746+
706747

707748
class WebsocketHandshakeTransformArgs:
708749
"""

crt/aws-c-mqtt

source/module.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,7 @@ static PyMethodDef s_module_methods[] = {
706706
AWS_PY_METHOD_DEF(mqtt_client_connection_unsubscribe, METH_VARARGS),
707707
AWS_PY_METHOD_DEF(mqtt_client_connection_disconnect, METH_VARARGS),
708708
AWS_PY_METHOD_DEF(mqtt_ws_handshake_transform_complete, METH_VARARGS),
709+
AWS_PY_METHOD_DEF(mqtt_client_connection_get_stats, METH_VARARGS),
709710

710711
/* MQTT5 Client */
711712
AWS_PY_METHOD_DEF(mqtt5_client_new, METH_VARARGS),

source/mqtt_client_connection.c

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,3 +1172,52 @@ PyObject *aws_py_mqtt_client_connection_disconnect(PyObject *self, PyObject *arg
11721172

11731173
Py_RETURN_NONE;
11741174
}
1175+
1176+
PyObject *aws_py_mqtt_client_connection_get_stats(PyObject *self, PyObject *args) {
1177+
(void)self;
1178+
bool success = false;
1179+
1180+
PyObject *impl_capsule;
1181+
PyObject *get_stats_callback_fn_py;
1182+
1183+
if (!PyArg_ParseTuple(args, "OO", &impl_capsule, &get_stats_callback_fn_py)) {
1184+
return NULL;
1185+
}
1186+
1187+
struct mqtt_connection_binding *connection =
1188+
PyCapsule_GetPointer(impl_capsule, s_capsule_name_mqtt_client_connection);
1189+
if (!connection) {
1190+
return NULL;
1191+
}
1192+
1193+
/* These must be DECREF'd when function ends */
1194+
PyObject *result = NULL;
1195+
1196+
struct aws_mqtt_connection_operation_statistics stats;
1197+
AWS_ZERO_STRUCT(stats);
1198+
1199+
aws_mqtt_client_connection_get_stats(connection->native, &stats);
1200+
1201+
result = PyObject_CallFunction(
1202+
get_stats_callback_fn_py,
1203+
"(KKKK)",
1204+
/* K */ (unsigned long long)stats.incomplete_operation_count,
1205+
/* K */ (unsigned long long)stats.incomplete_operation_size,
1206+
/* K */ (unsigned long long)stats.unacked_operation_count,
1207+
/* K */ (unsigned long long)stats.unacked_operation_size);
1208+
if (!result) {
1209+
PyErr_WriteUnraisable(PyErr_Occurred());
1210+
goto done;
1211+
}
1212+
1213+
success = true;
1214+
1215+
done:
1216+
1217+
Py_XDECREF(result);
1218+
1219+
if (success) {
1220+
Py_RETURN_NONE;
1221+
}
1222+
return NULL;
1223+
}

source/mqtt_client_connection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ PyObject *aws_py_mqtt_client_connection_on_message(PyObject *self, PyObject *arg
2020
PyObject *aws_py_mqtt_client_connection_unsubscribe(PyObject *self, PyObject *args);
2121
PyObject *aws_py_mqtt_client_connection_resubscribe_existing_topics(PyObject *self, PyObject *args);
2222
PyObject *aws_py_mqtt_client_connection_disconnect(PyObject *self, PyObject *args);
23+
PyObject *aws_py_mqtt_client_connection_get_stats(PyObject *self, PyObject *args);
2324

2425
PyObject *aws_py_mqtt_ws_handshake_transform_complete(PyObject *self, PyObject *args);
2526

test/test_mqtt.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,64 @@ def test_connect_disconnect_with_default_singletons(self):
237237
EventLoopGroup.release_static_default()
238238
DefaultHostResolver.release_static_default()
239239

240+
def test_connect_publish_wait_statistics_disconnect(self):
241+
connection = self._create_connection()
242+
connection.connect().result(TIMEOUT)
243+
244+
# check operation statistics
245+
statistics = connection.get_stats().result(TIMEOUT)
246+
self.assertEqual(statistics.incomplete_operation_count, 0)
247+
self.assertEqual(statistics.incomplete_operation_size, 0)
248+
self.assertEqual(statistics.unacked_operation_count, 0)
249+
self.assertEqual(statistics.unacked_operation_size, 0)
250+
251+
# publish
252+
published, packet_id = connection.publish(self.TEST_TOPIC, self.TEST_MSG, QoS.AT_LEAST_ONCE)
253+
puback = published.result(TIMEOUT)
254+
self.assertEqual(packet_id, puback['packet_id'])
255+
256+
# check operation statistics
257+
statistics = connection.get_stats().result(TIMEOUT)
258+
self.assertEqual(statistics.incomplete_operation_count, 0)
259+
self.assertEqual(statistics.incomplete_operation_size, 0)
260+
self.assertEqual(statistics.unacked_operation_count, 0)
261+
self.assertEqual(statistics.unacked_operation_size, 0)
262+
263+
# disconnect
264+
connection.disconnect().result(TIMEOUT)
265+
266+
def test_connect_publish_statistics_wait_disconnect(self):
267+
connection = self._create_connection()
268+
connection.connect().result(TIMEOUT)
269+
270+
# publish
271+
published, packet_id = connection.publish(self.TEST_TOPIC, self.TEST_MSG, QoS.AT_LEAST_ONCE)
272+
# Per packet: (The size of the topic, the size of the payload, 2 for the header and 2 for the packet ID)
273+
expected_size = len(self.TEST_TOPIC) + len(self.TEST_MSG) + 4
274+
275+
# check operation statistics
276+
statistics = connection.get_stats().result(TIMEOUT)
277+
self.assertEqual(statistics.incomplete_operation_count, 1)
278+
self.assertEqual(statistics.incomplete_operation_size, expected_size)
279+
# NOTE: Unacked will be zero because we have not invoked the future yet
280+
# and so it has not had time to move to the socket
281+
self.assertEqual(statistics.unacked_operation_count, 0)
282+
self.assertEqual(statistics.unacked_operation_size, 0)
283+
284+
# wait for PubAck
285+
puback = published.result(TIMEOUT)
286+
self.assertEqual(packet_id, puback['packet_id'])
287+
288+
# check operation statistics
289+
statistics = connection.get_stats().result(TIMEOUT)
290+
self.assertEqual(statistics.incomplete_operation_count, 0)
291+
self.assertEqual(statistics.incomplete_operation_size, 0)
292+
self.assertEqual(statistics.unacked_operation_count, 0)
293+
self.assertEqual(statistics.unacked_operation_size, 0)
294+
295+
# disconnect
296+
connection.disconnect().result(TIMEOUT)
297+
240298

241299
if __name__ == 'main':
242300
unittest.main()

test/test_mqtt5.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1226,6 +1226,46 @@ def test_interruption_qos1_publish(self):
12261226

12271227
callbacks.future_stopped.result(TIMEOUT)
12281228

1229+
# ==============================================================
1230+
# MISC TEST CASES
1231+
# ==============================================================
1232+
1233+
def test_operation_statistics_uc1(self):
1234+
client_id_publisher = create_client_id()
1235+
payload = "HELLO WORLD"
1236+
topic_filter = "test/MQTT5_Binding_Python_" + client_id_publisher
1237+
1238+
client_options = mqtt5.ClientOptions("will be replaced", 0)
1239+
client_options.connect_options = mqtt5.ConnectPacket(client_id=client_id_publisher)
1240+
client1, callbacks = self._test_connect(auth_type=AuthType.DIRECT_MUTUAL_TLS, client_options=client_options)
1241+
1242+
# Make sure the operation statistics are empty
1243+
statistics = client1.get_stats().result(TIMEOUT)
1244+
self.assertEqual(statistics.incomplete_operation_count, 0)
1245+
self.assertEqual(statistics.incomplete_operation_size, 0)
1246+
self.assertEqual(statistics.unacked_operation_count, 0)
1247+
self.assertEqual(statistics.unacked_operation_size, 0)
1248+
1249+
publish_packet = mqtt5.PublishPacket(
1250+
payload=payload,
1251+
topic=topic_filter,
1252+
qos=mqtt5.QoS.AT_LEAST_ONCE)
1253+
1254+
publishes = 10
1255+
for x in range(publishes):
1256+
publish_future = client1.publish(publish_packet)
1257+
publish_future.result(TIMEOUT)
1258+
1259+
# Make sure the operation statistics are empty
1260+
statistics = client1.get_stats().result(TIMEOUT)
1261+
self.assertEqual(statistics.incomplete_operation_count, 0)
1262+
self.assertEqual(statistics.incomplete_operation_size, 0)
1263+
self.assertEqual(statistics.unacked_operation_count, 0)
1264+
self.assertEqual(statistics.unacked_operation_size, 0)
1265+
1266+
client1.stop()
1267+
callbacks.future_stopped.result(TIMEOUT)
1268+
12291269

12301270
if __name__ == 'main':
12311271
unittest.main()

0 commit comments

Comments
 (0)