diff --git a/yaqc/yaqc/_socket.py b/yaqc/yaqc/_socket.py index 28a8375..a06bcad 100644 --- a/yaqc/yaqc/_socket.py +++ b/yaqc/yaqc/_socket.py @@ -65,6 +65,7 @@ def handshake( self._write(request) self._write_metadata() self._write_method_name("") + self._write_terminator() # read response response = self._read(handshake_response) self._read({"type": "map", "values": "bytes"}) diff --git a/yaqd-core/yaqd_core/avrorpc/unpacker.py b/yaqd-core/yaqd_core/avrorpc/unpacker.py index 1d507f7..91914a4 100644 --- a/yaqd-core/yaqd_core/avrorpc/unpacker.py +++ b/yaqd-core/yaqd_core/avrorpc/unpacker.py @@ -10,72 +10,79 @@ class Unpacker: - def __init__(self, protocol, file_like=None): + def __init__(self, protocol): self.protocol = protocol - if file_like is None: - self._file = io.BytesIO() - else: - self._file = file_like - self.buf = io.BytesIO() + + self._msg_bufs = asyncio.Queue() + + self._buf = io.BytesIO() + self._remaining = 0 + self.handshake_complete = False - self.handshake_response = None - self.meta = None - self.message_name = None - self.parameters = None - self.remaining = 0 + self._handshake_response = None self.named_types = {t["name"]: t for t in self.protocol.get("types", [])} - self.new_data = asyncio.Event() def __aiter__(self): return self async def __anext__(self): - while True: - try: - self.new_data.clear() - if not self.handshake_complete and self.handshake_response is None: - handshake_request = await self._read_object( - handshake_request_schema - ) - self.handshake_response = handle_handshake( - handshake_request, self.protocol - ) - if self.handshake_response.match == "BOTH": - self.handshake_complete = True - if self.meta is None: - self.meta = await self._read_object( - {"type": "map", "values": "bytes"} - ) - if self.message_name is None: - self.message_name = await self._read_object("string") - if self.message_name != "" and self.protocol["messages"][ - self.message_name - ].get("request", []): - await self._read_parameters(self.message_name) + msg = await self._msg_bufs.get() + print("Processing a message") + parameters = None + meta = self._read_object( + {"type": "map", "values": "bytes"}, + msg, + ) + message_name = self._read_object("string", msg) + if message_name != "" and self.protocol["messages"][message_name].get( + "request", [] + ): + parameters = self._read_parameters(message_name, msg) - ret = ( - self.handshake_response, - self.meta, - self.message_name, - self.parameters, - ) - self.handshake_response = None - self.meta = None - self.message_name = None - self.parameters = None - return ret - except (ValueError, struct.error): - await self.new_data.wait() + print(f"Identifieed {message_name=} with {parameters=}") + hs = None + if self._handshake_response: + hs = self._handshake_response + self._handshake_response = None + return ( + hs, + meta, + message_name, + parameters, + ) def feed(self, data: bytes): - # Must support random access, if it does not, must be fed externally (e.g. TCP) - pos = self._file.tell() - self._file.seek(0, 2) - self._file.write(data) - self._file.seek(pos) - self.new_data.set() + self._buf.seek(0, 2) + while data: + print(data, self._remaining, len(data)) + if self._remaining: + written = self._buf.write(data[: self._remaining]) + data = data[written:] + self._remaining -= written + if data: + self._remaining = struct.unpack_from(">L", data[:4])[0] + data = data[4:] + if self._remaining == 0: + self._buf.seek(0) + if not self.handshake_complete: + print("Doing handshake") + handshake_request = self._read_object( + handshake_request_schema, + self._buf, + ) + print("Read handshake request", handshake_request) + self._handshake_response = handle_handshake( + handshake_request, self.protocol + ) + print(self._handshake_response.match) + if self._handshake_response.match == "BOTH": + print("Handshake complete") + self.handshake_complete = True + + self._msg_bufs.put_nowait(self._buf) + self._buf = io.BytesIO() - async def _read_object(self, schema): + def _read_object(self, schema, buf): schema = fastavro.parse_schema( schema, expand=True, named_schemas=self.named_types ) @@ -86,26 +93,11 @@ async def _read_object(self, schema): ) except fastavro.schema.SchemaParseException: pass # Must not have needed the second pass... - while True: - try: - self.buf.seek(0) - obj = fastavro.schemaless_reader(self.buf, schema) - self.buf = io.BytesIO() - return obj - except Exception: - self.buf.seek(0) - if not self.remaining: - self.remaining = struct.unpack_from(">L", self._file.read(4))[0] - - self.buf.seek(0, 2) - num_read = self.buf.write(self._file.read(self.remaining)) - self.remaining -= num_read - await asyncio.sleep(0) + obj = fastavro.schemaless_reader(buf, schema) + return obj - async def _read_parameters(self, name): - if self.parameters is None: - self.parameters = [] - for param_schema in self.protocol["messages"][name]["request"][ - len(self.parameters) : - ]: - self.parameters.append(await self._read_object(param_schema["type"])) + def _read_parameters(self, name, buf): + parameters = [] + for param_schema in self.protocol["messages"][name]["request"]: + parameters.append(self._read_object(param_schema["type"], buf)) + return parameters