diff --git a/canopen/network.py b/canopen/network.py index 02bec899..6f2777c8 100644 --- a/canopen/network.py +++ b/canopen/network.py @@ -75,10 +75,10 @@ def unsubscribe(self, can_id, callback=None) -> None: If given, remove only this callback. Otherwise all callbacks for the CAN ID. """ - if callback is None: - del self.subscribers[can_id] - else: + if callback is not None: self.subscribers[can_id].remove(callback) + if not self.subscribers[can_id] or callback is None: + del self.subscribers[can_id] def connect(self, *args, **kwargs) -> Network: """Connect to CAN bus using python-can. diff --git a/canopen/node/local.py b/canopen/node/local.py index 8f2493d9..34bba899 100644 --- a/canopen/node/local.py +++ b/canopen/node/local.py @@ -39,6 +39,8 @@ def __init__( self.emcy = EmcyProducer(0x80 + self.id) def associate_network(self, network: canopen.network.Network): + if self.has_network(): + raise RuntimeError("Node is already associated with a network") self.network = network self.sdo.network = network self.tpdo.network = network @@ -49,6 +51,8 @@ def associate_network(self, network: canopen.network.Network): network.subscribe(0, self.nmt.on_command) def remove_network(self) -> None: + if not self.has_network(): + return self.network.unsubscribe(self.sdo.rx_cobid, self.sdo.on_request) self.network.unsubscribe(0, self.nmt.on_command) self.network = canopen.network._UNINITIALIZED_NETWORK diff --git a/canopen/node/remote.py b/canopen/node/remote.py index 226c0c0f..371f784c 100644 --- a/canopen/node/remote.py +++ b/canopen/node/remote.py @@ -51,6 +51,8 @@ def __init__( self.load_configuration() def associate_network(self, network: canopen.network.Network): + if self.has_network(): + raise RuntimeError("Node is already associated with a network") self.network = network self.sdo.network = network self.pdo.network = network @@ -64,6 +66,8 @@ def associate_network(self, network: canopen.network.Network): network.subscribe(0, self.nmt.on_command) def remove_network(self) -> None: + if not self.has_network(): + return for sdo in self.sdo_channels: self.network.unsubscribe(sdo.tx_cobid, sdo.on_response) self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat) diff --git a/test/test_node.py b/test/test_node.py new file mode 100644 index 00000000..43373a2a --- /dev/null +++ b/test/test_node.py @@ -0,0 +1,104 @@ +import unittest + +import canopen + + +def count_subscribers(network: canopen.Network) -> int: + """Count the number of subscribers in the network.""" + return sum(len(n) for n in network.subscribers.values()) + + +class TestLocalNode(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.network = canopen.Network() + cls.network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0 + cls.network.connect(interface="virtual") + + cls.node = canopen.LocalNode(2, canopen.objectdictionary.ObjectDictionary()) + + @classmethod + def tearDownClass(cls): + cls.network.disconnect() + + def test_associate_network(self): + # Need to store the number of subscribers before associating because the + # network implementation automatically adds subscribers to the list + n_subscribers = count_subscribers(self.network) + + # Associating the network with the local node + self.node.associate_network(self.network) + self.assertIs(self.node.network, self.network) + self.assertIs(self.node.sdo.network, self.network) + self.assertIs(self.node.tpdo.network, self.network) + self.assertIs(self.node.rpdo.network, self.network) + self.assertIs(self.node.nmt.network, self.network) + self.assertIs(self.node.emcy.network, self.network) + + # Test that its not possible to associate the network multiple times + with self.assertRaises(RuntimeError) as cm: + self.node.associate_network(self.network) + self.assertIn("already associated with a network", str(cm.exception)) + + # Test removal of the network. The count of subscribers should + # be the same as before the association + self.node.remove_network() + uninitalized = canopen.network._UNINITIALIZED_NETWORK + self.assertIs(self.node.network, uninitalized) + self.assertIs(self.node.sdo.network, uninitalized) + self.assertIs(self.node.tpdo.network, uninitalized) + self.assertIs(self.node.rpdo.network, uninitalized) + self.assertIs(self.node.nmt.network, uninitalized) + self.assertIs(self.node.emcy.network, uninitalized) + self.assertEqual(count_subscribers(self.network), n_subscribers) + + # Test that its possible to deassociate the network multiple times + self.node.remove_network() + + +class TestRemoteNode(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.network = canopen.Network() + cls.network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0 + cls.network.connect(interface="virtual") + + cls.node = canopen.RemoteNode(2, canopen.objectdictionary.ObjectDictionary()) + + @classmethod + def tearDownClass(cls): + cls.network.disconnect() + + def test_associate_network(self): + # Need to store the number of subscribers before associating because the + # network implementation automatically adds subscribers to the list + n_subscribers = count_subscribers(self.network) + + # Associating the network with the local node + self.node.associate_network(self.network) + self.assertIs(self.node.network, self.network) + self.assertIs(self.node.sdo.network, self.network) + self.assertIs(self.node.tpdo.network, self.network) + self.assertIs(self.node.rpdo.network, self.network) + self.assertIs(self.node.nmt.network, self.network) + + # Test that its not possible to associate the network multiple times + with self.assertRaises(RuntimeError) as cm: + self.node.associate_network(self.network) + self.assertIn("already associated with a network", str(cm.exception)) + + # Test removal of the network. The count of subscribers should + # be the same as before the association + self.node.remove_network() + uninitalized = canopen.network._UNINITIALIZED_NETWORK + self.assertIs(self.node.network, uninitalized) + self.assertIs(self.node.sdo.network, uninitalized) + self.assertIs(self.node.tpdo.network, uninitalized) + self.assertIs(self.node.rpdo.network, uninitalized) + self.assertIs(self.node.nmt.network, uninitalized) + self.assertEqual(count_subscribers(self.network), n_subscribers) + + # Test that its possible to deassociate the network multiple times + self.node.remove_network()