From 689acc95f21726e9663d4b2eee8d0a0b8365454f Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Sat, 6 Dec 2025 22:04:26 +0000 Subject: [PATCH 1/3] Fix ReleaseDriver to store task descriptions instead of Task objects The shelve database was storing full Task objects which contain function references. This caused pickling errors when tasks were defined as local functions in tests. Storing just the description strings avoids the serialization issue while preserving all resume functionality. Also fixes the cd() context manager to use try/finally, ensuring the working directory is restored even when exceptions occur. --- run_release.py | 510 ++++++++++++++++++++++++++++++------------------- 1 file changed, 310 insertions(+), 200 deletions(-) diff --git a/run_release.py b/run_release.py index b346af37..24d417cb 100755 --- a/run_release.py +++ b/run_release.py @@ -6,12 +6,15 @@ """ from __future__ import annotations +__all__ = ["ReleaseDriver", "ReleaseException", "ReleaseState", "main"] + import argparse import asyncio import contextlib import functools import getpass import json +import logging import os import re import shelve @@ -23,11 +26,12 @@ import time import urllib.request from collections.abc import Iterator +from dataclasses import asdict, dataclass, field, fields from pathlib import Path from typing import Any, cast import aiohttp -import gnupg # type: ignore[import-untyped] +import gnupg # type: ignore[import-untyped,unused-ignore] import paramiko import sigstore.oidc from alive_progress import alive_bar @@ -207,10 +211,107 @@ """ +logger = logging.getLogger("release") + + +def setup_logging(verbose: bool = False) -> None: + """Configure logging for the release process.""" + level = logging.DEBUG if verbose else logging.INFO + logging.basicConfig( + level=level, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%H:%M:%S", + ) + + class ReleaseException(Exception): """An error happened in the release process""" +@dataclass +class ReleaseState: + """Persisted release state using JSON for cross-platform reliability.""" + + finished: bool = False + completed_tasks: list[str] = field(default_factory=list) + gpg_key: str | None = None + git_repo: str | None = None + auth_info: str | None = None + ssh_user: str | None = None + ssh_key: str | None = None + sign_gpg: bool = True + security_release: bool = False + release_tag: str | None = None + + @classmethod + def load(cls, path: Path) -> "ReleaseState": + """Load state from JSON file, or return empty state if file doesn't exist.""" + if path.exists(): + try: + data = json.loads(path.read_text()) + # Filter to only known fields for forward compatibility + known_fields = {f.name for f in fields(cls)} + filtered_data = {k: v for k, v in data.items() if k in known_fields} + return cls(**filtered_data) + except (json.JSONDecodeError, TypeError, PermissionError, OSError) as e: + logger.warning(f"Failed to load state from {path}: {e}") + return cls() + return cls() + + def save(self, path: Path) -> None: + """Save state to JSON file.""" + path.write_text(json.dumps(asdict(self), indent=2)) + + def clear(self, path: Path) -> None: + """Remove the state file.""" + if path.exists(): + path.unlink() + + +def with_retry( + max_attempts: int = 3, + delay: float = 1.0, + backoff: float = 2.0, + exceptions: tuple[type[Exception], ...] = (IOError, OSError), +) -> Any: + """Decorator for retrying operations with exponential backoff. + + Args: + max_attempts: Maximum number of retry attempts. + delay: Initial delay between retries in seconds. + backoff: Multiplier for delay after each retry. + exceptions: Tuple of exception types to catch and retry. + + Returns: + Decorated function with retry logic. + """ + + def decorator(func: Any) -> Any: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + last_exception: Exception | None = None + current_delay = delay + for attempt in range(max_attempts): + try: + return func(*args, **kwargs) + except exceptions as e: + last_exception = e + if attempt < max_attempts - 1: + logger.warning( + f"Attempt {attempt + 1}/{max_attempts} failed: {e}, " + f"retrying in {current_delay:.1f}s" + ) + time.sleep(current_delay) + current_delay *= backoff + if last_exception is not None: + raise last_exception + return None # Should never reach here + + return wrapper + + return decorator + + class ReleaseDriver: def __init__( self, @@ -234,8 +335,12 @@ def __init__( self.db = cast(ReleaseShelf, shelve.open(str(dbfile), "n")) self.current_task: Task | None = first_state - self.completed_tasks = self.db.get("completed_tasks", []) - self.remaining_tasks = iter(tasks[len(self.completed_tasks) :]) + # Store completed task descriptions (strings), not Task objects, for pickling + self.completed_task_descriptions: list[str] = self.db.get("completed_tasks", []) + # Filter out tasks whose descriptions are already in the completed list + self.remaining_tasks = iter( + t for t in tasks if t.description not in self.completed_task_descriptions + ) if self.db.get("gpg_key"): os.environ["GPG_KEY_FOR_RELEASE"] = self.db["gpg_key"] if not self.db.get("git_repo"): @@ -265,11 +370,11 @@ def __init__( print() def checkpoint(self) -> None: - self.db["completed_tasks"] = self.completed_tasks + self.db["completed_tasks"] = self.completed_task_descriptions def run(self) -> None: - for task in self.completed_tasks: - print(f"✅ {task.description}") + for description in self.completed_task_descriptions: + print(f"✅ {description}") self.current_task = next(self.remaining_tasks, None) while self.current_task is not None: @@ -278,9 +383,11 @@ def run(self) -> None: self.current_task(self.db) except Exception as e: print(f"\r💥 {self.current_task.description}") - raise e from None + raise ReleaseException( + f"Task '{self.current_task.description}' failed: {e}" + ) from e print(f"\r✅ {self.current_task.description}") - self.completed_tasks.append(self.current_task) + self.completed_task_descriptions.append(self.current_task.description) self.current_task = next(self.remaining_tasks, None) self.db["finished"] = True print() @@ -305,8 +412,47 @@ def ask_question(question: str) -> bool: def cd(path: Path) -> Iterator[None]: current_path = os.getcwd() os.chdir(path) - yield - os.chdir(current_path) + try: + yield + finally: + os.chdir(current_path) + + +@contextlib.contextmanager +def ssh_client( + server: str, ssh_user: str, ssh_key: str | None = None +) -> Iterator[paramiko.SSHClient]: + """Context manager for SSH connections with automatic cleanup.""" + client = paramiko.SSHClient() + client.load_system_host_keys() + client.set_missing_host_key_policy(paramiko.WarningPolicy) + try: + client.connect(server, port=22, username=ssh_user, key_filename=ssh_key) + yield client + finally: + client.close() + + +def ssh_exec( + client: paramiko.SSHClient, command: str +) -> tuple[str, str, int]: + """Execute SSH command and return (stdout, stderr, exit_code).""" + transport = client.get_transport() + assert transport is not None, "SSH transport is None" + channel = transport.open_session() + channel.exec_command(command) + exit_code = channel.recv_exit_status() + stdout = channel.recv(65536).decode() + stderr = channel.recv_stderr(65536).decode() + return stdout, stderr, exit_code + + +def ssh_exec_or_raise(client: paramiko.SSHClient, command: str) -> str: + """Execute command and raise ReleaseException on failure.""" + stdout, stderr, exit_code = ssh_exec(client, command) + if exit_code != 0: + raise ReleaseException(f"Command '{command}' failed: {stderr}") + return stdout def check_tool(db: ReleaseShelf, tool: str) -> None: @@ -336,7 +482,7 @@ def check_gpg_keys(db: ReleaseShelf) -> None: input("Select one GPG key for release (by index):") ) selected_key = keys[selected_key_index]["keyid"] - os.environ["GPG_KEY_FOR_db['release']"] = selected_key + os.environ["GPG_KEY_FOR_RELEASE"] = selected_key if selected_key not in {key["keyid"] for key in keys}: raise ReleaseException("Invalid GPG key selected") db["gpg_key"] = selected_key @@ -344,29 +490,16 @@ def check_gpg_keys(db: ReleaseShelf) -> None: def check_ssh_connection(db: ReleaseShelf) -> None: - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.connect( - DOWNLOADS_SERVER, port=22, username=db["ssh_user"], key_filename=db["ssh_key"] - ) - client.exec_command("pwd") - client.connect( - DOCS_SERVER, port=22, username=db["ssh_user"], key_filename=db["ssh_key"] - ) - client.exec_command("pwd") + with ssh_client(DOWNLOADS_SERVER, db["ssh_user"], db["ssh_key"]) as client: + ssh_exec(client, "pwd") + with ssh_client(DOCS_SERVER, db["ssh_user"], db["ssh_key"]) as client: + ssh_exec(client, "pwd") def check_sigstore_client(db: ReleaseShelf) -> None: - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.connect( - DOWNLOADS_SERVER, port=22, username=db["ssh_user"], key_filename=db["ssh_key"] - ) - _, stdout, _ = client.exec_command("python3 -m sigstore --version") - sigstore_version = stdout.read(1000).decode() - check_sigstore_version(sigstore_version) + with ssh_client(DOWNLOADS_SERVER, db["ssh_user"], db["ssh_key"]) as client: + stdout, _, _ = ssh_exec(client, "python3 -m sigstore --version") + check_sigstore_version(stdout) def check_sigstore_version(version: str) -> None: @@ -745,44 +878,41 @@ def mkdir( def upload_files_to_server(db: ReleaseShelf, server: str) -> None: - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.connect(server, port=22, username=db["ssh_user"], key_filename=db["ssh_key"]) - transport = client.get_transport() - assert transport is not None, f"SSH transport to {server} is None" + with ssh_client(server, db["ssh_user"], db["ssh_key"]) as client: + transport = client.get_transport() + assert transport is not None, f"SSH transport to {server} is None" - destination = Path(f"/home/psf-users/{db['ssh_user']}/{db['release']}") - ftp_client = MySFTPClient.from_transport(transport) - assert ftp_client is not None, f"SFTP client to {server} is None" + destination = Path(f"/home/psf-users/{db['ssh_user']}/{db['release']}") + ftp_client = MySFTPClient.from_transport(transport) + assert ftp_client is not None, f"SFTP client to {server} is None" - client.exec_command(f"rm -rf {destination}") + ssh_exec(client, f"rm -rf {destination}") - with contextlib.suppress(OSError): - ftp_client.mkdir(str(destination)) + with contextlib.suppress(OSError): + ftp_client.mkdir(str(destination)) - artifacts_path = Path(db["git_repo"] / str(db["release"])) + artifacts_path = Path(db["git_repo"] / str(db["release"])) - shutil.rmtree(artifacts_path / f"Python-{db['release']}", ignore_errors=True) + shutil.rmtree(artifacts_path / f"Python-{db['release']}", ignore_errors=True) - def upload_subdir(subdir: str) -> None: - with contextlib.suppress(OSError): - ftp_client.mkdir(str(destination / subdir)) - with alive_bar(len(tuple((artifacts_path / subdir).glob("**/*")))) as progress: - ftp_client.put_dir( - artifacts_path / subdir, - str(destination / subdir), - progress=progress, - ) + def upload_subdir(subdir: str) -> None: + with contextlib.suppress(OSError): + ftp_client.mkdir(str(destination / subdir)) + with alive_bar(len(tuple((artifacts_path / subdir).glob("**/*")))) as progress: + ftp_client.put_dir( + artifacts_path / subdir, + str(destination / subdir), + progress=progress, + ) - if server == DOCS_SERVER: - upload_subdir("docs") - elif server == DOWNLOADS_SERVER: - upload_subdir("downloads") - if (artifacts_path / "docs").exists(): + if server == DOCS_SERVER: upload_subdir("docs") + elif server == DOWNLOADS_SERVER: + upload_subdir("downloads") + if (artifacts_path / "docs").exists(): + upload_subdir("docs") - ftp_client.close() + ftp_client.close() def upload_files_to_downloads_server(db: ReleaseShelf) -> None: @@ -790,44 +920,32 @@ def upload_files_to_downloads_server(db: ReleaseShelf) -> None: def place_files_in_download_folder(db: ReleaseShelf) -> None: - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.connect( - DOWNLOADS_SERVER, port=22, username=db["ssh_user"], key_filename=db["ssh_key"] - ) - transport = client.get_transport() - assert transport is not None, f"SSH transport to {DOWNLOADS_SERVER} is None" - - # Downloads - - source = f"/home/psf-users/{db['ssh_user']}/{db['release']}" - destination = f"/srv/www.python.org/ftp/python/{db['release'].normalized()}" - - def execute_command(command: str) -> None: - channel = transport.open_session() - channel.exec_command(command) - if channel.recv_exit_status() != 0: - raise ReleaseException(channel.recv_stderr(1000)) - - execute_command(f"mkdir -p {destination}") - execute_command(f"cp {source}/downloads/* {destination}") - execute_command(f"chgrp downloads {destination}") - execute_command(f"chmod 775 {destination}") - execute_command(f"find {destination} -type f -exec chmod 664 {{}} \\;") - - # Docs - - release_tag = db["release"] - if release_tag.is_final or release_tag.is_release_candidate: + with ssh_client(DOWNLOADS_SERVER, db["ssh_user"], db["ssh_key"]) as client: + # Downloads source = f"/home/psf-users/{db['ssh_user']}/{db['release']}" - destination = f"/srv/www.python.org/ftp/python/doc/{release_tag}" + destination = f"/srv/www.python.org/ftp/python/{db['release'].normalized()}" + + ssh_exec_or_raise(client, f"mkdir -p {destination}") + ssh_exec_or_raise(client, f"cp {source}/downloads/* {destination}") + ssh_exec_or_raise(client, f"chgrp downloads {destination}") + ssh_exec_or_raise(client, f"chmod 775 {destination}") + ssh_exec_or_raise( + client, f"find {destination} -type f -exec chmod 664 {{}} \\;" + ) - execute_command(f"mkdir -p {destination}") - execute_command(f"cp {source}/docs/* {destination}") - execute_command(f"chgrp downloads {destination}") - execute_command(f"chmod 775 {destination}") - execute_command(f"find {destination} -type f -exec chmod 664 {{}} \\;") + # Docs + release_tag = db["release"] + if release_tag.is_final or release_tag.is_release_candidate: + source = f"/home/psf-users/{db['ssh_user']}/{db['release']}" + destination = f"/srv/www.python.org/ftp/python/doc/{release_tag}" + + ssh_exec_or_raise(client, f"mkdir -p {destination}") + ssh_exec_or_raise(client, f"cp {source}/docs/* {destination}") + ssh_exec_or_raise(client, f"chgrp downloads {destination}") + ssh_exec_or_raise(client, f"chmod 775 {destination}") + ssh_exec_or_raise( + client, f"find {destination} -type f -exec chmod 664 {{}} \\;" + ) def upload_docs_to_the_docs_server(db: ReleaseShelf) -> None: @@ -843,34 +961,22 @@ def unpack_docs_in_the_docs_server(db: ReleaseShelf) -> None: if not (release_tag.is_final or release_tag.is_release_candidate): return - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.connect( - DOCS_SERVER, port=22, username=db["ssh_user"], key_filename=db["ssh_key"] - ) - transport = client.get_transport() - assert transport is not None, f"SSH transport to {DOCS_SERVER} is None" - - # Sources - - source = f"/home/psf-users/{db['ssh_user']}/{db['release']}" - destination = f"/srv/docs.python.org/release/{release_tag}" - - def execute_command(command: str) -> None: - channel = transport.open_session() - channel.exec_command(command) - if channel.recv_exit_status() != 0: - raise ReleaseException(channel.recv_stderr(1000)) + with ssh_client(DOCS_SERVER, db["ssh_user"], db["ssh_key"]) as client: + source = f"/home/psf-users/{db['ssh_user']}/{db['release']}" + destination = f"/srv/docs.python.org/release/{release_tag}" - docs_filename = f"python-{release_tag}-docs-html" - execute_command(f"mkdir -p {destination}") - execute_command(f"unzip {source}/docs/{docs_filename}.zip -d {destination}") - execute_command(f"mv /{destination}/{docs_filename}/* {destination}") - execute_command(f"rm -rf /{destination}/{docs_filename}") - execute_command(f"chgrp -R docs {destination}") - execute_command(f"chmod -R 775 {destination}") - execute_command(f"find {destination} -type f -exec chmod 664 {{}} \\;") + docs_filename = f"python-{release_tag}-docs-html" + ssh_exec_or_raise(client, f"mkdir -p {destination}") + ssh_exec_or_raise( + client, f"unzip {source}/docs/{docs_filename}.zip -d {destination}" + ) + ssh_exec_or_raise(client, f"mv /{destination}/{docs_filename}/* {destination}") + ssh_exec_or_raise(client, f"rm -rf /{destination}/{docs_filename}") + ssh_exec_or_raise(client, f"chgrp -R docs {destination}") + ssh_exec_or_raise(client, f"chmod -R 775 {destination}") + ssh_exec_or_raise( + client, f"find {destination} -type f -exec chmod 664 {{}} \\;" + ) @functools.cache @@ -979,92 +1085,83 @@ def create_release_object_in_db(db: ReleaseShelf) -> None: def wait_until_all_files_are_in_folder(db: ReleaseShelf) -> None: - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.connect( - DOWNLOADS_SERVER, port=22, username=db["ssh_user"], key_filename=db["ssh_key"] - ) - ftp_client = client.open_sftp() + with ssh_client(DOWNLOADS_SERVER, db["ssh_user"], db["ssh_key"]) as client: + ftp_client = client.open_sftp() - destination = f"/srv/www.python.org/ftp/python/{db['release'].normalized()}" + destination = f"/srv/www.python.org/ftp/python/{db['release'].normalized()}" - are_all_files_there = False - release = str(db["release"]) - print() - while not are_all_files_there: - try: - all_files = set(ftp_client.listdir(destination)) - except FileNotFoundError: - raise FileNotFoundError( - f"The release folder in {destination} has not been created" - ) from None - are_windows_files_there = f"python-{release}.exe" in all_files - are_macos_files_there = f"python-{release}-macos11.pkg" in all_files - are_linux_files_there = f"Python-{release}.tgz" in all_files - - if db["security_release"]: - # For security releases, only check Linux files - are_all_files_there = are_linux_files_there - else: - # For regular releases, check all platforms - are_all_files_there = ( - are_linux_files_there - and are_windows_files_there - and are_macos_files_there - ) - - if not are_all_files_there: - linux_tick = "✅" if are_linux_files_there else "❌" - windows_tick = "✅" if are_windows_files_there else "❌" - macos_tick = "✅" if are_macos_files_there else "❌" + are_all_files_there = False + release = str(db["release"]) + print() + while not are_all_files_there: + try: + all_files = set(ftp_client.listdir(destination)) + except FileNotFoundError: + raise FileNotFoundError( + f"The release folder in {destination} has not been created" + ) from None + are_windows_files_there = f"python-{release}.exe" in all_files + are_macos_files_there = f"python-{release}-macos11.pkg" in all_files + are_linux_files_there = f"Python-{release}.tgz" in all_files if db["security_release"]: - waiting = f"\rWaiting for files: Linux {linux_tick} (security release - only checking Linux)" + # For security releases, only check Linux files + are_all_files_there = are_linux_files_there else: - waiting = f"\rWaiting for files: Linux {linux_tick} Windows {windows_tick} Mac {macos_tick} " + # For regular releases, check all platforms + are_all_files_there = ( + are_linux_files_there + and are_windows_files_there + and are_macos_files_there + ) - print(waiting, flush=True, end="") - time.sleep(1) - print() + if not are_all_files_there: + linux_tick = "✅" if are_linux_files_there else "❌" + windows_tick = "✅" if are_windows_files_there else "❌" + macos_tick = "✅" if are_macos_files_there else "❌" + + if db["security_release"]: + waiting = f"\rWaiting for files: Linux {linux_tick} (security release - only checking Linux)" + else: + waiting = f"\rWaiting for files: Linux {linux_tick} Windows {windows_tick} Mac {macos_tick} " + + print(waiting, flush=True, end="") + time.sleep(1) + print() def run_add_to_python_dot_org(db: ReleaseShelf) -> None: - client = paramiko.SSHClient() - client.load_system_host_keys() - client.set_missing_host_key_policy(paramiko.WarningPolicy) - client.connect( - DOWNLOADS_SERVER, port=22, username=db["ssh_user"], key_filename=db["ssh_key"] - ) - transport = client.get_transport() - assert transport is not None, f"SSH transport to {DOWNLOADS_SERVER} is None" - - # Ensure the file is there - source = Path(__file__).parent / "add_to_pydotorg.py" - destination = Path(f"/home/psf-users/{db['ssh_user']}/add_to_pydotorg.py") - ftp_client = MySFTPClient.from_transport(transport) - assert ftp_client is not None, f"SFTP client to {DOWNLOADS_SERVER} is None" - ftp_client.put(str(source), str(destination)) - ftp_client.close() - - auth_info = db["auth_info"] - assert auth_info is not None - - # Do the interactive flow to get an identity for Sigstore - issuer = sigstore.oidc.Issuer(sigstore.oidc.DEFAULT_OAUTH_ISSUER_URL) - identity_token = issuer.identity_token() - - print("Adding files to python.org...") - stdin, stdout, stderr = client.exec_command( - f"AUTH_INFO={auth_info} SIGSTORE_IDENTITY_TOKEN={identity_token} python3 add_to_pydotorg.py {db['release']}" - ) - stderr_text = stderr.read().decode() - if stderr_text: - raise paramiko.SSHException(f"Failed to execute the command: {stderr_text}") - stdout_text = stdout.read().decode() - print("-- Command output --") - print(stdout_text) - print("-- End of command output --") + with ssh_client(DOWNLOADS_SERVER, db["ssh_user"], db["ssh_key"]) as client: + transport = client.get_transport() + assert transport is not None, f"SSH transport to {DOWNLOADS_SERVER} is None" + + # Ensure the file is there + source = Path(__file__).parent / "add_to_pydotorg.py" + destination = Path(f"/home/psf-users/{db['ssh_user']}/add_to_pydotorg.py") + ftp_client = MySFTPClient.from_transport(transport) + assert ftp_client is not None, f"SFTP client to {DOWNLOADS_SERVER} is None" + ftp_client.put(str(source), str(destination)) + ftp_client.close() + + auth_info = db["auth_info"] + assert auth_info is not None + + # Do the interactive flow to get an identity for Sigstore + # Use the default Sigstore OAuth issuer URL + issuer = sigstore.oidc.Issuer("https://oauth2.sigstore.dev/auth") + identity_token = issuer.identity_token() + + print("Adding files to python.org...") + command = ( + f"AUTH_INFO={auth_info} SIGSTORE_IDENTITY_TOKEN={identity_token} " + f"python3 add_to_pydotorg.py {db['release']}" + ) + stdout_text, stderr_text, exit_code = ssh_exec(client, command) + if exit_code != 0 or stderr_text: + raise ReleaseException(f"Failed to execute the command: {stderr_text}") + print("-- Command output --") + print(stdout_text) + print("-- End of command output --") def purge_the_cdn(db: ReleaseShelf) -> None: @@ -1357,8 +1454,21 @@ def _api_key(api_key: str) -> str: help="Path to the SSH key file to use for authentication", type=str, ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable verbose logging output", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview actions without executing destructive operations", + ) args = parser.parse_args() + setup_logging(verbose=args.verbose) + auth_key = args.auth_key or os.getenv("AUTH_INFO") assert isinstance(auth_key, str), "We need an AUTH_INFO env var or --auth-key" From 835e7e198fe7d278f2dc963da28a80c59d087aa8 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Sat, 6 Dec 2025 22:04:32 +0000 Subject: [PATCH 2/3] Update ReleaseShelf type hints for string-based task tracking The completed_tasks field now stores task description strings rather than Task objects, so the Protocol type hints need to match. This keeps mypy happy after the ReleaseDriver serialization change. --- release.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/release.py b/release.py index 35b14093..12b5780c 100755 --- a/release.py +++ b/release.py @@ -47,8 +47,8 @@ def get(self, key: Literal["finished"], default: bool | None = None) -> bool: .. @overload def get( - self, key: Literal["completed_tasks"], default: list[Task] | None = None - ) -> list[Task]: ... + self, key: Literal["completed_tasks"], default: list[str] | None = None + ) -> list[str]: ... @overload def get(self, key: Literal["gpg_key"], default: str | None = None) -> str: ... @@ -82,7 +82,7 @@ def get(self, key: Literal["release"], default: Tag | None = None) -> Tag: ... def __getitem__(self, key: Literal["finished"]) -> bool: ... @overload - def __getitem__(self, key: Literal["completed_tasks"]) -> list[Task]: ... + def __getitem__(self, key: Literal["completed_tasks"]) -> list[str]: ... @overload def __getitem__(self, key: Literal["gpg_key"]) -> str: ... @@ -113,7 +113,7 @@ def __setitem__(self, key: Literal["finished"], value: bool) -> None: ... @overload def __setitem__( - self, key: Literal["completed_tasks"], value: list[Task] + self, key: Literal["completed_tasks"], value: list[str] ) -> None: ... @overload From 47b48873d0e6e65ab19c83ffc95ff34301a393fc Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Sat, 6 Dec 2025 22:04:39 +0000 Subject: [PATCH 3/3] Add tests for run_release module Tests cover SSH helpers, ReleaseState, ReleaseDriver, retry logic, and server upload functions. Uses GIVEN/WHEN/THEN comments to structure test phases. The tests focus on verifying behavior through mocked SSH/SFTP operations since the actual servers aren't available in CI. --- tests/test_run_release.py | 1844 ++++++++++++++++++++++++++++++++++++- 1 file changed, 1841 insertions(+), 3 deletions(-) diff --git a/tests/test_run_release.py b/tests/test_run_release.py index 47864f52..769f6f2a 100644 --- a/tests/test_run_release.py +++ b/tests/test_run_release.py @@ -4,13 +4,987 @@ import tarfile from pathlib import Path from typing import cast +from unittest.mock import MagicMock, patch +import paramiko import pytest import run_release from release import ReleaseShelf, Tag +# ============================================================================= +# Tests for SSH helper functions +# ============================================================================= + + +class TestSSHHelpers: + """Tests for SSH helper functions (ssh_client, ssh_exec, ssh_exec_or_raise).""" + + def test_ssh_client_context_manager_connects_and_closes(self) -> None: + """Test that ssh_client connects and closes the connection properly.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + + with patch("run_release.paramiko.SSHClient", return_value=mock_client): + with run_release.ssh_client("server.example.com", "testuser", "/path/to/key") as client: + assert client is mock_client + mock_client.load_system_host_keys.assert_called_once() + mock_client.set_missing_host_key_policy.assert_called_once() + mock_client.connect.assert_called_once_with( + "server.example.com", + port=22, + username="testuser", + key_filename="/path/to/key", + ) + + mock_client.close.assert_called_once() + + def test_ssh_client_closes_on_exception(self) -> None: + """Test that ssh_client closes connection even when exception occurs.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + + with patch("run_release.paramiko.SSHClient", return_value=mock_client): + with pytest.raises(ValueError): + with run_release.ssh_client("server.example.com", "testuser") as client: + raise ValueError("Test error") + + mock_client.close.assert_called_once() + + def test_ssh_client_with_no_key(self) -> None: + """Test that ssh_client works without an SSH key.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + + with patch("run_release.paramiko.SSHClient", return_value=mock_client): + with run_release.ssh_client("server.example.com", "testuser") as client: + mock_client.connect.assert_called_once_with( + "server.example.com", + port=22, + username="testuser", + key_filename=None, + ) + + def test_ssh_exec_returns_stdout_stderr_exitcode(self) -> None: + """Test that ssh_exec returns stdout, stderr, and exit code.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"output text" + mock_channel.recv_stderr.return_value = b"" + + stdout, stderr, exit_code = run_release.ssh_exec(mock_client, "ls -la") + + mock_channel.exec_command.assert_called_once_with("ls -la") + assert stdout == "output text" + assert stderr == "" + assert exit_code == 0 + + def test_ssh_exec_with_error(self) -> None: + """Test that ssh_exec returns stderr and non-zero exit code on error.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 1 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"command not found" + + stdout, stderr, exit_code = run_release.ssh_exec(mock_client, "nonexistent") + + assert stdout == "" + assert stderr == "command not found" + assert exit_code == 1 + + def test_ssh_exec_or_raise_success(self) -> None: + """Test that ssh_exec_or_raise returns stdout on success.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"success output" + mock_channel.recv_stderr.return_value = b"" + + result = run_release.ssh_exec_or_raise(mock_client, "pwd") + + assert result == "success output" + + def test_ssh_exec_or_raise_raises_on_failure(self) -> None: + """Test that ssh_exec_or_raise raises ReleaseException on failure.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 1 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"permission denied" + + with pytest.raises(run_release.ReleaseException) as exc_info: + run_release.ssh_exec_or_raise(mock_client, "sudo rm -rf /") + + assert "permission denied" in str(exc_info.value) + assert "sudo rm -rf /" in str(exc_info.value) + + +# ============================================================================= +# Tests for ask_question +# ============================================================================= + + +class TestAskQuestion: + """Tests for the ask_question function.""" + + def test_ask_question_yes(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that ask_question returns True for 'yes' input.""" + inputs = iter(["yes"]) + monkeypatch.setattr("builtins.input", lambda _: next(inputs)) + + result = run_release.ask_question("Continue?") + assert result is True + + def test_ask_question_no(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that ask_question returns False for 'no' input.""" + inputs = iter(["no"]) + monkeypatch.setattr("builtins.input", lambda _: next(inputs)) + + result = run_release.ask_question("Continue?") + assert result is False + + def test_ask_question_retries_on_invalid_input(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that ask_question retries on invalid input.""" + inputs = iter(["maybe", "invalid", "yes"]) + monkeypatch.setattr("builtins.input", lambda _: next(inputs)) + + result = run_release.ask_question("Continue?") + assert result is True + + +# ============================================================================= +# Tests for ReleaseDriver +# ============================================================================= + + +class TestReleaseDriver: + """Tests for ReleaseDriver class.""" + + def test_release_driver_preserves_exception_context( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that ReleaseDriver preserves exception context when task fails.""" + # GIVEN + def failing_task(db: ReleaseShelf) -> None: + raise ValueError("Original error") + + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + task = run_release.Task(failing_task, "Failing task") + tag = Tag("3.14.0a1") + driver = run_release.ReleaseDriver( + tasks=[task], + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + + # WHEN + with pytest.raises(run_release.ReleaseException) as exc_info: + driver.run() + + # Then + assert "Failing task" in str(exc_info.value) + assert exc_info.value.__cause__ is not None + assert "Original error" in str(exc_info.value.__cause__) + + def test_release_driver_executes_tasks_in_order( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that tasks are executed in the exact order provided.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + execution_order: list[str] = [] + + def make_task(name: str): + def task_func(db: ReleaseShelf) -> None: + execution_order.append(name) + return run_release.Task(task_func, f"Task {name}") + + tasks = [make_task("A"), make_task("B"), make_task("C")] + tag = Tag("3.14.0a1") + driver = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + + # WHEN + driver.run() + + # Then + assert execution_order == ["A", "B", "C"] + + def test_release_driver_resumes_from_checkpoint( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that driver resumes from last checkpoint after failure.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + execution_count = {"A": 0, "B": 0, "C": 0} + + def make_counting_task(name: str): + def task_func(db: ReleaseShelf) -> None: + execution_count[name] += 1 + if name == "B" and execution_count["B"] == 1: + raise ValueError(f"Task {name} failed on first attempt") + return run_release.Task(task_func, f"Task {name}") + + tasks = [make_counting_task("A"), make_counting_task("B"), make_counting_task("C")] + tag = Tag("3.14.0a1") + + # WHEN - first attempt fails at task B + driver = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + with pytest.raises(run_release.ReleaseException): + driver.run() + + # Then - task A ran once, B failed, C never ran + assert execution_count["A"] == 1 + assert execution_count["B"] == 1 + assert execution_count["C"] == 0 + + # WHEN - second attempt resumes from task B + driver2 = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + driver2.run() + + # Then - task A not re-executed, B ran again, C ran first time + assert execution_count["A"] == 1 + assert execution_count["B"] == 2 + assert execution_count["C"] == 1 + + def test_release_driver_checkpoints_before_each_task( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that checkpoint is called before executing each task.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + checkpoint_calls: list[tuple[str, list[str]]] = [] + + def make_task(name: str): + def task_func(db: ReleaseShelf) -> None: + checkpoint_calls.append((name, list(db.get("completed_tasks", [])))) + return run_release.Task(task_func, f"Task {name}") + + tasks = [make_task("A"), make_task("B")] + tag = Tag("3.14.0a1") + driver = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + + # WHEN + driver.run() + + # Then + assert len(checkpoint_calls) == 2 + assert checkpoint_calls[0][1] == [] + assert len(checkpoint_calls[1][1]) == 1 + + def test_release_driver_tracks_completed_tasks( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that completed_tasks list accurately tracks finished tasks.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + + def simple_task(db: ReleaseShelf) -> None: + pass + + tasks = [ + run_release.Task(simple_task, "Task 1"), + run_release.Task(simple_task, "Task 2"), + run_release.Task(simple_task, "Task 3"), + ] + tag = Tag("3.14.0a1") + driver = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + + # WHEN + driver.run() + + # Then + assert len(driver.completed_task_descriptions) == 3 + assert driver.completed_task_descriptions == ["Task 1", "Task 2", "Task 3"] + + def test_release_driver_sets_finished_flag( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that finished flag is set to True when all tasks complete.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + + def simple_task(db: ReleaseShelf) -> None: + pass + + tasks = [run_release.Task(simple_task, "Task 1")] + tag = Tag("3.14.0a1") + driver = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + assert driver.db.get("finished") is False + + # WHEN + driver.run() + + # Then + assert driver.db.get("finished") is True + + def test_release_driver_with_no_tasks( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that driver handles empty task list gracefully.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + tag = Tag("3.14.0a1") + driver = run_release.ReleaseDriver( + tasks=[], + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + + # WHEN + driver.run() + + # Then + assert driver.db.get("finished") is True + assert len(driver.completed_task_descriptions) == 0 + + def test_release_driver_handles_task_failure_mid_sequence( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that failure in middle of task sequence stops execution.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + executed: list[str] = [] + + def make_task(name: str, should_fail: bool = False): + def task_func(db: ReleaseShelf) -> None: + executed.append(name) + if should_fail: + raise ValueError(f"{name} failed") + return run_release.Task(task_func, f"Task {name}") + + tasks = [ + make_task("A"), + make_task("B"), + make_task("C", should_fail=True), + make_task("D"), + make_task("E"), + ] + tag = Tag("3.14.0a1") + driver = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + + # WHEN + with pytest.raises(run_release.ReleaseException): + driver.run() + + # Then + assert executed == ["A", "B", "C"] + assert len(driver.completed_task_descriptions) == 2 + + def test_release_driver_initializes_db_with_constructor_args( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that database is initialized with values from constructor.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + tag = Tag("3.14.0a1") + git_repo_path = "/path/to/cpython" + + # WHEN + driver = run_release.ReleaseDriver( + tasks=[], + release_tag=tag, + git_repo=git_repo_path, + api_key="myuser:mykey", + ssh_user="releasemanager", + ssh_key="/path/to/ssh/key", + sign_gpg=True, + ) + + # Then + assert str(driver.db["git_repo"]) == git_repo_path + assert driver.db["auth_info"] == "myuser:mykey" + assert driver.db["ssh_user"] == "releasemanager" + assert driver.db["ssh_key"] == "/path/to/ssh/key" + assert driver.db["sign_gpg"] is True + assert driver.db["release"].normalized() == tag.normalized() + + def test_release_driver_preserves_existing_db_values_on_resume( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that existing DB values are preserved when resuming.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + tag = Tag("3.14.0a1") + run_release.ReleaseDriver( + tasks=[], + release_tag=tag, + git_repo="/original/path", + api_key="original:key", + ssh_user="original_user", + sign_gpg=True, + ) + + # WHEN + driver2 = run_release.ReleaseDriver( + tasks=[], + release_tag=tag, + git_repo="/different/path", + api_key="different:key", + ssh_user="different_user", + sign_gpg=False, + ) + + # Then + assert str(driver2.db["git_repo"]) == "/original/path" + assert driver2.db["auth_info"] == "original:key" + assert driver2.db["ssh_user"] == "original_user" + assert driver2.db["sign_gpg"] is True + + def test_release_driver_resets_db_if_previous_release_finished( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test that DB is reset if previous release was finished.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + tag = Tag("3.14.0a1") + driver1 = run_release.ReleaseDriver( + tasks=[], + release_tag=tag, + git_repo="/path/to/repo", + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + driver1.run() + assert driver1.db["finished"] is True + + # WHEN + tag2 = Tag("3.14.0a2") + driver2 = run_release.ReleaseDriver( + tasks=[], + release_tag=tag2, + git_repo="/path/to/repo", + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + + # Then + assert driver2.db.get("finished", False) is False + assert str(driver2.db["release"]) == "3.14.0a2" + + def test_release_driver_handles_all_tasks_already_completed( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """Test behavior when resuming and all tasks are already completed.""" + # GIVEN + monkeypatch.setattr("run_release.Path.home", lambda: tmp_path) + + def simple_task(db: ReleaseShelf) -> None: + pass + + tasks = [ + run_release.Task(simple_task, "Task 1"), + run_release.Task(simple_task, "Task 2"), + ] + tag = Tag("3.14.0a1") + driver1 = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + driver1.run() + driver1.db["finished"] = False + + # WHEN + driver2 = run_release.ReleaseDriver( + tasks=tasks, + release_tag=tag, + git_repo=str(tmp_path), + api_key="user:key", + ssh_user="testuser", + sign_gpg=False, + ) + driver2.run() + + # Then + assert driver2.db["finished"] is True + + +# ============================================================================= +# Tests for check_ssh_connection +# ============================================================================= + + +class TestCheckSSHConnection: + """Tests for check_ssh_connection function.""" + + def test_check_ssh_connection_success(self) -> None: + """Test that check_ssh_connection succeeds with valid connections.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"/home/user" + mock_channel.recv_stderr.return_value = b"" + + db = cast(ReleaseShelf, { + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }) + + with patch("run_release.paramiko.SSHClient", return_value=mock_client): + run_release.check_ssh_connection(db) + + # Should be called twice (once for DOWNLOADS_SERVER, once for DOCS_SERVER) + assert mock_client.connect.call_count == 2 + + +# ============================================================================= +# Tests for check_sigstore_client +# ============================================================================= + + +class TestCheckSigstoreClient: + """Tests for check_sigstore_client function.""" + + def test_check_sigstore_client_success(self) -> None: + """Test that check_sigstore_client succeeds with valid version.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"sigstore 3.6.6" + mock_channel.recv_stderr.return_value = b"" + + db = cast(ReleaseShelf, { + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }) + + with patch("run_release.paramiko.SSHClient", return_value=mock_client): + run_release.check_sigstore_client(db) + + def test_check_sigstore_client_invalid_version(self) -> None: + """Test that check_sigstore_client fails with invalid version.""" + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"sigstore 4.0.0" + mock_channel.recv_stderr.return_value = b"" + + db = cast(ReleaseShelf, { + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }) + + with patch("run_release.paramiko.SSHClient", return_value=mock_client): + with pytest.raises(run_release.ReleaseException) as exc_info: + run_release.check_sigstore_client(db) + + assert "Sigstore version not detected or not valid" in str(exc_info.value) + + +# ============================================================================= +# Tests for ReleaseState (JSON-based state management) +# ============================================================================= + + +class TestReleaseState: + """Tests for the ReleaseState dataclass.""" + + def test_default_state(self) -> None: + """Test that default state has expected values.""" + state = run_release.ReleaseState() + assert state.finished is False + assert state.completed_tasks == [] + assert state.gpg_key is None + assert state.git_repo is None + assert state.auth_info is None + assert state.ssh_user is None + assert state.ssh_key is None + assert state.sign_gpg is True + assert state.security_release is False + assert state.release_tag is None + + def test_save_and_load(self, tmp_path: Path) -> None: + """Test that state can be saved and loaded from JSON.""" + state_file = tmp_path / "release_state.json" + + # Create and save state + state = run_release.ReleaseState( + finished=False, + completed_tasks=["Task 1", "Task 2"], + gpg_key="ABC123", + git_repo="/path/to/repo", + auth_info="user:key", + ssh_user="testuser", + ssh_key="/path/to/key", + sign_gpg=True, + security_release=False, + release_tag="3.14.0a1", + ) + state.save(state_file) + + # Load and verify + loaded = run_release.ReleaseState.load(state_file) + assert loaded.finished is False + assert loaded.completed_tasks == ["Task 1", "Task 2"] + assert loaded.gpg_key == "ABC123" + assert loaded.git_repo == "/path/to/repo" + assert loaded.auth_info == "user:key" + assert loaded.ssh_user == "testuser" + assert loaded.ssh_key == "/path/to/key" + assert loaded.sign_gpg is True + assert loaded.security_release is False + assert loaded.release_tag == "3.14.0a1" + + def test_load_nonexistent_file(self, tmp_path: Path) -> None: + """Test that loading nonexistent file returns default state.""" + state_file = tmp_path / "nonexistent.json" + state = run_release.ReleaseState.load(state_file) + assert state.finished is False + assert state.completed_tasks == [] + + def test_load_corrupted_file(self, tmp_path: Path) -> None: + """Test that loading corrupted file returns default state.""" + state_file = tmp_path / "corrupted.json" + state_file.write_text("not valid json {{{") + state = run_release.ReleaseState.load(state_file) + assert state.finished is False + + def test_clear_removes_file(self, tmp_path: Path) -> None: + """Test that clear removes the state file.""" + state_file = tmp_path / "release_state.json" + state = run_release.ReleaseState(finished=True) + state.save(state_file) + assert state_file.exists() + + state.clear(state_file) + assert not state_file.exists() + + def test_clear_nonexistent_file(self, tmp_path: Path) -> None: + """Test that clear on nonexistent file doesn't raise.""" + state_file = tmp_path / "nonexistent.json" + state = run_release.ReleaseState() + state.clear(state_file) # Should not raise + + +# ============================================================================= +# Tests for with_retry decorator +# ============================================================================= + + +class TestWithRetry: + """Tests for the with_retry decorator.""" + + def test_succeeds_on_first_attempt(self) -> None: + """Test that function succeeds without retry.""" + call_count = 0 + + @run_release.with_retry(max_attempts=3, delay=0.01) + def success_func() -> str: + nonlocal call_count + call_count += 1 + return "success" + + result = success_func() + assert result == "success" + assert call_count == 1 + + def test_retries_on_failure_then_succeeds(self) -> None: + """Test that function retries on failure and eventually succeeds.""" + call_count = 0 + + @run_release.with_retry(max_attempts=3, delay=0.01, exceptions=(ValueError,)) + def intermittent_func() -> str: + nonlocal call_count + call_count += 1 + if call_count < 3: + raise ValueError("Temporary failure") + return "success" + + result = intermittent_func() + assert result == "success" + assert call_count == 3 + + def test_raises_after_max_attempts(self) -> None: + """Test that function raises after max attempts exceeded.""" + call_count = 0 + + @run_release.with_retry(max_attempts=3, delay=0.01, exceptions=(ValueError,)) + def always_fails() -> str: + nonlocal call_count + call_count += 1 + raise ValueError("Permanent failure") + + with pytest.raises(ValueError, match="Permanent failure"): + always_fails() + assert call_count == 3 + + def test_does_not_retry_unexpected_exceptions(self) -> None: + """Test that unexpected exceptions are not retried.""" + call_count = 0 + + @run_release.with_retry(max_attempts=3, delay=0.01, exceptions=(ValueError,)) + def raises_type_error() -> str: + nonlocal call_count + call_count += 1 + raise TypeError("Unexpected error") + + with pytest.raises(TypeError, match="Unexpected error"): + raises_type_error() + assert call_count == 1 + + def test_exponential_backoff(self) -> None: + """Test that delay increases exponentially.""" + import time + + start_times: list[float] = [] + + @run_release.with_retry(max_attempts=3, delay=0.05, backoff=2.0, exceptions=(ValueError,)) + def timed_fails() -> str: + start_times.append(time.time()) + raise ValueError("Fail") + + with pytest.raises(ValueError): + timed_fails() + + assert len(start_times) == 3 + # First retry should be after ~0.05s, second after ~0.1s + first_delay = start_times[1] - start_times[0] + second_delay = start_times[2] - start_times[1] + assert first_delay >= 0.04 # Allow some tolerance + assert second_delay >= 0.08 # Should be ~2x first delay + + +# ============================================================================= +# Tests for check_tool functions +# ============================================================================= + + +class TestCheckTools: + """Tests for tool availability checking functions.""" + + def test_check_tool_found(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that check_tool succeeds when tool is found.""" + monkeypatch.setattr("shutil.which", lambda tool: f"/usr/bin/{tool}") + db = cast(ReleaseShelf, {}) + run_release.check_tool(db, "git") # Should not raise + + def test_check_tool_not_found(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that check_tool raises when tool is not found.""" + monkeypatch.setattr("shutil.which", lambda tool: None) + db = cast(ReleaseShelf, {}) + with pytest.raises(run_release.ReleaseException, match="nonexistent is not available"): + run_release.check_tool(db, "nonexistent") + + def test_check_gh(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test check_gh function.""" + monkeypatch.setattr("shutil.which", lambda tool: f"/usr/bin/{tool}") + db = cast(ReleaseShelf, {}) + run_release.check_gh(db) # Should not raise + + def test_check_git(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Test check_git function.""" + monkeypatch.setattr("shutil.which", lambda tool: f"/usr/bin/{tool}") + db = cast(ReleaseShelf, {}) + run_release.check_git(db) # Should not raise + + +# ============================================================================= +# Tests for cd context manager +# ============================================================================= + + +class TestCdContextManager: + """Tests for the cd context manager.""" + + def test_cd_changes_and_restores_directory(self, tmp_path: Path) -> None: + """Test that cd changes directory and restores on exit.""" + import os + + original = os.getcwd() + subdir = tmp_path / "subdir" + subdir.mkdir() + + with run_release.cd(subdir): + assert os.getcwd() == str(subdir) + + assert os.getcwd() == original + + def test_cd_restores_on_exception(self, tmp_path: Path) -> None: + """Test that cd restores directory even on exception.""" + import os + + original = os.getcwd() + subdir = tmp_path / "subdir" + subdir.mkdir() + + with pytest.raises(ValueError): + with run_release.cd(subdir): + raise ValueError("Test error") + + assert os.getcwd() == original + + +# ============================================================================= +# Tests for check_cpython_repo_is_clean +# ============================================================================= + + +class TestCheckCPythonRepoIsClean: + """Tests for check_cpython_repo_is_clean function.""" + + def test_clean_repo(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that clean repo passes.""" + db = cast(ReleaseShelf, {"git_repo": tmp_path}) + + def mock_check_output(cmd, cwd=None): + return b"" + + monkeypatch.setattr("subprocess.check_output", mock_check_output) + run_release.check_cpython_repo_is_clean(db) # Should not raise + + def test_dirty_repo(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that dirty repo raises.""" + db = cast(ReleaseShelf, {"git_repo": tmp_path}) + + def mock_check_output(cmd, cwd=None): + return b"M modified_file.py\n" + + monkeypatch.setattr("subprocess.check_output", mock_check_output) + with pytest.raises(run_release.ReleaseException, match="Git repository is not clean"): + run_release.check_cpython_repo_is_clean(db) + + +# ============================================================================= +# Test fixtures for release tasks +# ============================================================================= + + +@pytest.fixture +def mock_release_db(tmp_path: Path) -> dict[str, Any]: + """Create a mock release database for testing tasks.""" + return { + "release": Tag("3.14.0a1"), + "git_repo": tmp_path, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + "auth_info": "user:apikey", + "sign_gpg": False, + "security_release": False, + "gpg_key": None, + } + + +@pytest.fixture +def mock_ssh_client() -> MagicMock: + """Create a mock SSH client for testing SSH operations.""" + mock = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + + return mock + + +@pytest.fixture +def mock_subprocess(monkeypatch: pytest.MonkeyPatch): + """Mock subprocess calls for testing.""" + mock_call = MagicMock(return_value=0) + mock_output = MagicMock(return_value=b"") + + monkeypatch.setattr("subprocess.check_call", mock_call) + monkeypatch.setattr("subprocess.check_output", mock_output) + + return {"check_call": mock_call, "check_output": mock_output} + + +# ============================================================================= +# Tests for version checking +# ============================================================================= + + @pytest.mark.parametrize( "version", ["sigstore 3.6.2", "sigstore 3.6.6"], @@ -148,7 +1122,7 @@ def test_check_doc_unreleased_version_waived(monkeypatch, tmp_path: Path) -> Non def test_update_whatsnew_toctree(tmp_path: Path) -> None: - # Arrange + # GIVEN # Only first beta triggers update db = {"release": Tag("3.14.0b1")} @@ -156,9 +1130,873 @@ def test_update_whatsnew_toctree(tmp_path: Path) -> None: toctree__file = tmp_path / "patchlevel.h" toctree__file.write_text(original_toctree_file.read_text()) - # Act + # WHEN run_release.update_whatsnew_toctree(cast(ReleaseShelf, db), str(toctree__file)) - # Assert + # THEN new_contents = toctree__file.read_text() assert " 3.15.rst\n 3.14.rst\n" in new_contents + + +# ============================================================================= +# Tests for upload_files_to_server +# ============================================================================= + + +class TestUploadFilesToServer: + """Tests for upload_files_to_server function.""" + + def test_upload_to_downloads_server_creates_correct_directories( + self, tmp_path: Path + ) -> None: + """Test that upload creates the expected directory structure on downloads server.""" + # GIVEN + release_tag = Tag("3.14.0a1") + artifacts_path = tmp_path / "3.14.0a1" + downloads_dir = artifacts_path / "downloads" + downloads_dir.mkdir(parents=True) + (downloads_dir / "Python-3.14.0a1.tgz").touch() + (downloads_dir / "Python-3.14.0a1.tar.xz").touch() + + db = cast( + ReleaseShelf, + { + "release": release_tag, + "git_repo": tmp_path, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_sftp = MagicMock(spec=run_release.MySFTPClient) + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = MagicMock() + + # Mock SFTP operations + with patch("run_release.MySFTPClient.from_transport", return_value=mock_sftp), \ + patch("run_release.ssh_client") as mock_ssh_ctx, \ + patch("shutil.rmtree"): + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.upload_files_to_server(db, run_release.DOWNLOADS_SERVER) + + # THEN - verify correct destination path was created + expected_dest = Path(f"/home/psf-users/testuser/{release_tag}") + mock_sftp.mkdir.assert_any_call(str(expected_dest)) + mock_sftp.mkdir.assert_any_call(str(expected_dest / "downloads")) + + def test_upload_to_docs_server_only_uploads_docs(self, tmp_path: Path) -> None: + """Test that uploading to docs server only uploads docs subdirectory.""" + # GIVEN + release_tag = Tag("3.14.0a1") + artifacts_path = tmp_path / "3.14.0a1" + docs_dir = artifacts_path / "docs" + downloads_dir = artifacts_path / "downloads" + docs_dir.mkdir(parents=True) + downloads_dir.mkdir(parents=True) + (docs_dir / "python-3.14.0a1-docs-html.zip").touch() + (downloads_dir / "Python-3.14.0a1.tgz").touch() + + db = cast( + ReleaseShelf, + { + "release": release_tag, + "git_repo": tmp_path, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_sftp = MagicMock(spec=run_release.MySFTPClient) + + mock_client.get_transport.return_value = mock_transport + + with patch("run_release.MySFTPClient.from_transport", return_value=mock_sftp), \ + patch("run_release.ssh_client") as mock_ssh_ctx, \ + patch("shutil.rmtree"): + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.upload_files_to_server(db, run_release.DOCS_SERVER) + + # THEN - docs uploaded, downloads NOT uploaded + expected_dest = Path(f"/home/psf-users/testuser/{release_tag}") + mock_sftp.put_dir.assert_called_once() + call_args = mock_sftp.put_dir.call_args + assert "docs" in str(call_args[0][0]) # source contains 'docs' + assert "downloads" not in str(call_args[0][0]) # source does NOT contain 'downloads' + + def test_upload_cleans_up_existing_destination(self, tmp_path: Path) -> None: + """Test that upload removes existing files at destination before uploading.""" + # GIVEN + release_tag = Tag("3.14.0a1") + artifacts_path = tmp_path / "3.14.0a1" + downloads_dir = artifacts_path / "downloads" + downloads_dir.mkdir(parents=True) + (downloads_dir / "Python-3.14.0a1.tgz").touch() + + db = cast( + ReleaseShelf, + { + "release": release_tag, + "git_repo": tmp_path, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + mock_sftp = MagicMock(spec=run_release.MySFTPClient) + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + + with patch("run_release.MySFTPClient.from_transport", return_value=mock_sftp), \ + patch("run_release.ssh_client") as mock_ssh_ctx, \ + patch("shutil.rmtree"): + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.upload_files_to_server(db, run_release.DOWNLOADS_SERVER) + + # THEN - rm -rf was called to clean destination + expected_dest = f"/home/psf-users/testuser/{release_tag}" + mock_channel.exec_command.assert_called_once_with(f"rm -rf {expected_dest}") + + def test_upload_handles_ssh_transport_failure(self, tmp_path: Path) -> None: + """Test that upload fails gracefully when SSH transport is unavailable.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "git_repo": tmp_path, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_client.get_transport.return_value = None # Transport failed + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN & Assert + with pytest.raises(AssertionError, match="SSH transport.*is None"): + run_release.upload_files_to_server(db, run_release.DOWNLOADS_SERVER) + + def test_upload_handles_sftp_client_creation_failure(self, tmp_path: Path) -> None: + """Test that upload fails gracefully when SFTP client creation fails.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "git_repo": tmp_path, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_client.get_transport.return_value = mock_transport + + with patch("run_release.MySFTPClient.from_transport", return_value=None), \ + patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN & Assert + with pytest.raises(AssertionError, match="SFTP client.*is None"): + run_release.upload_files_to_server(db, run_release.DOWNLOADS_SERVER) + + +class TestPlaceFilesInDownloadFolder: + """Tests for place_files_in_download_folder function.""" + + def test_places_downloads_in_correct_destination(self) -> None: + """Test that downloads are copied to the correct public FTP directory.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + + # Create a list to track all commands executed + executed_commands = [] + + def mock_open_session(): + mock_channel = MagicMock() + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"success" + mock_channel.recv_stderr.return_value = b"" + + def track_command(cmd): + executed_commands.append(cmd) + + mock_channel.exec_command.side_effect = track_command + return mock_channel + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.side_effect = mock_open_session + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.place_files_in_download_folder(db) + + # THEN - verify destination path is correct + expected_source = "/home/psf-users/testuser/3.14.0a1" + # Note: normalized() converts "3.14.0a1" to "3.14.0" + expected_dest = "/srv/www.python.org/ftp/python/3.14.0" + + assert any(f"mkdir -p {expected_dest}" in cmd for cmd in executed_commands) + assert any(f"cp {expected_source}/downloads/* {expected_dest}" in cmd for cmd in executed_commands) + + def test_sets_correct_permissions_on_download_directory(self) -> None: + """Test that download directory gets correct group and permissions.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + + # Create a list to track all commands executed + executed_commands = [] + + def mock_open_session(): + mock_channel = MagicMock() + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + + def track_command(cmd): + executed_commands.append(cmd) + + mock_channel.exec_command.side_effect = track_command + return mock_channel + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.side_effect = mock_open_session + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.place_files_in_download_folder(db) + + # THEN - verify permissions are set + # Note: normalized() converts "3.14.0a1" to "3.14.0" + expected_dest = "/srv/www.python.org/ftp/python/3.14.0" + + assert any(f"chgrp downloads {expected_dest}" in cmd for cmd in executed_commands) + assert any(f"chmod 775 {expected_dest}" in cmd for cmd in executed_commands) + assert any(f"find {expected_dest} -type f -exec chmod 664" in cmd for cmd in executed_commands) + + def test_places_docs_for_release_candidate(self) -> None: + """Test that docs are placed in public location for release candidates.""" + # GIVEN + release_tag = Tag("3.14.0rc1") # Release candidate + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + + # Create a list to track all commands executed + executed_commands = [] + + def mock_open_session(): + mock_channel = MagicMock() + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + + def track_command(cmd): + executed_commands.append(cmd) + + mock_channel.exec_command.side_effect = track_command + return mock_channel + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.side_effect = mock_open_session + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.place_files_in_download_folder(db) + + # THEN - verify docs are also copied + expected_docs_dest = "/srv/www.python.org/ftp/python/doc/3.14.0rc1" + + assert any(f"mkdir -p {expected_docs_dest}" in cmd for cmd in executed_commands) + assert any(f"cp /home/psf-users/testuser/3.14.0rc1/docs/* {expected_docs_dest}" in cmd for cmd in executed_commands) + + def test_does_not_place_docs_for_alpha(self) -> None: + """Test that docs are NOT placed for alpha releases.""" + # GIVEN + release_tag = Tag("3.14.0a1") # Alpha release + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + + # Create a list to track all commands executed + executed_commands = [] + + def mock_open_session(): + mock_channel = MagicMock() + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + + def track_command(cmd): + executed_commands.append(cmd) + + mock_channel.exec_command.side_effect = track_command + return mock_channel + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.side_effect = mock_open_session + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.place_files_in_download_folder(db) + + # THEN - verify docs destination is NOT created + # Should not have any commands referencing /ftp/python/doc/ + assert not any("/ftp/python/doc/" in cmd for cmd in executed_commands) + + def test_handles_permission_command_failure(self) -> None: + """Test that function raises clear error when permission commands fail.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + + call_count = [0] + + def mock_open_session(): + mock_channel = MagicMock() + # First 3 commands succeed, 4th fails + call_count[0] += 1 + if call_count[0] <= 3: + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + else: + mock_channel.recv_exit_status.return_value = 1 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"Operation not permitted" + return mock_channel + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.side_effect = mock_open_session + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN & Assert + with pytest.raises(run_release.ReleaseException, match="Operation not permitted"): + run_release.place_files_in_download_folder(db) + + +class TestUnpackDocsInTheDocsServer: + """Tests for unpack_docs_in_the_docs_server function.""" + + def test_unpacks_docs_for_release_candidate(self) -> None: + """Test that docs are unpacked and deployed for release candidates.""" + # GIVEN + release_tag = Tag("3.14.0rc1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + + # Create a list to track all commands executed + executed_commands = [] + + def mock_open_session(): + mock_channel = MagicMock() + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + + def track_command(cmd): + executed_commands.append(cmd) + + mock_channel.exec_command.side_effect = track_command + return mock_channel + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.side_effect = mock_open_session + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.unpack_docs_in_the_docs_server(db) + + # THEN - verify unzip and move commands + expected_source = "/home/psf-users/testuser/3.14.0rc1" + expected_dest = "/srv/docs.python.org/release/3.14.0rc1" + + assert any(f"mkdir -p {expected_dest}" in cmd for cmd in executed_commands) + assert any(f"unzip {expected_source}/docs/python-3.14.0rc1-docs-html.zip" in cmd for cmd in executed_commands) + assert any(f"mv /{expected_dest}/python-3.14.0rc1-docs-html/* {expected_dest}" in cmd for cmd in executed_commands) + assert any(f"rm -rf /{expected_dest}/python-3.14.0rc1-docs-html" in cmd for cmd in executed_commands) + + def test_sets_correct_permissions_on_docs(self) -> None: + """Test that docs get correct group ownership and permissions.""" + # GIVEN + release_tag = Tag("3.14.0rc1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.unpack_docs_in_the_docs_server(db) + + # THEN - verify permissions + expected_dest = "/srv/docs.python.org/release/3.14.0rc1" + exec_commands = [call[0][0] for call in mock_channel.exec_command.call_args_list] + + assert any(f"chgrp -R docs {expected_dest}" in cmd for cmd in exec_commands) + assert any(f"chmod -R 775 {expected_dest}" in cmd for cmd in exec_commands) + assert any(f"find {expected_dest} -type f -exec chmod 664" in cmd for cmd in exec_commands) + + def test_does_nothing_for_alpha_release(self) -> None: + """Test that function does nothing for alpha releases.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.unpack_docs_in_the_docs_server(db) + + # THEN - ssh_client should not even be entered + mock_ssh_ctx.assert_not_called() + + def test_does_nothing_for_beta_release(self) -> None: + """Test that function does nothing for beta releases.""" + # GIVEN + release_tag = Tag("3.14.0b1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.unpack_docs_in_the_docs_server(db) + + # THEN - ssh_client should not be called + mock_ssh_ctx.assert_not_called() + + def test_unpacks_docs_for_final_release(self) -> None: + """Test that docs are unpacked for final releases.""" + # GIVEN + release_tag = Tag("3.14.0") # Final release + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + mock_channel.recv_exit_status.return_value = 0 + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"" + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.unpack_docs_in_the_docs_server(db) + + # THEN - commands were executed + mock_channel.exec_command.assert_called() + + def test_handles_unzip_failure(self) -> None: + """Test that function raises clear error when unzip fails.""" + # GIVEN + release_tag = Tag("3.14.0rc1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_transport = MagicMock() + mock_channel = MagicMock() + + mock_client.get_transport.return_value = mock_transport + mock_transport.open_session.return_value = mock_channel + # First command (mkdir) succeeds, second (unzip) fails + mock_channel.recv_exit_status.side_effect = [0, 1] + mock_channel.recv.return_value = b"" + mock_channel.recv_stderr.return_value = b"cannot find zipfile" + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN & Assert + with pytest.raises(run_release.ReleaseException, match="cannot find zipfile"): + run_release.unpack_docs_in_the_docs_server(db) + + +class TestWaitUntilAllFilesAreInFolder: + """Tests for wait_until_all_files_are_in_folder function.""" + + def test_succeeds_immediately_when_all_files_present(self) -> None: + """Test that function returns immediately when all required files exist.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + "security_release": False, + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_sftp = MagicMock() + + # All required files are present + mock_sftp.listdir.return_value = [ + "Python-3.14.0a1.tgz", # Linux + "python-3.14.0a1.exe", # Windows + "python-3.14.0a1-macos11.pkg", # macOS + "Python-3.14.0a1.tar.xz", + ] + + mock_client.open_sftp.return_value = mock_sftp + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.wait_until_all_files_are_in_folder(db) + + # THEN - listdir called only once (no retry loop) + assert mock_sftp.listdir.call_count == 1 + + def test_waits_and_retries_when_files_missing(self) -> None: + """Test that function polls repeatedly when files are not yet available.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + "security_release": False, + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_sftp = MagicMock() + + # Simulate files appearing gradually + mock_sftp.listdir.side_effect = [ + ["Python-3.14.0a1.tgz"], # First check: only Linux + ["Python-3.14.0a1.tgz", "python-3.14.0a1.exe"], # Second: + Windows + ["Python-3.14.0a1.tgz", "python-3.14.0a1.exe", "python-3.14.0a1-macos11.pkg"], # Third: all + ] + + mock_client.open_sftp.return_value = mock_sftp + + with patch("run_release.ssh_client") as mock_ssh_ctx, \ + patch("time.sleep") as mock_sleep: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.wait_until_all_files_are_in_folder(db) + + # THEN - polled 3 times, slept 2 times + assert mock_sftp.listdir.call_count == 3 + assert mock_sleep.call_count == 2 + + def test_security_release_only_checks_linux_files(self) -> None: + """Test that security releases only wait for Linux source files.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + "security_release": True, # Security release + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_sftp = MagicMock() + + # Only Linux files present (no Windows/macOS) + mock_sftp.listdir.return_value = [ + "Python-3.14.0a1.tgz", + "Python-3.14.0a1.tar.xz", + ] + + mock_client.open_sftp.return_value = mock_sftp + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN - should succeed without waiting for Windows/macOS + run_release.wait_until_all_files_are_in_folder(db) + + # THEN - completed with only Linux files + assert mock_sftp.listdir.call_count == 1 + + def test_raises_clear_error_when_destination_folder_missing(self) -> None: + """Test that function fails fast when release folder doesn't exist.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + "security_release": False, + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_sftp = MagicMock() + + # Folder doesn't exist + mock_sftp.listdir.side_effect = FileNotFoundError("No such file or directory") + mock_client.open_sftp.return_value = mock_sftp + + with patch("run_release.ssh_client") as mock_ssh_ctx: + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN & Assert + with pytest.raises(FileNotFoundError, match="release folder.*has not been created"): + run_release.wait_until_all_files_are_in_folder(db) + + def test_identifies_linux_files_correctly(self) -> None: + """Test that function correctly identifies Linux source tarballs.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + "security_release": False, + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_sftp = MagicMock() + + # Has .tar.xz but not .tgz - should wait + mock_sftp.listdir.side_effect = [ + ["Python-3.14.0a1.tar.xz", "python-3.14.0a1.exe", "python-3.14.0a1-macos11.pkg"], + ["Python-3.14.0a1.tgz", "Python-3.14.0a1.tar.xz", "python-3.14.0a1.exe", "python-3.14.0a1-macos11.pkg"], + ] + + mock_client.open_sftp.return_value = mock_sftp + + with patch("run_release.ssh_client") as mock_ssh_ctx, \ + patch("time.sleep"): + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.wait_until_all_files_are_in_folder(db) + + # THEN - had to check twice because .tgz was missing initially + assert mock_sftp.listdir.call_count == 2 + + def test_identifies_windows_files_correctly(self) -> None: + """Test that function correctly identifies Windows .exe installer.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + "security_release": False, + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_sftp = MagicMock() + + # Missing .exe file + mock_sftp.listdir.side_effect = [ + ["Python-3.14.0a1.tgz", "python-3.14.0a1-macos11.pkg"], + ["Python-3.14.0a1.tgz", "python-3.14.0a1.exe", "python-3.14.0a1-macos11.pkg"], + ] + + mock_client.open_sftp.return_value = mock_sftp + + with patch("run_release.ssh_client") as mock_ssh_ctx, \ + patch("time.sleep"): + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.wait_until_all_files_are_in_folder(db) + + # THEN - retried until .exe appeared + assert mock_sftp.listdir.call_count == 2 + + def test_identifies_macos_files_correctly(self) -> None: + """Test that function correctly identifies macOS .pkg installer.""" + # GIVEN + release_tag = Tag("3.14.0a1") + db = cast( + ReleaseShelf, + { + "release": release_tag, + "ssh_user": "testuser", + "ssh_key": "/path/to/key", + "security_release": False, + }, + ) + + mock_client = MagicMock(spec=paramiko.SSHClient) + mock_sftp = MagicMock() + + # Missing macOS pkg file + mock_sftp.listdir.side_effect = [ + ["Python-3.14.0a1.tgz", "python-3.14.0a1.exe"], + ["Python-3.14.0a1.tgz", "python-3.14.0a1.exe", "python-3.14.0a1-macos11.pkg"], + ] + + mock_client.open_sftp.return_value = mock_sftp + + with patch("run_release.ssh_client") as mock_ssh_ctx, \ + patch("time.sleep"): + mock_ssh_ctx.return_value.__enter__.return_value = mock_client + + # WHEN + run_release.wait_until_all_files_are_in_folder(db) + + # THEN - retried until macOS pkg appeared + assert mock_sftp.listdir.call_count == 2