From 5fa2e32c78c2d09336b0e817897f651cee05ea61 Mon Sep 17 00:00:00 2001 From: Daniel Kohler <11864045+ddkohler@users.noreply.github.com> Date: Mon, 24 Nov 2025 14:59:17 -0600 Subject: [PATCH 1/7] server handle disconnects --- yaqd-core/yaqd_core/_protocol.py | 125 +++++++++++++----------- yaqd-core/yaqd_core/avrorpc/unpacker.py | 19 ++-- 2 files changed, 80 insertions(+), 64 deletions(-) diff --git a/yaqd-core/yaqd_core/_protocol.py b/yaqd-core/yaqd_core/_protocol.py index 6644a08..4ff9f19 100644 --- a/yaqd-core/yaqd_core/_protocol.py +++ b/yaqd-core/yaqd_core/_protocol.py @@ -17,18 +17,20 @@ def __init__(self, daemon, *args, **kwargs): def connection_lost(self, exc): peername = self.transport.get_extra_info("peername") - self.logger.info(f"Connection lost from {peername} to {self._daemon.name}") + self.logger.info(f"Connection lost from {peername}") self.task.cancel() self._daemon._connection_lost(peername) def connection_made(self, transport): """Process an incomming connection.""" peername = transport.get_extra_info("peername") - self.logger.info(f"Connection made from {peername} to {self._daemon.name}") + self.logger.info(f"Connection made from {peername}") self.transport = transport self.unpacker = avrorpc.Unpacker(self._avro_protocol) self._daemon._connection_made(peername) - self.task = asyncio.get_event_loop().create_task(self.process_requests()) + self.task = self._daemon._loop.create_task(self.process_requests()) + self._daemon._tasks.append(self.task) + self.task.add_done_callback(self._daemon._tasks.remove) def data_received(self, data): """Process an incomming request.""" @@ -38,61 +40,68 @@ def data_received(self, data): self.unpacker.feed(data) async def process_requests(self): - async for hs, meta, name, params in self.unpacker: - if hs is not None: - out = bytes(hs) - out = struct.pack(">L", len(out)) + out - self.transport.write(out) - if hs.match == "NONE": - name = "" + try: + async for hs, meta, name, params in self.unpacker: + if hs is not None: + out = bytes(hs) + out = struct.pack(">L", len(out)) + out + self.transport.write(out) + if hs.match == "NONE": + name = "" - out_meta = io.BytesIO() - fastavro.schemaless_writer( - out_meta, {"type": "map", "values": "bytes"}, meta - ) - length = out_meta.tell() - self.transport.write(struct.pack(">L", length) + out_meta.getvalue()) - self.logger.debug(f"Wrote meta, {meta}, {out_meta.getvalue()}") - try: - response_out = io.BytesIO() - response = None - response_schema = "null" - if name: - fun = getattr(self._daemon, name) - if params is None: - params = [] - response = fun(*params) - response_schema = fastavro.parse_schema( - self._avro_protocol["messages"][name].get("response", "null"), - expand=True, - named_schemas=self._named_types, + out_meta = io.BytesIO() + fastavro.schemaless_writer( + out_meta, {"type": "map", "values": "bytes"}, meta + ) + length = out_meta.tell() + self.transport.write(struct.pack(">L", length) + out_meta.getvalue()) + self.logger.debug(f"Wrote meta, {meta}, {out_meta.getvalue()}") + try: + response_out = io.BytesIO() + response = None + response_schema = "null" + if name: + fun = getattr(self._daemon, name) + if params is None: + params = [] + response = fun(*params) + response_schema = fastavro.parse_schema( + self._avro_protocol["messages"][name].get("response", "null"), + expand=True, + named_schemas=self._named_types, + ) + # Needed twice for nested types... Probably can be fixed upstream + response_schema = fastavro.parse_schema( + response_schema, + expand=True, + named_schemas=self._named_types, + ) + fastavro.schemaless_writer(response_out, response_schema, response) + except Exception as e: + self.logger.error(f"Caught exception: {type(e)} in message {name}") + self.logger.debug(traceback.format_exc()) + self.transport.write(struct.pack(">L", 1) + b"\1") + error_out = io.BytesIO() + fastavro.schemaless_writer(error_out, ["string"], repr(e)) + length = error_out.tell() + self.transport.write(struct.pack(">L", length) + error_out.getvalue()) + else: + self.transport.write(struct.pack(">L", 1) + b"\0") + self.logger.debug(f"Wrote non-error flag") + length = response_out.tell() + self.transport.write( + struct.pack(">L", length) + response_out.getvalue() ) - # Needed twice for nested types... Probably can be fixed upstream - response_schema = fastavro.parse_schema( - response_schema, - expand=True, - named_schemas=self._named_types, + self.logger.debug( + f"Wrote response {response}, {response_out.getvalue()}" ) - fastavro.schemaless_writer(response_out, response_schema, response) - except Exception as e: - self.logger.error(f"Caught exception: {type(e)} in message {name}") - self.logger.debug(traceback.format_exc()) - self.transport.write(struct.pack(">L", 1) + b"\1") - error_out = io.BytesIO() - fastavro.schemaless_writer(error_out, ["string"], repr(e)) - length = error_out.tell() - self.transport.write(struct.pack(">L", length) + error_out.getvalue()) - else: - self.transport.write(struct.pack(">L", 1) + b"\0") - self.logger.debug(f"Wrote non-error flag") - length = response_out.tell() - self.transport.write( - struct.pack(">L", length) + response_out.getvalue() - ) - self.logger.debug( - f"Wrote response {response}, {response_out.getvalue()}" - ) - self.transport.write(struct.pack(">L", 0)) - if name == "shutdown": - self.logger.debug("Closing transport") - self.transport.close() + self.transport.write(struct.pack(">L", 0)) + if name == "shutdown": + self.logger.debug("Closing transport") + self.transport.close() + except asyncio.CancelledError as e: + self.logger.debug("task cancellation caught") + await self.unpacker.__aexit__(None, None, None) + self.transport.close() + self.logger.debug(f"file closed? {self.unpacker._file.closed}") + raise e diff --git a/yaqd-core/yaqd_core/avrorpc/unpacker.py b/yaqd-core/yaqd_core/avrorpc/unpacker.py index 1d507f7..1c2207f 100644 --- a/yaqd-core/yaqd_core/avrorpc/unpacker.py +++ b/yaqd-core/yaqd_core/avrorpc/unpacker.py @@ -67,13 +67,20 @@ async def __anext__(self): except (ValueError, struct.error): await self.new_data.wait() + async def __aexit__(self, exc_type, exc_val, exc_tb): + logger.info("closing") + await asyncio.sleep(0) + self._file.close() + self.buf.close() + def feed(self, data: bytes): - # Must support random access, if it does not, must be fed externally (e.g. TCP) - pos = self._file.tell() - self._file.seek(0, 2) - self._file.write(data) - self._file.seek(pos) - self.new_data.set() + if not self._file.closed: + # Must support random access, if it does not, must be fed externally (e.g. TCP) + pos = self._file.tell() + self._file.seek(0, 2) + self._file.write(data) + self._file.seek(pos) + self.new_data.set() async def _read_object(self, schema): schema = fastavro.parse_schema( From 6bb62680c0eeed21a5f323e1f1a2006942bc980b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 24 Nov 2025 21:12:43 +0000 Subject: [PATCH 2/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- yaqd-core/yaqd_core/_protocol.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/yaqd-core/yaqd_core/_protocol.py b/yaqd-core/yaqd_core/_protocol.py index 4ff9f19..4609714 100644 --- a/yaqd-core/yaqd_core/_protocol.py +++ b/yaqd-core/yaqd_core/_protocol.py @@ -66,7 +66,9 @@ async def process_requests(self): params = [] response = fun(*params) response_schema = fastavro.parse_schema( - self._avro_protocol["messages"][name].get("response", "null"), + self._avro_protocol["messages"][name].get( + "response", "null" + ), expand=True, named_schemas=self._named_types, ) @@ -84,7 +86,9 @@ async def process_requests(self): error_out = io.BytesIO() fastavro.schemaless_writer(error_out, ["string"], repr(e)) length = error_out.tell() - self.transport.write(struct.pack(">L", length) + error_out.getvalue()) + self.transport.write( + struct.pack(">L", length) + error_out.getvalue() + ) else: self.transport.write(struct.pack(">L", 1) + b"\0") self.logger.debug(f"Wrote non-error flag") @@ -102,6 +106,6 @@ async def process_requests(self): except asyncio.CancelledError as e: self.logger.debug("task cancellation caught") await self.unpacker.__aexit__(None, None, None) - self.transport.close() + self.transport.close() self.logger.debug(f"file closed? {self.unpacker._file.closed}") raise e From 96df83396c6ab5af9f9f76b21cbb97b4cbcacdff Mon Sep 17 00:00:00 2001 From: Daniel Kohler <11864045+ddkohler@users.noreply.github.com> Date: Mon, 24 Nov 2025 15:24:20 -0600 Subject: [PATCH 3/7] remove logger from unpacker --- tests/connection/connection.py | 4 ++-- yaqd-core/yaqd_core/avrorpc/unpacker.py | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/connection/connection.py b/tests/connection/connection.py index 50e1e02..bfc44ea 100644 --- a/tests/connection/connection.py +++ b/tests/connection/connection.py @@ -1,7 +1,7 @@ -from yaqd_core import Base +from yaqd_core import IsDaemon -class ConnectionTest(Base): +class ConnectionTest(IsDaemon): _kind = "connection-test" def echo(self, s: str): diff --git a/yaqd-core/yaqd_core/avrorpc/unpacker.py b/yaqd-core/yaqd_core/avrorpc/unpacker.py index 1c2207f..60825fb 100644 --- a/yaqd-core/yaqd_core/avrorpc/unpacker.py +++ b/yaqd-core/yaqd_core/avrorpc/unpacker.py @@ -68,7 +68,6 @@ async def __anext__(self): await self.new_data.wait() async def __aexit__(self, exc_type, exc_val, exc_tb): - logger.info("closing") await asyncio.sleep(0) self._file.close() self.buf.close() From f7600c16aec8dcf1b5672a7b26b3a5b3b105e23e Mon Sep 17 00:00:00 2001 From: Daniel Kohler <11864045+ddkohler@users.noreply.github.com> Date: Mon, 24 Nov 2025 15:31:38 -0600 Subject: [PATCH 4/7] Update CHANGELOG.md --- yaqd-core/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/yaqd-core/CHANGELOG.md b/yaqd-core/CHANGELOG.md index 5f8d640..0c16604 100644 --- a/yaqd-core/CHANGELOG.md +++ b/yaqd-core/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/). ## [Unreleased] ### Fixed +- handle cleanup when a client connection is broken - type hints for IsSensor attributes are appropriate for _n_-dimensional data ## [2023.11.0] From facb11acd477bbca16ff1c294cbb8d68c8ec58e0 Mon Sep 17 00:00:00 2001 From: Daniel Kohler <11864045+ddkohler@users.noreply.github.com> Date: Mon, 1 Dec 2025 14:12:03 -0600 Subject: [PATCH 5/7] refactor includes BytesIO close statements --- yaqd-core/yaqd_core/_protocol.py | 135 ++++++++++++++++--------------- 1 file changed, 71 insertions(+), 64 deletions(-) diff --git a/yaqd-core/yaqd_core/_protocol.py b/yaqd-core/yaqd_core/_protocol.py index 4609714..4304dc9 100644 --- a/yaqd-core/yaqd_core/_protocol.py +++ b/yaqd-core/yaqd_core/_protocol.py @@ -41,71 +41,78 @@ def data_received(self, data): async def process_requests(self): try: - async for hs, meta, name, params in self.unpacker: - if hs is not None: - out = bytes(hs) - out = struct.pack(">L", len(out)) + out - self.transport.write(out) - if hs.match == "NONE": - name = "" - - out_meta = io.BytesIO() - fastavro.schemaless_writer( - out_meta, {"type": "map", "values": "bytes"}, meta - ) - length = out_meta.tell() - self.transport.write(struct.pack(">L", length) + out_meta.getvalue()) - self.logger.debug(f"Wrote meta, {meta}, {out_meta.getvalue()}") - try: - response_out = io.BytesIO() - response = None - response_schema = "null" - if name: - fun = getattr(self._daemon, name) - if params is None: - params = [] - response = fun(*params) - response_schema = fastavro.parse_schema( - self._avro_protocol["messages"][name].get( - "response", "null" - ), - expand=True, - named_schemas=self._named_types, - ) - # Needed twice for nested types... Probably can be fixed upstream - response_schema = fastavro.parse_schema( - response_schema, - expand=True, - named_schemas=self._named_types, - ) - fastavro.schemaless_writer(response_out, response_schema, response) - except Exception as e: - self.logger.error(f"Caught exception: {type(e)} in message {name}") - self.logger.debug(traceback.format_exc()) - self.transport.write(struct.pack(">L", 1) + b"\1") - error_out = io.BytesIO() - fastavro.schemaless_writer(error_out, ["string"], repr(e)) - length = error_out.tell() - self.transport.write( - struct.pack(">L", length) + error_out.getvalue() - ) - else: - self.transport.write(struct.pack(">L", 1) + b"\0") - self.logger.debug(f"Wrote non-error flag") - length = response_out.tell() - self.transport.write( - struct.pack(">L", length) + response_out.getvalue() - ) - self.logger.debug( - f"Wrote response {response}, {response_out.getvalue()}" - ) - self.transport.write(struct.pack(">L", 0)) - if name == "shutdown": - self.logger.debug("Closing transport") - self.transport.close() + await self._process_requests() except asyncio.CancelledError as e: - self.logger.debug("task cancellation caught") + self.logger.debug("cancelling process_requests") await self.unpacker.__aexit__(None, None, None) self.transport.close() - self.logger.debug(f"file closed? {self.unpacker._file.closed}") raise e + + async def _process_requests(self): + async for hs, meta, name, params in self.unpacker: + if hs is not None: + out = bytes(hs) + out = struct.pack(">L", len(out)) + out + self.transport.write(out) + if hs.match == "NONE": + name = "" + + meta_out = io.BytesIO() + fastavro.schemaless_writer( + meta_out, {"type": "map", "values": "bytes"}, meta + ) + length = meta_out.tell() + self.transport.write(struct.pack(">L", length) + meta_out.getvalue()) + self.logger.debug(f"Wrote meta, {meta}, {meta_out.getvalue()}") + try: + response_out = io.BytesIO() + response = None + response_schema = "null" + if name: + fun = getattr(self._daemon, name) + if params is None: + params = [] + response = fun(*params) + response_schema = fastavro.parse_schema( + self._avro_protocol["messages"][name].get( + "response", "null" + ), + expand=True, + named_schemas=self._named_types, + ) + # Needed twice for nested types... Probably can be fixed upstream + response_schema = fastavro.parse_schema( + response_schema, + expand=True, + named_schemas=self._named_types, + ) + fastavro.schemaless_writer(response_out, response_schema, response) + except Exception as e: + self.logger.error(f"Caught exception: {type(e)} in message {name}") + self.logger.debug(traceback.format_exc()) + self.transport.write(struct.pack(">L", 1) + b"\1") + error_out = io.BytesIO() + fastavro.schemaless_writer(error_out, ["string"], repr(e)) + length = error_out.tell() + self.transport.write( + struct.pack(">L", length) + error_out.getvalue() + ) + error_out.close() + else: + self.transport.write(struct.pack(">L", 1) + b"\0") + self.logger.debug(f"Wrote non-error flag") + length = response_out.tell() + self.transport.write( + struct.pack(">L", length) + response_out.getvalue() + ) + self.logger.debug( + f"Wrote response {response}, {response_out.getvalue()}" + ) + finally: + response_out.close() + meta_out.close() + self.transport.write(struct.pack(">L", 0)) + self.unpacker._file = io.BytesIO() + if name == "shutdown": + self.logger.debug("Closing transport") + self.transport.close() From 411abfecc13e6d63a821d59778b129b5f965814a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 1 Dec 2025 20:12:17 +0000 Subject: [PATCH 6/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- yaqd-core/yaqd_core/_protocol.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/yaqd-core/yaqd_core/_protocol.py b/yaqd-core/yaqd_core/_protocol.py index 4304dc9..ce5eff9 100644 --- a/yaqd-core/yaqd_core/_protocol.py +++ b/yaqd-core/yaqd_core/_protocol.py @@ -74,9 +74,7 @@ async def _process_requests(self): params = [] response = fun(*params) response_schema = fastavro.parse_schema( - self._avro_protocol["messages"][name].get( - "response", "null" - ), + self._avro_protocol["messages"][name].get("response", "null"), expand=True, named_schemas=self._named_types, ) @@ -94,9 +92,7 @@ async def _process_requests(self): error_out = io.BytesIO() fastavro.schemaless_writer(error_out, ["string"], repr(e)) length = error_out.tell() - self.transport.write( - struct.pack(">L", length) + error_out.getvalue() - ) + self.transport.write(struct.pack(">L", length) + error_out.getvalue()) error_out.close() else: self.transport.write(struct.pack(">L", 1) + b"\0") @@ -110,7 +106,7 @@ async def _process_requests(self): ) finally: response_out.close() - meta_out.close() + meta_out.close() self.transport.write(struct.pack(">L", 0)) self.unpacker._file = io.BytesIO() if name == "shutdown": From a7aa0cc6c44d6209255cbd8b2f98a1b1ee108edb Mon Sep 17 00:00:00 2001 From: Daniel Kohler <11864045+ddkohler@users.noreply.github.com> Date: Mon, 1 Dec 2025 14:29:52 -0600 Subject: [PATCH 7/7] revert out-of-scope changes --- tests/connection/connection.py | 4 ++-- yaqd-core/yaqd_core/_protocol.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/connection/connection.py b/tests/connection/connection.py index bfc44ea..50e1e02 100644 --- a/tests/connection/connection.py +++ b/tests/connection/connection.py @@ -1,7 +1,7 @@ -from yaqd_core import IsDaemon +from yaqd_core import Base -class ConnectionTest(IsDaemon): +class ConnectionTest(Base): _kind = "connection-test" def echo(self, s: str): diff --git a/yaqd-core/yaqd_core/_protocol.py b/yaqd-core/yaqd_core/_protocol.py index ce5eff9..2ca4a16 100644 --- a/yaqd-core/yaqd_core/_protocol.py +++ b/yaqd-core/yaqd_core/_protocol.py @@ -17,14 +17,14 @@ def __init__(self, daemon, *args, **kwargs): def connection_lost(self, exc): peername = self.transport.get_extra_info("peername") - self.logger.info(f"Connection lost from {peername}") + self.logger.info(f"Connection lost from {peername} to {self._daemon.name}") self.task.cancel() self._daemon._connection_lost(peername) def connection_made(self, transport): """Process an incomming connection.""" peername = transport.get_extra_info("peername") - self.logger.info(f"Connection made from {peername}") + self.logger.info(f"Connection made from {peername} to {self._daemon.name}") self.transport = transport self.unpacker = avrorpc.Unpacker(self._avro_protocol) self._daemon._connection_made(peername)