diff --git a/README.md b/README.md index 094eefccc..ac8cc9658 100644 --- a/README.md +++ b/README.md @@ -862,6 +862,26 @@ Service exposes metrics in Prometheus format on `/metrics` endpoint. Scraping th curl 'http://127.0.0.1:8080/metrics' ``` +#### Available Metrics + +The service exports several types of metrics: + +**API and LLM Metrics:** +- `ols_rest_api_calls_total` - REST API calls counter +- `ols_response_duration_seconds` - Response durations +- `ols_llm_calls_total` - LLM calls counter +- `ols_llm_token_sent_total` / `ols_llm_token_received_total` - Token usage counters + +**Quota Metrics** (when quota handlers are configured): +- `ols_quota_limit_total{subject_type, subject_id}` - Total quota allocated per subject +- `ols_quota_available_total{subject_type, subject_id}` - Available quota remaining +- `ols_quota_utilization_percent{subject_type, subject_id}` - Quota utilization percentage +- `ols_token_usage_total{user_id, provider, model, token_type}` - Cumulative token consumption +- `ols_quota_warning_subjects_total{subject_type}` - Number of subjects with >80% quota usage +- `ols_quota_exceeded_subjects_total{subject_type}` - Number of subjects that exceeded quota + +Quota metrics are automatically updated when the `/metrics` endpoint is accessed and periodically in the background (every 5 minutes by default). These metrics provide insights into token usage patterns, quota utilization, and help with capacity planning and cost management. + ### Gradio UI There is a minimal Gradio UI you can use when running the OLS server locally. To use it, it is needed to enable UI in `olsconfig.yaml` file: diff --git a/ols/app/metrics/metrics.py b/ols/app/metrics/metrics.py index 1e407565c..668fed2e1 100644 --- a/ols/app/metrics/metrics.py +++ b/ols/app/metrics/metrics.py @@ -1,6 +1,7 @@ """Prometheus metrics that are exposed by REST API.""" -from typing import Annotated, Any +import logging +from typing import Annotated, Any, Optional from fastapi import APIRouter, Depends from fastapi.responses import PlainTextResponse @@ -14,9 +15,15 @@ ) from ols import config +from ols.app.metrics.quota_metrics_service import ( + get_quota_metrics_collector, + update_quota_metrics_on_request, +) from ols.src.auth.auth import get_auth_dependency from ols.utils.config import AppConfig +logger = logging.getLogger(__name__) + router = APIRouter(tags=["metrics"]) auth_dependency = get_auth_dependency( config.ols_config, virtual_path="/ols-metrics-access" @@ -56,16 +63,46 @@ ) +def get_quota_metrics_dependency() -> Optional[Any]: + """FastAPI dependency to provide quota metrics collector. + + Returns: + QuotaMetricsCollector instance or None if not configured or failed to initialize + """ + try: + # Check if quota handlers are configured + if ( + config.ols_config.quota_handlers is None + or config.ols_config.quota_handlers.storage is None + ): + logger.debug("Quota handlers not configured, skipping quota metrics") + return None + + return get_quota_metrics_collector(config.ols_config.quota_handlers.storage) + + except Exception as e: + logger.error("Failed to initialize quota metrics collector: %s", e) + # Return None to gracefully degrade - metrics endpoint should still work + return None + + @router.get("/metrics", response_class=PlainTextResponse) -def get_metrics(auth: Annotated[Any, Depends(auth_dependency)]) -> PlainTextResponse: +def get_metrics( + auth: Annotated[Any, Depends(auth_dependency)], + quota_collector: Annotated[Optional[Any], Depends(get_quota_metrics_dependency)], +) -> PlainTextResponse: """Metrics Endpoint. Args: auth: The Authentication handler (FastAPI Depends) that will handle authentication Logic. + quota_collector: The quota metrics collector dependency (optional) Returns: Response containing the latest metrics. """ + # Update quota metrics if collector is available + update_quota_metrics_on_request(quota_collector) + return PlainTextResponse(content=generate_latest(), media_type=CONTENT_TYPE_LATEST) diff --git a/ols/app/metrics/quota_metrics_collector.py b/ols/app/metrics/quota_metrics_collector.py new file mode 100644 index 000000000..efd5b1f54 --- /dev/null +++ b/ols/app/metrics/quota_metrics_collector.py @@ -0,0 +1,168 @@ +"""Prometheus metrics collector for quota utilization statistics.""" + +import logging +from typing import Dict, Set + +from prometheus_client import Gauge + +from ols.app.metrics.quota_metrics_repository import QuotaMetricsRepository + +logger = logging.getLogger(__name__) + + +class QuotaMetricsCollector: + """Collector for quota-related Prometheus metrics.""" + + def __init__(self, repository: QuotaMetricsRepository) -> None: + """Initialize the quota metrics collector.""" + self.repository = repository + + # Initialize Prometheus metrics + self.quota_limit_total = Gauge( + "ols_quota_limit_total", + "Total quota limit allocated", + ["subject_type", "subject_id"], + ) + + self.quota_available_total = Gauge( + "ols_quota_available_total", + "Available quota remaining", + ["subject_type", "subject_id"], + ) + + self.quota_utilization_percent = Gauge( + "ols_quota_utilization_percent", + "Quota utilization as percentage", + ["subject_type", "subject_id"], + ) + + self.token_usage_total = Gauge( + "ols_token_usage_total", + "Total tokens consumed", + ["user_id", "provider", "model", "token_type"], + ) + + self.quota_warning_subjects_total = Gauge( + "ols_quota_warning_subjects_total", + "Number of subjects with >80% quota usage", + ["subject_type"], + ) + + self.quota_exceeded_subjects_total = Gauge( + "ols_quota_exceeded_subjects_total", + "Number of subjects that exceeded quota", + ["subject_type"], + ) + + logger.info("QuotaMetricsCollector initialized") + + def update_quota_metrics(self) -> None: + """Update quota-related Prometheus metrics.""" + try: + # Check database health first + if not self.repository.health_check(): + logger.warning( + "Database health check failed, skipping quota metrics update" + ) + return + + logger.debug("Updating quota metrics") + quota_records = self.repository.get_quota_records() + + # Track seen metrics to clear stale ones + seen_quota_metrics: Set[tuple] = set() + + # Counters for warning and exceeded subjects + warning_counts: Dict[str, int] = {} + exceeded_counts: Dict[str, int] = {} + + for record in quota_records: + subject_type = "user" if record.subject == "u" else "cluster" + subject_id = record.id if record.id else "cluster" + + labels = (subject_type, subject_id) + seen_quota_metrics.add(labels) + + # Update basic quota metrics + self.quota_limit_total.labels(*labels).set(record.quota_limit) + self.quota_available_total.labels(*labels).set(record.available) + self.quota_utilization_percent.labels(*labels).set( + record.utilization_percent + ) + + # Track warning and exceeded thresholds + if record.utilization_percent > 100: + exceeded_counts[subject_type] = ( + exceeded_counts.get(subject_type, 0) + 1 + ) + elif record.utilization_percent > 80: + warning_counts[subject_type] = ( + warning_counts.get(subject_type, 0) + 1 + ) + + # Update threshold metrics + for subject_type in ["user", "cluster"]: + self.quota_warning_subjects_total.labels(subject_type).set( + warning_counts.get(subject_type, 0) + ) + self.quota_exceeded_subjects_total.labels(subject_type).set( + exceeded_counts.get(subject_type, 0) + ) + + logger.debug("Updated %d quota records", len(quota_records)) + + except Exception as e: + logger.error("Error updating quota metrics: %s", e) + + def update_token_usage_metrics(self) -> None: + """Update token usage Prometheus metrics.""" + try: + # Check database health first + if not self.repository.health_check(): + logger.warning( + "Database health check failed, skipping token usage metrics update" + ) + return + + logger.debug("Updating token usage metrics") + token_records = self.repository.get_token_usage_records() + + # Track seen metrics to clear stale ones + seen_token_metrics: Set[tuple] = set() + + for record in token_records: + # Update input token metrics + input_labels = (record.user_id, record.provider, record.model, "input") + seen_token_metrics.add(input_labels) + self.token_usage_total.labels(*input_labels).set(record.input_tokens) + + # Update output token metrics + output_labels = ( + record.user_id, + record.provider, + record.model, + "output", + ) + seen_token_metrics.add(output_labels) + self.token_usage_total.labels(*output_labels).set(record.output_tokens) + + logger.debug("Updated %d token usage records", len(token_records)) + + except Exception as e: + logger.error("Error updating token usage metrics: %s", e) + + def update_all_metrics(self) -> None: + """Update all quota-related metrics.""" + logger.debug("Starting comprehensive quota metrics update") + + try: + self.update_quota_metrics() + except Exception as e: + logger.error("Failed to update quota metrics: %s", e) + + try: + self.update_token_usage_metrics() + except Exception as e: + logger.error("Failed to update token usage metrics: %s", e) + + logger.debug("Completed quota metrics update") diff --git a/ols/app/metrics/quota_metrics_repository.py b/ols/app/metrics/quota_metrics_repository.py new file mode 100644 index 000000000..8d4ad6bf7 --- /dev/null +++ b/ols/app/metrics/quota_metrics_repository.py @@ -0,0 +1,178 @@ +"""Repository interface and implementation for quota metrics data access.""" + +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import datetime +from typing import List, Optional + +import psycopg2 + +from ols.app.models.config import PostgresConfig + +logger = logging.getLogger(__name__) + + +@dataclass +class QuotaRecord: + """Data class representing a quota record from the database.""" + + id: str + subject: str + quota_limit: int + available: int + updated_at: datetime + + @property + def utilization_percent(self) -> float: + """Calculate quota utilization as a percentage.""" + if self.quota_limit == 0: + return 0.0 + used = self.quota_limit - self.available + return (used / self.quota_limit) * 100.0 + + +@dataclass +class TokenUsageRecord: + """Data class representing a token usage record from the database.""" + + user_id: str + provider: str + model: str + input_tokens: int + output_tokens: int + updated_at: datetime + + @property + def total_tokens(self) -> int: + """Calculate total tokens (input + output).""" + return self.input_tokens + self.output_tokens + + +class QuotaMetricsRepository(ABC): + """Abstract repository interface for quota metrics data access.""" + + @abstractmethod + def get_quota_records(self) -> List[QuotaRecord]: + """Retrieve all quota records from the database.""" + raise NotImplementedError + + @abstractmethod + def get_token_usage_records(self) -> List[TokenUsageRecord]: + """Retrieve all token usage records from the database.""" + raise NotImplementedError + + @abstractmethod + def health_check(self) -> bool: + """Check if the database connection is healthy.""" + raise NotImplementedError + + +class PostgresQuotaMetricsRepository(QuotaMetricsRepository): + """PostgreSQL implementation of the quota metrics repository.""" + + SELECT_QUOTA_RECORDS = """ + SELECT id, subject, quota_limit, available, updated_at + FROM quota_limits + WHERE revoked_at IS NULL + ORDER BY subject, id + """ + + SELECT_USAGE_RECORDS = """ + SELECT user_id, provider, model, input_tokens, output_tokens, updated_at + FROM token_usage + ORDER BY user_id, provider, model + """ + + def __init__(self, config: PostgresConfig) -> None: + """Initialize the repository with database connection configuration.""" + self.connection_config = config + self.connection: Optional[psycopg2.extensions.connection] = None + self._connect() + + def _connect(self) -> None: + """Establish connection to the PostgreSQL database.""" + logger.info("Establishing connection to PostgreSQL for quota metrics") + + try: + self.connection = psycopg2.connect( + host=self.connection_config.host, + port=self.connection_config.port, + user=self.connection_config.user, + password=self.connection_config.password, + dbname=self.connection_config.dbname, + sslmode=self.connection_config.ssl_mode, + gssencmode=self.connection_config.gss_encmode, + ) + self.connection.autocommit = True + logger.info("Successfully connected to PostgreSQL for quota metrics") + except Exception as e: + if self.connection: + self.connection.close() + logger.exception("Error connecting to PostgreSQL for quota metrics: %s", e) + raise + + def _ensure_connected(self) -> None: + """Ensure database connection is established.""" + if self.connection is None or self.connection.closed: + logger.warning("Database connection lost, reconnecting...") + self._connect() + + def get_quota_records(self) -> List[QuotaRecord]: + """Retrieve all quota records from the database.""" + self._ensure_connected() + + try: + with self.connection.cursor() as cursor: + cursor.execute(self.SELECT_QUOTA_RECORDS) + rows = cursor.fetchall() + + return [ + QuotaRecord( + id=row[0], + subject=row[1], + quota_limit=row[2], + available=row[3], + updated_at=row[4], + ) + for row in rows + ] + except Exception as e: + logger.error("Error retrieving quota records: %s", e) + raise + + def get_token_usage_records(self) -> List[TokenUsageRecord]: + """Retrieve all token usage records from the database.""" + self._ensure_connected() + + try: + with self.connection.cursor() as cursor: + cursor.execute(self.SELECT_USAGE_RECORDS) + rows = cursor.fetchall() + + return [ + TokenUsageRecord( + user_id=row[0], + provider=row[1], + model=row[2], + input_tokens=row[3], + output_tokens=row[4], + updated_at=row[5], + ) + for row in rows + ] + except Exception as e: + logger.error("Error retrieving token usage records: %s", e) + raise + + def health_check(self) -> bool: + """Check if the database connection is healthy.""" + try: + self._ensure_connected() + with self.connection.cursor() as cursor: + cursor.execute("SELECT 1") + logger.debug("Database health check passed") + return True + except Exception as e: + logger.error("Database health check failed: %s", e) + return False diff --git a/ols/app/metrics/quota_metrics_scheduler.py b/ols/app/metrics/quota_metrics_scheduler.py new file mode 100644 index 000000000..8265e1f0f --- /dev/null +++ b/ols/app/metrics/quota_metrics_scheduler.py @@ -0,0 +1,97 @@ +"""Background scheduler for quota metrics collection.""" + +import logging +import time +from threading import Thread +from typing import Optional + +from ols.app.metrics.quota_metrics_service import get_quota_metrics_collector +from ols.app.models.config import QuotaHandlersConfig +from ols.utils.config import AppConfig + +logger = logging.getLogger(__name__) + +# Default update interval in seconds (5 minutes) +DEFAULT_UPDATE_INTERVAL = 300 + + +def quota_metrics_scheduler(config: Optional[QuotaHandlersConfig]) -> bool: + """Background task to periodically update quota metrics. + + This function runs in an infinite loop, updating quota metrics at regular + intervals. It's designed to be run in a separate daemon thread. + + Args: + config: Quota handlers configuration containing storage and scheduler settings + + Returns: + False if configuration is invalid or initialization fails, + otherwise runs indefinitely + """ + if config is None: + logger.warning( + "Quota handlers not configured, skipping quota metrics scheduler" + ) + return False + + if config.storage is None: + logger.warning( + "Storage for quota handlers not configured, skipping quota metrics scheduler" + ) + return False + + # Determine update interval + update_interval = DEFAULT_UPDATE_INTERVAL + if config.scheduler is not None and config.scheduler.period is not None: + update_interval = config.scheduler.period + + logger.info( + "Starting quota metrics scheduler with %d second interval", update_interval + ) + + # Initialize quota metrics collector + try: + collector = get_quota_metrics_collector(config.storage) + if collector is None: + logger.error("Failed to initialize quota metrics collector") + return False + except Exception as e: + logger.error("Error initializing quota metrics collector: %s", e) + return False + + # Main scheduler loop + while True: + try: + logger.debug("Updating quota metrics") + collector.update_all_metrics() + logger.debug("Quota metrics updated successfully") + + except Exception as e: + logger.error("Error updating quota metrics: %s", e) + # Continue running even if update fails + + try: + time.sleep(update_interval) + except KeyboardInterrupt: + logger.info("Quota metrics scheduler interrupted, stopping") + break + + return True + + +def start_quota_metrics_scheduler(config: AppConfig) -> None: + """Start quota metrics scheduler in a separate daemon thread. + + Args: + config: Application configuration containing quota handler settings + """ + logger.info("Starting quota metrics scheduler thread") + + thread = Thread( + target=quota_metrics_scheduler, + daemon=True, + args=(config.ols_config.quota_handlers,), + ) + thread.start() + + logger.info("Quota metrics scheduler thread started") diff --git a/ols/app/metrics/quota_metrics_service.py b/ols/app/metrics/quota_metrics_service.py new file mode 100644 index 000000000..e6b4959a3 --- /dev/null +++ b/ols/app/metrics/quota_metrics_service.py @@ -0,0 +1,109 @@ +"""FastAPI dependency injection service for quota metrics.""" + +import logging +from typing import Optional + +from ols.app.metrics.quota_metrics_collector import QuotaMetricsCollector +from ols.app.metrics.quota_metrics_repository import PostgresQuotaMetricsRepository +from ols.app.models.config import PostgresConfig + +logger = logging.getLogger(__name__) + + +class QuotaMetricsService: + """Singleton service for managing quota metrics collector instances.""" + + _instance: Optional["QuotaMetricsService"] = None + _collector: Optional[QuotaMetricsCollector] = None + + def __new__(cls) -> "QuotaMetricsService": + """Create a new instance using singleton pattern.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def get_collector( + self, postgres_config: PostgresConfig + ) -> Optional[QuotaMetricsCollector]: + """Get or create a quota metrics collector instance. + + Args: + postgres_config: PostgreSQL configuration for database connection + + Returns: + QuotaMetricsCollector instance or None if initialization fails + """ + if self._collector is not None: + logger.debug("Returning cached quota metrics collector") + return self._collector + + try: + logger.info("Initializing quota metrics collector") + repository = PostgresQuotaMetricsRepository(postgres_config) + self._collector = QuotaMetricsCollector(repository) + logger.info("Quota metrics collector initialized successfully") + return self._collector + + except Exception as e: + logger.error("Failed to initialize quota metrics collector: %s", e) + # Re-raise to ensure the dependency system knows about the failure + raise + + def reset(self) -> None: + """Reset the collector instance.""" + logger.debug("Resetting quota metrics collector") + self._collector = None + + +# Module-level service instance +_service = QuotaMetricsService() + + +def get_quota_metrics_collector( + postgres_config: PostgresConfig, +) -> Optional[QuotaMetricsCollector]: + """Get or create a quota metrics collector instance. + + This function serves as a FastAPI dependency that provides a quota metrics + collector instance. It implements singleton pattern to avoid creating + multiple database connections. + + Args: + postgres_config: PostgreSQL configuration for database connection + + Returns: + QuotaMetricsCollector instance or None if initialization fails + """ + return _service.get_collector(postgres_config) + + +def update_quota_metrics_on_request(collector: Optional[QuotaMetricsCollector]) -> None: + """Update quota metrics when metrics endpoint is requested. + + This function can be called before serving metrics to ensure + the latest quota data is included. + + Args: + collector: The quota metrics collector instance + """ + if collector is None: + logger.warning("No quota metrics collector available, skipping update") + return + + try: + logger.debug("Updating quota metrics for endpoint request") + collector.update_all_metrics() + logger.debug("Quota metrics updated successfully") + + except Exception as e: + logger.error("Failed to update quota metrics: %s", e) + # Don't re-raise here to avoid breaking the metrics endpoint + + +def reset_quota_metrics_collector() -> None: + """Reset the quota metrics collector instance. + + This is primarily useful for testing to ensure clean state + between test runs. + """ + _service.reset() diff --git a/tests/unit/app/metrics/test_quota_metrics_collector.py b/tests/unit/app/metrics/test_quota_metrics_collector.py new file mode 100644 index 000000000..a08c3cf1e --- /dev/null +++ b/tests/unit/app/metrics/test_quota_metrics_collector.py @@ -0,0 +1,303 @@ +"""Unit tests for QuotaMetricsCollector.""" + +import datetime +from unittest.mock import MagicMock + +from prometheus_client import REGISTRY + +from ols import config +from ols.app.metrics.quota_metrics_collector import QuotaMetricsCollector +from ols.app.metrics.quota_metrics_repository import QuotaRecord, TokenUsageRecord + +# needs to be setup before imports that use authentication +config.ols_config.authentication_config.module = "k8s" + + +class TestQuotaMetricsCollector: + """Test QuotaMetricsCollector implementation.""" + + def setup_method(self): + """Set up test environment.""" + # Clear any existing metrics to avoid conflicts + to_remove = [ + collector + for collector in list(REGISTRY._collector_to_names.keys()) + if hasattr(collector, "_name") + and ("ols_quota" in collector._name or "ols_token" in collector._name) + ] + + for collector in to_remove: + try: + REGISTRY.unregister(collector) + except KeyError: + pass # Already removed + + def teardown_method(self): + """Clean up test environment.""" + # Clear any metrics created during tests + to_remove = [ + collector + for collector in list(REGISTRY._collector_to_names.keys()) + if hasattr(collector, "_name") + and ("ols_quota" in collector._name or "ols_token" in collector._name) + ] + + for collector in to_remove: + try: + REGISTRY.unregister(collector) + except KeyError: + pass # Already removed + + def test_init_creates_prometheus_metrics(self): + """Test that initialization creates Prometheus metrics.""" + mock_repo = MagicMock() + collector = QuotaMetricsCollector(mock_repo) + + assert collector.quota_limit_total is not None + assert collector.quota_available_total is not None + assert collector.quota_utilization_percent is not None + assert collector.token_usage_total is not None + assert collector.quota_warning_subjects_total is not None + assert collector.quota_exceeded_subjects_total is not None + + def test_update_quota_metrics_with_data(self): + """Test updating quota metrics with sample data.""" + mock_repo = MagicMock() + + quota_records = [ + QuotaRecord( + id="user1", + subject="u", + quota_limit=1000, + available=750, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ), + QuotaRecord( + id="user2", + subject="u", + quota_limit=500, + available=100, + updated_at=datetime.datetime(2024, 1, 1, 13, 0, 0), + ), + QuotaRecord( + id="", + subject="c", + quota_limit=10000, + available=8000, + updated_at=datetime.datetime(2024, 1, 1, 14, 0, 0), + ), + ] + + mock_repo.get_quota_records.return_value = quota_records + mock_repo.health_check.return_value = True + + collector = QuotaMetricsCollector(mock_repo) + collector.update_quota_metrics() + + # Verify repository was called + mock_repo.get_quota_records.assert_called_once() + + # Verify metrics were set (this would require inspecting the prometheus metrics) + # For now, just verify no exceptions were raised + + def test_update_quota_metrics_empty_data(self): + """Test updating quota metrics with empty data.""" + mock_repo = MagicMock() + mock_repo.get_quota_records.return_value = [] + mock_repo.health_check.return_value = True + + collector = QuotaMetricsCollector(mock_repo) + collector.update_quota_metrics() + + mock_repo.get_quota_records.assert_called_once() + + def test_update_quota_metrics_database_error(self): + """Test handling of database errors during quota metrics update.""" + mock_repo = MagicMock() + mock_repo.get_quota_records.side_effect = Exception("Database connection error") + mock_repo.health_check.return_value = ( + True # Health check passes, but query fails + ) + + collector = QuotaMetricsCollector(mock_repo) + + # Should not raise exception, but should handle gracefully + collector.update_quota_metrics() + + mock_repo.get_quota_records.assert_called_once() + + def test_update_token_usage_metrics_with_data(self): + """Test updating token usage metrics with sample data.""" + mock_repo = MagicMock() + + token_records = [ + TokenUsageRecord( + user_id="user1", + provider="openai", + model="gpt-4", + input_tokens=500, + output_tokens=200, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ), + TokenUsageRecord( + user_id="user2", + provider="openai", + model="gpt-3.5", + input_tokens=300, + output_tokens=150, + updated_at=datetime.datetime(2024, 1, 1, 13, 0, 0), + ), + ] + + mock_repo.get_token_usage_records.return_value = token_records + mock_repo.health_check.return_value = True + + collector = QuotaMetricsCollector(mock_repo) + collector.update_token_usage_metrics() + + mock_repo.get_token_usage_records.assert_called_once() + + def test_update_token_usage_metrics_empty_data(self): + """Test updating token usage metrics with empty data.""" + mock_repo = MagicMock() + mock_repo.get_token_usage_records.return_value = [] + mock_repo.health_check.return_value = True + + collector = QuotaMetricsCollector(mock_repo) + collector.update_token_usage_metrics() + + mock_repo.get_token_usage_records.assert_called_once() + + def test_update_token_usage_metrics_database_error(self): + """Test handling of database errors during token usage metrics update.""" + mock_repo = MagicMock() + mock_repo.get_token_usage_records.side_effect = Exception( + "Database connection error" + ) + mock_repo.health_check.return_value = ( + True # Health check passes, but query fails + ) + + collector = QuotaMetricsCollector(mock_repo) + + # Should not raise exception, but should handle gracefully + collector.update_token_usage_metrics() + + mock_repo.get_token_usage_records.assert_called_once() + + def test_update_all_metrics(self): + """Test updating all metrics at once.""" + mock_repo = MagicMock() + mock_repo.get_quota_records.return_value = [] + mock_repo.get_token_usage_records.return_value = [] + mock_repo.health_check.return_value = True + + collector = QuotaMetricsCollector(mock_repo) + collector.update_all_metrics() + + mock_repo.get_quota_records.assert_called_once() + mock_repo.get_token_usage_records.assert_called_once() + + def test_quota_threshold_calculations(self): + """Test quota threshold calculations for warnings and exceeded.""" + mock_repo = MagicMock() + + quota_records = [ + # User with 90% utilization (warning) + QuotaRecord( + id="user1", + subject="u", + quota_limit=1000, + available=100, # 90% used + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ), + # User with 110% utilization (exceeded) + QuotaRecord( + id="user2", + subject="u", + quota_limit=500, + available=-50, # 110% used + updated_at=datetime.datetime(2024, 1, 1, 13, 0, 0), + ), + # User with 50% utilization (normal) + QuotaRecord( + id="user3", + subject="u", + quota_limit=1000, + available=500, # 50% used + updated_at=datetime.datetime(2024, 1, 1, 14, 0, 0), + ), + ] + + mock_repo.get_quota_records.return_value = quota_records + mock_repo.health_check.return_value = True + + collector = QuotaMetricsCollector(mock_repo) + collector.update_quota_metrics() + + mock_repo.get_quota_records.assert_called_once() + + def test_health_check_integration(self): + """Test that health check affects metric collection behavior.""" + mock_repo = MagicMock() + mock_repo.health_check.return_value = False + mock_repo.get_quota_records.side_effect = Exception("DB unavailable") + + collector = QuotaMetricsCollector(mock_repo) + + # Should handle unhealthy database gracefully + collector.update_quota_metrics() + + # Verify health check was performed + mock_repo.health_check.assert_called() + + def test_metric_labels_and_values(self): + """Test that metrics contain correct labels and values.""" + mock_repo = MagicMock() + + quota_record = QuotaRecord( + id="user123", + subject="u", + quota_limit=1000, + available=750, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + + token_record = TokenUsageRecord( + user_id="user123", + provider="openai", + model="gpt-4", + input_tokens=500, + output_tokens=200, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + + mock_repo.get_quota_records.return_value = [quota_record] + mock_repo.get_token_usage_records.return_value = [token_record] + mock_repo.health_check.return_value = True + + collector = QuotaMetricsCollector(mock_repo) + collector.update_all_metrics() + + # Verify method calls + mock_repo.get_quota_records.assert_called_once() + mock_repo.get_token_usage_records.assert_called_once() + mock_repo.health_check.assert_called() + + def test_error_resilience_during_metric_update(self): + """Test that errors in one metric update don't affect others.""" + mock_repo = MagicMock() + + # Make quota records fail but token usage succeed + mock_repo.get_quota_records.side_effect = Exception("Quota DB error") + mock_repo.get_token_usage_records.return_value = [] + mock_repo.health_check.return_value = True + + collector = QuotaMetricsCollector(mock_repo) + + # Should not raise exception for the entire update + collector.update_all_metrics() + + # Both methods should have been attempted + mock_repo.get_quota_records.assert_called_once() + mock_repo.get_token_usage_records.assert_called_once() diff --git a/tests/unit/app/metrics/test_quota_metrics_integration.py b/tests/unit/app/metrics/test_quota_metrics_integration.py new file mode 100644 index 000000000..83c55c7c0 --- /dev/null +++ b/tests/unit/app/metrics/test_quota_metrics_integration.py @@ -0,0 +1,187 @@ +"""Integration tests for quota metrics with FastAPI dependencies.""" + +import datetime +from unittest.mock import MagicMock, patch + +import pytest + +from ols import config +from ols.app.metrics.quota_metrics_repository import QuotaRecord, TokenUsageRecord +from ols.app.metrics.quota_metrics_service import get_quota_metrics_collector + +# needs to be setup before imports that use authentication +config.ols_config.authentication_config.module = "k8s" + + +class TestQuotaMetricsIntegration: + """Test integration of quota metrics with FastAPI dependencies.""" + + def setup_method(self): + """Set up test environment.""" + from prometheus_client import REGISTRY + + from ols.app.metrics.quota_metrics_service import reset_quota_metrics_collector + + # Reset quota metrics service + reset_quota_metrics_collector() + + # Clear any existing quota metrics from the registry + to_remove = [ + collector + for collector in list(REGISTRY._collector_to_names.keys()) + if hasattr(collector, "_name") + and ("ols_quota" in collector._name or "ols_token" in collector._name) + ] + + for collector in to_remove: + try: + REGISTRY.unregister(collector) + except KeyError: + pass # Already removed + + def teardown_method(self): + """Clean up test environment.""" + from prometheus_client import REGISTRY + + from ols.app.metrics.quota_metrics_service import reset_quota_metrics_collector + + # Reset quota metrics service + reset_quota_metrics_collector() + + # Clear any metrics created during tests + to_remove = [ + collector + for collector in list(REGISTRY._collector_to_names.keys()) + if hasattr(collector, "_name") + and ("ols_quota" in collector._name or "ols_token" in collector._name) + ] + + for collector in to_remove: + try: + REGISTRY.unregister(collector) + except KeyError: + pass # Already removed + + def test_quota_metrics_dependency_injection(self): + """Test that quota metrics collector can be injected as a dependency.""" + from ols.app.models.config import PostgresConfig + + with patch( + "ols.app.metrics.quota_metrics_service.PostgresQuotaMetricsRepository" + ) as mock_repo_class: + mock_repo = MagicMock() + mock_repo_class.return_value = mock_repo + + config_obj = PostgresConfig() + collector = get_quota_metrics_collector(config_obj) + + assert collector is not None + assert hasattr(collector, "update_all_metrics") + + def test_metrics_endpoint_includes_quota_metrics(self): + """Test that quota metrics are included in Prometheus output.""" + from prometheus_client import generate_latest + + # Mock the quota repository and collector + mock_repo = MagicMock() + mock_repo.get_quota_records.return_value = [ + QuotaRecord( + id="user1", + subject="u", + quota_limit=1000, + available=750, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + ] + mock_repo.get_token_usage_records.return_value = [ + TokenUsageRecord( + user_id="user1", + provider="openai", + model="gpt-4", + input_tokens=500, + output_tokens=200, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + ] + mock_repo.health_check.return_value = True + + with patch( + "ols.app.metrics.quota_metrics_service.PostgresQuotaMetricsRepository" + ) as mock_repo_class: + mock_repo_class.return_value = mock_repo + + from ols.app.models.config import PostgresConfig + + # Create collector and update metrics + config_obj = PostgresConfig() + collector = get_quota_metrics_collector(config_obj) + collector.update_all_metrics() + + # Generate Prometheus output + metrics_output = generate_latest().decode("utf-8") + + # Check that quota metrics are included in the response + assert "ols_quota_limit_total" in metrics_output + assert "ols_quota_available_total" in metrics_output + assert "ols_quota_utilization_percent" in metrics_output + assert "ols_token_usage_total" in metrics_output + + def test_quota_metrics_update_on_endpoint_call(self): + """Test that quota metrics are updated when metrics endpoint is called.""" + from ols.app.models.config import PostgresConfig + + mock_repo = MagicMock() + mock_repo.get_quota_records.return_value = [] + mock_repo.get_token_usage_records.return_value = [] + mock_repo.health_check.return_value = True + + with patch( + "ols.app.metrics.quota_metrics_service.PostgresQuotaMetricsRepository" + ) as mock_repo_class: + mock_repo_class.return_value = mock_repo + + config_obj = PostgresConfig() + collector = get_quota_metrics_collector(config_obj) + + # Verify that the collector can update metrics + collector.update_all_metrics() + + mock_repo.get_quota_records.assert_called_once() + mock_repo.get_token_usage_records.assert_called_once() + + def test_quota_metrics_error_handling_in_dependency(self): + """Test error handling when quota metrics collection fails.""" + from ols.app.models.config import PostgresConfig + + with patch( + "ols.app.metrics.quota_metrics_service.PostgresQuotaMetricsRepository" + ) as mock_repo_class: + # Make repository initialization fail + mock_repo_class.side_effect = Exception("Database connection failed") + + config_obj = PostgresConfig() + + # Should handle the error gracefully and return None or a placeholder collector + with pytest.raises(Exception): + get_quota_metrics_collector(config_obj) + + def test_quota_metrics_caching_in_dependency(self): + """Test that quota metrics collector is cached properly.""" + from ols.app.models.config import PostgresConfig + + with patch( + "ols.app.metrics.quota_metrics_service.PostgresQuotaMetricsRepository" + ) as mock_repo_class: + mock_repo = MagicMock() + mock_repo_class.return_value = mock_repo + + config_obj = PostgresConfig() + + # Call the dependency function multiple times + collector1 = get_quota_metrics_collector(config_obj) + collector2 = get_quota_metrics_collector(config_obj) + + # Should return the same instance (if cached) + assert collector1 is not None + assert collector2 is not None + assert collector1 is collector2 # Same instance due to caching diff --git a/tests/unit/app/metrics/test_quota_metrics_repository.py b/tests/unit/app/metrics/test_quota_metrics_repository.py new file mode 100644 index 000000000..e9f4db1ff --- /dev/null +++ b/tests/unit/app/metrics/test_quota_metrics_repository.py @@ -0,0 +1,306 @@ +"""Unit tests for QuotaMetricsRepository.""" + +import datetime +from unittest.mock import MagicMock, patch + +import pytest + +from ols import config +from ols.app.metrics.quota_metrics_repository import ( + PostgresQuotaMetricsRepository, + QuotaRecord, + TokenUsageRecord, +) +from ols.app.models.config import PostgresConfig + +# needs to be setup before imports that use authentication +config.ols_config.authentication_config.module = "k8s" + + +class TestQuotaRecord: + """Test QuotaRecord data class.""" + + def test_quota_record_creation(self): + """Test QuotaRecord creation with all fields.""" + record = QuotaRecord( + id="user123", + subject="u", + quota_limit=1000, + available=750, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + assert record.id == "user123" + assert record.subject == "u" + assert record.quota_limit == 1000 + assert record.available == 750 + assert record.updated_at == datetime.datetime(2024, 1, 1, 12, 0, 0) + + def test_quota_utilization_percent(self): + """Test quota utilization percentage calculation.""" + record = QuotaRecord( + id="user123", + subject="u", + quota_limit=1000, + available=250, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + assert record.utilization_percent == 75.0 + + def test_quota_utilization_percent_zero_limit(self): + """Test quota utilization when limit is zero.""" + record = QuotaRecord( + id="user123", + subject="u", + quota_limit=0, + available=0, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + assert record.utilization_percent == 0.0 + + def test_quota_utilization_percent_negative_available(self): + """Test quota utilization when available is negative (over quota).""" + record = QuotaRecord( + id="user123", + subject="u", + quota_limit=1000, + available=-100, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + assert abs(record.utilization_percent - 110.0) < 0.001 + + +class TestTokenUsageRecord: + """Test TokenUsageRecord data class.""" + + def test_token_usage_record_creation(self): + """Test TokenUsageRecord creation with all fields.""" + record = TokenUsageRecord( + user_id="user123", + provider="openai", + model="gpt-4", + input_tokens=500, + output_tokens=200, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + assert record.user_id == "user123" + assert record.provider == "openai" + assert record.model == "gpt-4" + assert record.input_tokens == 500 + assert record.output_tokens == 200 + assert record.updated_at == datetime.datetime(2024, 1, 1, 12, 0, 0) + + def test_total_tokens(self): + """Test total tokens calculation.""" + record = TokenUsageRecord( + user_id="user123", + provider="openai", + model="gpt-4", + input_tokens=500, + output_tokens=200, + updated_at=datetime.datetime(2024, 1, 1, 12, 0, 0), + ) + assert record.total_tokens == 700 + + +class TestPostgresQuotaMetricsRepository: + """Test PostgresQuotaMetricsRepository implementation.""" + + def test_init_storage_failure_detection(self): + """Test exception handling for storage initialization.""" + exception_message = "Exception during PostgreSQL storage." + + with patch("psycopg2.connect") as mock_connect: + mock_connect.side_effect = Exception(exception_message) + + config = PostgresConfig() + with pytest.raises(Exception, match=exception_message): + PostgresQuotaMetricsRepository(config) + + def test_get_quota_records_success(self): + """Test successful retrieval of quota records.""" + quota_data = [ + ("user1", "u", 1000, 750, datetime.datetime(2024, 1, 1, 12, 0, 0)), + ("user2", "u", 500, 200, datetime.datetime(2024, 1, 1, 13, 0, 0)), + ("", "c", 10000, 8000, datetime.datetime(2024, 1, 1, 14, 0, 0)), + ] + + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = quota_data + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + records = repo.get_quota_records() + + assert len(records) == 3 + assert records[0].id == "user1" + assert records[0].subject == "u" + assert records[0].quota_limit == 1000 + assert records[0].available == 750 + assert records[1].id == "user2" + assert records[2].id == "" + assert records[2].subject == "c" + + def test_get_quota_records_empty_result(self): + """Test handling of empty quota records result.""" + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [] + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + records = repo.get_quota_records() + + assert len(records) == 0 + + def test_get_quota_records_database_error(self): + """Test handling of database errors during quota record retrieval.""" + mock_cursor = MagicMock() + mock_cursor.execute.side_effect = Exception("Database connection error") + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + + with pytest.raises(Exception, match="Database connection error"): + repo.get_quota_records() + + def test_get_token_usage_records_success(self): + """Test successful retrieval of token usage records.""" + token_data = [ + ( + "user1", + "openai", + "gpt-4", + 500, + 200, + datetime.datetime(2024, 1, 1, 12, 0, 0), + ), + ( + "user2", + "openai", + "gpt-3.5", + 300, + 150, + datetime.datetime(2024, 1, 1, 13, 0, 0), + ), + ] + + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = token_data + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + records = repo.get_token_usage_records() + + assert len(records) == 2 + assert records[0].user_id == "user1" + assert records[0].provider == "openai" + assert records[0].model == "gpt-4" + assert records[0].input_tokens == 500 + assert records[0].output_tokens == 200 + assert records[1].user_id == "user2" + + def test_get_token_usage_records_empty_result(self): + """Test handling of empty token usage records result.""" + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [] + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + records = repo.get_token_usage_records() + + assert len(records) == 0 + + def test_get_token_usage_records_database_error(self): + """Test handling of database errors during token usage record retrieval.""" + mock_cursor = MagicMock() + mock_cursor.execute.side_effect = Exception("Database connection error") + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + + with pytest.raises(Exception, match="Database connection error"): + repo.get_token_usage_records() + + def test_health_check_success(self): + """Test successful health check.""" + mock_cursor = MagicMock() + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + is_healthy = repo.health_check() + + assert is_healthy is True + mock_cursor.execute.assert_called_with("SELECT 1") + + def test_health_check_failure(self): + """Test health check failure.""" + mock_cursor = MagicMock() + mock_cursor.execute.side_effect = Exception("Connection failed") + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + is_healthy = repo.health_check() + + assert is_healthy is False + + def test_disconnected_repository_reconnects(self): + """Test that disconnected repository reconnects automatically.""" + mock_cursor = MagicMock() + mock_cursor.fetchall.return_value = [] + + with patch("psycopg2.connect") as mock_connect: + mock_connect.return_value.cursor.return_value.__enter__.return_value = ( + mock_cursor + ) + + config = PostgresConfig() + repo = PostgresQuotaMetricsRepository(config) + + # Simulate disconnection + repo.connection = None + + # This should trigger reconnection + records = repo.get_quota_records() + + assert len(records) == 0 + # Verify reconnection was called + assert mock_connect.call_count >= 2 diff --git a/tests/unit/app/metrics/test_quota_metrics_scheduler.py b/tests/unit/app/metrics/test_quota_metrics_scheduler.py new file mode 100644 index 000000000..bda142260 --- /dev/null +++ b/tests/unit/app/metrics/test_quota_metrics_scheduler.py @@ -0,0 +1,211 @@ +"""Unit tests for quota metrics scheduler.""" + +import time +from threading import Thread +from unittest.mock import MagicMock, patch + +from ols import config +from ols.app.metrics.quota_metrics_scheduler import ( + quota_metrics_scheduler, + start_quota_metrics_scheduler, +) +from ols.utils.config import AppConfig + +# needs to be setup before imports that use authentication +config.ols_config.authentication_config.module = "k8s" + + +class TestQuotaMetricsScheduler: + """Test quota metrics scheduler functionality.""" + + def test_quota_metrics_scheduler_with_no_config(self): + """Test scheduler behavior when quota handlers are not configured.""" + result = quota_metrics_scheduler(None) + assert result is False + + def test_quota_metrics_scheduler_with_no_storage(self): + """Test scheduler behavior when storage is not configured.""" + from ols.app.models.config import QuotaHandlersConfig + + config_obj = QuotaHandlersConfig() + config_obj.storage = None + + result = quota_metrics_scheduler(config_obj) + assert result is False + + def test_quota_metrics_scheduler_with_valid_config(self): + """Test scheduler with valid configuration.""" + from ols.app.models.config import PostgresConfig, QuotaHandlersConfig + + config_obj = QuotaHandlersConfig() + config_obj.storage = PostgresConfig() + + with patch( + "ols.app.metrics.quota_metrics_scheduler.get_quota_metrics_collector" + ) as mock_get_collector: + mock_collector = MagicMock() + mock_get_collector.return_value = mock_collector + + with patch("time.sleep") as mock_sleep: + # Make sleep raise an exception to exit the loop + mock_sleep.side_effect = KeyboardInterrupt("Test exit") + + try: + quota_metrics_scheduler(config_obj) + except KeyboardInterrupt: + pass # Expected to exit this way + + # Verify collector was called + mock_get_collector.assert_called_once() + mock_collector.update_all_metrics.assert_called() + + def test_quota_metrics_scheduler_with_collector_error(self): + """Test scheduler behavior when collector initialization fails.""" + from ols.app.models.config import PostgresConfig, QuotaHandlersConfig + + config_obj = QuotaHandlersConfig() + config_obj.storage = PostgresConfig() + + with patch( + "ols.app.metrics.quota_metrics_scheduler.get_quota_metrics_collector" + ) as mock_get_collector: + mock_get_collector.side_effect = Exception("Database connection failed") + + result = quota_metrics_scheduler(config_obj) + assert result is False + + def test_quota_metrics_scheduler_continues_on_update_error(self): + """Test that scheduler continues running even if update fails.""" + from ols.app.models.config import PostgresConfig, QuotaHandlersConfig + + config_obj = QuotaHandlersConfig() + config_obj.storage = PostgresConfig() + + with patch( + "ols.app.metrics.quota_metrics_scheduler.get_quota_metrics_collector" + ) as mock_get_collector: + mock_collector = MagicMock() + mock_collector.update_all_metrics.side_effect = [ + Exception("First update failed"), + None, # Second update succeeds + ] + mock_get_collector.return_value = mock_collector + + with patch("time.sleep") as mock_sleep: + # Make sleep raise an exception to exit the loop after second iteration + mock_sleep.side_effect = [None, KeyboardInterrupt("Test exit")] + + try: + quota_metrics_scheduler(config_obj) + except KeyboardInterrupt: + pass # Expected to exit this way + + # Verify collector was called twice + assert mock_collector.update_all_metrics.call_count == 2 + + def test_quota_metrics_scheduler_sleep_interval(self): + """Test that scheduler respects the configured sleep interval.""" + from ols.app.models.config import ( + PostgresConfig, + QuotaHandlersConfig, + SchedulerConfig, + ) + + config_obj = QuotaHandlersConfig() + config_obj.storage = PostgresConfig() + config_obj.scheduler = SchedulerConfig(period=42) # Custom period + + with patch( + "ols.app.metrics.quota_metrics_scheduler.get_quota_metrics_collector" + ) as mock_get_collector: + mock_collector = MagicMock() + mock_get_collector.return_value = mock_collector + + with patch("time.sleep") as mock_sleep: + mock_sleep.side_effect = KeyboardInterrupt("Test exit") + + try: + quota_metrics_scheduler(config_obj) + except KeyboardInterrupt: + pass # Expected to exit this way + + # Verify sleep was called with correct interval + mock_sleep.assert_called_with(42) + + def test_start_quota_metrics_scheduler(self): + """Test starting the quota metrics scheduler in a background thread.""" + from ols.app.models.config import PostgresConfig, QuotaHandlersConfig + + config_obj = AppConfig() + # Set up quota handlers config + config_obj.ols_config.quota_handlers = QuotaHandlersConfig() + config_obj.ols_config.quota_handlers.storage = PostgresConfig() + + with patch( + "ols.app.metrics.quota_metrics_scheduler.Thread" + ) as mock_thread_class: + mock_thread = MagicMock() + mock_thread_class.return_value = mock_thread + + start_quota_metrics_scheduler(config_obj) + + # Verify thread was created and started + mock_thread_class.assert_called_once() + call_args = mock_thread_class.call_args + assert call_args[1]["daemon"] is True + assert call_args[1]["target"] == quota_metrics_scheduler + assert call_args[1]["args"] == (config_obj.ols_config.quota_handlers,) + + mock_thread.start.assert_called_once() + + def test_quota_metrics_scheduler_default_interval(self): + """Test scheduler uses default interval when not configured.""" + from ols.app.models.config import PostgresConfig, QuotaHandlersConfig + + config_obj = QuotaHandlersConfig() + config_obj.storage = PostgresConfig() + config_obj.scheduler = None # No scheduler config + + with patch( + "ols.app.metrics.quota_metrics_scheduler.get_quota_metrics_collector" + ) as mock_get_collector: + mock_collector = MagicMock() + mock_get_collector.return_value = mock_collector + + with patch("time.sleep") as mock_sleep: + mock_sleep.side_effect = KeyboardInterrupt("Test exit") + + try: + quota_metrics_scheduler(config_obj) + except KeyboardInterrupt: + pass # Expected to exit this way + + # Verify sleep was called with default interval (300 seconds) + mock_sleep.assert_called_with(300) + + def test_quota_metrics_scheduler_thread_safety(self): + """Test that scheduler can be safely interrupted.""" + from ols.app.models.config import PostgresConfig, QuotaHandlersConfig + + config_obj = QuotaHandlersConfig() + config_obj.storage = PostgresConfig() + + with patch( + "ols.app.metrics.quota_metrics_scheduler.get_quota_metrics_collector" + ) as mock_get_collector: + mock_collector = MagicMock() + mock_get_collector.return_value = mock_collector + + # Start scheduler in a real thread + thread = Thread( + target=quota_metrics_scheduler, daemon=True, args=(config_obj,) + ) + thread.start() + + # Let it run briefly + time.sleep(0.1) + + # Thread should be running + assert thread.is_alive() + + # The daemon thread will be automatically cleaned up when test ends