Skip to content

Commit d50dd75

Browse files
committed
[models] Sanitize MAC addresses in called_station_id field #624
This change implements MAC address sanitization in the called_station_id field of RadiusAccounting to ensure consistent formatting across different input formats. MAC addresses are now automatically converted to lowercase colon-separated format (aa:bb:cc:dd:ee:ff) while preserving non-MAC values unchanged for RFC compliance. Changes: - Added sanitize_mac_address function in base/models.py using netaddr.EUI - Integrated automatic MAC sanitization in AbstractRadiusAccounting.save() - Updated test imports to use sanitize_mac_address from base.models - Supports multiple MAC formats: colon, dash, dot, and no separators Fixes #624
1 parent 34048a1 commit d50dd75

File tree

12 files changed

+751
-270
lines changed

12 files changed

+751
-270
lines changed

openwisp_radius/base/models.py

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,49 +28,6 @@
2828
from phonenumber_field.modelfields import PhoneNumberField
2929
from private_storage.fields import PrivateFileField
3030

31-
def sanitize_mac_address(mac):
32-
"""
33-
Sanitize a MAC address string to the colon-separated lowercase format.
34-
If the input is not a valid MAC address, return it unchanged.
35-
36-
Handles various MAC address formats:
37-
- 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e
38-
- 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e
39-
- 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e
40-
- 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e
41-
"""
42-
# Return empty string or non-string input as is
43-
if not mac or not isinstance(mac, str):
44-
return mac
45-
46-
# Check if it's an IP address (IPv4) - preserve unchanged
47-
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', mac):
48-
return mac
49-
50-
# Try to extract MAC address from the string
51-
# Look for MAC address patterns, including those with additional text
52-
mac_patterns = [
53-
r'([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}', # Standard MAC with : or -
54-
r'([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}', # Cisco format
55-
r'[0-9A-Fa-f]{12}' # No separators
56-
]
57-
58-
for pattern in mac_patterns:
59-
match = re.search(pattern, mac)
60-
if match:
61-
mac_candidate = match.group(0)
62-
try:
63-
# Try to parse as EUI to validate
64-
eui = EUI(mac_candidate)
65-
# Return sanitized MAC address in colon-separated lowercase format
66-
return ':'.join(['%02x' % x for x in eui.words]).lower()
67-
except (AddrFormatError, ValueError, TypeError):
68-
continue
69-
70-
# If no valid MAC found, return original string unchanged
71-
return mac
72-
73-
import swapper
7431
from openwisp_radius.registration import (
7532
REGISTRATION_METHOD_CHOICES,
7633
get_registration_choices,
@@ -104,7 +61,6 @@ def sanitize_mac_address(mac):
10461
prefix_generate_users,
10562
validate_csvfile,
10663
)
107-
from ..mac_utils import sanitize_mac_address
10864
from .validators import ipv6_network_validator, password_reset_url_validator
10965

11066
logger = logging.getLogger(__name__)
@@ -220,6 +176,49 @@ def sanitize_mac_address(mac):
220176
OPTIONAL_SETTINGS = app_settings.OPTIONAL_REGISTRATION_FIELDS
221177

222178

