From 17df7b205f12d5367722ee1d77067f7349213df1 Mon Sep 17 00:00:00 2001 From: Svein Seldal Date: Sat, 26 Apr 2025 18:00:04 +0200 Subject: [PATCH 1/4] Fix issue providing a custom timestamp --- canopen/timestamp.py | 2 +- test/test_time.py | 39 +++++++++++++++++++++++++++++++++------ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/canopen/timestamp.py b/canopen/timestamp.py index 21dcc636..3ef306a8 100644 --- a/canopen/timestamp.py +++ b/canopen/timestamp.py @@ -26,7 +26,7 @@ def transmit(self, timestamp: Optional[float] = None): :param float timestamp: Optional Unix timestamp to use, otherwise the current time is used. """ - delta = timestamp or time.time() - OFFSET + delta = (timestamp or time.time()) - OFFSET days, seconds = divmod(delta, ONE_DAY) data = TIME_OF_DAY_STRUCT.pack(int(seconds * 1000), int(days)) self.network.send_message(self.cob_id, data) diff --git a/test/test_time.py b/test/test_time.py index acd2b490..8a4e7095 100644 --- a/test/test_time.py +++ b/test/test_time.py @@ -1,6 +1,10 @@ import unittest +from unittest.mock import patch +import time +from datetime import datetime import canopen +import canopen.timestamp class TestTime(unittest.TestCase): @@ -10,13 +14,36 @@ def test_time_producer(self): network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0 network.connect(interface="virtual", receive_own_messages=True) producer = canopen.timestamp.TimeProducer(network) - producer.transmit(1486236238) - msg = network.bus.recv(1) - network.disconnect() - self.assertEqual(msg.arbitration_id, 0x100) - self.assertEqual(msg.dlc, 6) - self.assertEqual(msg.data, b"\xb0\xa4\x29\x04\x31\x43") + # Test that the epoch is correct + epoch = datetime.strptime("1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z").timestamp() + self.assertEqual(int(epoch), canopen.timestamp.OFFSET) + + current = time.time() + with patch("canopen.timestamp.time.time", return_value=current): + current_in_epoch = current - epoch + + # Test it looking up the current time + producer.transmit() + msg = network.bus.recv(1) + self.assertEqual(msg.arbitration_id, 0x100) + self.assertEqual(msg.dlc, 6) + ms, days = canopen.timestamp.TIME_OF_DAY_STRUCT.unpack(msg.data) + self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY) + self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000)) + + # Test providing a timestamp + faketime = 1_927_999_438 # 2031-02-04 20:23:58 + producer.transmit(faketime) + msg = network.bus.recv(1) + self.assertEqual(msg.arbitration_id, 0x100) + self.assertEqual(msg.dlc, 6) + ms, days = canopen.timestamp.TIME_OF_DAY_STRUCT.unpack(msg.data) + current_in_epoch = faketime - epoch + self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY) + self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000)) + + network.disconnect() if __name__ == "__main__": unittest.main() From 4531c9b9cf492743f80844a3f55a5ab447279ba5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Colomb?= Date: Tue, 10 Jun 2025 00:04:12 +0200 Subject: [PATCH 2/4] Fix import ordering and whitespace issues. Line-length set to maximum 96. --- test/test_time.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/test/test_time.py b/test/test_time.py index 8a4e7095..9f713749 100644 --- a/test/test_time.py +++ b/test/test_time.py @@ -1,7 +1,7 @@ -import unittest -from unittest.mock import patch import time +import unittest from datetime import datetime +from unittest.mock import patch import canopen import canopen.timestamp @@ -16,7 +16,9 @@ def test_time_producer(self): producer = canopen.timestamp.TimeProducer(network) # Test that the epoch is correct - epoch = datetime.strptime("1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z").timestamp() + epoch = datetime.strptime( + "1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z" + ).timestamp() self.assertEqual(int(epoch), canopen.timestamp.OFFSET) current = time.time() @@ -45,5 +47,6 @@ def test_time_producer(self): network.disconnect() + if __name__ == "__main__": unittest.main() From f9df77e6724d2ad704328c6fabd445a959c61293 Mon Sep 17 00:00:00 2001 From: Svein Seldal Date: Tue, 10 Jun 2025 18:00:42 +0200 Subject: [PATCH 3/4] Validate the encoding directly --- test/test_time.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/test/test_time.py b/test/test_time.py index 9f713749..3575b51d 100644 --- a/test/test_time.py +++ b/test/test_time.py @@ -34,16 +34,13 @@ def test_time_producer(self): self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY) self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000)) - # Test providing a timestamp - faketime = 1_927_999_438 # 2031-02-04 20:23:58 + # Provide a specific time to verify the proper encoding + faketime = 1_927_999_438 # 2031-02-04 19:23:58 producer.transmit(faketime) msg = network.bus.recv(1) self.assertEqual(msg.arbitration_id, 0x100) self.assertEqual(msg.dlc, 6) - ms, days = canopen.timestamp.TIME_OF_DAY_STRUCT.unpack(msg.data) - current_in_epoch = faketime - epoch - self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY) - self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000)) + self.assertEqual(msg.data, b"\xb0\xa4\x29\x04\x31\x43") network.disconnect() From cc0b3f350d844ed423339b262287f1e1a7f277d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Colomb?= Date: Wed, 11 Jun 2025 00:07:09 +0200 Subject: [PATCH 4/4] Suggested restructuring from review. * Independent test_epoch() method. * Reduce diff for constant test vector, remove unpacking. * Avoid use of internal constants (except the OFFSET one tested above) in dynamic unpacking test vector. --- test/test_time.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/test/test_time.py b/test/test_time.py index 3575b51d..fa45a444 100644 --- a/test/test_time.py +++ b/test/test_time.py @@ -1,3 +1,4 @@ +import struct import time import unittest from datetime import datetime @@ -9,38 +10,37 @@ class TestTime(unittest.TestCase): + def test_epoch(self): + """Verify that the epoch matches the standard definition.""" + epoch = datetime.strptime( + "1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z" + ).timestamp() + self.assertEqual(int(epoch), canopen.timestamp.OFFSET) + def test_time_producer(self): network = canopen.Network() network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0 network.connect(interface="virtual", receive_own_messages=True) producer = canopen.timestamp.TimeProducer(network) - # Test that the epoch is correct - epoch = datetime.strptime( - "1984-01-01 00:00:00 +0000", "%Y-%m-%d %H:%M:%S %z" - ).timestamp() - self.assertEqual(int(epoch), canopen.timestamp.OFFSET) + # Provide a specific time to verify the proper encoding + producer.transmit(1_927_999_438) # 2031-02-04T19:23:58+00:00 + msg = network.bus.recv(1) + self.assertEqual(msg.arbitration_id, 0x100) + self.assertEqual(msg.dlc, 6) + self.assertEqual(msg.data, b"\xb0\xa4\x29\x04\x31\x43") + # Test again with the current time as implicit timestamp current = time.time() with patch("canopen.timestamp.time.time", return_value=current): - current_in_epoch = current - epoch - - # Test it looking up the current time + current_from_epoch = current - canopen.timestamp.OFFSET producer.transmit() msg = network.bus.recv(1) self.assertEqual(msg.arbitration_id, 0x100) self.assertEqual(msg.dlc, 6) - ms, days = canopen.timestamp.TIME_OF_DAY_STRUCT.unpack(msg.data) - self.assertEqual(days, int(current_in_epoch) // canopen.timestamp.ONE_DAY) - self.assertEqual(ms, int((current_in_epoch % canopen.timestamp.ONE_DAY) * 1000)) - - # Provide a specific time to verify the proper encoding - faketime = 1_927_999_438 # 2031-02-04 19:23:58 - producer.transmit(faketime) - msg = network.bus.recv(1) - self.assertEqual(msg.arbitration_id, 0x100) - self.assertEqual(msg.dlc, 6) - self.assertEqual(msg.data, b"\xb0\xa4\x29\x04\x31\x43") + ms, days = struct.unpack("