Skip to content

Commit d02b555

Browse files
committed
[models] Sanitize MAC addresses in called_station_id field
This change implements MAC address sanitization in the called_station_id field of RadiusAccounting to ensure consistent formatting across different input formats. MAC addresses are now automatically converted to lowercase colon-separated format (aa:bb:cc:dd:ee:ff) while preserving non-MAC values unchanged for RFC compliance. Changes: - Added sanitize_mac_address function in base/models.py using netaddr.EUI - Integrated automatic MAC sanitization in AbstractRadiusAccounting.save() - Updated test imports to use sanitize_mac_address from base.models - Supports multiple MAC formats: colon, dash, dot, and no separators Fixes #624
1 parent 34048a1 commit d02b555

File tree

3 files changed

+52
-84
lines changed

3 files changed

+52
-84
lines changed

openwisp_radius/base/models.py

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,49 +28,6 @@
2828
from phonenumber_field.modelfields import PhoneNumberField
2929
from private_storage.fields import PrivateFileField
3030

31-
def sanitize_mac_address(mac):
32-
"""
33-
Sanitize a MAC address string to the colon-separated lowercase format.
34-
If the input is not a valid MAC address, return it unchanged.
35-
36-
Handles various MAC address formats:
37-
- 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e
38-
- 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e
39-
- 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e
40-
- 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e
41-
"""
42-
# Return empty string or non-string input as is
43-
if not mac or not isinstance(mac, str):
44-
return mac
45-
46-
# Check if it's an IP address (IPv4) - preserve unchanged
47-
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', mac):
48-
return mac
49-
50-
# Try to extract MAC address from the string
51-
# Look for MAC address patterns, including those with additional text
52-
mac_patterns = [
53-
r'([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}', # Standard MAC with : or -
54-
r'([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}', # Cisco format
55-
r'[0-9A-Fa-f]{12}' # No separators
56-
]
57-
58-
for pattern in mac_patterns:
59-
match = re.search(pattern, mac)
60-
if match:
61-
mac_candidate = match.group(0)
62-
try:
63-
# Try to parse as EUI to validate
64-
eui = EUI(mac_candidate)
65-
# Return sanitized MAC address in colon-separated lowercase format
66-
return ':'.join(['%02x' % x for x in eui.words]).lower()
67-
except (AddrFormatError, ValueError, TypeError):
68-
continue
69-
70-
# If no valid MAC found, return original string unchanged
71-
return mac
72-
73-
import swapper
7431
from openwisp_radius.registration import (
7532
REGISTRATION_METHOD_CHOICES,
7633
get_registration_choices,
@@ -104,7 +61,6 @@ def sanitize_mac_address(mac):
10461
prefix_generate_users,
10562
validate_csvfile,
10663
)
107-
from ..mac_utils import sanitize_mac_address
10864
from .validators import ipv6_network_validator, password_reset_url_validator
10965

11066
logger = logging.getLogger(__name__)
@@ -220,6 +176,49 @@ def sanitize_mac_address(mac):
220176
OPTIONAL_SETTINGS = app_settings.OPTIONAL_REGISTRATION_FIELDS
221177

222178