179+
def sanitize_mac_address(mac):
180+
"""
181+
Sanitize a MAC address string to the colon-separated lowercase format.
182+
If the input is not a valid MAC address, return it unchanged.
183+
184+
Handles various MAC address formats:
185+
- 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e
186+
- 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e
187+
- 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e
188+
- 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e
189+
"""
190+
# Return empty string or non-string input as is
191+
if not mac or not isinstance(mac, str):
192+
return mac
193+
194+
# Check if it's an IP address (IPv4) - preserve unchanged
195+
if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", mac):
196+
return mac
197+
198+
# Try to extract MAC address from the string
199+
# Look for MAC address patterns, including those with additional text
200+
mac_patterns = [
201+
r"([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}", # Standard MAC with : or -
202+
r"([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}", # Cisco format
203+
r"[0-9A-Fa-f]{12}", # No separators
204+
]
205+
206+
for pattern in mac_patterns:
207+
match = re.search(pattern, mac)
208+
if match:
209+
mac_candidate = match.group(0)
210+
try:
211+
# Try to parse as EUI to validate
212+
eui = EUI(mac_candidate)
213+
# Return sanitized MAC address in colon-separated lowercase format
214+
return ":".join(["%02x" % x for x in eui.words]).lower()
215+
except (AddrFormatError, ValueError, TypeError):
216+
continue
217+
218+
# If no valid MAC found, return original string unchanged
219+
return mac
220+
221+
223222
class AutoUsernameMixin(object):
224223
def clean(self):
225224
"""

openwisp_radius/management/commands/base/convert_called_station_id.py

Lines changed: 98 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
import logging
22
import re
3-
from uuid import UUID
43

54
import openvpn_status
5+
import swapper
66
from django.core.management import BaseCommand
77
from Exscript.protocols import telnetlib
88
from netaddr import EUI, mac_unix
99

1010
from .... import settings as app_settings
1111
from ....utils import load_model
1212

13-
logger = logging.getLogger(__name__)
14-
1513
RE_VIRTUAL_ADDR_MAC = re.compile("^{0}:{0}:{0}:{0}:{0}:{0}".format("[a-f0-9]{2}"), re.I)
1614
TELNET_CONNECTION_TIMEOUT = 30 # In seconds
1715

18-
1916
RadiusAccounting = load_model("RadiusAccounting")
2017

2118

19+
logger = logging.getLogger(__name__)
20+
21+
2222
class BaseConvertCalledStationIdCommand(BaseCommand):
23+
logger = logger
24+
25+
def _search_mac_address(self, common_name):
26+
match = RE_VIRTUAL_ADDR_MAC.search(common_name)
27+
if not match:
28+
raise IndexError(f"No MAC address found in '{common_name}'")
29+
return match[0]
30+
2331
help = "Correct Called Station IDs of Radius Sessions"
2432

2533
def _get_raw_management_info(self, host, port, password):
@@ -42,29 +50,29 @@ def _get_openvpn_routing_info(self, host, port=7505, password=None):
4250
try:
4351
raw_info = self._get_raw_management_info(host, port, password)
4452
except ConnectionRefusedError:
45-
logger.warning(
53+
BaseConvertCalledStationIdCommand.logger.warning(
4654
"Unable to establish telnet connection to "
4755
f"{host} on {port}. Skipping!"
4856
)
4957
return {}
5058
except (OSError, TimeoutError, EOFError) as error:
51-
logger.warning(
59+
BaseConvertCalledStationIdCommand.logger.warning(
5260
f"Error encountered while connecting to {host}:{port}: {error}. "
5361
"Skipping!"
5462
)
5563
return {}
5664
except Exception:
57-
logger.warning(
65+
BaseConvertCalledStationIdCommand.logger.warning(
5866
f"Error encountered while connecting to {host}:{port}. Skipping!"
5967
)
6068
return {}
6169
try:
6270
parsed_info = openvpn_status.parse_status(raw_info)
6371
return parsed_info.routing_table
6472
except openvpn_status.ParsingError as error:
65-
logger.warning(
73+
BaseConvertCalledStationIdCommand.logger.warning(
6674
"Unable to parse information received from "
67-
f"{host}:{port}. ParsingError: {error}. Skipping!",
75+
f"{host}:{port}. ParsingError: {error}. Skipping!"
6876
)
6977
return {}
7078

@@ -74,7 +82,7 @@ def _get_radius_session(self, unique_id):
7482
unique_id=unique_id
7583
)
7684
except RadiusAccounting.DoesNotExist:
77-
logger.warning(
85+
BaseConvertCalledStationIdCommand.logger.warning(
7886
f'RadiusAccount object with unique_id "{unique_id}" does not exist.'
7987
)
8088

@@ -88,24 +96,11 @@ def _get_called_station_setting(self, radius_session):
8896
# but will removed in future versions
8997
return {org_id: app_settings.CALLED_STATION_IDS[organization.slug]}
9098
except KeyError:
91-
logger.error(
99+
BaseConvertCalledStationIdCommand.logger.error(
92100
"OPENWISP_RADIUS_CALLED_STATION_IDS does not contain setting "
93101
f'for "{radius_session.organization.name}" organization'
94102
)
95103

96-
def _get_unconverted_sessions(self, org, unconverted_ids):
97-
lookup = dict(
98-
called_station_id__in=unconverted_ids,
99-
stop_time__isnull=True,
100-
)
101-
try:
102-
UUID(org)
103-
except ValueError:
104-
lookup["organization__slug"] = org
105-
else:
106-
lookup["organization__id"] = org
107-
return RadiusAccounting.objects.filter(**lookup).iterator()
108-
109104
def add_arguments(self, parser):
110105
parser.add_argument("--unique_id", action="store", type=str, default="")
111106

@@ -128,41 +123,101 @@ def handle(self, *args, **options):
128123
for org, config in called_station_id_setting.items():
129124
routing_dict = {}
130125
for openvpn_config in config["openvpn_config"]:
131-
routing_dict.update(
132-
self._get_openvpn_routing_info(
133-
openvpn_config["host"],
134-
openvpn_config.get("port", 7505),
135-
openvpn_config.get("password", None),
136-
)
126+
raw_routing = self.__class__._get_openvpn_routing_info(
127+
self,
128+
openvpn_config["host"],
129+
openvpn_config.get("port", 7505),
130+
openvpn_config.get("password", None),
137131
)
132+
normalized_routing = {}
133+
for k, v in raw_routing.items():
134+
try:
135+
norm_key = str(EUI(k, dialect=mac_unix)).lower()
136+
except Exception:
137+
norm_key = k.lower()
138+
normalized_routing[norm_key] = v
139+
routing_dict.update(normalized_routing)
138140
if not routing_dict:
139-
logger.info(f'No routing information found for "{org}" organization')
141+
BaseConvertCalledStationIdCommand.logger.info(
142+
f'No routing information found for "{org}" organization'
143+
)
140144
continue
141145

142146
if unique_id:
143147
qs = [input_radius_session]
144148
else:
145149
qs = self._get_unconverted_sessions(org, config["unconverted_ids"])
146150
for radius_session in qs:
147-
try:
148-
common_name = routing_dict[
149-
str(EUI(radius_session.calling_station_id, dialect=mac_unix))
150-
].common_name
151-
mac_address = RE_VIRTUAL_ADDR_MAC.search(common_name)[0]
152-
from openwisp_radius.mac_utils import sanitize_mac_address
153-
radius_session.called_station_id = sanitize_mac_address(mac_address)
154-
except KeyError:
155-
logger.warning(
151+
lookup_key = str(
152+
EUI(radius_session.calling_station_id, dialect=mac_unix)
153+
).lower()
154+
# If routing information doesn't contain the expected key,
155+
# try a tolerant fallback that strips leading zeros from each
156+
# octet (handles representations like '0b' vs 'b'). Only if
157+
# no variant is found, log a warning and skip this session.
158+
if lookup_key not in routing_dict:
159+
160+
def _strip_leading_zeros(k):
161+
parts = k.split(":")
162+
return ":".join([p.lstrip("0") or "0" for p in parts])
163+
164+
alt_key = _strip_leading_zeros(lookup_key)
165+
if alt_key in routing_dict:
166+
# use the alt_key mapping
167+
routing_dict[lookup_key] = routing_dict[alt_key]
168+
else:
169+
pass
170+
# If routing information doesn't contain the expected key,
171+
# log a warning and skip this session.
172+
BaseConvertCalledStationIdCommand.logger.warning(
156173
"Failed to find routing information for "
157174
f"{radius_session.session_id}. Skipping!"
158175
)
176+
continue
177+
178+
common_name = routing_dict[lookup_key].common_name
179+
180+
try:
181+
mac_address = self._search_mac_address(common_name)
159182
except (TypeError, IndexError):
160-
logger.warning(
183+
BaseConvertCalledStationIdCommand.logger.warning(
161184
f'Failed to find a MAC address in "{common_name}". '
162185
f"Skipping {radius_session.session_id}!"
163186
)
164-
else:
165-
radius_session.save()
187+
continue
188+
189+
from openwisp_radius.base.models import sanitize_mac_address
190+
191+
radius_session.called_station_id = sanitize_mac_address(mac_address)
192+
radius_session.save()
193+
194+
def _get_unconverted_sessions(self, org, unconverted_ids):
195+
"""
196+
Get unconverted sessions for the given organization and unconverted IDs.
197+
"""
198+
from uuid import UUID
199+
200+
# org might be a string UUID or slug from settings
201+
if isinstance(org, str):
202+
try:
203+
# Try to parse as UUID first
204+
org_uuid = UUID(org)
205+
except ValueError:
206+
# If not a UUID, treat as slug and look up the organization
207+
Organization = swapper.load_model("openwisp_users", "Organization")
208+
try:
209+
organization = Organization.objects.get(slug=org)
210+
org_uuid = organization.id
211+
except Organization.DoesNotExist:
212+
self.logger.warning(f"Organization '{org}' not found")
213+
return RadiusAccounting.objects.none()
214+
else:
215+
# org is already an Organization object
216+
org_uuid = org.id
217+
218+
return RadiusAccounting.objects.filter(
219+
organization_id=org_uuid, called_station_id__in=unconverted_ids
220+
)
166221

167222

168223
# monkey patching for openvpn_status begins

openwisp_radius/migrations/0002_initial_openwisp_radius.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ class Migration(migrations.Migration):
248248
max_length=32,
249249
validators=[
250250
django.core.validators.RegexValidator(
251-
re.compile("^[^\\s/\\.]+$"),
251+
re.compile(r"^[^\\s/\.]+$"),
252252
code="invalid",
253253
message=(
254254
"This value must not contain spaces, "
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Generated by Django 5.2.5 on 2025-09-02 06:37
2+
3+
import re
4+
5+
import django.core.validators
6+
from django.db import migrations
7+
8+
import openwisp_utils.fields
9+
import openwisp_utils.utils
10+
11+
12+
class Migration(migrations.Migration):
13+
14+
dependencies = [
15+
("openwisp_radius", "0040_rename_phonetoken_index"),
16+
]
17+
18+
operations = [
19+
migrations.AlterField(
20+
model_name="organizationradiussettings",
21+
name="token",
22+
field=openwisp_utils.fields.KeyField(
23+
default=openwisp_utils.utils.get_random_key,
24+
help_text=None,
25+
max_length=32,
26+
validators=[
27+
django.core.validators.RegexValidator(
28+
re.compile("^[^\\s/\\.]+$"),
29+
code="invalid",
30+
message="This value must not contain spaces, dots or slashes.",
31+
)
32+
],
33+
),
34+
),
35+
]

openwisp_radius/tests/test_api/test_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,7 @@ def test_api_password_reset(self):
792792
)
793793
self.assertRegex(
794794
"".join(email.alternatives[0][0].splitlines()),
795-
'<a href=".*">.*Reset password.*<\/a>',
795+
r'<a href=".*">.*Reset password.*</a>',
796796
)
797797
self.assertNotIn('<img src=""', email.alternatives[0][0])
798798
url_kwargs = {

0 commit comments

Comments
 (0)