diff --git a/pyproject.toml b/pyproject.toml index 60fde23..c62d859 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ dependencies = [ "langgraph>=0.4.3", "langchain-aws>=0.2.23", "pytz>=2025.2", + "aiohttp>=3.9.0", + "aiofiles>=23.0.0", ] [tool.setuptools] diff --git a/registry/__init__.py b/registry/__init__.py new file mode 100644 index 0000000..a73acb5 --- /dev/null +++ b/registry/__init__.py @@ -0,0 +1,30 @@ +""" +MCP Registry Package + +This package provides registry functionality for MCP (Model Context Protocol) servers, +including health monitoring, failover capabilities, and server discovery. +""" + +from .registry_health_monitor import ( + RegistryHealthMonitor, + RegistryConfig, + RegistryStatus, + RegistryHealthMetrics, + HealthCheckResult +) + +from .registry_failover_client import ( + RegistryFailoverClient, + FailoverResult +) + +__version__ = "0.1.0" +__all__ = [ + "RegistryHealthMonitor", + "RegistryConfig", + "RegistryStatus", + "RegistryHealthMetrics", + "HealthCheckResult", + "RegistryFailoverClient", + "FailoverResult" +] \ No newline at end of file diff --git a/registry/main.py b/registry/main.py index 5075c9e..2edb00a 100644 --- a/registry/main.py +++ b/registry/main.py @@ -40,6 +40,16 @@ from mcp.client.sse import sse_client # --- MCP Client Imports --- END +# --- Registry Health Monitoring Imports --- START +from .registry_health_monitor import ( + RegistryHealthMonitor, + RegistryConfig, + RegistryStatus, + RegistryHealthMetrics +) +from .registry_failover_client import RegistryFailoverClient, FailoverResult +# --- Registry Health Monitoring Imports --- END + # --- Define paths based on container structure --- START CONTAINER_APP_DIR = Path("/app") CONTAINER_REGISTRY_DIR = CONTAINER_APP_DIR / "registry" @@ -131,6 +141,12 @@ # --- WebSocket Connection Management --- active_connections: Set[WebSocket] = set() +# --- Registry Health Monitoring Global Variables --- START +REGISTRY_CONFIG_PATH = SERVERS_DIR / "external_registries.json" +registry_health_monitor: Optional[RegistryHealthMonitor] = None +registry_failover_client: Optional[RegistryFailoverClient] = None +# --- Registry Health Monitoring Global Variables --- END + # --- FAISS Helper Functions --- START def _get_text_for_embedding(server_info: dict) -> str: @@ -1074,7 +1090,21 @@ async def lifespan(app: FastAPI): logger.info("Generating initial Nginx configuration...") regenerate_nginx_config() # Generate config based on initial health status - # 4. Start the background periodic health check task + # 4. Initialize Registry Health Monitoring System + logger.info("Initializing registry health monitoring system...") + global registry_health_monitor, registry_failover_client + + registry_health_monitor = RegistryHealthMonitor(REGISTRY_CONFIG_PATH) + await registry_health_monitor.initialize() + + registry_failover_client = RegistryFailoverClient(registry_health_monitor) + await registry_failover_client.initialize() + + # Start health monitoring + await registry_health_monitor.start_monitoring() + logger.info("Registry health monitoring system started") + + # 5. Start the background periodic health check task logger.info("Starting background health check task...") health_check_task = asyncio.create_task(run_health_checks()) @@ -1084,6 +1114,17 @@ async def lifespan(app: FastAPI): # --- Shutdown tasks --- START logger.info("Running shutdown tasks...") + + # Stop registry health monitoring + if registry_health_monitor: + logger.info("Stopping registry health monitoring...") + await registry_health_monitor.stop_monitoring() + await registry_health_monitor.cleanup() + + if registry_failover_client: + logger.info("Cleaning up registry failover client...") + await registry_failover_client.cleanup() + logger.info("Cancelling background health check task...") health_check_task.cancel() try: @@ -1154,6 +1195,279 @@ def api_auth( app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") templates = Jinja2Templates(directory=TEMPLATES_DIR) +# --- Registry Health Monitoring API Routes --- + +@app.post("/api/registries/add") +async def add_external_registry( + name: Annotated[str, Form()], + url: Annotated[str, Form()], + priority: Annotated[int, Form()] = 1, + timeout_seconds: Annotated[int, Form()] = 10, + retry_attempts: Annotated[int, Form()] = 3, + health_check_interval_seconds: Annotated[int, Form()] = 30, + api_key: Annotated[str, Form()] = None, + username: Annotated[str, Depends(api_auth)] = None, +): + """Add a new external registry for monitoring""" + if not registry_health_monitor: + raise HTTPException(status_code=503, detail="Registry health monitoring not initialized") + + # Validate URL format + try: + from urllib.parse import urlparse + parsed_url = urlparse(url) + if not parsed_url.scheme or not parsed_url.netloc: + raise ValueError("Invalid URL format") + except Exception: + raise HTTPException(status_code=400, detail="Invalid URL format") + + # Check if registry already exists + if name in registry_health_monitor.registries: + raise HTTPException(status_code=400, detail=f"Registry '{name}' already exists") + + # Create registry configuration + config = RegistryConfig( + name=name, + url=url, + priority=priority, + timeout_seconds=timeout_seconds, + retry_attempts=retry_attempts, + health_check_interval_seconds=health_check_interval_seconds, + api_key=api_key if api_key else None + ) + + try: + await registry_health_monitor.add_registry(config) + logger.info(f"Added external registry '{name}' by user '{username}'") + return JSONResponse( + status_code=201, + content={ + "message": f"Registry '{name}' added successfully", + "registry": { + "name": name, + "url": url, + "priority": priority, + "status": "checking" + } + } + ) + except Exception as e: + logger.error(f"Failed to add registry '{name}': {e}") + raise HTTPException(status_code=500, detail=f"Failed to add registry: {str(e)}") + + +@app.delete("/api/registries/{registry_name}") +async def remove_external_registry( + registry_name: str, + username: Annotated[str, Depends(api_auth)] = None, +): + """Remove an external registry""" + if not registry_health_monitor: + raise HTTPException(status_code=503, detail="Registry health monitoring not initialized") + + if registry_name not in registry_health_monitor.registries: + raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found") + + try: + await registry_health_monitor.remove_registry(registry_name) + logger.info(f"Removed external registry '{registry_name}' by user '{username}'") + return JSONResponse( + status_code=200, + content={"message": f"Registry '{registry_name}' removed successfully"} + ) + except Exception as e: + logger.error(f"Failed to remove registry '{registry_name}': {e}") + raise HTTPException(status_code=500, detail=f"Failed to remove registry: {str(e)}") + + +@app.get("/api/registries") +async def list_external_registries( + username: Annotated[str, Depends(api_auth)] = None, +): + """List all external registries and their health status""" + if not registry_health_monitor: + raise HTTPException(status_code=503, detail="Registry health monitoring not initialized") + + registries_info = [] + for name, config in registry_health_monitor.registries.items(): + metrics = registry_health_monitor.health_metrics.get(name) + registries_info.append({ + "name": name, + "url": config.url, + "priority": config.priority, + "enabled": config.enabled, + "status": metrics.status.value if metrics else "unknown", + "last_check": metrics.last_check_time.isoformat() if metrics and metrics.last_check_time else None, + "response_time_ms": metrics.response_time_ms if metrics else None, + "success_rate": metrics.success_rate if metrics else 0, + "uptime_percentage": metrics.uptime_percentage if metrics else 0, + "consecutive_failures": metrics.consecutive_failures if metrics else 0, + "last_error": metrics.last_error if metrics else None + }) + + return { + "registries": registries_info, + "system_health": registry_health_monitor.get_system_health_summary() + } + + +@app.get("/api/registries/{registry_name}/health") +async def get_registry_health( + registry_name: str, + username: Annotated[str, Depends(api_auth)] = None, +): + """Get detailed health information for a specific registry""" + if not registry_health_monitor: + raise HTTPException(status_code=503, detail="Registry health monitoring not initialized") + + if registry_name not in registry_health_monitor.registries: + raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found") + + config = registry_health_monitor.registries[registry_name] + metrics = registry_health_monitor.health_metrics.get(registry_name) + history = registry_health_monitor.get_registry_history(registry_name, limit=50) + + return { + "registry_name": registry_name, + "config": { + "url": config.url, + "priority": config.priority, + "timeout_seconds": config.timeout_seconds, + "enabled": config.enabled, + "health_check_interval_seconds": config.health_check_interval_seconds + }, + "metrics": { + "status": metrics.status.value if metrics else "unknown", + "last_check": metrics.last_check_time.isoformat() if metrics and metrics.last_check_time else None, + "response_time_ms": metrics.response_time_ms if metrics else None, + "success_rate": metrics.success_rate if metrics else 0, + "uptime_percentage": metrics.uptime_percentage if metrics else 0, + "consecutive_failures": metrics.consecutive_failures if metrics else 0, + "total_requests": metrics.total_requests if metrics else 0, + "successful_requests": metrics.successful_requests if metrics else 0, + "last_error": metrics.last_error if metrics else None + }, + "recent_history": [ + { + "timestamp": check.timestamp.isoformat(), + "status": check.status.value, + "response_time_ms": check.response_time_ms, + "error": check.error + } + for check in history + ] + } + + +@app.post("/api/registries/{registry_name}/health-check") +async def force_registry_health_check( + registry_name: str, + username: Annotated[str, Depends(api_auth)] = None, +): + """Force immediate health check for a specific registry""" + if not registry_health_monitor: + raise HTTPException(status_code=503, detail="Registry health monitoring not initialized") + + if registry_name not in registry_health_monitor.registries: + raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found") + + try: + result = await registry_health_monitor.check_registry_health(registry_name) + logger.info(f"Manual health check triggered for registry '{registry_name}' by user '{username}'") + + return { + "registry_name": registry_name, + "status": result.status.value, + "response_time_ms": result.response_time_ms, + "error": result.error, + "timestamp": result.timestamp.isoformat() + } + except Exception as e: + logger.error(f"Failed to perform health check for registry '{registry_name}': {e}") + raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}") + + +@app.post("/api/registries/health-check-all") +async def force_all_registries_health_check( + username: Annotated[str, Depends(api_auth)] = None, +): + """Force immediate health check for all registries""" + if not registry_health_monitor: + raise HTTPException(status_code=503, detail="Registry health monitoring not initialized") + + try: + await registry_health_monitor.force_health_check() + logger.info(f"Manual health check triggered for all registries by user '{username}'") + + # Return current status of all registries + return await list_external_registries(username) + except Exception as e: + logger.error(f"Failed to perform health check for all registries: {e}") + raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}") + + +@app.get("/api/registries/failover-status") +async def get_failover_status( + username: Annotated[str, Depends(api_auth)] = None, +): + """Get current failover status and statistics""" + if not registry_failover_client: + raise HTTPException(status_code=503, detail="Registry failover client not initialized") + + stats = registry_failover_client.get_failover_stats() + primary = registry_health_monitor.get_primary_registry() if registry_health_monitor else None + backups = registry_health_monitor.get_backup_registries() if registry_health_monitor else [] + + return { + "failover_stats": stats, + "primary_registry": primary, + "backup_registries": backups, + "failover_available": len(backups) > 0 + } + + +@app.post("/api/registries/{registry_name}/enable") +async def enable_registry( + registry_name: str, + username: Annotated[str, Depends(api_auth)] = None, +): + """Enable a registry""" + if not registry_health_monitor: + raise HTTPException(status_code=503, detail="Registry health monitoring not initialized") + + if registry_name not in registry_health_monitor.registries: + raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found") + + try: + await registry_health_monitor.update_registry_config(registry_name, {"enabled": True}) + logger.info(f"Registry '{registry_name}' enabled by user '{username}'") + return {"message": f"Registry '{registry_name}' enabled successfully"} + except Exception as e: + logger.error(f"Failed to enable registry '{registry_name}': {e}") + raise HTTPException(status_code=500, detail=f"Failed to enable registry: {str(e)}") + + +@app.post("/api/registries/{registry_name}/disable") +async def disable_registry( + registry_name: str, + username: Annotated[str, Depends(api_auth)] = None, +): + """Disable a registry""" + if not registry_health_monitor: + raise HTTPException(status_code=503, detail="Registry health monitoring not initialized") + + if registry_name not in registry_health_monitor.registries: + raise HTTPException(status_code=404, detail=f"Registry '{registry_name}' not found") + + try: + await registry_health_monitor.update_registry_config(registry_name, {"enabled": False}) + logger.info(f"Registry '{registry_name}' disabled by user '{username}'") + return {"message": f"Registry '{registry_name}' disabled successfully"} + except Exception as e: + logger.error(f"Failed to disable registry '{registry_name}': {e}") + raise HTTPException(status_code=500, detail=f"Failed to disable registry: {str(e)}") + + # --- Routes --- diff --git a/registry/registry_failover_client.py b/registry/registry_failover_client.py new file mode 100644 index 0000000..b70d07b --- /dev/null +++ b/registry/registry_failover_client.py @@ -0,0 +1,425 @@ +""" +MCP Registry Failover Client + +This module provides automatic failover capabilities when interacting with +external MCP registries, ensuring high availability and resilience. +""" + +import asyncio +import json +import logging +from datetime import datetime, timezone +from typing import Dict, List, Optional, Any, Tuple +from dataclasses import dataclass + +import aiohttp + +from .registry_health_monitor import ( + RegistryHealthMonitor, + RegistryConfig, + RegistryStatus, + HealthCheckResult +) + +logger = logging.getLogger(__name__) + + +@dataclass +class FailoverResult: + """Result of a failover operation""" + success: bool + registry_used: Optional[str] + response_data: Optional[Any] + error: Optional[str] + attempts: List[Tuple[str, str]] # [(registry_name, error), ...] + response_time_ms: Optional[float] + + +class RegistryFailoverClient: + """ + Client that automatically handles failover between multiple MCP registries + """ + + def __init__(self, health_monitor: RegistryHealthMonitor): + self.health_monitor = health_monitor + self.session: Optional[aiohttp.ClientSession] = None + self.default_timeout = 30 + self.max_retries_per_registry = 2 + self.backoff_delay = 1.0 # seconds + + # Circuit breaker settings + self.circuit_breaker_timeout = 60 # seconds to wait before retrying failed registry + self.circuit_breaker_state: Dict[str, datetime] = {} + + async def initialize(self): + """Initialize the failover client""" + self.session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self.default_timeout) + ) + logger.info("Registry failover client initialized") + + async def cleanup(self): + """Clean up resources""" + if self.session: + await self.session.close() + logger.info("Registry failover client cleaned up") + + def _is_circuit_open(self, registry_name: str) -> bool: + """Check if circuit breaker is open for a registry""" + if registry_name not in self.circuit_breaker_state: + return False + + circuit_time = self.circuit_breaker_state[registry_name] + now = datetime.now(timezone.utc) + return (now - circuit_time).total_seconds() < self.circuit_breaker_timeout + + def _open_circuit(self, registry_name: str): + """Open circuit breaker for a registry""" + self.circuit_breaker_state[registry_name] = datetime.now(timezone.utc) + logger.warning(f"Opened circuit breaker for registry '{registry_name}'") + + def _close_circuit(self, registry_name: str): + """Close circuit breaker for a registry""" + if registry_name in self.circuit_breaker_state: + del self.circuit_breaker_state[registry_name] + logger.info(f"Closed circuit breaker for registry '{registry_name}'") + + async def _make_request( + self, + registry_name: str, + method: str, + path: str, + **kwargs + ) -> Tuple[bool, Any, Optional[str]]: + """ + Make a request to a specific registry + + Returns: + (success, response_data, error_message) + """ + config = self.health_monitor.registries.get(registry_name) + if not config: + return False, None, f"Registry '{registry_name}' not configured" + + if not config.enabled: + return False, None, f"Registry '{registry_name}' is disabled" + + # Check circuit breaker + if self._is_circuit_open(registry_name): + return False, None, f"Circuit breaker open for '{registry_name}'" + + url = f"{config.url.rstrip('/')}/{path.lstrip('/')}" + + # Prepare headers + headers = config.headers.copy() if config.headers else {} + if config.api_key: + headers['Authorization'] = f'Bearer {config.api_key}' + + # Add any additional headers from kwargs + if 'headers' in kwargs: + headers.update(kwargs.pop('headers')) + + # Set timeout + timeout = aiohttp.ClientTimeout(total=config.timeout_seconds) + + try: + async with self.session.request( + method, + url, + headers=headers, + timeout=timeout, + **kwargs + ) as response: + + if response.status >= 200 and response.status < 300: + try: + data = await response.json() + self._close_circuit(registry_name) # Success, close circuit if open + return True, data, None + except json.JSONDecodeError: + # If not JSON, return text + text = await response.text() + self._close_circuit(registry_name) + return True, text, None + else: + error_msg = f"HTTP {response.status}" + try: + error_text = await response.text() + if error_text: + error_msg += f": {error_text}" + except: + pass + + # Open circuit on server errors + if response.status >= 500: + self._open_circuit(registry_name) + + return False, None, error_msg + + except asyncio.TimeoutError: + self._open_circuit(registry_name) + return False, None, "Request timeout" + except aiohttp.ClientError as e: + self._open_circuit(registry_name) + return False, None, f"Connection error: {str(e)}" + except Exception as e: + self._open_circuit(registry_name) + return False, None, f"Unexpected error: {str(e)}" + + async def make_request_with_failover( + self, + method: str, + path: str, + use_primary_only: bool = False, + **kwargs + ) -> FailoverResult: + """ + Make a request with automatic failover to backup registries + + Args: + method: HTTP method (GET, POST, etc.) + path: API path to request + use_primary_only: If True, only try the primary registry + **kwargs: Additional arguments for the request + + Returns: + FailoverResult with success status and details + """ + start_time = asyncio.get_event_loop().time() + attempts = [] + + # Get registries to try + if use_primary_only: + primary = self.health_monitor.get_primary_registry() + registries_to_try = [primary] if primary else [] + else: + registries_to_try = self.health_monitor.get_healthy_registries() + + if not registries_to_try: + return FailoverResult( + success=False, + registry_used=None, + response_data=None, + error="No healthy registries available", + attempts=[], + response_time_ms=None + ) + + # Try each registry with retries + for registry_name in registries_to_try: + for retry in range(self.max_retries_per_registry): + success, response_data, error = await self._make_request( + registry_name, method, path, **kwargs + ) + + if success: + end_time = asyncio.get_event_loop().time() + response_time_ms = (end_time - start_time) * 1000 + + logger.info(f"Request successful using registry '{registry_name}' " + f"(attempt {retry + 1}/{self.max_retries_per_registry})") + + return FailoverResult( + success=True, + registry_used=registry_name, + response_data=response_data, + error=None, + attempts=attempts + [(registry_name, "success")], + response_time_ms=response_time_ms + ) + else: + attempts.append((registry_name, error)) + logger.warning(f"Request failed for registry '{registry_name}' " + f"(attempt {retry + 1}/{self.max_retries_per_registry}): {error}") + + # Wait before retry (except on last attempt) + if retry < self.max_retries_per_registry - 1: + await asyncio.sleep(self.backoff_delay * (retry + 1)) + + # All registries failed + return FailoverResult( + success=False, + registry_used=None, + response_data=None, + error="All registries failed", + attempts=attempts, + response_time_ms=None + ) + + async def get_servers( + self, + query: Optional[str] = None, + tags: Optional[List[str]] = None, + limit: Optional[int] = None + ) -> FailoverResult: + """Get servers from registries with failover""" + params = {} + if query: + params['q'] = query + if tags: + params['tags'] = ','.join(tags) + if limit: + params['limit'] = limit + + return await self.make_request_with_failover( + 'GET', + '/api/servers', + params=params + ) + + async def get_server_details(self, server_id: str) -> FailoverResult: + """Get server details with failover""" + return await self.make_request_with_failover( + 'GET', + f'/api/servers/{server_id}' + ) + + async def search_tools( + self, + query: str, + limit: Optional[int] = None + ) -> FailoverResult: + """Search tools across registries with failover""" + params = {'q': query} + if limit: + params['limit'] = limit + + return await self.make_request_with_failover( + 'GET', + '/api/tools/search', + params=params + ) + + async def get_registry_status(self) -> FailoverResult: + """Get registry status information""" + return await self.make_request_with_failover( + 'GET', + '/api/status' + ) + + async def register_server(self, server_data: Dict[str, Any]) -> FailoverResult: + """Register a server with failover (tries primary registry only)""" + return await self.make_request_with_failover( + 'POST', + '/api/servers', + json=server_data, + use_primary_only=True # Only use primary for write operations + ) + + async def update_server(self, server_id: str, server_data: Dict[str, Any]) -> FailoverResult: + """Update a server with failover (tries primary registry only)""" + return await self.make_request_with_failover( + 'PUT', + f'/api/servers/{server_id}', + json=server_data, + use_primary_only=True + ) + + async def delete_server(self, server_id: str) -> FailoverResult: + """Delete a server with failover (tries primary registry only)""" + return await self.make_request_with_failover( + 'DELETE', + f'/api/servers/{server_id}', + use_primary_only=True + ) + + async def bulk_import_servers( + self, + source_registry: Optional[str] = None + ) -> FailoverResult: + """ + Bulk import servers from a backup registry to primary + + This is useful for disaster recovery scenarios + """ + primary = self.health_monitor.get_primary_registry() + if not primary: + return FailoverResult( + success=False, + registry_used=None, + response_data=None, + error="No primary registry available for import", + attempts=[], + response_time_ms=None + ) + + # Get source registry (use first backup if not specified) + if not source_registry: + backups = self.health_monitor.get_backup_registries() + if not backups: + return FailoverResult( + success=False, + registry_used=None, + response_data=None, + error="No backup registries available for import", + attempts=[], + response_time_ms=None + ) + source_registry = backups[0] + + # Get servers from source registry + source_result = await self.make_request_with_failover( + 'GET', '/api/servers' + ) + + if not source_result.success: + return FailoverResult( + success=False, + registry_used=None, + response_data=None, + error=f"Failed to get servers from source registry: {source_result.error}", + attempts=source_result.attempts, + response_time_ms=None + ) + + servers = source_result.response_data.get('servers', []) + imported_count = 0 + failed_count = 0 + + # Import each server to primary registry + for server in servers: + import_result = await self.register_server(server) + if import_result.success: + imported_count += 1 + else: + failed_count += 1 + logger.warning(f"Failed to import server {server.get('name', 'unknown')}: {import_result.error}") + + return FailoverResult( + success=True, + registry_used=primary, + response_data={ + "imported_count": imported_count, + "failed_count": failed_count, + "total_servers": len(servers), + "source_registry": source_registry + }, + error=None, + attempts=[], + response_time_ms=None + ) + + def get_failover_stats(self) -> Dict[str, Any]: + """Get failover statistics and current state""" + healthy_registries = self.health_monitor.get_healthy_registries() + primary = self.health_monitor.get_primary_registry() + backup_count = len(self.health_monitor.get_backup_registries()) + + circuit_breaker_info = {} + now = datetime.now(timezone.utc) + for registry_name, circuit_time in self.circuit_breaker_state.items(): + remaining_seconds = self.circuit_breaker_timeout - (now - circuit_time).total_seconds() + circuit_breaker_info[registry_name] = { + "open_since": circuit_time.isoformat(), + "remaining_seconds": max(0, remaining_seconds) + } + + return { + "healthy_registries_count": len(healthy_registries), + "primary_registry": primary, + "backup_registries_count": backup_count, + "circuit_breakers_open": len(circuit_breaker_info), + "circuit_breaker_details": circuit_breaker_info, + "failover_available": backup_count > 0, + "system_status": "healthy" if primary else "degraded" + } \ No newline at end of file diff --git a/registry/registry_health_monitor.py b/registry/registry_health_monitor.py new file mode 100644 index 0000000..66bece9 --- /dev/null +++ b/registry/registry_health_monitor.py @@ -0,0 +1,478 @@ +""" +MCP Registry Health Monitoring and Failover System + +This module provides comprehensive health monitoring and failover capabilities +for external MCP registries, ensuring high availability and resilience. +""" + +import asyncio +import json +import logging +import time +from datetime import datetime, timezone, timedelta +from enum import Enum +from pathlib import Path +from typing import Dict, List, Optional, Set, Tuple, Any +from dataclasses import dataclass, asdict +from urllib.parse import urlparse + +import aiohttp +import aiofiles + +logger = logging.getLogger(__name__) + + +class RegistryStatus(Enum): + """Registry health status enumeration""" + HEALTHY = "healthy" + DEGRADED = "degraded" + UNHEALTHY = "unhealthy" + UNKNOWN = "unknown" + MAINTENANCE = "maintenance" + + +@dataclass +class RegistryConfig: + """Configuration for an external MCP registry""" + name: str + url: str + priority: int = 1 # Lower number = higher priority + timeout_seconds: int = 10 + retry_attempts: int = 3 + retry_delay_seconds: int = 5 + health_check_interval_seconds: int = 30 + enabled: bool = True + api_key: Optional[str] = None + headers: Optional[Dict[str, str]] = None + + +@dataclass +class RegistryHealthMetrics: + """Health metrics for a registry""" + status: RegistryStatus + last_check_time: datetime + response_time_ms: Optional[float] + success_rate: float # Percentage over last N checks + consecutive_failures: int + last_error: Optional[str] + uptime_percentage: float # Over last 24 hours + total_requests: int + successful_requests: int + + +@dataclass +class HealthCheckResult: + """Result of a single health check""" + registry_name: str + status: RegistryStatus + response_time_ms: Optional[float] + error: Optional[str] + timestamp: datetime + details: Optional[Dict[str, Any]] = None + + +class RegistryHealthMonitor: + """ + Monitors health of multiple MCP registries and provides failover capabilities + """ + + def __init__(self, config_file_path: Optional[Path] = None): + self.registries: Dict[str, RegistryConfig] = {} + self.health_metrics: Dict[str, RegistryHealthMetrics] = {} + self.health_history: Dict[str, List[HealthCheckResult]] = {} + self.config_file_path = config_file_path + self.monitoring_task: Optional[asyncio.Task] = None + self.is_monitoring = False + self.session: Optional[aiohttp.ClientSession] = None + + # Failover configuration + self.max_history_size = 1000 + self.health_check_timeout = 30 + self.circuit_breaker_threshold = 5 # Consecutive failures before marking unhealthy + + # Callbacks for status changes + self.status_change_callbacks: List[callable] = [] + + async def initialize(self): + """Initialize the health monitor""" + self.session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self.health_check_timeout) + ) + + if self.config_file_path and self.config_file_path.exists(): + await self.load_config() + + logger.info("Registry health monitor initialized") + + async def cleanup(self): + """Clean up resources""" + if self.monitoring_task: + self.monitoring_task.cancel() + try: + await self.monitoring_task + except asyncio.CancelledError: + pass + + if self.session: + await self.session.close() + + logger.info("Registry health monitor cleaned up") + + async def load_config(self): + """Load registry configurations from file""" + try: + async with aiofiles.open(self.config_file_path, 'r') as f: + content = await f.read() + config_data = json.loads(content) + + for registry_data in config_data.get("registries", []): + config = RegistryConfig(**registry_data) + await self.add_registry(config) + + logger.info(f"Loaded {len(self.registries)} registry configurations") + except Exception as e: + logger.error(f"Failed to load registry config: {e}") + + async def save_config(self): + """Save current registry configurations to file""" + if not self.config_file_path: + return + + try: + config_data = { + "registries": [asdict(config) for config in self.registries.values()], + "updated_at": datetime.now(timezone.utc).isoformat() + } + + # Ensure directory exists + self.config_file_path.parent.mkdir(parents=True, exist_ok=True) + + async with aiofiles.open(self.config_file_path, 'w') as f: + await f.write(json.dumps(config_data, indent=2)) + + logger.info("Registry configuration saved") + except Exception as e: + logger.error(f"Failed to save registry config: {e}") + + async def add_registry(self, config: RegistryConfig): + """Add a new registry to monitor""" + self.registries[config.name] = config + + # Initialize health metrics + self.health_metrics[config.name] = RegistryHealthMetrics( + status=RegistryStatus.UNKNOWN, + last_check_time=datetime.now(timezone.utc), + response_time_ms=None, + success_rate=0.0, + consecutive_failures=0, + last_error=None, + uptime_percentage=0.0, + total_requests=0, + successful_requests=0 + ) + + self.health_history[config.name] = [] + + # Perform initial health check + await self.check_registry_health(config.name) + + # Save configuration + await self.save_config() + + logger.info(f"Added registry '{config.name}' for monitoring") + + async def remove_registry(self, registry_name: str): + """Remove a registry from monitoring""" + if registry_name in self.registries: + del self.registries[registry_name] + del self.health_metrics[registry_name] + del self.health_history[registry_name] + + await self.save_config() + logger.info(f"Removed registry '{registry_name}' from monitoring") + + async def start_monitoring(self): + """Start continuous health monitoring""" + if self.is_monitoring: + return + + self.is_monitoring = True + self.monitoring_task = asyncio.create_task(self._monitoring_loop()) + logger.info("Started registry health monitoring") + + async def stop_monitoring(self): + """Stop health monitoring""" + self.is_monitoring = False + if self.monitoring_task: + self.monitoring_task.cancel() + try: + await self.monitoring_task + except asyncio.CancelledError: + pass + logger.info("Stopped registry health monitoring") + + async def _monitoring_loop(self): + """Main monitoring loop""" + while self.is_monitoring: + try: + # Check all enabled registries + check_tasks = [] + for name, config in self.registries.items(): + if config.enabled: + check_tasks.append(self.check_registry_health(name)) + + if check_tasks: + await asyncio.gather(*check_tasks, return_exceptions=True) + + # Wait for the shortest interval among all registries + min_interval = min( + config.health_check_interval_seconds + for config in self.registries.values() + if config.enabled + ) if self.registries else 60 + + await asyncio.sleep(min_interval) + + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in monitoring loop: {e}") + await asyncio.sleep(10) # Back off on error + + async def check_registry_health(self, registry_name: str) -> HealthCheckResult: + """Perform health check on a specific registry""" + config = self.registries.get(registry_name) + if not config: + raise ValueError(f"Registry '{registry_name}' not found") + + start_time = time.time() + result = HealthCheckResult( + registry_name=registry_name, + status=RegistryStatus.UNKNOWN, + response_time_ms=None, + error=None, + timestamp=datetime.now(timezone.utc) + ) + + try: + # Prepare headers + headers = config.headers.copy() if config.headers else {} + if config.api_key: + headers['Authorization'] = f'Bearer {config.api_key}' + + # Perform health check request + health_url = f"{config.url.rstrip('/')}/health" + + timeout = aiohttp.ClientTimeout(total=config.timeout_seconds) + async with self.session.get(health_url, headers=headers, timeout=timeout) as response: + response_time = (time.time() - start_time) * 1000 + result.response_time_ms = response_time + + if response.status == 200: + result.status = RegistryStatus.HEALTHY + + # Try to parse response for additional details + try: + data = await response.json() + result.details = data + except: + pass + + elif response.status in [502, 503, 504]: + result.status = RegistryStatus.DEGRADED + result.error = f"HTTP {response.status}" + else: + result.status = RegistryStatus.UNHEALTHY + result.error = f"HTTP {response.status}" + + except asyncio.TimeoutError: + result.status = RegistryStatus.UNHEALTHY + result.error = "Timeout" + except aiohttp.ClientError as e: + result.status = RegistryStatus.UNHEALTHY + result.error = f"Connection error: {str(e)}" + except Exception as e: + result.status = RegistryStatus.UNHEALTHY + result.error = f"Unexpected error: {str(e)}" + + # Update metrics + await self._update_health_metrics(result) + + return result + + async def _update_health_metrics(self, result: HealthCheckResult): + """Update health metrics based on check result""" + metrics = self.health_metrics[result.registry_name] + + # Update basic metrics + metrics.last_check_time = result.timestamp + metrics.response_time_ms = result.response_time_ms + metrics.last_error = result.error + metrics.total_requests += 1 + + # Update success tracking + if result.status == RegistryStatus.HEALTHY: + metrics.successful_requests += 1 + metrics.consecutive_failures = 0 + else: + metrics.consecutive_failures += 1 + + # Calculate success rate + metrics.success_rate = (metrics.successful_requests / metrics.total_requests) * 100 + + # Update status with circuit breaker logic + old_status = metrics.status + if metrics.consecutive_failures >= self.circuit_breaker_threshold: + metrics.status = RegistryStatus.UNHEALTHY + else: + metrics.status = result.status + + # Calculate uptime percentage (last 24 hours) + await self._calculate_uptime(result.registry_name) + + # Store in history + self.health_history[result.registry_name].append(result) + + # Trim history to max size + if len(self.health_history[result.registry_name]) > self.max_history_size: + self.health_history[result.registry_name] = \ + self.health_history[result.registry_name][-self.max_history_size:] + + # Notify status change + if old_status != metrics.status: + await self._notify_status_change(result.registry_name, old_status, metrics.status) + + async def _calculate_uptime(self, registry_name: str): + """Calculate uptime percentage for the last 24 hours""" + now = datetime.now(timezone.utc) + since = now - timedelta(hours=24) + + history = self.health_history[registry_name] + recent_checks = [ + check for check in history + if check.timestamp >= since + ] + + if not recent_checks: + return + + healthy_checks = len([ + check for check in recent_checks + if check.status == RegistryStatus.HEALTHY + ]) + + uptime_percentage = (healthy_checks / len(recent_checks)) * 100 + self.health_metrics[registry_name].uptime_percentage = uptime_percentage + + async def _notify_status_change(self, registry_name: str, old_status: RegistryStatus, new_status: RegistryStatus): + """Notify registered callbacks about status changes""" + logger.info(f"Registry '{registry_name}' status changed: {old_status.value} -> {new_status.value}") + + for callback in self.status_change_callbacks: + try: + if asyncio.iscoroutinefunction(callback): + await callback(registry_name, old_status, new_status) + else: + callback(registry_name, old_status, new_status) + except Exception as e: + logger.error(f"Error in status change callback: {e}") + + def add_status_change_callback(self, callback: callable): + """Add a callback for status change notifications""" + self.status_change_callbacks.append(callback) + + def remove_status_change_callback(self, callback: callable): + """Remove a status change callback""" + if callback in self.status_change_callbacks: + self.status_change_callbacks.remove(callback) + + def get_healthy_registries(self) -> List[str]: + """Get list of currently healthy registries, ordered by priority""" + healthy = [] + for name, metrics in self.health_metrics.items(): + if (metrics.status == RegistryStatus.HEALTHY and + self.registries[name].enabled): + healthy.append(name) + + # Sort by priority (lower number = higher priority) + healthy.sort(key=lambda name: self.registries[name].priority) + return healthy + + def get_primary_registry(self) -> Optional[str]: + """Get the primary (highest priority, healthy) registry""" + healthy = self.get_healthy_registries() + return healthy[0] if healthy else None + + def get_backup_registries(self) -> List[str]: + """Get backup registries (healthy but not primary)""" + healthy = self.get_healthy_registries() + return healthy[1:] if len(healthy) > 1 else [] + + def get_registry_metrics(self, registry_name: Optional[str] = None) -> Dict[str, RegistryHealthMetrics]: + """Get health metrics for specific registry or all registries""" + if registry_name: + return {registry_name: self.health_metrics.get(registry_name)} + return self.health_metrics.copy() + + def get_registry_history(self, registry_name: str, limit: Optional[int] = None) -> List[HealthCheckResult]: + """Get health check history for a registry""" + history = self.health_history.get(registry_name, []) + if limit: + return history[-limit:] + return history.copy() + + async def force_health_check(self, registry_name: Optional[str] = None): + """Force immediate health check for one or all registries""" + if registry_name: + if registry_name in self.registries: + await self.check_registry_health(registry_name) + else: + check_tasks = [ + self.check_registry_health(name) + for name in self.registries.keys() + ] + await asyncio.gather(*check_tasks, return_exceptions=True) + + async def update_registry_config(self, registry_name: str, updates: Dict[str, Any]): + """Update configuration for a registry""" + if registry_name not in self.registries: + raise ValueError(f"Registry '{registry_name}' not found") + + config = self.registries[registry_name] + + # Update configuration + for key, value in updates.items(): + if hasattr(config, key): + setattr(config, key, value) + + await self.save_config() + logger.info(f"Updated configuration for registry '{registry_name}'") + + def get_system_health_summary(self) -> Dict[str, Any]: + """Get overall system health summary""" + total_registries = len(self.registries) + enabled_registries = len([r for r in self.registries.values() if r.enabled]) + healthy_registries = len([ + name for name, metrics in self.health_metrics.items() + if metrics.status == RegistryStatus.HEALTHY and self.registries[name].enabled + ]) + + primary_registry = self.get_primary_registry() + backup_count = len(self.get_backup_registries()) + + overall_status = "healthy" + if healthy_registries == 0: + overall_status = "critical" + elif healthy_registries < enabled_registries * 0.5: + overall_status = "degraded" + + return { + "overall_status": overall_status, + "total_registries": total_registries, + "enabled_registries": enabled_registries, + "healthy_registries": healthy_registries, + "primary_registry": primary_registry, + "backup_registries_count": backup_count, + "monitoring_active": self.is_monitoring, + "last_update": datetime.now(timezone.utc).isoformat() + } \ No newline at end of file diff --git a/registry/servers/external_registries.json b/registry/servers/external_registries.json new file mode 100644 index 0000000..7215d39 --- /dev/null +++ b/registry/servers/external_registries.json @@ -0,0 +1,33 @@ +{ + "registries": [ + { + "name": "mcp-run-registry", + "url": "https://api.mcp.run", + "priority": 1, + "timeout_seconds": 15, + "retry_attempts": 3, + "retry_delay_seconds": 5, + "health_check_interval_seconds": 60, + "enabled": false, + "api_key": null, + "headers": { + "User-Agent": "MCP-Gateway/1.0" + } + }, + { + "name": "mkinf-registry", + "url": "https://api.mkinf.com", + "priority": 2, + "timeout_seconds": 10, + "retry_attempts": 2, + "retry_delay_seconds": 3, + "health_check_interval_seconds": 120, + "enabled": false, + "api_key": null, + "headers": { + "User-Agent": "MCP-Gateway/1.0" + } + } + ], + "updated_at": "2024-01-15T10:00:00Z" +} \ No newline at end of file