179+
def sanitize_mac_address(mac):
180+
"""
181+
Sanitize a MAC address string to the colon-separated lowercase format.
182+
If the input is not a valid MAC address, return it unchanged.
183+
184+
Handles various MAC address formats:
185+
- 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e
186+
- 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e
187+
- 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e
188+
- 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e
189+
"""
190+
# Return empty string or non-string input as is
191+
if not mac or not isinstance(mac, str):
192+
return mac
193+
194+
# Check if it's an IP address (IPv4) - preserve unchanged
195+
if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", mac):
196+
return mac
197+
198+
# Try to extract MAC address from the string
199+
# Look for MAC address patterns, including those with additional text
200+
mac_patterns = [
201+
r"([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}", # Standard MAC with : or -
202+
r"([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}", # Cisco format
203+
r"[0-9A-Fa-f]{12}", # No separators
204+
]
205+
206+
for pattern in mac_patterns:
207+
match = re.search(pattern, mac)
208+
if match:
209+
mac_candidate = match.group(0)
210+
try:
211+
# Try to parse as EUI to validate
212+
eui = EUI(mac_candidate)
213+
# Return sanitized MAC address in colon-separated lowercase format
214+
return ":".join(["%02x" % x for x in eui.words]).lower()
215+
except (AddrFormatError, ValueError, TypeError):
216+
continue
217+
218+
# If no valid MAC found, return original string unchanged
219+
return mac
220+
221+
223222
class AutoUsernameMixin(object):
224223
def clean(self):
225224
"""

openwisp_radius/management/commands/base/convert_called_station_id.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ def handle(self, *args, **options):
149149
str(EUI(radius_session.calling_station_id, dialect=mac_unix))
150150
].common_name
151151
mac_address = RE_VIRTUAL_ADDR_MAC.search(common_name)[0]
152-
from openwisp_radius.mac_utils import sanitize_mac_address
152+
from openwisp_radius.base.models import sanitize_mac_address
153+
153154
radius_session.called_station_id = sanitize_mac_address(mac_address)
154155
except KeyError:
155156
logger.warning(

openwisp_radius/tests/test_api/test_freeradius_api.py

Lines changed: 7 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
from ... import registration
2121
from ... import settings as app_settings
2222
from ...api.freeradius_views import logger as freeradius_api_logger
23-
from ...counters.exceptions import MaxQuotaReached, SkipCheck
2423
from ...base.models import sanitize_mac_address
24+
from ...counters.exceptions import MaxQuotaReached, SkipCheck
2525
from ...signals import radius_accounting_success
2626
from ...utils import load_model
2727
from ..mixins import ApiTokenMixin, BaseTestCase, BaseTransactionTestCase
@@ -406,7 +406,9 @@ def assertAcctData(self, ra, data):
406406
if key == "called_station_id":
407407
# Both values should be sanitized for comparison
408408
ra_value = sanitize_mac_address(ra_value) if ra_value else ra_value
409-
data_value = sanitize_mac_address(data_value) if data_value else data_value
409+
data_value = (
410+
sanitize_mac_address(data_value) if data_value else data_value
411+
)
410412
_type = type(ra_value)
411413
if _type != type(data_value):
412414
data_value = _type(data_value)
@@ -1853,7 +1855,9 @@ def test_mac_addr_roaming_accounting_view(self):
18531855
username="tester",
18541856
stop_time__isnull=False,
18551857
nas_ip_address=acct_post_data["nas_ip_address"],
1856-
called_station_id=sanitize_mac_address(acct_post_data["called_station_id"]),
1858+
called_station_id=sanitize_mac_address(
1859+
acct_post_data["called_station_id"]
1860+
),
18571861
).count(),
18581862
1,
18591863
)
@@ -2225,42 +2229,6 @@ def test_ip_from_radsetting_not_exist(self):
22252229
self.assertEqual(response.status_code, 403)
22262230
self.assertEqual(response.data["detail"], self.fail_msg)
22272231

2228-
def test_ip_from_radsetting_valid(self):
2229-
with mock.patch(self.freeradius_hosts_path, []):
2230-
radsetting = OrganizationRadiusSettings.objects.get(
2231-
organization=self._get_org()
2232-
)
2233-
radsetting.freeradius_allowed_hosts = "127.0.0.1"
2234-
radsetting.save()
2235-
response = self.client.post(reverse("radius:authorize"), self.params)
2236-
self.assertEqual(response.status_code, 200)
2237-
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2238-
2239-
def test_ip_from_setting_valid(self):
2240-
response = self.client.post(reverse("radius:authorize"), self.params)
2241-
self.assertEqual(response.status_code, 200)
2242-
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2243-
2244-
@capture_any_output()
2245-
def test_ip_from_radsetting_not_exist(self):
2246-
org2 = self._create_org(**{"name": "test", "slug": "test"})
2247-
user = self._create_user(username="org2-tester", email="tester@org2.com")
2248-
self._create_org_user(**{"organization": org2, "user": user})
2249-
params = self.params.copy()
2250-
params["username"] = "org2-tester"
2251-
response = self.client.post(
2252-
reverse("radius:user_auth_token", args=[org2.slug]), params
2253-
)
2254-
self.assertEqual(response.status_code, 200)
2255-
with self.subTest("FREERADIUS_ALLOWED_HOSTS is 127.0.0.1"):
2256-
response = self.client.post(reverse("radius:authorize"), params)
2257-
self.assertEqual(response.status_code, 200)
2258-
self.assertEqual(response.data, _AUTH_TYPE_ACCEPT_RESPONSE)
2259-
with self.subTest("Empty Settings"), mock.patch(self.freeradius_hosts_path, []):
2260-
response = self.client.post(reverse("radius:authorize"), params)
2261-
self.assertEqual(response.status_code, 403)
2262-
self.assertEqual(response.data["detail"], self.fail_msg)
2263-
22642232

22652233
class TestTransactionClientIpApi(
22662234
TestClientIpApiMixin, ApiTokenMixin, BaseTransactionTestCase

0 commit comments

Comments
 (0)