-
Notifications
You must be signed in to change notification settings - Fork 298
Async rework WIP #803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Async rework WIP #803
Changes from 4 commits
28b0409
2b1114c
a8c271c
2e3719e
743474f
4b8ba4a
05e52e3
ad37c21
664c19a
5f48ed6
0e320cd
773d1f9
b0b8be8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,180 @@ | ||
| import asyncio | ||
| import io | ||
| import logging | ||
| from abc import ABC, abstractmethod | ||
| from typing import * | ||
|
|
||
| import serial | ||
| import serial_asyncio | ||
|
|
||
| from meshtastic.protobuf.mesh_pb2 import FromRadio, ToRadio | ||
|
|
||
|
|
||
| # magic number used in streaming client headers | ||
| HEADER_MAGIC: bytes = b"\x94\xc3" | ||
|
|
||
|
|
||
| class ConnectionError(Exception): | ||
| """Base class for MeshConnection-related errors.""" | ||
|
|
||
|
|
||
| class BadPayloadError(ConnectionError): | ||
| def __init__(self, payload, reason: str): | ||
| self.payload = payload | ||
| super().__init__(reason) | ||
|
|
||
|
|
||
| class MeshConnection(ABC): | ||
| """A client API connection to a meshtastic radio.""" | ||
|
|
||
| def __init__(self, name: str): | ||
| self.name: str = name | ||
| self.on_disconnect: asyncio.Event = asyncio.Event() | ||
| self._is_ready: bool = False | ||
| self._send_lock: asyncio.Lock = asyncio.Lock() | ||
| self._recv_lock: asyncio.Lock = asyncio.Lock() | ||
| self._init_task: asyncio.Task = asyncio.create_task(self._initialize()) | ||
| self._init_task.add_done_callback(self._after_initialize) | ||
|
|
||
| @abstractmethod | ||
| async def _initialize(self): | ||
| """Perform any connection initialization that must be performed async | ||
| (and therefore not from the constructor).""" | ||
|
|
||
| @abstractmethod | ||
| async def _send_bytes(self, msg: buffer): | ||
| """Send bytes to the mesh device.""" | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| async def _recv_bytes(self) -> buffer: | ||
| """Recieve bytes from the mesh device.""" | ||
| pass | ||
|
|
||
| @staticmethod | ||
| @abstractmethod | ||
| async def get_available() -> AsyncGenerator[Any]: | ||
| """Enumerate any mesh devices that can be connected to. | ||
|
|
||
| Generates values that can be passed to the concrete connection class's | ||
| constructor.""" | ||
| pass | ||
|
|
||
| def ready(self): | ||
| return self._is_ready | ||
|
|
||
| def _after_initialize(self): | ||
| self._is_ready = True | ||
| del self._init_task | ||
|
|
||
| async def send(self, message: ToRadio): | ||
| """Send something to the connected device.""" | ||
| async with self._send_lock: | ||
| msg_str: str = message.SerializeToString() | ||
| await self._send_bytes(bytes(msg_str)) | ||
|
|
||
| async def recv(self) -> FromRadio: | ||
| """Recieve something from the connected device.""" | ||
| async with self._recv_lock: | ||
| msg_bytes: buffer = await self._recv_bytes() | ||
| return FromRadio.FromString(str(msg_bytes, errors="ignore")) | ||
|
|
||
| async def listen(self) -> AsyncGenerator[FromRadio]: | ||
| while not self.on_disconnect.is_set(): | ||
| yield await self.recv() | ||
|
|
||
| def close(self): | ||
| """Close the connection. | ||
| Overloaders should remember to call supermethod""" | ||
| if not self.ready(): | ||
| self._init_task.cancel() | ||
|
|
||
| self.on_disconnect.set() | ||
|
|
||
| def __enter__(self): | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type, exc_value, trace): | ||
| self.close() | ||
|
|
||
|
|
||
| class StreamConnection(MeshConnection): | ||
| """Base class for connections using the aio stream API""" | ||
| def __init__(self, name: str): | ||
| self._reader: Optional[asyncio.StreamReader] = None | ||
| self._writer: Optional[asyncio.StreamWriter] = None | ||
| self.stream_debug_out: io.StringIO = io.StringIO() | ||
| super().__init__(name) | ||
|
|
||
| def _handle_debug(self, debug_out: bytes): | ||
| self.stream_debug_out.write(str(debug_out)) | ||
| self.stream_debug_out.flush() | ||
|
|
||
| async def _send_bytes(self, msg: buffer): | ||
| length: int = len(msg) | ||
| if length > 512: | ||
| raise BadPayloadError(msg, "Cannot send client API messages over 512 bytes") | ||
|
|
||
| self._writer.write(HEADER_MAGIC) | ||
| self._writer.write(length.to_bytes(2, "big")) | ||
| self._writer.write(msg) | ||
| await self._writer.drain() | ||
|
|
||
| async def _find_stream_header(self): | ||
| """Consumes and logs debug out bytes until a valid header is detected""" | ||
| try: | ||
| while True: | ||
| from_stream: bytes = await self._reader.readuntil((b'\n', HEADER_MAGIC)) | ||
| if from_stream.endswith(HEADER_MAGIC): | ||
| self._handle_debug(from_stream[:-2]) | ||
| return | ||
| else: | ||
| self._handle_debug(from_stream) | ||
|
|
||
| except asyncio.IncompleteReadError as err: | ||
| if len(err.partial) > 0: | ||
| self._handle_debug(err.partial) | ||
| raise | ||
|
|
||
| async def _recv_bytes(self) -> buffer: | ||
| try: | ||
| while True: | ||
| await self._find_stream_header() | ||
| size_bytes: bytes = await self._reader.readexactly(2) | ||
| size: int = int.from_bytes(size_bytes, "big") | ||
| if 0 < size <= 512: | ||
| return await self._reader.readexactly(size) | ||
|
|
||
| self._handle_debug(size_bytes) | ||
|
|
||
| except asyncio.LimitOverrunError as err: | ||
| raise ConnectionError( | ||
| "Read buffer overrun while reading stream") from err | ||
|
|
||
| except asyncio.IncompleteReadError: | ||
| logging.error(f"Connection to {self.name} terminated: stream EOF reached") | ||
| self.close() | ||
|
|
||
| def close(self): | ||
| super().close() | ||
| self._writer.close() | ||
| self.stream_debug_out.close() | ||
| asyncio.as_completed((self._writer.wait_closed(),)) | ||
|
|
||
|
|
||
| class SerialConnection(StreamConnection): | ||
| def __init__(self, portaddr: str, baudrate: int=115200): | ||
| self.port: str = portaddr | ||
| self.baudrate: int = baudrate | ||
| super().__init__(portaddr) | ||
|
|
||
| async def _initialize(self): | ||
| self._reader, self._writer = await serial_asyncio.open_serial_connectio( | ||
dangerdyke marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| port=self._port, baudrate=self._baudrate, | ||
| ) | ||
|
|
||
| @staticmethod | ||
| async def get_available() -> AsyncGenerator[str]: | ||
| for port in serial.tools.list_ports.comports(): | ||
| if port.hwid != "n/a": | ||
| yield port.device | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. presuming this is planned to eventually flesh out to something similar to what we do in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically, yes. I think it's more sensible to define this behavior alongside the rest of the connection code. I also intend to use this method to implement bluetooth scanning when i get to the BLE connection impl |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you anticipate this being used in the interface class that would wrap this? Obviously this plus
sendis basically the core API here. In the existing API, we're of course not really expecting callers to set up threads or such themselves, just set up listeners and make an interface, and otherwise go about their business. Part of why I ask is that we also do various management like updating the in-memory node data, and presumably we'd still want that to happen. Would this then only really be async-iterated-over by the interface class, or would it be used by clients directly?Either way we should probably document it, anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured the listen function would be used by the MeshInterface to grab every mesh message, and dispatch pubsub events and such appropriately. I haven't quite worked out the specifics but you're right that it would be problematic to have more then one listen call per connection instance, as implemented.
I think what i'm gonna do is just put a lock on the listen method. Clients using the MeshInterface API won't need to even touch Connection instances directly anyways. I can also document everything before everything's ready to be merged.