Skip to content

Commit 7251a53

Browse files
authored
Enhance duplicate message handling (#221)
* Add hop check to duplicate message handling * Store all message to prev msg list
1 parent c372d51 commit 7251a53

File tree

1 file changed

+114
-48
lines changed

1 file changed

+114
-48
lines changed

insteonplm/devices/__init__.py

Lines changed: 114 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -578,9 +578,8 @@ def _add_device_from_prod_data(self, address, cat, subcat, product_key):
578578
if device:
579579
if not self._plm.devices[device.id]:
580580
self._plm.devices[device.id] = device
581-
_LOGGER.debug("Device with id %s added to device list.", device.id)
582-
583-
_LOGGER.info("Total Insteon devices found: %d", len(self._plm.devices))
581+
_LOGGER.info("Device with id %s added to device list.", device.id)
582+
_LOGGER.debug("Total Insteon devices found: %d", len(self._plm.devices))
584583
self._plm.aldb_device_handled(self._address)
585584
self._plm.devices.save_device_info()
586585
else:
@@ -739,38 +738,39 @@ def _register_messages(self):
739738
# Send / Receive message processing
740739
def receive_message(self, msg):
741740
"""Receive a messages sent to this device."""
742-
_LOGGER.debug('Starting Device.receive_message for %s',
743-
msg.address.human)
744-
if hasattr(msg, 'isack') and msg.isack:
745-
_LOGGER.debug('Got Message ACK %s', id(msg))
746-
if self._sent_msg_wait_for_directACK.get('callback') is not None:
747-
_LOGGER.debug('Look for direct ACK')
748-
asyncio.ensure_future(self._wait_for_direct_ACK(),
749-
loop=self._plm.loop)
741+
_LOGGER.debug("Starting Device.receive_message for %s", msg.address.human)
742+
if hasattr(msg, "isack") and msg.isack:
743+
_LOGGER.debug("Got Message ACK %s", id(msg))
744+
if self._sent_msg_wait_for_directACK.get("callback") is not None:
745+
_LOGGER.debug("Look for direct ACK")
746+
asyncio.ensure_future(self._wait_for_direct_ACK(), loop=self._plm.loop)
750747
else:
751-
_LOGGER.debug('DA queue: %s',
752-
self._sent_msg_wait_for_directACK)
753-
_LOGGER.debug('Message ACK with no callback')
748+
_LOGGER.debug("MSG queue: %s", self._sent_msg_wait_for_directACK)
749+
_LOGGER.debug("Message ACK with no callback")
754750

755751
if not self._is_duplicate(msg):
756-
if (hasattr(msg, 'flags') and
757-
hasattr(msg.flags, 'isDirectACK') and
758-
msg.flags.isDirectACK):
759-
_LOGGER.debug('Got Direct ACK message. Already in queue: %d, '
760-
'Queueing %s:%s',
761-
self._directACK_received_queue.qsize(),
762-
id(msg), msg)
752+
if (
753+
hasattr(msg, "flags")
754+
and hasattr(msg.flags, "isDirectACK")
755+
and msg.flags.isDirectACK
756+
):
757+
_LOGGER.debug(
758+
"Got Direct ACK message. Already in queue: %d, Queueing %s:%s",
759+
self._directACK_received_queue.qsize(),
760+
id(msg),
761+
msg,
762+
)
763763
if self._send_msg_lock.locked():
764764
self._directACK_received_queue.put_nowait(msg)
765765
else:
766-
_LOGGER.debug('But Direct ACK not expected')
766+
_LOGGER.debug("But Direct ACK not expected")
767767

768768
callbacks = self._message_callbacks.get_callbacks_from_message(msg)
769769
for callback in callbacks:
770770
_LOGGER.debug("Scheduling msg callback: %s", callback)
771771
self._plm.loop.call_soon(callback, msg)
772772
else:
773-
_LOGGER.debug('msg is duplicate: %s', id(msg))
773+
_LOGGER.debug("msg is duplicate: %s", id(msg))
774774
self._last_communication_received = datetime.datetime.now()
775775
_LOGGER.debug("Ending Device.receive_message")
776776

@@ -779,35 +779,98 @@ def _is_duplicate(self, msg):
779779
MESSAGE_STANDARD_MESSAGE_RECEIVED_0X50,
780780
MESSAGE_EXTENDED_MESSAGE_RECEIVED_0X51,
781781
]:
782+
self._save_recent_message(msg)
782783
return False
783784

