Skip to content
Draft
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions meshtastic/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import asyncio
import io
import logging
from abc import ABC, abstractmethod
from typing import *

import serial
import serial_asyncio

from meshtastic.protobuf.mesh_pb2 import FromRadio, ToRadio


# magic number used in streaming client headers
HEADER_MAGIC: bytes = b"\x94\xc3"


class ConnectionError(Exception):
"""Base class for MeshConnection-related errors."""


class BadPayloadError(ConnectionError):
def __init__(self, payload, reason: str):
self.payload = payload
super().__init__(reason)


class MeshConnection(ABC):
"""A client API connection to a meshtastic radio."""

def __init__(self, name: str):
self.name: str = name
self.on_disconnect: asyncio.Event = asyncio.Event()
self._is_ready: bool = False
self._send_lock: asyncio.Lock = asyncio.Lock()
self._recv_lock: asyncio.Lock = asyncio.Lock()
self._init_task: asyncio.Task = asyncio.create_task(self._initialize())
self._init_task.add_done_callback(self._after_initialize)

@abstractmethod
async def _initialize(self):
"""Perform any connection initialization that must be performed async
(and therefore not from the constructor)."""

@abstractmethod
async def _send_bytes(self, msg: buffer):
"""Send bytes to the mesh device."""
pass

@abstractmethod
async def _recv_bytes(self) -> buffer:
"""Recieve bytes from the mesh device."""
pass

@staticmethod
@abstractmethod
async def get_available() -> AsyncGenerator[Any]:
"""Enumerate any mesh devices that can be connected to.

Generates values that can be passed to the concrete connection class's
constructor."""
pass

def ready(self):
return self._is_ready

def _after_initialize(self):
self._is_ready = True
del self._init_task

async def send(self, message: ToRadio):
"""Send something to the connected device."""
async with self._send_lock:
msg_str: str = message.SerializeToString()
await self._send_bytes(bytes(msg_str))

async def recv(self) -> FromRadio:
"""Recieve something from the connected device."""
async with self._recv_lock:
msg_bytes: buffer = await self._recv_bytes()
return FromRadio.FromString(str(msg_bytes, errors="ignore"))

async def listen(self) -> AsyncGenerator[FromRadio]:
while not self.on_disconnect.is_set():
yield await self.recv()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you anticipate this being used in the interface class that would wrap this? Obviously this plus send is basically the core API here. In the existing API, we're of course not really expecting callers to set up threads or such themselves, just set up listeners and make an interface, and otherwise go about their business. Part of why I ask is that we also do various management like updating the in-memory node data, and presumably we'd still want that to happen. Would this then only really be async-iterated-over by the interface class, or would it be used by clients directly?

Either way we should probably document it, anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured the listen function would be used by the MeshInterface to grab every mesh message, and dispatch pubsub events and such appropriately. I haven't quite worked out the specifics but you're right that it would be problematic to have more then one listen call per connection instance, as implemented.

I think what i'm gonna do is just put a lock on the listen method. Clients using the MeshInterface API won't need to even touch Connection instances directly anyways. I can also document everything before everything's ready to be merged.


def close(self):
"""Close the connection.
Overloaders should remember to call supermethod"""
if not self.ready():
self._init_task.cancel()

self.on_disconnect.set()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, trace):
self.close()


class StreamConnection(MeshConnection):
"""Base class for connections using the aio stream API"""
def __init__(self, name: str):
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self.stream_debug_out: io.StringIO = io.StringIO()
super().__init__(name)

def _handle_debug(self, debug_out: bytes):
self.stream_debug_out.write(str(debug_out))
self.stream_debug_out.flush()

async def _send_bytes(self, msg: buffer):
length: int = len(msg)
if length > 512:
raise BadPayloadError(msg, "Cannot send client API messages over 512 bytes")

self._writer.write(HEADER_MAGIC)
self._writer.write(length.to_bytes(2, "big"))
self._writer.write(msg)
await self._writer.drain()

async def _find_stream_header(self):
"""Consumes and logs debug out bytes until a valid header is detected"""
try:
while True:
from_stream: bytes = await self._reader.readuntil((b'\n', HEADER_MAGIC))
if from_stream.endswith(HEADER_MAGIC):
self._handle_debug(from_stream[:-2])
return
else:
self._handle_debug(from_stream)

except asyncio.IncompleteReadError as err:
if len(err.partial) > 0:
self._handle_debug(err.partial)
raise

async def _recv_bytes(self) -> buffer:
try:
while True:
await self._find_stream_header()
size_bytes: bytes = await self._reader.readexactly(2)
size: int = int.from_bytes(size_bytes, "big")
if 0 < size <= 512:
return await self._reader.readexactly(size)

self._handle_debug(size_bytes)

except asyncio.LimitOverrunError as err:
raise ConnectionError(
"Read buffer overrun while reading stream") from err

except asyncio.IncompleteReadError:
logging.error(f"Connection to {self.name} terminated: stream EOF reached")
self.close()

def close(self):
super().close()
self._writer.close()
self.stream_debug_out.close()
asyncio.as_completed((self._writer.wait_closed(),))


class SerialConnection(StreamConnection):
def __init__(self, portaddr: str, baudrate: int=115200):
self.port: str = portaddr
self.baudrate: int = baudrate
super().__init__(portaddr)

async def _initialize(self):
self._reader, self._writer = await serial_asyncio.open_serial_connectio(
port=self._port, baudrate=self._baudrate,
)

@staticmethod
async def get_available() -> AsyncGenerator[str]:
for port in serial.tools.list_ports.comports():
if port.hwid != "n/a":
yield port.device
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

presuming this is planned to eventually flesh out to something similar to what we do in meshtastic.util.findPorts and this is effectively a stub for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, yes. I think it's more sensible to define this behavior alongside the rest of the connection code. I also intend to use this method to implement bluetooth scanning when i get to the BLE connection impl

Loading