Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 136 additions & 4 deletions src/anomalib/utils/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,136 @@
across different working directories.
"""

import logging
import os
import re
import shutil
import sys
from contextlib import suppress
from pathlib import Path

logger = logging.getLogger(__name__)


def _validate_windows_path(path: Path) -> bool:
"""Validate that a path is safe for use in Windows commands.

Args:
path: Path to validate

Returns:
True if path is safe, False otherwise
"""
path_str = str(path)

# Check for shell metacharacters that could be dangerous
dangerous_chars = {"&", "|", ";", "<", ">", "^", '"', "'", "`", "$", "(", ")", "*", "?", "[", "]", "{", "}"}
# Check for command injection patterns
injection_patterns = ["&&", "||", ";", "&", "|"]

# Perform all validation checks
if (
any(char in path_str for char in dangerous_chars)
or any(pattern in path_str for pattern in injection_patterns)
or "\x00" in path_str
or len(path_str) > 260 # Windows MAX_PATH
):
return False

# Ensure the path exists and is actually a directory (for target)
# or that its parent exists (for tmp)
try:
return path.is_dir() if path.exists() else path.parent.exists()
except (OSError, ValueError):
return False


def _is_windows_junction(p: Path) -> bool:
"""Return True if path is a directory junction."""
if not sys.platform.startswith("win"):
return False

try:
# On Windows, check if it's a directory that's not a symlink
# Junctions appear as directories but resolve to different paths
return p.exists() and p.is_dir() and not p.is_symlink() and p.resolve() != p
except (OSError, RuntimeError):
# Handle cases where path operations fail
return False


def _safe_remove_path(p: Path) -> None:
"""Remove file/dir/symlink/junction at p without following links."""
if not os.path.lexists(str(p)):
return
with suppress(FileNotFoundError):
if p.is_symlink():
p.unlink()
elif _is_windows_junction(p):
# Use rmdir for Windows junctions
p.rmdir()
elif p.is_dir():
shutil.rmtree(p)
else:
p.unlink()


def _make_latest_windows(latest: Path, target: Path) -> None:
# Clean previous latest (symlink/junction/dir/file)
_safe_remove_path(latest)

tmp = latest.with_name(latest.name + "_tmp")
_safe_remove_path(tmp)

# Try creating a directory junction using native Python API
try:
# Use Path.symlink_to with target_is_directory=True for directory junction on Windows
# This creates a junction point that doesn't require admin privileges
tmp.symlink_to(target.resolve(), target_is_directory=True)
except (OSError, NotImplementedError):
# Try using Windows mklink command via subprocess
try:
import subprocess

# Note: Using subprocess with mklink is safe here as we control
# the command and arguments. This is a standard Windows command.
if not _validate_windows_path(tmp) or not _validate_windows_path(target):
logger.warning(
"Warning: Unsafe characters detected in paths. Falling back to text pointer file for 'latest'.",
)
msg = f"Unsafe path detected: {tmp} -> {target}"
raise ValueError(msg)
result = subprocess.run( # noqa: S603
[ # noqa: S607
"cmd",
"/c",
"mklink",
"/J",
str(tmp),
str(target.resolve()),
],
capture_output=True,
text=True,
check=False,
)
Comment on lines +134 to +146
Copy link

Copilot AI Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using subprocess with shell commands poses security risks. While path validation is performed, consider using the Windows API directly through ctypes or a dedicated library for creating junctions to eliminate subprocess usage entirely.

Copilot uses AI. Check for mistakes.

if result.returncode == 0 and tmp.exists():
tmp.replace(latest)
return
except (subprocess.SubprocessError, OSError):
# Subprocess failed, fall through to fallback
pass
else:
# Only reached if symlink creation succeeded
tmp.replace(latest)
return

# Final fallback: create a text file indicating the latest version
# This preserves the intended behavior without breaking the system
latest.mkdir(exist_ok=True)
version_file = latest / ".version_pointer"
version_file.write_text(str(target.resolve()))


def create_versioned_dir(root_dir: str | Path) -> Path:
"""Create a new version directory and update the ``latest`` symbolic link.
Expand Down Expand Up @@ -100,11 +227,16 @@ def create_versioned_dir(root_dir: str | Path) -> Path:

# Update the 'latest' symbolic link to point to the new version directory
latest_link_path = root_dir / "latest"
if latest_link_path.is_symlink() or latest_link_path.exists():
latest_link_path.unlink()
latest_link_path.symlink_to(new_version_dir, target_is_directory=True)
if sys.platform.startswith("win"):
_make_latest_windows(latest_link_path, new_version_dir)
else:
if latest_link_path.is_symlink() or latest_link_path.exists():
latest_link_path.unlink()
latest_link_path.symlink_to(new_version_dir, target_is_directory=True)

return latest_link_path
# Return the versioned directory path, not the latest link
# This ensures training saves to the versioned directory directly
return new_version_dir


def convert_to_snake_case(s: str) -> str:
Expand Down