|
| 1 | +from sys import implementation as _implementation |
| 2 | +if _implementation.name != 'circuitpython': |
| 3 | + raise NotImplementedError("Woah there, Bud! You can only do IR stuff from CircuitPython!") |
| 4 | + |
| 5 | +import pulseio |
| 6 | +import board |
| 7 | +import digitalio |
| 8 | +import adafruit_irremote |
| 9 | +#irPin = digitalio.DigitalInOut(board.A0) |
| 10 | +#irPin.direction = digitalio.Direction.INPUT |
| 11 | +DEFAULT_PULSEIN = pulseio.PulseIn(board.A0, maxlen=30000, idle_state=True) |
| 12 | +DEFAULT_DECODER = adafruit_irremote.GenericDecode() |
| 13 | + |
| 14 | +def decode_chunk(decoder, pulsein) -> bytes: |
| 15 | + pulses = decoder.read_pulses(pulsein) |
| 16 | + try: |
| 17 | + buf = bytes(decoder.decode_bits(pulses)) |
| 18 | + print(".", end="") |
| 19 | + return buf |
| 20 | + except adafruit_irremote.IRNECRepeatException: |
| 21 | + return b'' |
| 22 | + except adafruit_irremote.IRDecodeException: |
| 23 | + return b'' |
| 24 | + except adafruit_irremote.FailedToDecode: |
| 25 | + return b'' |
| 26 | + |
| 27 | +def decode_line(decoder, pulsein) -> bytes: |
| 28 | + buf = b'' |
| 29 | + for i in range(10): |
| 30 | + buf += decode_chunk(decoder, pulsein) |
| 31 | + if buf.endswith(b'\r\n'): |
| 32 | + break |
| 33 | + return buf |
| 34 | + |
| 35 | +class IRMsg: |
| 36 | + def __init__(self, headers, body): |
| 37 | + self.headers = headers |
| 38 | + self.body = body |
| 39 | + |
| 40 | + @property |
| 41 | + def division(self): |
| 42 | + return self.headers[b'Division'].decode() |
| 43 | + |
| 44 | + @property |
| 45 | + def text(self): |
| 46 | + return self.body.decode() |
| 47 | + |
| 48 | +def receive(decoder = DEFAULT_DECODER, pulsein = DEFAULT_PULSEIN): |
| 49 | + # Read until BELL |
| 50 | + print("*", end="") |
| 51 | + while decode_chunk(decoder, pulsein) != b'\x07': |
| 52 | + pass |
| 53 | + buf = b'\x07' |
| 54 | + # Read until no BELL |
| 55 | + print("*", end="") |
| 56 | + while buf == b'\x07': |
| 57 | + buf = decode_chunk(decoder, pulsein) |
| 58 | + # Figure out datagram length |
| 59 | + print("*", end="") |
| 60 | + total_length = int.from_bytes(buf, 'big') |
| 61 | + read_length = 0 |
| 62 | + if total_length < 8: |
| 63 | + return b'', b'', b'' |
| 64 | + # Start receiving the datagram |
| 65 | + print("*", end="") |
| 66 | + protocol = decode_line(decoder, pulsein) |
| 67 | + read_length += len(protocol) |
| 68 | + if protocol != b"CSMSG/1.0\r\n": |
| 69 | + print("Potentially unsupported protocol version. Try getting the latest servercom library?") |
| 70 | + headers = {} |
| 71 | + buf = b': ' |
| 72 | + while buf.find(b': ') >= 0 or buf == b'\r\n': |
| 73 | + print("*", end="") |
| 74 | + buf = decode_line(decoder, pulsein) |
| 75 | + read_length += len(buf) |
| 76 | + if buf == b'\r\n': |
| 77 | + break |
| 78 | + split_buf = buf.strip(b'\r\n').split(b': ') |
| 79 | + headers.update({split_buf[0]: split_buf[1]}) |
| 80 | + body = b'' |
| 81 | + content_length = int(headers[b'Content-Length']) |
| 82 | + while len(body) < content_length: |
| 83 | + print("*", end="") |
| 84 | + body += decode_chunk(decoder, pulsein) |
| 85 | + return IRMsg(headers, body) |
| 86 | + |
| 87 | +#dude = receive() |
| 88 | +#print() |
| 89 | +#print(dude.division) |
| 90 | +#print(dude.text) |
0 commit comments