784785
recent_messages = []
785786
while not self._recent_messages.empty():
786787
recent_message = self._recent_messages.get_nowait()
787788
if recent_message:
788-
msg_received = recent_message.get("received")
789+
msg_received = recent_message["received"]
789790
if msg_received >= (
790791
datetime.datetime.now() - datetime.timedelta(0, 0, 500000)
791792
):
792793
recent_messages.append(recent_message)
793794

794-
ret_val = False
795+
# Save remaining messages for next call
795796
for recent_message in recent_messages:
796-
prev_msg = recent_message.get("msg")
797797
self._recent_messages.put_nowait(recent_message)
798-
if msg.matches_pattern(prev_msg):
799-
ret_val = True
798+
self._save_recent_message(msg)
799+
# Create a template to compare current message to (ignore hops)
800+
if msg.flags.isExtended:
801+
template = ExtendedReceive.template(
802+
address=msg.address,
803+
target=msg.target,
804+
commandtuple={"cmd1": msg.cmd1, "cmd2": msg.cmd2},
805+
flags=MessageFlags.template(
806+
msg.flags.messageType, msg.flags.extended
807+
),
808+
userdata=msg.userdata
809+
)
810+
else:
811+
template = StandardReceive.template(
812+
address=msg.address,
813+
target=msg.target,
814+
commandtuple={"cmd1": msg.cmd1, "cmd2": msg.cmd2},
815+
flags=MessageFlags.template(
816+
msg.flags.messageType, msg.flags.extended
817+
),
818+
)
800819

801-
self._recent_messages.put_nowait(
802-
{"msg": msg, "received": datetime.datetime.now()})
803-
return ret_val
820+
for recent_message in recent_messages:
821+
prev_msg = recent_message["msg"]
822+
if prev_msg.matches_pattern(template):
823+
_LOGGER.debug("Duplicate matches pattern")
824+
_LOGGER.debug("TEMP: %s", template)
825+
_LOGGER.debug("Prev: %s", prev_msg)
826+
return True
827+
828+
# Check if this is a cleanup message from a broadcast
829+
if prev_msg.flags.isAllLinkBroadcast:
830+
prev_group = prev_msg.target.bytes[2]
831+
elif prev_msg.flags.isAllLinkCleanup:
832+
prev_group = prev_msg.cmd2
833+
834+
if msg.flags.isAllLinkCleanup or msg.flags.isAllLinkBroadcast:
835+
cmd1 = msg.cmd1
836+
if msg.flags.isAllLinkBroadcast:
837+
group = msg.target.bytes[2]
838+
else:
839+
group = msg.cmd2
840+
if prev_msg.cmd1 == cmd1 and prev_group == group:
841+
return True
842+
843+
# Address an edge case where two directACKs arrive back to back
844+
# Keep the first one (see insteonplm issue # 215)
845+
recent = self._get_most_recent_message(recent_messages)
846+
for recent_msg in recent_messages:
847+
_LOGGER.debug("RCT: %s", recent_msg['msg'])
848+
if recent and msg.flags.isDirectACK and recent.flags.isDirectACK:
849+
_LOGGER.debug("Duplicate direct ACK")
850+
_LOGGER.debug("TEMP: %s", msg)
851+
_LOGGER.debug("Prev: %s", prev_msg)
852+
return True
853+
854+
return False
855+
856+
def _save_recent_message(self, msg):
857+
recent_message = {"msg": msg, "received": datetime.datetime.now()}
858+
self._recent_messages.put_nowait(recent_message)
859+
860+
def _get_most_recent_message(self, recent_messages):
861+
if not recent_messages:
862+
return None
863+
most_recent = recent_messages[0]
864+
for recent in recent_messages[1:]:
865+
if recent["received"] > most_recent["received"]:
866+
most_recent = recent
867+
return most_recent["msg"]
804868

