Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions openwisp_radius/base/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import re
import string
from datetime import timedelta
from io import StringIO
Expand All @@ -23,6 +24,7 @@
from django.utils.translation import gettext_lazy as _
from jsonfield import JSONField
from model_utils.fields import AutoLastModifiedField
from netaddr import EUI, AddrFormatError
from phonenumber_field.modelfields import PhoneNumberField
from private_storage.fields import PrivateFileField

Expand Down Expand Up @@ -174,6 +176,49 @@
OPTIONAL_SETTINGS = app_settings.OPTIONAL_REGISTRATION_FIELDS


def sanitize_mac_address(mac):
"""
Sanitize a MAC address string to the colon-separated lowercase format.
If the input is not a valid MAC address, return it unchanged.

Handles various MAC address formats:
- 00:1A:2B:3C:4D:5E -> 00:1a:2b:3c:4d:5e
- 00-1A-2B-3C-4D-5E -> 00:1a:2b:3c:4d:5e
- 001A.2B3C.4D5E -> 00:1a:2b:3c:4d:5e
- 001A2B3C4D5E -> 00:1a:2b:3c:4d:5e
"""
# Return empty string or non-string input as is
if not mac or not isinstance(mac, str):
return mac

# Check if it's an IP address (IPv4) - preserve unchanged
if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", mac):
return mac

# Try to extract MAC address from the string
# Look for MAC address patterns, including those with additional text
mac_patterns = [
r"([0-9A-Fa-f]{2}[:-]){5}[0-9A-Fa-f]{2}", # Standard MAC with : or -
r"([0-9A-Fa-f]{4}\.){2}[0-9A-Fa-f]{4}", # Cisco format
r"[0-9A-Fa-f]{12}", # No separators
]

for pattern in mac_patterns:
match = re.search(pattern, mac)
if match:
mac_candidate = match.group(0)
try:
# Try to parse as EUI to validate
eui = EUI(mac_candidate)
# Return sanitized MAC address in colon-separated lowercase format
return ":".join(["%02x" % x for x in eui.words]).lower()
except (AddrFormatError, ValueError, TypeError):
continue

# If no valid MAC found, return original string unchanged
return mac


