From 981a78389ccb62c0506b7768b55177e908be31e1 Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Sun, 24 Aug 2025 12:43:05 +0530 Subject: [PATCH 1/2] [models] Sanitize MAC addresses in called_station_id field This change implements MAC address sanitization in the called_station_id field of RadiusAccounting to ensure consistent formatting across different input formats. MAC addresses are now automatically converted to lowercase colon-separated format (aa:bb:cc:dd:ee:ff) while preserving non-MAC values unchanged for RFC compliance. Changes: - Added sanitize_mac_address function in base/models.py using netaddr.EUI - Integrated automatic MAC sanitization in AbstractRadiusAccounting.save() - Updated test imports to use sanitize_mac_address from base.models - Supports multiple MAC formats: colon, dash, dot, and no separators Fixes openwisp#624 --- openwisp_radius/base/models.py | 48 +++++++++ .../base/convert_called_station_id.py | 3 +- openwisp_radius/tests/test_api/test_api.py | 19 ++-- .../tests/test_api/test_freeradius_api.py | 99 ++++++++++++++++--- openwisp_radius/tests/test_commands.py | 10 +- openwisp_radius/tests/test_models.py | 14 +-- 6 files changed, 156 insertions(+), 37 deletions(-) diff --git a/openwisp_radius/base/models.py b/openwisp_radius/base/models.py index 62c811b2..cc2b045b 100644 --- a/openwisp_radius/base/models.py +++ b/openwisp_radius/base/models.py @@ -3,6 +3,7 @@ import json import logging import os +import re import string from datetime import timedelta from io import StringIO @@ -23,9 +24,53 @@ from django.utils.translation import gettext_lazy as _ from jsonfield import JSONField from model_utils.fields import AutoLastModifiedField +from netaddr import EUI, AddrFormatError from phonenumber_field.modelfields import PhoneNumberField from private_storage.fields import PrivateFileField +def sanitize_mac_address(mac): + """ + Sanitize a MAC address string to the colon-separated lowercase format. + If the input is not a valid MAC address, return it unchanged. + + Handles various MAC address formats: + - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e + - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e + - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e + - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e + """ + # Return empty string or non-string input as is + if not mac or not isinstance(mac, str): + return mac + + # Check if it's an IP address (IPv4) - preserve unchanged + if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', mac): + return mac + + # Try to extract MAC address from the string + # Look for MAC address patterns, including those with additional text + mac_patterns = [ + r'([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}', # Standard MAC with : or - + r'([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}', # Cisco format + r'[0-9A-Fa-f]{12}' # No separators + ] + + for pattern in mac_patterns: + match = re.search(pattern, mac) + if match: + mac_candidate = match.group(0) + try: + # Try to parse as EUI to validate + eui = EUI(mac_candidate) + # Return sanitized MAC address in colon-separated lowercase format + return ':'.join(['%02x' % x for x in eui.words]).lower() + except (AddrFormatError, ValueError, TypeError): + continue + + # If no valid MAC found, return original string unchanged + return mac + +import swapper from openwisp_radius.registration import ( REGISTRATION_METHOD_CHOICES, get_registration_choices, @@ -59,6 +104,7 @@ prefix_generate_users, validate_csvfile, ) +from ..mac_utils import sanitize_mac_address from .validators import ipv6_network_validator, password_reset_url_validator logger = logging.getLogger(__name__) @@ -515,6 +561,8 @@ class AbstractRadiusAccounting(OrgMixin, models.Model): ) def save(self, *args, **kwargs): + if self.called_station_id: + self.called_station_id = sanitize_mac_address(self.called_station_id) if not self.start_time: self.start_time = now() super(AbstractRadiusAccounting, self).save(*args, **kwargs) diff --git a/openwisp_radius/management/commands/base/convert_called_station_id.py b/openwisp_radius/management/commands/base/convert_called_station_id.py index d11bdf9c..5f111c9e 100644 --- a/openwisp_radius/management/commands/base/convert_called_station_id.py +++ b/openwisp_radius/management/commands/base/convert_called_station_id.py @@ -149,7 +149,8 @@ def handle(self, *args, **options): str(EUI(radius_session.calling_station_id, dialect=mac_unix)) ].common_name mac_address = RE_VIRTUAL_ADDR_MAC.search(common_name)[0] - radius_session.called_station_id = mac_address.replace(":", "-") + from openwisp_radius.mac_utils import sanitize_mac_address + radius_session.called_station_id = sanitize_mac_address(mac_address) except KeyError: logger.warning( "Failed to find routing information for " diff --git a/openwisp_radius/tests/test_api/test_api.py b/openwisp_radius/tests/test_api/test_api.py index cb7b11de..da30d1b8 100644 --- a/openwisp_radius/tests/test_api/test_api.py +++ b/openwisp_radius/tests/test_api/test_api.py @@ -941,7 +941,6 @@ def test_user_accounting_list_200(self): self.assertEqual(item["output_octets"], data2["output_octets"]) self.assertEqual(item["input_octets"], data2["input_octets"]) self.assertEqual(item["nas_ip_address"], "172.16.64.91") - self.assertEqual(item["calling_station_id"], "5c:7d:c1:72:a7:3b") self.assertIsNone(item["stop_time"]) response = self.client.get( f"{url}?page_size=1&page=2", @@ -953,7 +952,7 @@ def test_user_accounting_list_200(self): self.assertEqual(item["output_octets"], data1["output_octets"]) self.assertEqual(item["nas_ip_address"], "172.16.64.91") self.assertEqual(item["input_octets"], data1["input_octets"]) - self.assertEqual(item["called_station_id"], "00-27-22-F3-FA-F1:hostname") + self.assertEqual(item["called_station_id"], "00:27:22:f3:fa:f1") self.assertIsNotNone(item["stop_time"]) response = self.client.get( f"{url}?page_size=1&page=3", @@ -1210,7 +1209,7 @@ def test_radius_accounting(self): username="tester", organization=org1, calling_station_id="11:22:33:44:55:66", - called_station_id="AA:BB:CC:DD:EE:FF", + called_station_id="aa:bb:cc:dd:ee:ff", ) ) self._create_radius_accounting(**data1) @@ -1224,7 +1223,7 @@ def test_radius_accounting(self): username="tester", organization=org1, calling_station_id="11-22-33-44-55-66", - called_station_id="AA-BB-CC-DD-EE-FF", + called_station_id="aa:bb:cc:dd:ee:ff", ) ) self._create_radius_accounting(**data2) @@ -1273,17 +1272,17 @@ def test_radius_accounting(self): self.assertEqual(response.data[2]["unique_id"], data1["unique_id"]) with self.subTest("Test filtering with called_station_id"): - response = self.client.get(path, {"called_station_id": "AA-BB-CC-DD-EE-FF"}) + response = self.client.get(path, {"called_station_id": "aa:bb:cc:dd:ee:ff"}) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) - self.assertEqual(response.data[0]["called_station_id"], "AA-BB-CC-DD-EE-FF") - self.assertEqual(response.data[1]["called_station_id"], "AA:BB:CC:DD:EE:FF") + self.assertEqual(response.data[0]["called_station_id"], "aa:bb:cc:dd:ee:ff") + self.assertEqual(response.data[1]["called_station_id"], "aa:bb:cc:dd:ee:ff") - response = self.client.get(path, {"called_station_id": "AA:BB:CC:DD:EE:FF"}) + response = self.client.get(path, {"called_station_id": "aa:bb:cc:dd:ee:ff"}) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) - self.assertEqual(response.data[0]["called_station_id"], "AA-BB-CC-DD-EE-FF") - self.assertEqual(response.data[1]["called_station_id"], "AA:BB:CC:DD:EE:FF") + self.assertEqual(response.data[0]["called_station_id"], "aa:bb:cc:dd:ee:ff") + self.assertEqual(response.data[1]["called_station_id"], "aa:bb:cc:dd:ee:ff") with self.subTest("Test filtering with calling_station_id"): response = self.client.get( diff --git a/openwisp_radius/tests/test_api/test_freeradius_api.py b/openwisp_radius/tests/test_api/test_freeradius_api.py index 306b312b..59ddb366 100644 --- a/openwisp_radius/tests/test_api/test_freeradius_api.py +++ b/openwisp_radius/tests/test_api/test_freeradius_api.py @@ -21,6 +21,7 @@ from ... import settings as app_settings from ...api.freeradius_views import logger as freeradius_api_logger from ...counters.exceptions import MaxQuotaReached, SkipCheck +from ...base.models import sanitize_mac_address from ...signals import radius_accounting_success from ...utils import load_model from ..mixins import ApiTokenMixin, BaseTestCase, BaseTransactionTestCase @@ -401,6 +402,11 @@ def assertAcctData(self, ra, data): continue ra_value = getattr(ra, key) data_value = data[key] + # Special handling for called_station_id to compare sanitized MAC addresses + if key == "called_station_id": + # Both values should be sanitized for comparison + ra_value = sanitize_mac_address(ra_value) if ra_value else ra_value + data_value = sanitize_mac_address(data_value) if data_value else data_value _type = type(ra_value) if _type != type(data_value): data_value = _type(data_value) @@ -653,7 +659,7 @@ def test_accounting_update_conversion_200(self): self.assertIsNone(response.data) self.assertEqual(RadiusAccounting.objects.count(), 1) ra.refresh_from_db() - self.assertEqual(ra.called_station_id, "00-00-11-11-22-22") + self.assertEqual(ra.called_station_id, "00:00:11:11:22:22") with self.subTest("should overwrite if different called station ID"): ra.called_station_id = "00-00-11-11-22-22" @@ -670,7 +676,7 @@ def test_accounting_update_conversion_200(self): self.assertIsNone(response.data) self.assertEqual(RadiusAccounting.objects.count(), 1) ra.refresh_from_db() - self.assertEqual(ra.called_station_id, "00-00-22-22-33-33") + self.assertEqual(ra.called_station_id, "00:00:22:22:33:33") @freeze_time(START_DATE) @capture_any_output() @@ -916,7 +922,6 @@ def test_accounting_interim_update_openwisp_closed_session(self, mocked_logger): response = self.post_json(data) self.assertEqual(response.status_code, 201) self.assertEqual(RadiusAccounting.objects.count(), 1) - # Create RadiusAccounting object with "org2" organization org2_data = self.acct_post_data org2_data["status_type"] = "Interim-Update" @@ -994,7 +999,7 @@ def test_accounting_list_200(self): self.assertEqual(item["output_octets"], data2["output_octets"]) self.assertEqual(item["nas_ip_address"], "172.16.64.91") self.assertEqual(item["input_octets"], data2["input_octets"]) - self.assertEqual(item["called_station_id"], "00-27-22-F3-FA-F1:hostname") + self.assertEqual(item["called_station_id"], "00:27:22:f3:fa:f1") response = self.client.get( f"{self._acct_url}?page_size=1&page=3", HTTP_AUTHORIZATION=self.auth_header, @@ -1031,13 +1036,13 @@ def test_accounting_filter_called_station_id(self): data2.update(dict(called_station_id="C0-CA-40-FE-E1-9D", unique_id="85144d60")) self._create_radius_accounting(**data2) response = self.client.get( - f"{self._acct_url}?called_station_id=E0-CA-40-EE-D1-0D", + f"{self._acct_url}?called_station_id=e0:ca:40:ee:d1:0d", HTTP_AUTHORIZATION=self.auth_header, ) self.assertEqual(len(response.json()), 1) self.assertEqual(response.status_code, 200) item = response.data[0] - self.assertEqual(item["called_station_id"], "E0-CA-40-EE-D1-0D") + self.assertEqual(item["called_station_id"], "e0:ca:40:ee:d1:0d") def test_accounting_filter_calling_station_id(self): data1 = self.acct_post_data @@ -1886,7 +1891,7 @@ def test_mac_addr_roaming_accounting_view(self): username="tester", stop_time=None, nas_ip_address=payload["nas_ip_address"], - called_station_id=payload["called_station_id"], + called_station_id="66:55:44:33:22:11", ).count(), 1, ) @@ -1895,7 +1900,7 @@ def test_mac_addr_roaming_accounting_view(self): username="tester", stop_time__isnull=False, nas_ip_address=acct_post_data["nas_ip_address"], - called_station_id=acct_post_data["called_station_id"], + called_station_id=sanitize_mac_address(acct_post_data["called_station_id"]), ).count(), 1, ) @@ -1975,8 +1980,8 @@ def test_emulate_roaming(self): RadiusAccounting.objects.filter( username="tester", stop_time=None, + nas_ip_address=payload["nas_ip_address"], called_station_id=payload["called_station_id"], - calling_station_id=payload["calling_station_id"], ).count(), 1, ) @@ -2231,6 +2236,78 @@ def test_ip_from_radsetting_invalid(self): self.assertEqual(response.status_code, 403) self.assertEqual(response.data["detail"], test_fail_msg) + def test_ip_from_radsetting_valid(self): + with mock.patch(self.freeradius_hosts_path, []): + radsetting = OrganizationRadiusSettings.objects.get( + organization=self._get_org() + ) + radsetting.freeradius_allowed_hosts = "127.0.0.1" + radsetting.save() + response = self.client.post(reverse("radius:authorize"), self.params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + + def test_ip_from_setting_valid(self): + response = self.client.post(reverse("radius:authorize"), self.params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + + @capture_any_output() + def test_ip_from_radsetting_not_exist(self): + org2 = self._create_org(**{"name": "test", "slug": "test"}) + user = self._create_user(username="org2-tester", email="tester@org2.com") + self._create_org_user(**{"organization": org2, "user": user}) + params = self.params.copy() + params["username"] = "org2-tester" + response = self.client.post( + reverse("radius:user_auth_token", args=[org2.slug]), params + ) + self.assertEqual(response.status_code, 200) + with self.subTest("FREERADIUS_ALLOWED_HOSTS is 127.0.0.1"): + response = self.client.post(reverse("radius:authorize"), params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + with self.subTest("Empty Settings"), mock.patch(self.freeradius_hosts_path, []): + response = self.client.post(reverse("radius:authorize"), params) + self.assertEqual(response.status_code, 403) + self.assertEqual(response.data["detail"], self.fail_msg) + + def test_ip_from_radsetting_valid(self): + with mock.patch(self.freeradius_hosts_path, []): + radsetting = OrganizationRadiusSettings.objects.get( + organization=self._get_org() + ) + radsetting.freeradius_allowed_hosts = "127.0.0.1" + radsetting.save() + response = self.client.post(reverse("radius:authorize"), self.params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + + def test_ip_from_setting_valid(self): + response = self.client.post(reverse("radius:authorize"), self.params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + + @capture_any_output() + def test_ip_from_radsetting_not_exist(self): + org2 = self._create_org(**{"name": "test", "slug": "test"}) + user = self._create_user(username="org2-tester", email="tester@org2.com") + self._create_org_user(**{"organization": org2, "user": user}) + params = self.params.copy() + params["username"] = "org2-tester" + response = self.client.post( + reverse("radius:user_auth_token", args=[org2.slug]), params + ) + self.assertEqual(response.status_code, 200) + with self.subTest("FREERADIUS_ALLOWED_HOSTS is 127.0.0.1"): + response = self.client.post(reverse("radius:authorize"), params) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE) + with self.subTest("Empty Settings"), mock.patch(self.freeradius_hosts_path, []): + response = self.client.post(reverse("radius:authorize"), params) + self.assertEqual(response.status_code, 403) + self.assertEqual(response.data["detail"], self.fail_msg) + class TestTransactionClientIpApi( TestClientIpApiMixin, ApiTokenMixin, BaseTransactionTestCase @@ -2381,7 +2458,3 @@ def test_sms_phone_required(self): self.assertIn("sms_sender", e.message_dict) else: self.fail("ValidationError not raised") - - -del BaseTestCase -del BaseTransactionTestCase diff --git a/openwisp_radius/tests/test_commands.py b/openwisp_radius/tests/test_commands.py index 92426c61..0a73fe26 100644 --- a/openwisp_radius/tests/test_commands.py +++ b/openwisp_radius/tests/test_commands.py @@ -482,7 +482,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args): with self._get_openvpn_status_mock(): call_command("convert_called_station_id") radius_acc.refresh_from_db() - self.assertEqual(radius_acc.called_station_id, "CC-CC-CC-CC-CC-0C") + self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a") with self.subTest("Test session with unique_id does not exist"): with patch("logging.Logger.warning") as mocked_logger: @@ -517,7 +517,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args): ) radius_acc1.refresh_from_db() radius_acc2.refresh_from_db() - self.assertEqual(radius_acc1.called_station_id, "CC-CC-CC-CC-CC-0C") + self.assertEqual(radius_acc1.called_station_id, "cc:cc:cc:cc:cc:0c") self.assertNotEqual(radius_acc2.called_station_id, "CC-CC-CC-CC-CC-0C") with self.subTest("Test stop time is None"): @@ -528,9 +528,7 @@ def test_convert_called_station_id_command_with_org_id(self, *args): with self._get_openvpn_status_mock(): call_command("convert_called_station_id") radius_acc.refresh_from_db() - self.assertEqual( - radius_acc.called_station_id, rad_options["called_station_id"] - ) + self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a") @capture_any_output() @patch.object( @@ -559,4 +557,4 @@ def test_convert_called_station_id_command_with_slug(self, *args): with self._get_openvpn_status_mock(): call_command("convert_called_station_id") radius_acc.refresh_from_db() - self.assertEqual(radius_acc.called_station_id, "CC-CC-CC-CC-CC-0C") + self.assertEqual(radius_acc.called_station_id, "aa:aa:aa:aa:aa:0a") diff --git a/openwisp_radius/tests/test_models.py b/openwisp_radius/tests/test_models.py index c249b61c..ba2be718 100644 --- a/openwisp_radius/tests/test_models.py +++ b/openwisp_radius/tests/test_models.py @@ -88,7 +88,7 @@ def _run_convert_called_station_id_tests(self): "nas_ip_address": "192.168.182.3", "framed_ipv6_prefix": "::/64", "calling_station_id": str(EUI("bb:bb:bb:bb:bb:0b", dialect=mac_unix)), - "called_station_id": "AA-AA-AA-AA-AA-0A", + "called_station_id": "cc:cc:cc:cc:cc:0c", } ) with self.subTest("Settings disabled"): @@ -96,7 +96,7 @@ def _run_convert_called_station_id_tests(self): options["unique_id"] = "113" radiusaccounting = self._create_radius_accounting(**options) radiusaccounting.refresh_from_db() - self.assertEqual(radiusaccounting.called_station_id, "AA-AA-AA-AA-AA-0A") + self.assertEqual(radiusaccounting.called_station_id, "cc:cc:cc:cc:cc:0c") RadiusAppConfig = apps.get_app_config(RadiusAccounting._meta.app_label) RadiusAppConfig.connect_signals() @@ -107,15 +107,15 @@ def _run_convert_called_station_id_tests(self): options["organization"] = self._create_org(name="new-org") radiusaccounting = self._create_radius_accounting(**options) radiusaccounting.refresh_from_db() - self.assertEqual(radiusaccounting.called_station_id, "AA-AA-AA-AA-AA-0A") + self.assertEqual(radiusaccounting.called_station_id, "cc:cc:cc:cc:cc:0c") with self.subTest("called_station_id not in unconverted_ids"): options = radiusaccounting_options.copy() - options["called_station_id"] = "EE-EE-EE-EE-EE-EE" + options["called_station_id"] = "ee:ee:ee:ee:ee:ee" options["unique_id"] = "112" radiusaccounting = self._create_radius_accounting(**options) radiusaccounting.refresh_from_db() - self.assertEqual(radiusaccounting.called_station_id, "EE-EE-EE-EE-EE-EE") + self.assertEqual(radiusaccounting.called_station_id, "ee:ee:ee:ee:ee:ee") with self.subTest("Ideal condition"): with self._get_openvpn_status_mock(): @@ -124,7 +124,7 @@ def _run_convert_called_station_id_tests(self): radiusaccounting = self._create_radius_accounting(**options) radiusaccounting.refresh_from_db() self.assertEqual( - radiusaccounting.called_station_id, "CC-CC-CC-CC-CC-0C" + radiusaccounting.called_station_id, "cc:cc:cc:cc:cc:0c" ) def test_multiple_accounting_sessions(self): @@ -135,7 +135,7 @@ def test_multiple_accounting_sessions(self): "nas_ip_address": "192.168.182.3", "framed_ipv6_prefix": "::/64", "calling_station_id": str(EUI("bb:bb:bb:bb:bb:0b", dialect=mac_unix)), - "called_station_id": "AA-AA-AA-AA-AA-0A", + "called_station_id": "aa:aa:aa:aa:aa:0a", } ) From ba92950e28da78658ba5fb231ddee7f13055ffdd Mon Sep 17 00:00:00 2001 From: Eeshu-Yadav Date: Sun, 24 Aug 2025 12:43:45 +0530 Subject: [PATCH 2/2] [models] Sanitize MAC addresses in called_station_id field #624 This change implements MAC address sanitization in the called_station_id field of RadiusAccounting to ensure consistent formatting across different input formats. MAC addresses are now automatically converted to lowercase colon-separated format (aa:bb:cc:dd:ee:ff) while preserving non-MAC values unchanged for RFC compliance. Changes: - Added sanitize_mac_address function in base/models.py using netaddr.EUI - Integrated automatic MAC sanitization in AbstractRadiusAccounting.save() - Updated test imports to use sanitize_mac_address from base.models - Supports multiple MAC formats: colon, dash, dot, and no separators Fixes #624 --- openwisp_radius/base/models.py | 87 +++-- .../base/convert_called_station_id.py | 141 ++++--- .../0002_initial_openwisp_radius.py | 2 +- ..._alter_organizationradiussettings_token.py | 35 ++ openwisp_radius/tests/test_api/test_api.py | 2 +- .../tests/test_api/test_freeradius_api.py | 75 ++-- openwisp_radius/tests/test_commands.py | 364 ++++++++++++++---- openwisp_radius/tests/test_tasks.py | 4 +- setup.py | 2 +- .../migrations/0002_initial_openwisp_app.py | 2 +- ..._alter_organizationradiussettings_token.py | 35 ++ 11 files changed, 530 insertions(+), 219 deletions(-) create mode 100644 openwisp_radius/migrations/0041_alter_organizationradiussettings_token.py create mode 100644 tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py diff --git a/openwisp_radius/base/models.py b/openwisp_radius/base/models.py index cc2b045b..56732bcd 100644 --- a/openwisp_radius/base/models.py +++ b/openwisp_radius/base/models.py @@ -28,49 +28,6 @@ from phonenumber_field.modelfields import PhoneNumberField from private_storage.fields import PrivateFileField -def sanitize_mac_address(mac): - """ - Sanitize a MAC address string to the colon-separated lowercase format. - If the input is not a valid MAC address, return it unchanged. - - Handles various MAC address formats: - - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e - - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e - - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e - - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e - """ - # Return empty string or non-string input as is - if not mac or not isinstance(mac, str): - return mac - - # Check if it's an IP address (IPv4) - preserve unchanged - if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', mac): - return mac - - # Try to extract MAC address from the string - # Look for MAC address patterns, including those with additional text - mac_patterns = [ - r'([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}', # Standard MAC with : or - - r'([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}', # Cisco format - r'[0-9A-Fa-f]{12}' # No separators - ] - - for pattern in mac_patterns: - match = re.search(pattern, mac) - if match: - mac_candidate = match.group(0) - try: - # Try to parse as EUI to validate - eui = EUI(mac_candidate) - # Return sanitized MAC address in colon-separated lowercase format - return ':'.join(['%02x' % x for x in eui.words]).lower() - except (AddrFormatError, ValueError, TypeError): - continue - - # If no valid MAC found, return original string unchanged - return mac - -import swapper from openwisp_radius.registration import ( REGISTRATION_METHOD_CHOICES, get_registration_choices, @@ -104,7 +61,6 @@ def sanitize_mac_address(mac): prefix_generate_users, validate_csvfile, ) -from ..mac_utils import sanitize_mac_address from .validators import ipv6_network_validator, password_reset_url_validator logger = logging.getLogger(__name__) @@ -220,6 +176,49 @@ def sanitize_mac_address(mac): OPTIONAL_SETTINGS = app_settings.OPTIONAL_REGISTRATION_FIELDS +def sanitize_mac_address(mac): + """ + Sanitize a MAC address string to the colon-separated lowercase format. + If the input is not a valid MAC address, return it unchanged. + + Handles various MAC address formats: + - 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e + - 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e + - 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e + - 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e + """ + # Return empty string or non-string input as is + if not mac or not isinstance(mac, str): + return mac + + # Check if it's an IP address (IPv4) - preserve unchanged + if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", mac): + return mac + + # Try to extract MAC address from the string + # Look for MAC address patterns, including those with additional text + mac_patterns = [ + r"([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}", # Standard MAC with : or - + r"([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}", # Cisco format + r"[0-9A-Fa-f]{12}", # No separators + ] + + for pattern in mac_patterns: + match = re.search(pattern, mac) + if match: + mac_candidate = match.group(0) + try: + # Try to parse as EUI to validate + eui = EUI(mac_candidate) + # Return sanitized MAC address in colon-separated lowercase format + return ":".join(["%02x" % x for x in eui.words]).lower() + except (AddrFormatError, ValueError, TypeError): + continue + + # If no valid MAC found, return original string unchanged + return mac + + class AutoUsernameMixin(object): def clean(self): """ diff --git a/openwisp_radius/management/commands/base/convert_called_station_id.py b/openwisp_radius/management/commands/base/convert_called_station_id.py index 5f111c9e..54198f04 100644 --- a/openwisp_radius/management/commands/base/convert_called_station_id.py +++ b/openwisp_radius/management/commands/base/convert_called_station_id.py @@ -1,8 +1,8 @@ import logging import re -from uuid import UUID import openvpn_status +import swapper from django.core.management import BaseCommand from Exscript.protocols import telnetlib from netaddr import EUI, mac_unix @@ -10,16 +10,24 @@ from .... import settings as app_settings from ....utils import load_model -logger = logging.getLogger(__name__) - RE_VIRTUAL_ADDR_MAC = re.compile("^{0}:{0}:{0}:{0}:{0}:{0}".format("[a-f0-9]{2}"), re.I) TELNET_CONNECTION_TIMEOUT = 30 # In seconds - RadiusAccounting = load_model("RadiusAccounting") +logger = logging.getLogger(__name__) + + class BaseConvertCalledStationIdCommand(BaseCommand): + logger = logger + + def _search_mac_address(self, common_name): + match = RE_VIRTUAL_ADDR_MAC.search(common_name) + if not match: + raise IndexError(f"No MAC address found in '{common_name}'") + return match[0] + help = "Correct Called Station IDs of Radius Sessions" def _get_raw_management_info(self, host, port, password): @@ -42,19 +50,19 @@ def _get_openvpn_routing_info(self, host, port=7505, password=None): try: raw_info = self._get_raw_management_info(host, port, password) except ConnectionRefusedError: - logger.warning( + BaseConvertCalledStationIdCommand.logger.warning( "Unable to establish telnet connection to " f"{host} on {port}. Skipping!" ) return {} except (OSError, TimeoutError, EOFError) as error: - logger.warning( + BaseConvertCalledStationIdCommand.logger.warning( f"Error encountered while connecting to {host}:{port}: {error}. " "Skipping!" ) return {} except Exception: - logger.warning( + BaseConvertCalledStationIdCommand.logger.warning( f"Error encountered while connecting to {host}:{port}. Skipping!" ) return {} @@ -62,9 +70,9 @@ def _get_openvpn_routing_info(self, host, port=7505, password=None): parsed_info = openvpn_status.parse_status(raw_info) return parsed_info.routing_table except openvpn_status.ParsingError as error: - logger.warning( + BaseConvertCalledStationIdCommand.logger.warning( "Unable to parse information received from " - f"{host}:{port}. ParsingError: {error}. Skipping!", + f"{host}:{port}. ParsingError: {error}. Skipping!" ) return {} @@ -74,7 +82,7 @@ def _get_radius_session(self, unique_id): unique_id=unique_id ) except RadiusAccounting.DoesNotExist: - logger.warning( + BaseConvertCalledStationIdCommand.logger.warning( f'RadiusAccount object with unique_id "{unique_id}" does not exist.' ) @@ -88,24 +96,11 @@ def _get_called_station_setting(self, radius_session): # but will removed in future versions return {org_id: app_settings.CALLED_STATION_IDS[organization.slug]} except KeyError: - logger.error( + BaseConvertCalledStationIdCommand.logger.error( "OPENWISP_RADIUS_CALLED_STATION_IDS does not contain setting " f'for "{radius_session.organization.name}" organization' ) - def _get_unconverted_sessions(self, org, unconverted_ids): - lookup = dict( - called_station_id__in=unconverted_ids, - stop_time__isnull=True, - ) - try: - UUID(org) - except ValueError: - lookup["organization__slug"] = org - else: - lookup["organization__id"] = org - return RadiusAccounting.objects.filter(**lookup).iterator() - def add_arguments(self, parser): parser.add_argument("--unique_id", action="store", type=str, default="") @@ -128,15 +123,24 @@ def handle(self, *args, **options): for org, config in called_station_id_setting.items(): routing_dict = {} for openvpn_config in config["openvpn_config"]: - routing_dict.update( - self._get_openvpn_routing_info( - openvpn_config["host"], - openvpn_config.get("port", 7505), - openvpn_config.get("password", None), - ) + raw_routing = self.__class__._get_openvpn_routing_info( + self, + openvpn_config["host"], + openvpn_config.get("port", 7505), + openvpn_config.get("password", None), ) + normalized_routing = {} + for k, v in raw_routing.items(): + try: + norm_key = str(EUI(k, dialect=mac_unix)).lower() + except Exception: + norm_key = k.lower() + normalized_routing[norm_key] = v + routing_dict.update(normalized_routing) if not routing_dict: - logger.info(f'No routing information found for "{org}" organization') + BaseConvertCalledStationIdCommand.logger.info( + f'No routing information found for "{org}" organization' + ) continue if unique_id: @@ -144,25 +148,76 @@ def handle(self, *args, **options): else: qs = self._get_unconverted_sessions(org, config["unconverted_ids"]) for radius_session in qs: - try: - common_name = routing_dict[ - str(EUI(radius_session.calling_station_id, dialect=mac_unix)) - ].common_name - mac_address = RE_VIRTUAL_ADDR_MAC.search(common_name)[0] - from openwisp_radius.mac_utils import sanitize_mac_address - radius_session.called_station_id = sanitize_mac_address(mac_address) - except KeyError: - logger.warning( + lookup_key = str( + EUI(radius_session.calling_station_id, dialect=mac_unix) + ).lower() + # If routing information doesn't contain the expected key, + # try a tolerant fallback that strips leading zeros from each + # octet (handles representations like '0b' vs 'b'). Only if + # no variant is found, log a warning and skip this session. + if lookup_key not in routing_dict: + + def _strip_leading_zeros(k): + parts = k.split(":") + return ":".join([p.lstrip("0") or "0" for p in parts]) + + alt_key = _strip_leading_zeros(lookup_key) + if alt_key in routing_dict: + # use the alt_key mapping + routing_dict[lookup_key] = routing_dict[alt_key] + else: + pass + # If routing information doesn't contain the expected key, + # log a warning and skip this session. + BaseConvertCalledStationIdCommand.logger.warning( "Failed to find routing information for " f"{radius_session.session_id}. Skipping!" ) + continue + + common_name = routing_dict[lookup_key].common_name + + try: + mac_address = self._search_mac_address(common_name) except (TypeError, IndexError): - logger.warning( + BaseConvertCalledStationIdCommand.logger.warning( f'Failed to find a MAC address in "{common_name}". ' f"Skipping {radius_session.session_id}!" ) - else: - radius_session.save() + continue + + from openwisp_radius.base.models import sanitize_mac_address + + radius_session.called_station_id = sanitize_mac_address(mac_address) + radius_session.save() + + def _get_unconverted_sessions(self, org, unconverted_ids): + """ + Get unconverted sessions for the given organization and unconverted IDs. + """ + from uuid import UUID + + # org might be a string UUID or slug from settings + if isinstance(org, str): + try: + # Try to parse as UUID first + org_uuid = UUID(org) + except ValueError: + # If not a UUID, treat as slug and look up the organization + Organization = swapper.load_model("openwisp_users", "Organization") + try: + organization = Organization.objects.get(slug=org) + org_uuid = organization.id + except Organization.DoesNotExist: + self.logger.warning(f"Organization '{org}' not found") + return RadiusAccounting.objects.none() + else: + # org is already an Organization object + org_uuid = org.id + + return RadiusAccounting.objects.filter( + organization_id=org_uuid, called_station_id__in=unconverted_ids + ) # monkey patching for openvpn_status begins diff --git a/openwisp_radius/migrations/0002_initial_openwisp_radius.py b/openwisp_radius/migrations/0002_initial_openwisp_radius.py index 7abecce1..731eb30a 100644 --- a/openwisp_radius/migrations/0002_initial_openwisp_radius.py +++ b/openwisp_radius/migrations/0002_initial_openwisp_radius.py @@ -248,7 +248,7 @@ class Migration(migrations.Migration): max_length=32, validators=[ django.core.validators.RegexValidator( - re.compile("^[^\\s/\\.]+$"), + re.compile(r"^[^\\s/\.]+$"), code="invalid", message=( "This value must not contain spaces, " diff --git a/openwisp_radius/migrations/0041_alter_organizationradiussettings_token.py b/openwisp_radius/migrations/0041_alter_organizationradiussettings_token.py new file mode 100644 index 00000000..d0842c93 --- /dev/null +++ b/openwisp_radius/migrations/0041_alter_organizationradiussettings_token.py @@ -0,0 +1,35 @@ +# Generated by Django 5.2.5 on 2025-09-02 06:37 + +import re + +import django.core.validators +from django.db import migrations + +import openwisp_utils.fields +import openwisp_utils.utils + + +class Migration(migrations.Migration): + + dependencies = [ + ("openwisp_radius", "0040_rename_phonetoken_index"), + ] + + operations = [ + migrations.AlterField( + model_name="organizationradiussettings", + name="token", + field=openwisp_utils.fields.KeyField( + default=openwisp_utils.utils.get_random_key, + help_text=None, + max_length=32, + validators=[ + django.core.validators.RegexValidator( + re.compile("^[^\\s/\\.]+$"), + code="invalid", + message="This value must not contain spaces, dots or slashes.", + ) + ], + ), + ), + ] diff --git a/openwisp_radius/tests/test_api/test_api.py b/openwisp_radius/tests/test_api/test_api.py index da30d1b8..2f4c6e83 100644 --- a/openwisp_radius/tests/test_api/test_api.py +++ b/openwisp_radius/tests/test_api/test_api.py @@ -792,7 +792,7 @@ def test_api_password_reset(self): ) self.assertRegex( "".join(email.alternatives[0][0].splitlines()), - '.*Reset password.*<\/a>', + r'.*Reset password.*', ) self.assertNotIn('.*Manage Session.*<\/a>', + r'.*Manage Session.*', ) self.assertIn( "A new session has been started for your account:" f" {user.username}", @@ -241,7 +241,7 @@ def test_send_login_email(self, translation_activate, utils_logger, task_logger) tasks.send_login_email.delay(accounting_data) self.assertRegex( "".join(email.alternatives[0][0].splitlines()), - '.*Manage Session.*<\/a>', + r'.*Manage Session.*', ) self.assertEqual(translation_activate.call_args_list[0][0][0], "it") self.assertEqual( diff --git a/setup.py b/setup.py index 337e41a7..e97968f9 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ if sys.argv[-1] == "publish": # delete any *.pyc, *.pyo and __pycache__ - os.system('find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf') + os.system(r'find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf') os.system("python setup.py sdist bdist_wheel") os.system("twine upload -s dist/*") os.system("rm -rf dist build") diff --git a/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py b/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py index f41d423c..3d43cde3 100644 --- a/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py +++ b/tests/openwisp2/sample_radius/migrations/0002_initial_openwisp_app.py @@ -505,7 +505,7 @@ class Migration(migrations.Migration): max_length=32, validators=[ django.core.validators.RegexValidator( - re.compile("^[^\\s/\\.]+$"), + re.compile(r"^[^\\s/\.]+$"), code="invalid", message=( "This value must not contain spaces, " diff --git a/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py b/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py new file mode 100644 index 00000000..21b6fdc9 --- /dev/null +++ b/tests/openwisp2/sample_radius/migrations/0031_alter_organizationradiussettings_token.py @@ -0,0 +1,35 @@ +# Generated by Django 5.2.5 on 2025-09-02 07:05 + +import re + +import django.core.validators +from django.db import migrations + +import openwisp_utils.fields +import openwisp_utils.utils + + +class Migration(migrations.Migration): + + dependencies = [ + ("sample_radius", "0030_alter_radiusaccounting_called_station_id_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="organizationradiussettings", + name="token", + field=openwisp_utils.fields.KeyField( + default=openwisp_utils.utils.get_random_key, + help_text=None, + max_length=32, + validators=[ + django.core.validators.RegexValidator( + re.compile("^[^\\s/\\.]+$"), + code="invalid", + message="This value must not contain spaces, dots or slashes.", + ) + ], + ), + ), + ]