805869
def _send_msg(self, msg, callback=None, on_timeout=False):
806-
_LOGGER.debug('Starting %s Device._send_msg: Queuing message',
807-
self.address.human)
808-
msg_info = {'msg': msg,
809-
'callback': callback,
810-
'on_timeout': on_timeout}
870+
_LOGGER.debug(
871+
"Starting %s Device._send_msg: Queuing message", self.address.human
872+
)
873+
msg_info = {"msg": msg, "callback": callback, "on_timeout": on_timeout}
811874
self._send_msg_queue.put_nowait(msg_info)
812875
asyncio.ensure_future(self._process_send_queue(), loop=self._plm.loop)
813876
# _LOGGER.debug('Ending Device._send_msg')
@@ -837,9 +900,12 @@ async def _wait_for_direct_ACK(self):
837900
try:
838901
with async_timeout.timeout(DIRECT_ACK_WAIT_TIMEOUT):
839902
msg = await self._directACK_received_queue.get()
840-
_LOGGER.debug('Remaining queue %d, Direct ACK: %s:%s',
841-
self._directACK_received_queue.qsize(),
842-
id(msg), msg)
903+
_LOGGER.debug(
904+
"Remaining queue %d, Direct ACK: %s:%s",
905+
self._directACK_received_queue.qsize(),
906+
id(msg),
907+
msg,
908+
)
843909
break
844910
except asyncio.TimeoutError:
845911
_LOGGER.debug("No direct ACK messages received.")
@@ -856,8 +922,7 @@ async def _wait_for_direct_ACK(self):
856922
if msg or self._sent_msg_wait_for_directACK.get("on_timeout"):
857923
callback = self._sent_msg_wait_for_directACK.get("callback", None)
858924
if callback is not None:
859-
_LOGGER.debug('Scheduling msg directACK callback: %s',
860-
callback)
925+
_LOGGER.debug("Scheduling msg directACK callback: %s", callback)
861926
callback(msg)
862927
self._sent_msg_wait_for_directACK = {}
863928
_LOGGER.debug("Ending Device._wait_for_direct_ACK")
@@ -926,20 +991,21 @@ def receive_message(self, msg):
926991
_LOGGER.debug("Scheduling msg callback: %s", callback)
927992
self._plm.loop.call_soon(callback, msg)
928993
self._last_communication_received = datetime.datetime.now()
929-
_LOGGER.debug('Ending x10Device.receive_message')
994+
_LOGGER.debug("Ending x10Device.receive_message")
930995

931996
async def close(self):
932997
"""Close the writer for a clean shutdown."""
933998
# Nothing actually needed here.
934999

9351000
def _send_msg(self, msg, wait_ack=True):
936-
_LOGGER.debug('Starting X10Device._send_msg')
937-
asyncio.ensure_future(self._process_send_queue(msg, wait_ack),
938-
loop=self._plm.loop)
939-
_LOGGER.debug('Ending x10Device._send_msg')
1001+
_LOGGER.debug("Starting X10Device._send_msg")
1002+
asyncio.ensure_future(
1003+
self._process_send_queue(msg, wait_ack), loop=self._plm.loop
1004+
)
1005+
_LOGGER.debug("Ending x10Device._send_msg")
9401006

9411007
async def _process_send_queue(self, msg, wait_ack):
942-
_LOGGER.debug('Starting x10Device._process_send_queue')
1008+
_LOGGER.debug("Starting x10Device._process_send_queue")
9431009
await self._send_msg_lock
9441010
if self._send_msg_lock.locked():
9451011
_LOGGER.debug("Lock is locked from yield from")
@@ -949,7 +1015,7 @@ async def _process_send_queue(self, msg, wait_ack):
9491015
_LOGGER.debug("No directACK wait")
9501016
_LOGGER.debug("Releasing lock")
9511017
self._send_msg_lock.release()
952-
_LOGGER.debug('Ending x10Device._process_send_queue')
1018+
_LOGGER.debug("Ending x10Device._process_send_queue")
9531019

9541020

9551021
class StateList:

0 commit comments

Comments
 (0)