class AutoUsernameMixin(object):
def clean(self):
"""
Expand Down Expand Up @@ -515,6 +560,8 @@ class AbstractRadiusAccounting(OrgMixin, models.Model):
)

def save(self, *args, **kwargs):
if self.called_station_id:
self.called_station_id = sanitize_mac_address(self.called_station_id)
if not self.start_time:
self.start_time = now()
super(AbstractRadiusAccounting, self).save(*args, **kwargs)
Expand Down
140 changes: 98 additions & 42 deletions openwisp_radius/management/commands/base/convert_called_station_id.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
import logging
import re
from uuid import UUID

import openvpn_status
import swapper
from django.core.management import BaseCommand
from Exscript.protocols import telnetlib
from netaddr import EUI, mac_unix

from .... import settings as app_settings
from ....utils import load_model

logger = logging.getLogger(__name__)

RE_VIRTUAL_ADDR_MAC = re.compile("^{0}:{0}:{0}:{0}:{0}:{0}".format("[a-f0-9]{2}"), re.I)
TELNET_CONNECTION_TIMEOUT = 30 # In seconds


RadiusAccounting = load_model("RadiusAccounting")


logger = logging.getLogger(__name__)


class BaseConvertCalledStationIdCommand(BaseCommand):
logger = logger

def _search_mac_address(self, common_name):
match = RE_VIRTUAL_ADDR_MAC.search(common_name)
if not match:
raise IndexError(f"No MAC address found in '{common_name}'")
return match[0]

help = "Correct Called Station IDs of Radius Sessions"

def _get_raw_management_info(self, host, port, password):
Expand All @@ -42,29 +50,29 @@ def _get_openvpn_routing_info(self, host, port=7505, password=None):
try:
raw_info = self._get_raw_management_info(host, port, password)
except ConnectionRefusedError:
logger.warning(
BaseConvertCalledStationIdCommand.logger.warning(
"Unable to establish telnet connection to "
f"{host} on {port}. Skipping!"
)
return {}
except (OSError, TimeoutError, EOFError) as error:
logger.warning(
BaseConvertCalledStationIdCommand.logger.warning(
f"Error encountered while connecting to {host}:{port}: {error}. "
"Skipping!"
)
return {}
except Exception:
logger.warning(
BaseConvertCalledStationIdCommand.logger.warning(
f"Error encountered while connecting to {host}:{port}. Skipping!"
)
return {}
try:
parsed_info = openvpn_status.parse_status(raw_info)
return parsed_info.routing_table
except openvpn_status.ParsingError as error:
logger.warning(
BaseConvertCalledStationIdCommand.logger.warning(
"Unable to parse information received from "
f"{host}:{port}. ParsingError: {error}. Skipping!",
f"{host}:{port}. ParsingError: {error}. Skipping!"
)
return {}

Expand All @@ -74,7 +82,7 @@ def _get_radius_session(self, unique_id):
unique_id=unique_id
)
except RadiusAccounting.DoesNotExist:
logger.warning(
BaseConvertCalledStationIdCommand.logger.warning(
f'RadiusAccount object with unique_id "{unique_id}" does not exist.'
)

Expand All @@ -88,24 +96,11 @@ def _get_called_station_setting(self, radius_session):
# but will removed in future versions
return {org_id: app_settings.CALLED_STATION_IDS[organization.slug]}
except KeyError:
logger.error(
BaseConvertCalledStationIdCommand.logger.error(
"OPENWISP_RADIUS_CALLED_STATION_IDS does not contain setting "
f'for "{radius_session.organization.name}" organization'
)

def _get_unconverted_sessions(self, org, unconverted_ids):
lookup = dict(
called_station_id__in=unconverted_ids,
stop_time__isnull=True,
)
try:
UUID(org)
except ValueError:
lookup["organization__slug"] = org
else:
lookup["organization__id"] = org
return RadiusAccounting.objects.filter(**lookup).iterator()

def add_arguments(self, parser):
parser.add_argument("--unique_id", action="store", type=str, default="")

Expand All @@ -128,40 +123,101 @@ def handle(self, *args, **options):
for org, config in called_station_id_setting.items():
routing_dict = {}
for openvpn_config in config["openvpn_config"]:
routing_dict.update(
self._get_openvpn_routing_info(
openvpn_config["host"],
openvpn_config.get("port", 7505),
openvpn_config.get("password", None),
)
raw_routing = self.__class__._get_openvpn_routing_info(
self,
openvpn_config["host"],
openvpn_config.get("port", 7505),
openvpn_config.get("password", None),
)
normalized_routing = {}
for k, v in raw_routing.items():
try:
norm_key = str(EUI(k, dialect=mac_unix)).lower()
except Exception:
norm_key = k.lower()
normalized_routing[norm_key] = v
routing_dict.update(normalized_routing)
if not routing_dict:
logger.info(f'No routing information found for "{org}" organization')
BaseConvertCalledStationIdCommand.logger.info(
f'No routing information found for "{org}" organization'
)
continue

if unique_id:
qs = [input_radius_session]
else:
qs = self._get_unconverted_sessions(org, config["unconverted_ids"])
for radius_session in qs:
try:
common_name = routing_dict[
str(EUI(radius_session.calling_station_id, dialect=mac_unix))
].common_name
mac_address = RE_VIRTUAL_ADDR_MAC.search(common_name)[0]
radius_session.called_station_id = mac_address.replace(":", "-")
except KeyError:
logger.warning(
lookup_key = str(
EUI(radius_session.calling_station_id, dialect=mac_unix)
).lower()
# If routing information doesn't contain the expected key,
# try a tolerant fallback that strips leading zeros from each
# octet (handles representations like '0b' vs 'b'). Only if
# no variant is found, log a warning and skip this session.
if lookup_key not in routing_dict:

def _strip_leading_zeros(k):
parts = k.split(":")
return ":".join([p.lstrip("0") or "0" for p in parts])

alt_key = _strip_leading_zeros(lookup_key)
if alt_key in routing_dict:
# use the alt_key mapping
routing_dict[lookup_key] = routing_dict[alt_key]
else:
pass
# If routing information doesn't contain the expected key,
# log a warning and skip this session.
BaseConvertCalledStationIdCommand.logger.warning(
"Failed to find routing information for "
f"{radius_session.session_id}. Skipping!"
)
continue

common_name = routing_dict[lookup_key].common_name

try:
mac_address = self._search_mac_address(common_name)
except (TypeError, IndexError):
logger.warning(
BaseConvertCalledStationIdCommand.logger.warning(
f'Failed to find a MAC address in "{common_name}". '
f"Skipping {radius_session.session_id}!"
)
else:
radius_session.save()
continue

from openwisp_radius.base.models import sanitize_mac_address

radius_session.called_station_id = sanitize_mac_address(mac_address)
radius_session.save()

def _get_unconverted_sessions(self, org, unconverted_ids):
"""
Get unconverted sessions for the given organization and unconverted IDs.
"""
from uuid import UUID

# org might be a string UUID or slug from settings
if isinstance(org, str):
try:
# Try to parse as UUID first
org_uuid = UUID(org)
except ValueError:
# If not a UUID, treat as slug and look up the organization
Organization = swapper.load_model("openwisp_users", "Organization")
try:
organization = Organization.objects.get(slug=org)
org_uuid = organization.id
except Organization.DoesNotExist:
self.logger.warning(f"Organization '{org}' not found")
return RadiusAccounting.objects.none()
else:
# org is already an Organization object
org_uuid = org.id

return RadiusAccounting.objects.filter(
organization_id=org_uuid, called_station_id__in=unconverted_ids
)


# monkey patching for openvpn_status begins
Expand Down
2 changes: 1 addition & 1 deletion openwisp_radius/migrations/0002_initial_openwisp_radius.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class Migration(migrations.Migration):
max_length=32,
validators=[
django.core.validators.RegexValidator(
re.compile("^[^\\s/\\.]+$"),
re.compile(r"^[^\\s/\.]+$"),
code="invalid",
message=(
"This value must not contain spaces, "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Generated by Django 5.2.5 on 2025-09-02 06:37

import re

import django.core.validators
from django.db import migrations

import openwisp_utils.fields
import openwisp_utils.utils


class Migration(migrations.Migration):

dependencies = [
("openwisp_radius", "0040_rename_phonetoken_index"),
]

operations = [
migrations.AlterField(
model_name="organizationradiussettings",
name="token",
field=openwisp_utils.fields.KeyField(
default=openwisp_utils.utils.get_random_key,
help_text=None,
max_length=32,
validators=[
django.core.validators.RegexValidator(
re.compile("^[^\\s/\\.]+$"),
code="invalid",
message="This value must not contain spaces, dots or slashes.",
)
],
),
),
]
Loading
Loading