From 46b0b253bd49ae2ccddfb122bbe44ee2578e3b5b Mon Sep 17 00:00:00 2001 From: Kyle Sunden Date: Fri, 19 Dec 2025 17:29:35 -0600 Subject: [PATCH 1/2] Rework unpacker to use the null termination of messages --- yaqc/yaqc/_socket.py | 1 + yaqd-core/yaqd_core/avrorpc/unpacker.py | 151 +++++++++++------------- 2 files changed, 73 insertions(+), 79 deletions(-) diff --git a/yaqc/yaqc/_socket.py b/yaqc/yaqc/_socket.py index 28a8375..a06bcad 100644 --- a/yaqc/yaqc/_socket.py +++ b/yaqc/yaqc/_socket.py @@ -65,6 +65,7 @@ def handshake( self._write(request) self._write_metadata() self._write_method_name("") + self._write_terminator() # read response response = self._read(handshake_response) self._read({"type": "map", "values": "bytes"}) diff --git a/yaqd-core/yaqd_core/avrorpc/unpacker.py b/yaqd-core/yaqd_core/avrorpc/unpacker.py index 1d507f7..a3d67aa 100644 --- a/yaqd-core/yaqd_core/avrorpc/unpacker.py +++ b/yaqd-core/yaqd_core/avrorpc/unpacker.py @@ -10,72 +10,80 @@ class Unpacker: - def __init__(self, protocol, file_like=None): + def __init__(self, protocol): self.protocol = protocol - if file_like is None: - self._file = io.BytesIO() - else: - self._file = file_like - self.buf = io.BytesIO() + + self._msg_bufs = asyncio.Queue() + + self._buf = io.BytesIO() + self._remaining = 0 + self.handshake_complete = False - self.handshake_response = None - self.meta = None - self.message_name = None - self.parameters = None - self.remaining = 0 + self._handshake_response = None self.named_types = {t["name"]: t for t in self.protocol.get("types", [])} - self.new_data = asyncio.Event() def __aiter__(self): return self async def __anext__(self): - while True: - try: - self.new_data.clear() - if not self.handshake_complete and self.handshake_response is None: - handshake_request = await self._read_object( - handshake_request_schema - ) - self.handshake_response = handle_handshake( - handshake_request, self.protocol - ) - if self.handshake_response.match == "BOTH": - self.handshake_complete = True - if self.meta is None: - self.meta = await self._read_object( - {"type": "map", "values": "bytes"} - ) - if self.message_name is None: - self.message_name = await self._read_object("string") - if self.message_name != "" and self.protocol["messages"][ - self.message_name - ].get("request", []): - await self._read_parameters(self.message_name) - - ret = ( - self.handshake_response, - self.meta, - self.message_name, - self.parameters, - ) - self.handshake_response = None - self.meta = None - self.message_name = None - self.parameters = None - return ret - except (ValueError, struct.error): - await self.new_data.wait() + msg = await self._msg_bufs.get() + print("Processing a message") + parameters = None + meta = self._read_object( + {"type": "map", "values": "bytes"}, + msg, + ) + message_name = self._read_object("string", msg) + if message_name != "" and self.protocol["messages"][ + message_name + ].get("request", []): + parameters = self._read_parameters(message_name, msg) + + print(f"Identifieed {message_name=} with {parameters=}") + hs = None + if self._handshake_response: + hs = self._handshake_response + self._handshake_response = None + return ( + hs, + meta, + message_name, + parameters, + ) def feed(self, data: bytes): - # Must support random access, if it does not, must be fed externally (e.g. TCP) - pos = self._file.tell() - self._file.seek(0, 2) - self._file.write(data) - self._file.seek(pos) - self.new_data.set() - - async def _read_object(self, schema): + self._buf.seek(0, 2) + while data: + print(data, self._remaining, len(data)) + if self._remaining: + written = self._buf.write(data[:self._remaining]) + data = data[written:] + self._remaining -= written + if data: + self._remaining = struct.unpack_from(">L", data[:4])[0] + data = data[4:] + if self._remaining == 0: + self._buf.seek(0) + if not self.handshake_complete: + print("Doing handshake") + handshake_request = self._read_object( + handshake_request_schema, + self._buf, + ) + print("Read handshake request", handshake_request) + self._handshake_response = handle_handshake( + handshake_request, self.protocol + ) + print(self._handshake_response.match) + if self._handshake_response.match == "BOTH": + print("Handshake complete") + self.handshake_complete = True + + self._msg_bufs.put_nowait(self._buf) + self._buf = io.BytesIO() + + + def _read_object(self, schema, buf): schema = fastavro.parse_schema( schema, expand=True, named_schemas=self.named_types ) @@ -86,26 +94,11 @@ async def _read_object(self, schema): ) except fastavro.schema.SchemaParseException: pass # Must not have needed the second pass... - while True: - try: - self.buf.seek(0) - obj = fastavro.schemaless_reader(self.buf, schema) - self.buf = io.BytesIO() - return obj - except Exception: - self.buf.seek(0) - if not self.remaining: - self.remaining = struct.unpack_from(">L", self._file.read(4))[0] - - self.buf.seek(0, 2) - num_read = self.buf.write(self._file.read(self.remaining)) - self.remaining -= num_read - await asyncio.sleep(0) - - async def _read_parameters(self, name): - if self.parameters is None: - self.parameters = [] - for param_schema in self.protocol["messages"][name]["request"][ - len(self.parameters) : - ]: - self.parameters.append(await self._read_object(param_schema["type"])) + obj = fastavro.schemaless_reader(buf, schema) + return obj + + def _read_parameters(self, name, buf): + parameters = [] + for param_schema in self.protocol["messages"][name]["request"]: + parameters.append(self._read_object(param_schema["type"], buf)) + return parameters From 9519d16b8febd83f47fb5e778ffe333f3b770ded Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 19 Dec 2025 23:33:13 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- yaqd-core/yaqd_core/avrorpc/unpacker.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/yaqd-core/yaqd_core/avrorpc/unpacker.py b/yaqd-core/yaqd_core/avrorpc/unpacker.py index a3d67aa..91914a4 100644 --- a/yaqd-core/yaqd_core/avrorpc/unpacker.py +++ b/yaqd-core/yaqd_core/avrorpc/unpacker.py @@ -34,9 +34,9 @@ async def __anext__(self): msg, ) message_name = self._read_object("string", msg) - if message_name != "" and self.protocol["messages"][ - message_name - ].get("request", []): + if message_name != "" and self.protocol["messages"][message_name].get( + "request", [] + ): parameters = self._read_parameters(message_name, msg) print(f"Identifieed {message_name=} with {parameters=}") @@ -56,13 +56,13 @@ def feed(self, data: bytes): while data: print(data, self._remaining, len(data)) if self._remaining: - written = self._buf.write(data[:self._remaining]) + written = self._buf.write(data[: self._remaining]) data = data[written:] self._remaining -= written if data: self._remaining = struct.unpack_from(">L", data[:4])[0] data = data[4:] - if self._remaining == 0: + if self._remaining == 0: self._buf.seek(0) if not self.handshake_complete: print("Doing handshake") @@ -82,7 +82,6 @@ def feed(self, data: bytes): self._msg_bufs.put_nowait(self._buf) self._buf = io.BytesIO() - def _read_object(self, schema, buf): schema = fastavro.parse_schema( schema, expand=True, named_schemas=self.named_types