diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index 54c6a0d..0217f2e 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -1 +1 @@ -@../cursor_rules.md \ No newline at end of file +Rules for Claude are in @AGENTS.md \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..897a981 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,123 @@ +# Project structure, technologies and architecture conventions + +## Technologies used + +- **uv** - Fast Python package installer and resolver +- **Python 3.13** - Modern Python with type hints and performance improvements +- **FastMCP** - MCP server implementation +- **Pytest** - Testing framework with fixtures and plugins +- **Pydantic** - Data validation using Python type annotations +- **Structlog** - Structured logging for better observability + +## Dependencies Management + +1. **Use pyproject.toml with uv** + - Use `pyproject.toml` for dependency management, not requirements.txt + - Works well with `uv` for fast, reliable package management + - Properly specify dependencies with version constraints + - Use `uv sync` to install dependencies + +2. **Example pyproject.toml** + ```toml + [build-system] + requires = ["setuptools>=42", "wheel"] + build-backend = "setuptools.build_meta" + + [project] + name = "my-mcp-server" + version = "0.1.0" + description = "My MCP server" + requires-python = ">=3.9" + dependencies = [ + "mcp>=0.2.0", + "requests>=2.28.0", + ] + ``` + +## Server Implementation Guidelines + +1. **Do NOT use uvicorn or fastapi with MCP/FastMCP** + - MCP has its own server implementation + - FastMCP/MCP can run directly using `mcp.run()` with no need for external web servers + - Avoid adding uvicorn or fastapi to dependencies + - Do not use `uvicorn.run(...)` in code + +2. **Use the correct server method** + - Use `mcp.run()` to start the server (no additional parameters needed for stdio transport) + - Example: `mcp.run()` instead of `uvicorn.run(mcp.app, ...)` + +3. **Dependencies** + - Only include required dependencies + - For basic MCP implementation, only `mcp` or `fastmcp` and possibly `requests` are needed + - Do not include web server packages unnecessarily + +## Code Organization and Imports + +1. **Use `src` as the root code directory** + - Ensure all code is placed within the `src` directory + - Handle imports accordingly by using the appropriate package path + - Example: `from src.gitguardian.your_module import YourClass` + +2. **FastMCP imports must use the correct package path** + - All imports concerning FastMCP must be done under `mcp.server.fastmcp` + - Example: `from mcp.server.fastmcp import FastMCP` instead of direct imports + +This guide ensures all MCP implementations follow the project's standards of using native capabilities rather than +external web servers. + +# Lower level Python rules + +## General Guidelines + +- Follow PEP 8 style guidelines +- Add docstrings to all public functions and classes +- Keep imports organized and sorted +- Don't use lazy/deferred imports, except inside Celery tasks definitions + +## Running tests + +To run the tests, `uv run pytest` + +## Creating tests : + +- We use pytest, and plain classes. Don't use unittest classes +- Use project fixtures (`test_account`, `owner_client`, `api_client`) instead of creating new instances +- file naming convention : `test_.py` +- don't use self.assertEqual (and so on) helpers but only assert +- The docstring of every test function must follow this format : + +``` +""" +GIVEN ... +WHEN ... +THEN ... +""" +``` + +## Variable names + +Use clear variable and function names that indicate their purpose without being overly verbose. +Follow established conventions where appropriate (e.g., `i` for loop indices, `df` for dataframes), +but ensure non-standard names are self-explanatory. +For example, prefer `calculate_user_discount(price, user_tier)` over `calc(p, t)`, but `i` is perfectly fine for a +simple loop counter. + +## **all** Lists + +- Add an `__all__` list when using `import *` imports +- When provided, the `__all__ = []` list must be ordered alphabetically +- When adding new items to `__all__`, maintain alphabetical order +- Example: + ```python + __all__ = [ + "ClassA", + "ClassB", + "function_a", + "function_b", + ] + ``` + +## Typing + +- Use typing for function signatures and return values +- Don't use `from typing import List` but rather `list` \ No newline at end of file diff --git a/cursor_rules.md b/cursor_rules.md deleted file mode 100644 index c91a099..0000000 --- a/cursor_rules.md +++ /dev/null @@ -1,72 +0,0 @@ -# Cursor Rules for FastMCP/MCP Development - -## Server Implementation Guidelines - -1. **Do NOT use uvicorn or fastapi with MCP/FastMCP** - - MCP has its own server implementation - - FastMCP/MCP can run directly using `mcp.run()` with no need for external web servers - - Avoid adding uvicorn or fastapi to dependencies - - Do not use `uvicorn.run(...)` in code - -2. **Use the correct server method** - - Use `mcp.run()` to start the server (no additional parameters needed for stdio transport) - - Example: `mcp.run()` instead of `uvicorn.run(mcp.app, ...)` - -3. **Dependencies** - - Only include required dependencies - - For basic MCP implementation, only `mcp` or `fastmcp` and possibly `requests` are needed - - Do not include web server packages unnecessarily - -## Dependencies Management - -1. **Use pyproject.toml with uv** - - Use `pyproject.toml` for dependency management, not requirements.txt - - Works well with `uv` for fast, reliable package management - - Properly specify dependencies with version constraints - - Use `uv sync` to install dependencies - -2. **Example pyproject.toml** - ```toml - [build-system] - requires = ["setuptools>=42", "wheel"] - build-backend = "setuptools.build_meta" - - [project] - name = "my-mcp-server" - version = "0.1.0" - description = "My MCP server" - requires-python = ">=3.9" - dependencies = [ - "mcp>=0.2.0", - "requests>=2.28.0", - ] - ``` - -## Code Organization and Imports - -1. **Use `src` as the root code directory** - - Ensure all code is placed within the `src` directory - - Handle imports accordingly by using the appropriate package path - - Example: `from src.gitguardian.your_module import YourClass` - -2. **FastMCP imports must use the correct package path** - - All imports concerning FastMCP must be done under `mcp.server.fastmcp` - - Example: `from mcp.server.fastmcp import FastMCP` instead of direct imports - -This guide ensures all MCP implementations follow the project's standards of using native capabilities rather than external web servers. - -## Example of correct implementation: - -```python -from mcp.server.fastmcp import FastMCP - -mcp = FastMCP("MyServer") - -@mcp.tool() -def my_tool(): - return "Hello world" - -if __name__ == "__main__": - # Correct way to run an MCP server - no need for uvicorn - mcp.run() -``` \ No newline at end of file diff --git a/packages/gg_api_core/pyproject.toml b/packages/gg_api_core/pyproject.toml index cc1acdd..062bcaf 100644 --- a/packages/gg_api_core/pyproject.toml +++ b/packages/gg_api_core/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "mcp[cli]>=1.9.0", "python-dotenv>=1.0.0", "pydantic-settings>=2.0.0", + "jinja2>=3.1.0", ] [build-system] diff --git a/packages/gg_api_core/src/gg_api_core/client.py b/packages/gg_api_core/src/gg_api_core/client.py index 2eea662..19958d0 100644 --- a/packages/gg_api_core/src/gg_api_core/client.py +++ b/packages/gg_api_core/src/gg_api_core/client.py @@ -10,7 +10,6 @@ import httpx from gg_api_core.host import is_self_hosted_instance -from gg_api_core.scopes import DEVELOPER_SCOPES from urllib.parse import urlparse # Setup logger @@ -49,6 +48,48 @@ class IncidentValidity(str, Enum): UNKNOWN = "unknown" +class TagNames(str, Enum): + REGRESSION = "REGRESSION" # Issue is a regression + HIST = ( + "HIST" + ) # Occurrence is visible and its Kind is history + PUBLICLY_EXPOSED = ( + "PUBLICLY_EXPOSED" + ) # Occurrence is visible and source is a public GitHub + TEST_FILE = ( + "TEST_FILE" + ) # Occurrence is visible and one of its insights is `test_file` + SENSITIVE_FILE = ( + "SENSITIVE_FILE" + ) # Occurrence is visible and one of its insights is `sensitive_filepath` + # DEPRECATED: Replaced by CHECK_RUN_SKIP_FALSE_POSITIVE but still there until we + # remove it from the public_api + DEPRECATED_IGNORED_IN_CHECK_RUN = ( + "IGNORED_IN_CHECK_RUN" + ) # Occurrence is visible and its GitHub check run a ignored + CHECK_RUN_SKIP_FALSE_POSITIVE = ( + "CHECK_RUN_SKIP_FALSE_POSITIVE" + ) + CHECK_RUN_SKIP_LOW_RISK = ( + "CHECK_RUN_SKIP_LOW_RISK" + ) + CHECK_RUN_SKIP_TEST_CRED = ( + "CHECK_RUN_SKIP_TEST_CRED" + ) + DEFAULT_BRANCH = ( + "DEFAULT_BRANCH" + ) # Occurrence is on the default branch of the repository + PUBLICLY_LEAKED = ( + "PUBLICLY_LEAKED" + ) # Issue's secret is publicly leaked outside the account perimeter + FALSE_POSITIVE = ( + "FALSE_POSITIVE" + ) + REVOCABLE_BY_GG = ( + "REVOCABLE_BY_GG" + ) + + class GitGuardianClient: """Client for interacting with the GitGuardian API.""" @@ -195,7 +236,7 @@ async def _ensure_oauth_token(self): return logger.warning("Acquired OAuth lock, proceeding with authentication") - logger.info(f" Client API URL: {self.public_api_url}") + logger.info(f" Client API URL: {self.public_api_url}") # TODO(TIM) logger.info(f" Client Dashboard URL: {self.dashboard_url}") logger.info(f" Client Server Name: {getattr(self, 'server_name', 'None')}") diff --git a/packages/gg_api_core/src/gg_api_core/oauth.py b/packages/gg_api_core/src/gg_api_core/oauth.py index 724fadb..5ab2ec6 100644 --- a/packages/gg_api_core/src/gg_api_core/oauth.py +++ b/packages/gg_api_core/src/gg_api_core/oauth.py @@ -14,10 +14,35 @@ from mcp.client.auth import TokenStorage from mcp.shared.auth import OAuthClientInformationFull, OAuthToken +from pydantic import BaseModel, Field # Configure logger logger = logging.getLogger(__name__) + +class APITokenInfo(BaseModel): + """Pydantic model representing the /api_tokens/self endpoint response.""" + + id: str = Field(description="Unique identifier for the API token") + name: str = Field(description="Name of the API token") + workspace_id: int = Field(description="ID of the workspace this token belongs to") + type: str = Field(description="Type of token (e.g., 'personal_access_token')") + status: str = Field(description="Status of the token (e.g., 'active', 'revoked')") + created_at: datetime.datetime = Field(description="Timestamp when the token was created") + last_used_at: Optional[datetime.datetime] = Field( + default=None, description="Timestamp of last usage, or None if never used" + ) + expire_at: Optional[datetime.datetime] = Field( + default=None, description="Expiration timestamp, or None if token never expires" + ) + revoked_at: Optional[datetime.datetime] = Field( + default=None, description="Timestamp when the token was revoked, or None if active" + ) + member_id: int = Field(description="ID of the member associated with this token") + creator_id: int = Field(description="ID of the user who created this token") + scopes: list[str] = Field(default_factory=list, description="List of scopes granted to this token") + + # Port range for callback server # Using the same port range as ggshield (29170-29998) to ensure compatibility # with self-hosted GitGuardian instances where the ggshield_oauth client @@ -474,14 +499,10 @@ def _load_saved_token(self): # Set the access token and related info self.access_token = token_data.get("access_token") if self.access_token: - # Store other token information - self.token_info = { - "expires_at": token_data.get("expires_at"), - "scopes": token_data.get("scopes"), - "token_name": token_data.get("token_name"), - } + # Update token name from saved data if available self.token_name = token_data.get("token_name", self.token_name) logger.info(f"Loaded saved token '{self.token_name}' for {self.dashboard_url}") + # Note: self.token_info will be populated when _fetch_token_info() is called else: logger.warning(f"Token data found but no access_token field") except Exception as e: @@ -501,13 +522,20 @@ async def oauth_process(self, login_path: str | None = None) -> str: Exception: If authentication fails """ logger.debug(f"oauth_process() called for token '{self.token_name}'") - + # Check if we already have a valid token loaded - if self.access_token and self.token_info: - logger.info(f"Using existing token '{self.token_name}' - skipping OAuth flow") - return self.access_token - - logger.info(f"No valid token found for '{self.token_name}', starting OAuth authentication flow") + if self.access_token: + # Try to fetch token info to verify the token is still valid + token_info = await self._fetch_token_info() + if token_info: + self.token_info = token_info + logger.info(f"Using existing token '{self.token_name}' - skipping OAuth flow") + return self.access_token + else: + logger.info(f"Saved token for '{self.token_name}' is no longer valid, starting OAuth authentication flow") + self.access_token = None + else: + logger.info(f"No valid token found for '{self.token_name}', starting OAuth authentication flow") # Handle the base URL correctly base_url = self.dashboard_url @@ -682,7 +710,7 @@ async def redirect_handler(authorization_url: str) -> None: # Save the token for future reuse if self.access_token and self.token_info: # Get expiry date from token info or set based on configured lifetime - expires_at = self.token_info.get("expires_at") + expires_at = self.token_info.expire_at.isoformat() if self.token_info.expire_at else None # If no expiry date was returned from the API but we have a token lifetime if not expires_at and self.token_lifetime is not None: @@ -708,7 +736,7 @@ async def redirect_handler(authorization_url: str) -> None: "access_token": self.access_token, "expires_at": expires_at, "token_name": self.token_name, - "scopes": self.token_info.get("scopes", self.scopes), + "scopes": self.token_info.scopes or self.scopes, } # Save to file storage @@ -722,10 +750,14 @@ async def redirect_handler(authorization_url: str) -> None: logger.error(f"OAuth authentication failed: {e}") raise - async def _fetch_token_info(self) -> None: - """Fetch token information from the GitGuardian API.""" + async def _fetch_token_info(self) -> APITokenInfo | None: + """Fetch token information from the GitGuardian API. + + Returns: + APITokenInfo: Pydantic model containing the API token information, or None if failed + """ if not self.access_token: - return + return None try: import httpx # Import here to avoid circular imports @@ -738,16 +770,24 @@ async def _fetch_token_info(self) -> None: ) if response.status_code == 200: - self.token_info = response.json() - logger.info(f"Retrieved token info with scopes: {self.token_info.get('scopes', [])}") + token_data = response.json() + self.token_info = APITokenInfo(**token_data) + logger.info(f"Retrieved token info with scopes: {self.token_info.scopes}") + return self.token_info else: # Log the error but don't raise an exception logger.warning(f"Failed to retrieve token info: HTTP {response.status_code}") if response.content: logger.debug(f"Response content: {response.text}") + return None except Exception as e: logger.warning(f"Failed to retrieve token info: {e}") + return None + + def get_token_info(self) -> APITokenInfo | None: + """Return the token information. - def get_token_info(self) -> dict | None: - """Return the token information.""" + Returns: + APITokenInfo: The API token information, or None if not available + """ return self.token_info diff --git a/packages/gg_api_core/src/gg_api_core/tools/find_current_source_id.py b/packages/gg_api_core/src/gg_api_core/tools/find_current_source_id.py index ba35053..0bd7ae2 100644 --- a/packages/gg_api_core/src/gg_api_core/tools/find_current_source_id.py +++ b/packages/gg_api_core/src/gg_api_core/tools/find_current_source_id.py @@ -1,13 +1,42 @@ from typing import Any import logging import subprocess +from pydantic import BaseModel, Field from gg_api_core.utils import get_client, parse_repo_url logger = logging.getLogger(__name__) +class SourceCandidate(BaseModel): + """A candidate source that might match the repository.""" + id: str | int = Field(description="Source ID") + url: str | None = Field(default=None, description="Repository URL") + name: str | None = Field(default=None, description="Repository name") + monitored: bool | None = Field(default=None, description="Whether source is monitored") + deleted_at: str | None = Field(default=None, description="Deletion timestamp if deleted") -async def find_current_source_id() -> dict[str, Any]: + +class FindCurrentSourceIdResult(BaseModel): + """Successful result from finding source ID.""" + repository_name: str = Field(description="Detected repository name") + source_id: str | int | None = Field(default=None, description="GitGuardian source ID (if exact match)") + source: dict[str, Any] | None = Field(default=None, description="Full source information (if exact match)") + message: str | None = Field(default=None, description="Status or informational message") + suggestion: str | None = Field(default=None, description="Suggestions for next steps") + candidates: list[SourceCandidate] | None = Field(default=None, + description="List of candidate sources (if no exact match)") + + +class FindCurrentSourceIdError(BaseModel): + """Error result from finding source ID.""" + error: str = Field(description="Error message") + repository_name: str | None = Field(default=None, description="Repository name if detected") + details: str | None = Field(default=None, description="Additional error details") + message: str | None = Field(default=None, description="User-friendly message") + suggestion: str | None = Field(default=None, description="Suggestions for resolving the error") + + +async def find_current_source_id() -> FindCurrentSourceIdResult | FindCurrentSourceIdError: """ Find the GitGuardian source_id for the current repository. @@ -19,12 +48,20 @@ async def find_current_source_id() -> dict[str, Any]: 5. If no exact match, returns all search results for the model to choose from Returns: - A dictionary containing: - - repository_name: The detected repository name - - source_id: The GitGuardian source ID (if exact match found) - - source: Full source information from GitGuardian (if exact match found) - - candidates: List of candidate sources (if no exact match but potential matches found) - - error: Error message if something went wrong + FindCurrentSourceIdResult: Pydantic model containing: + - repository_name: The detected repository name + - source_id: The GitGuardian source ID (if exact match found) + - source: Full source information from GitGuardian (if exact match found) + - message: Status or informational message + - suggestion: Suggestions for next steps + - candidates: List of SourceCandidate objects (if no exact match but potential matches found) + + FindCurrentSourceIdError: Pydantic model containing: + - error: Error message + - repository_name: Repository name if detected + - details: Additional error details + - message: User-friendly message + - suggestion: Suggestions for resolving the error """ client = get_client() logger.debug("Finding source_id for current repository") @@ -42,21 +79,21 @@ async def find_current_source_id() -> dict[str, Any]: remote_url = result.stdout.strip() logger.debug(f"Found remote URL: {remote_url}") except subprocess.CalledProcessError as e: - return { - "error": "Not a git repository or no remote 'origin' configured", - "details": str(e), - } + return FindCurrentSourceIdError( + error="Not a git repository or no remote 'origin' configured", + details=str(e), + ) except subprocess.TimeoutExpired: - return {"error": "Git command timed out"} + return FindCurrentSourceIdError(error="Git command timed out") # Parse repository name from remote URL repository_name = parse_repo_url(remote_url) if not repository_name: - return { - "error": f"Could not parse repository URL: {remote_url}", - "details": "The URL format is not recognized. Supported platforms: GitHub, GitLab (Cloud & Self-hosted), Bitbucket (Cloud & Data Center), Azure DevOps", - } + return FindCurrentSourceIdError( + error=f"Could not parse repository URL: {remote_url}", + details="The URL format is not recognized. Supported platforms: GitHub, GitLab (Cloud & Self-hosted), Bitbucket (Cloud & Data Center), Azure DevOps", + ) logger.info(f"Detected repository name: {repository_name}") @@ -67,31 +104,31 @@ async def find_current_source_id() -> dict[str, Any]: if isinstance(result, dict): source_id = result.get("id") logger.info(f"Found exact match with source_id: {source_id}") - return { - "repository_name": repository_name, - "source_id": source_id, - "source": result, - "message": f"Successfully found exact match for GitGuardian source: {repository_name}", - } + return FindCurrentSourceIdResult( + repository_name=repository_name, + source_id=source_id, + source=result, + message=f"Successfully found exact match for GitGuardian source: {repository_name}", + ) # Handle multiple candidates (list result) elif isinstance(result, list) and len(result) > 0: logger.info(f"Found {len(result)} candidate sources for repository: {repository_name}") - return { - "repository_name": repository_name, - "message": f"No exact match found for '{repository_name}', but found {len(result)} potential matches.", - "suggestion": "Review the candidates below and determine which source best matches the current repository based on the name and URL.", - "candidates": [ - { - "id": source.get("id"), - "url": source.get("url"), - "name": source.get("full_name") or source.get("name"), - "monitored": source.get("monitored"), - "deleted_at": source.get("deleted_at"), - } + return FindCurrentSourceIdResult( + repository_name=repository_name, + message=f"No exact match found for '{repository_name}', but found {len(result)} potential matches.", + suggestion="Review the candidates below and determine which source best matches the current repository based on the name and URL.", + candidates=[ + SourceCandidate( + id=source.get("id"), + url=source.get("url"), + name=source.get("full_name") or source.get("name"), + monitored=source.get("monitored"), + deleted_at=source.get("deleted_at"), + ) for source in result ], - } + ) # No matches found at all else: @@ -105,39 +142,39 @@ async def find_current_source_id() -> dict[str, Any]: if isinstance(fallback_result, dict): source_id = fallback_result.get("id") logger.info(f"Found match using repo name only, source_id: {source_id}") - return { - "repository_name": repository_name, - "source_id": source_id, - "source": fallback_result, - "message": f"Found match using repository name '{repo_only}' (without organization prefix)", - } + return FindCurrentSourceIdResult( + repository_name=repository_name, + source_id=source_id, + source=fallback_result, + message=f"Found match using repository name '{repo_only}' (without organization prefix)", + ) elif isinstance(fallback_result, list) and len(fallback_result) > 0: logger.info(f"Found {len(fallback_result)} candidates using repo name only") - return { - "repository_name": repository_name, - "message": f"No exact match for '{repository_name}', but found {len(fallback_result)} potential matches using repo name '{repo_only}'.", - "suggestion": "Review the candidates below and determine which source best matches the current repository.", - "candidates": [ - { - "id": source.get("id"), - "url": source.get("url"), - "name": source.get("full_name") or source.get("name"), - "monitored": source.get("monitored"), - "deleted_at": source.get("deleted_at"), - } + return FindCurrentSourceIdResult( + repository_name=repository_name, + message=f"No exact match for '{repository_name}', but found {len(fallback_result)} potential matches using repo name '{repo_only}'.", + suggestion="Review the candidates below and determine which source best matches the current repository.", + candidates=[ + SourceCandidate( + id=source.get("id"), + url=source.get("url"), + name=source.get("full_name") or source.get("name"), + monitored=source.get("monitored"), + deleted_at=source.get("deleted_at"), + ) for source in fallback_result ], - } + ) # Absolutely no matches found logger.warning(f"No sources found for repository: {repository_name}") - return { - "repository_name": repository_name, - "error": f"Repository '{repository_name}' not found in GitGuardian", - "message": "The repository may not be connected to GitGuardian, or you may not have access to it.", - "suggestion": "Check that the repository is properly connected to GitGuardian and that your account has access to it.", - } + return FindCurrentSourceIdError( + repository_name=repository_name, + error=f"Repository '{repository_name}' not found in GitGuardian", + message="The repository may not be connected to GitGuardian, or you may not have access to it.", + suggestion="Check that the repository is properly connected to GitGuardian and that your account has access to it.", + ) except Exception as e: logger.error(f"Error finding source_id: {str(e)}") - return {"error": f"Failed to find source_id: {str(e)}"} + return FindCurrentSourceIdError(error=f"Failed to find source_id: {str(e)}") diff --git a/packages/gg_api_core/src/gg_api_core/tools/generate_honey_token.py b/packages/gg_api_core/src/gg_api_core/tools/generate_honey_token.py index cfc3889..9afdc16 100644 --- a/packages/gg_api_core/src/gg_api_core/tools/generate_honey_token.py +++ b/packages/gg_api_core/src/gg_api_core/tools/generate_honey_token.py @@ -16,20 +16,47 @@ class GenerateHoneytokenParams(BaseModel): new_token: bool = Field( default=False, description="If False, retrieves an existing active honeytoken created by you instead of generating a new one. " - "If no existing token is found, a new one will be created. " - "To generate a new token, set this to True.", + "If no existing token is found, a new one will be created. " + "To generate a new token, set this to True.", ) -async def generate_honeytoken(params: GenerateHoneytokenParams) -> dict[str, Any]: +class GenerateHoneytokenResult(BaseModel): + """Result from generating or retrieving a honeytoken.""" + model_config = {"extra": "allow"} # Allow additional fields from API + + id: str | int = Field(description="Honeytoken ID") + name: str | None = Field(default=None, description="Honeytoken name") + description: str | None = Field(default=None, description="Honeytoken description") + type: str | None = Field(default=None, description="Honeytoken type") + status: str | None = Field(default=None, description="Honeytoken status") + token: dict[str, Any] | None = Field(default=None, description="Token details if show_token=True") + injection_recommendations: dict[str, Any] | None = Field(default=None, description="Injection recommendations") + + +async def generate_honeytoken(params: GenerateHoneytokenParams) -> GenerateHoneytokenResult: """ Generate an AWS GitGuardian honeytoken and get injection recommendations. + If new_token is False, attempts to retrieve an existing active honeytoken created by the current user + instead of generating a new one. If no existing token is found, a new one will be created. + Args: params: GenerateHoneytokenParams model containing honeytoken configuration Returns: - Honeytoken data and injection recommendations + GenerateHoneytokenResult: Pydantic model containing: + - id: Honeytoken ID + - name: Honeytoken name + - description: Honeytoken description + - type: Honeytoken type + - status: Honeytoken status + - token: Token details (if show_token=True was used) + - injection_recommendations: Instructions for injecting the honeytoken + - Additional fields from the API response + + Raises: + ToolError: If the honeytoken generation or retrieval fails """ client = get_client() logger.debug(f"Processing honeytoken request with name: {params.name}, new_token: {params.new_token}") @@ -67,7 +94,7 @@ async def generate_honeytoken(params: GenerateHoneytokenParams) -> dict[str, Any if honeytoken_id: detailed_token = await client.get_honeytoken(honeytoken_id, show_token=True) logger.debug(f"Retrieved existing honeytoken with ID: {honeytoken_id}") - return detailed_token + return GenerateHoneytokenResult(**detailed_token) logger.debug("No suitable existing honeytokens found, creating a new one") else: @@ -82,7 +109,8 @@ async def generate_honeytoken(params: GenerateHoneytokenParams) -> dict[str, Any {"key": "source", "value": "auto-generated"}, {"key": "type", "value": "aws"}, ] - result = await client.create_honeytoken(name=params.name, description=params.description, custom_tags=custom_tags) + result = await client.create_honeytoken(name=params.name, description=params.description, + custom_tags=custom_tags) # Validate that we got an ID in the response if not result.get("id"): @@ -95,7 +123,7 @@ async def generate_honeytoken(params: GenerateHoneytokenParams) -> dict[str, Any "instructions": "Add the honeytoken to your codebase in configuration files, environment variables, or code comments to detect unauthorized access." } - return result + return GenerateHoneytokenResult(**result) except Exception as e: logger.error(f"Error generating honeytoken: {str(e)}") - raise ToolError(f"Failed to generate honeytoken: {str(e)}") \ No newline at end of file + raise ToolError(f"Failed to generate honeytoken: {str(e)}") diff --git a/packages/gg_api_core/src/gg_api_core/tools/list_honey_tokens.py b/packages/gg_api_core/src/gg_api_core/tools/list_honey_tokens.py index d96e245..05595e6 100644 --- a/packages/gg_api_core/src/gg_api_core/tools/list_honey_tokens.py +++ b/packages/gg_api_core/src/gg_api_core/tools/list_honey_tokens.py @@ -18,22 +18,33 @@ class ListHoneytokensParams(BaseModel): default=None, description="Sort field (e.g., 'name', '-name', 'created_at', '-created_at')" ) show_token: bool = Field(default=False, description="Whether to include token details in the response") - creator_id: str | None = Field(default=None, description="Filter by creator ID") - creator_api_token_id: str | None = Field(default=None, description="Filter by creator API token ID") + creator_id: str | int | None = Field(default=None, description="Filter by creator ID") + creator_api_token_id: str | int | None = Field(default=None, description="Filter by creator API token ID") per_page: int = Field(default=20, description="Number of results per page (default: 20, min: 1, max: 100)") get_all: bool = Field(default=False, description="If True, fetch all results using cursor-based pagination") mine: bool = Field(default=False, description="If True, fetch honeytokens created by the current user") -async def list_honeytokens(params: ListHoneytokensParams) -> list[dict[str, Any]]: +class ListHoneytokensResult(BaseModel): + """Result from listing honeytokens.""" + honeytokens: list[dict[str, Any]] = Field(description="List of honeytoken objects") + + +async def list_honeytokens(params: ListHoneytokensParams) -> ListHoneytokensResult: """ List honeytokens from the GitGuardian dashboard with filtering options. + If mine=True, filters honeytokens to show only those created by the current user. + Args: params: ListHoneytokensParams model containing all filtering options Returns: - List of honeytokens matching the specified criteria + ListHoneytokensResult: Pydantic model containing: + - honeytokens: List of honeytoken objects matching the specified criteria + + Raises: + ToolError: If the listing operation fails """ client = get_client() logger.debug("Listing honeytokens with filters") @@ -79,7 +90,7 @@ async def list_honeytokens(params: ListHoneytokensParams) -> list[dict[str, Any] honeytokens = result logger.debug(f"Found {len(honeytokens)} honeytokens") - return honeytokens + return ListHoneytokensResult(honeytokens=honeytokens) except Exception as e: logger.error(f"Error listing honeytokens: {str(e)}") raise ToolError(str(e)) diff --git a/packages/gg_api_core/src/gg_api_core/tools/list_repo_incidents.py b/packages/gg_api_core/src/gg_api_core/tools/list_repo_incidents.py index d72a274..e458ab9 100644 --- a/packages/gg_api_core/src/gg_api_core/tools/list_repo_incidents.py +++ b/packages/gg_api_core/src/gg_api_core/tools/list_repo_incidents.py @@ -3,14 +3,93 @@ from pydantic import BaseModel, Field +from gg_api_core.client import IncidentSeverity, IncidentStatus, IncidentValidity, TagNames from gg_api_core.utils import get_client logger = logging.getLogger(__name__) -DEFAULT_EXCLUDED_TAGS = ["TEST_FILE", "FALSE_POSITIVE", "CHECK_RUN_SKIP_FALSE_POSITIVE", "CHECK_RUN_SKIP_LOW_RISK", "CHECK_RUN_SKIP_TEST_CRED"] -DEFAULT_SEVERITIES = ["critical", "high", "medium"] -DEFAULT_STATUSES = ["TRIGGERED", "ASSIGNED", "RESOLVED"] # We exclude "IGNORED" ones -DEFAULT_VALIDITIES = ["valid", "failed_to_check", "no_checker", "unknown"] # We exclude "invalid" ones +DEFAULT_EXCLUDED_TAGS = [ + TagNames.TEST_FILE, + TagNames.FALSE_POSITIVE, + TagNames.CHECK_RUN_SKIP_FALSE_POSITIVE, + TagNames.CHECK_RUN_SKIP_LOW_RISK, + TagNames.CHECK_RUN_SKIP_TEST_CRED, +] +DEFAULT_SEVERITIES = [ + IncidentSeverity.CRITICAL, + IncidentSeverity.HIGH, + IncidentSeverity.MEDIUM, +] +DEFAULT_STATUSES = [ + IncidentStatus.TRIGGERED, + IncidentStatus.ASSIGNED, + IncidentStatus.RESOLVED, +] # We exclude "IGNORED" ones +DEFAULT_VALIDITIES = [ + IncidentValidity.VALID, + IncidentValidity.FAILED_TO_CHECK, + IncidentValidity.NO_CHECKER, + IncidentValidity.UNKNOWN, +] # We exclude "INVALID" ones + + +def _build_filter_info(params: "ListRepoIncidentsParams") -> dict[str, Any]: + """Build a dictionary describing the filters applied to the query.""" + filters = {} + + # Include all active filters + if params.from_date: + filters["from_date"] = params.from_date + if params.to_date: + filters["to_date"] = params.to_date + if params.presence: + filters["presence"] = params.presence + if params.tags: + filters["tags_include"] = [tag.value if hasattr(tag, 'value') else tag for tag in params.tags] + if params.exclude_tags: + filters["exclude_tags"] = [tag.value if hasattr(tag, 'value') else tag for tag in params.exclude_tags] + if params.status: + filters["status"] = [st.value if hasattr(st, 'value') else st for st in params.status] + if params.severity: + filters["severity"] = [sev.value if hasattr(sev, 'value') else sev for sev in params.severity] + if params.validity: + filters["validity"] = [v.value if hasattr(v, 'value') else v for v in params.validity] + if not params.mine: + filters["assigned_to_me"] = False + + return filters + + +def _build_suggestion(params: "ListRepoIncidentsParams", incidents_count: int) -> str: + """Build a suggestion message based on applied filters and results.""" + suggestions = [] + + # Explain what's being filtered + if params.mine: + suggestions.append("Filtering to incidents assigned to current user") + + if params.exclude_tags: + excluded_tag_names = [tag.name if hasattr(tag, 'name') else tag for tag in params.exclude_tags] + suggestions.append(f"Incidents are filtered to exclude tags: {', '.join(excluded_tag_names)}") + + if params.status: + status_names = [st.name if hasattr(st, 'name') else st for st in params.status] + suggestions.append(f"Filtered by status: {', '.join(status_names)}") + + if params.severity: + sev_names = [sev.name if hasattr(sev, 'name') else sev for sev in params.severity] + suggestions.append(f"Filtered by severity: {', '.join(sev_names)}") + + if params.validity: + val_names = [v.name if hasattr(v, 'name') else v for v in params.validity] + suggestions.append(f"Filtered by validity: {', '.join(val_names)}") + + # If no results, suggest how to get more + if incidents_count == 0 and suggestions: + suggestions.append( + "No incidents matched the applied filters. Try with mine=False, exclude_tags=[], or different status/severity/validity filters to see all incidents.") + + return "\n".join(suggestions) if suggestions else "" class ListRepoIncidentsParams(BaseModel): @@ -19,10 +98,16 @@ class ListRepoIncidentsParams(BaseModel): default=None, description="The full repository name. For example, for https://github.com/GitGuardian/ggmcp.git the full name is GitGuardian/ggmcp. Pass the current repository name if not provided. Not required if source_id is provided." ) - source_id: str | None = Field( + source_id: str | int | None = Field( default=None, description="The GitGuardian source ID to filter by. Can be obtained using find_current_source_id. If provided, repository_name is not required." ) + ordering: str | None = Field(default=None, description="Sort field (e.g., 'date', '-date' for descending)") + per_page: int = Field(default=20, description="Number of results per page (default: 20, min: 1, max: 100)") + cursor: str | None = Field(default=None, description="Pagination cursor for fetching next page of results") + get_all: bool = Field(default=False, description="If True, fetch all results using cursor-based pagination") + + # Filters from_date: str | None = Field( default=None, description="Filter occurrences created after this date (ISO format: YYYY-MM-DD)" ) @@ -36,19 +121,32 @@ class ListRepoIncidentsParams(BaseModel): description="Exclude incidents with these tag names." ) status: list[str] | None = Field(default=DEFAULT_STATUSES, description="Filter by status (list of status names)") - ordering: str | None = Field(default=None, description="Sort field (e.g., 'date', '-date' for descending)") - per_page: int = Field(default=20, description="Number of results per page (default: 20, min: 1, max: 100)") - cursor: str | None = Field(default=None, description="Pagination cursor for fetching next page of results") - get_all: bool = Field(default=False, description="If True, fetch all results using cursor-based pagination") mine: bool = Field( default=True, description="If True, fetch only incidents assigned to the current user. Set to False to get all incidents.", ) - severity: list[str] | None = Field(default=DEFAULT_SEVERITIES, description="Filter by severity (list of severity names)") - validity: list[str] | None = Field(default=DEFAULT_VALIDITIES, description="Filter by validity (list of validity names)") + severity: list[str] | None = Field(default=DEFAULT_SEVERITIES, + description="Filter by severity (list of severity names)") + validity: list[str] | None = Field(default=DEFAULT_VALIDITIES, + description="Filter by validity (list of validity names)") -async def list_repo_incidents(params: ListRepoIncidentsParams) -> dict[str, Any]: +class ListRepoIncidentsResult(BaseModel): + """Result from listing repository incidents.""" + source_id: str | int | None = Field(default=None, description="Source ID of the repository") + incidents: list[dict[str, Any]] = Field(default_factory=list, description="List of incident objects") + total_count: int = Field(description="Total number of incidents") + next_cursor: str | None = Field(default=None, description="Pagination cursor for next page") + applied_filters: dict[str, Any] = Field(default_factory=dict, description="Filters that were applied to the query") + suggestion: str = Field(default="", description="Suggestions for interpreting or modifying the results") + + +class ListRepoIncidentsError(BaseModel): + """Error result from listing repository incidents.""" + error: str = Field(description="Error message") + + +async def list_repo_incidents(params: ListRepoIncidentsParams) -> ListRepoIncidentsResult | ListRepoIncidentsError: """ List secret incidents or occurrences related to a specific repository. @@ -56,16 +154,25 @@ async def list_repo_incidents(params: ListRepoIncidentsParams) -> dict[str, Any] By default, incidents tagged with TEST_FILE or FALSE_POSITIVE are excluded. Pass exclude_tags=[] to disable this filtering. Args: - params: ListRepoIncidentsParams model containing all filtering options + params: ListRepoIncidentsParams model containing all filtering options. + Either repository_name or source_id must be provided. Returns: - List of incidents and occurrences matching the specified criteria + ListRepoIncidentsResult: Pydantic model containing: + - source_id: Source ID of the repository + - incidents: List of incident objects + - total_count: Total number of incidents + - next_cursor: Pagination cursor (if applicable) + - applied_filters: Dictionary of filters that were applied + - suggestion: Suggestions for interpreting or modifying results + + ListRepoIncidentsError: Pydantic model with error message if the operation fails """ client = get_client() # Validate that at least one of repository_name or source_id is provided if not params.repository_name and not params.source_id: - return {"error": "Either repository_name or source_id must be provided"} + return ListRepoIncidentsError(error="Either repository_name or source_id must be provided") logger.debug(f"Listing incidents with repository_name={params.repository_name}, source_id={params.source_id}") @@ -84,7 +191,8 @@ async def list_repo_incidents(params: ListRepoIncidentsParams) -> dict[str, Any] if params.tags: api_params["tags"] = ",".join(params.tags) if isinstance(params.tags, list) else params.tags if params.exclude_tags: - api_params["exclude_tags"] = ",".join(params.exclude_tags) if isinstance(params.exclude_tags, list) else params.exclude_tags + api_params["exclude_tags"] = ",".join(params.exclude_tags) if isinstance(params.exclude_tags, + list) else params.exclude_tags if params.per_page: api_params["per_page"] = params.per_page if params.cursor: @@ -94,59 +202,68 @@ async def list_repo_incidents(params: ListRepoIncidentsParams) -> dict[str, Any] if params.mine: api_params["assigned_to_me"] = "true" if params.severity: - api_params["severity"] = ",".join(params.severity) if isinstance(params.severity, list) else params.severity + api_params["severity"] = ",".join(params.severity) if isinstance(params.severity, + list) else params.severity if params.status: api_params["status"] = ",".join(params.status) if isinstance(params.status, list) else params.status if params.validity: - api_params["validity"] = ",".join(params.validity) if isinstance(params.validity, list) else params.validity + api_params["validity"] = ",".join(params.validity) if isinstance(params.validity, + list) else params.validity # Get incidents directly using source_id if params.get_all: - incidents_result = await client.paginate_all(f"/sources/{params.source_id}/incidents/secrets", api_params) + incidents_result = await client.paginate_all(f"/sources/{params.source_id}/incidents/secrets", + api_params) if isinstance(incidents_result, list): - return { - "source_id": params.source_id, - "incidents": incidents_result, - "total_count": len(incidents_result), - } + count = len(incidents_result) + return ListRepoIncidentsResult( + source_id=params.source_id, + incidents=incidents_result, + total_count=count, + applied_filters=_build_filter_info(params), + suggestion=_build_suggestion(params, count), + ) elif isinstance(incidents_result, dict): - return { - "source_id": params.source_id, - "incidents": incidents_result.get("data", []), - "total_count": incidents_result.get("total_count", len(incidents_result.get("data", []))), - } + count = incidents_result.get("total_count", len(incidents_result.get("data", []))) + return ListRepoIncidentsResult( + source_id=params.source_id, + incidents=incidents_result.get("data", []), + total_count=count, + applied_filters=_build_filter_info(params), + suggestion=_build_suggestion(params, count), + ) else: # Fallback for unexpected types - return { - "source_id": params.source_id, - "incidents": [], - "total_count": 0, - "error": f"Unexpected response type: {type(incidents_result).__name__}", - } + return ListRepoIncidentsError( + error=f"Unexpected response type: {type(incidents_result).__name__}", + ) else: incidents_result = await client.list_source_incidents(params.source_id, **api_params) if isinstance(incidents_result, dict): - return { - "source_id": params.source_id, - "incidents": incidents_result.get("data", []), - "next_cursor": incidents_result.get("next_cursor"), - "total_count": incidents_result.get("total_count", 0), - } + count = incidents_result.get("total_count", 0) + return ListRepoIncidentsResult( + source_id=params.source_id, + incidents=incidents_result.get("data", []), + next_cursor=incidents_result.get("next_cursor"), + total_count=count, + applied_filters=_build_filter_info(params), + suggestion=_build_suggestion(params, count), + ) elif isinstance(incidents_result, list): # Handle case where API returns a list directly - return { - "source_id": params.source_id, - "incidents": incidents_result, - "total_count": len(incidents_result), - } + count = len(incidents_result) + return ListRepoIncidentsResult( + source_id=params.source_id, + incidents=incidents_result, + total_count=count, + applied_filters=_build_filter_info(params), + suggestion=_build_suggestion(params, count), + ) else: # Fallback for unexpected types - return { - "source_id": params.source_id, - "incidents": [], - "total_count": 0, - "error": f"Unexpected response type: {type(incidents_result).__name__}", - } + return ListRepoIncidentsError( + error=f"Unexpected response type: {type(incidents_result).__name__}", + ) else: # Use repository_name lookup (legacy path) result = await client.list_repo_incidents_directly( @@ -162,8 +279,21 @@ async def list_repo_incidents(params: ListRepoIncidentsParams) -> dict[str, Any] get_all=params.get_all, mine=params.mine, ) - return result + + # Enrich result with filter info and convert to Pydantic model + if isinstance(result, dict): + count = result.get("total_count", len(result.get("incidents", []))) + return ListRepoIncidentsResult( + source_id=result.get("source_id"), + incidents=result.get("incidents", []), + total_count=count, + next_cursor=result.get("next_cursor"), + applied_filters=_build_filter_info(params), + suggestion=_build_suggestion(params, count), + ) + else: + return ListRepoIncidentsError(error="Unexpected result format from legacy path") except Exception as e: logger.error(f"Error listing repository incidents: {str(e)}") - return {"error": f"Failed to list repository incidents: {str(e)}"} + return ListRepoIncidentsError(error=f"Failed to list repository incidents: {str(e)}") diff --git a/packages/gg_api_core/src/gg_api_core/tools/list_repo_occurrences.py b/packages/gg_api_core/src/gg_api_core/tools/list_repo_occurrences.py index 3b250d7..3a7b7ce 100644 --- a/packages/gg_api_core/src/gg_api_core/tools/list_repo_occurrences.py +++ b/packages/gg_api_core/src/gg_api_core/tools/list_repo_occurrences.py @@ -1,29 +1,41 @@ from typing import Any import logging -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator +from gg_api_core.client import IncidentSeverity, IncidentStatus, IncidentValidity, TagNames from gg_api_core.utils import get_client logger = logging.getLogger(__name__) +DEFAULT_EXCLUDED_TAGS = [ + TagNames.TEST_FILE, + TagNames.FALSE_POSITIVE, + TagNames.CHECK_RUN_SKIP_FALSE_POSITIVE, + TagNames.CHECK_RUN_SKIP_LOW_RISK, + TagNames.CHECK_RUN_SKIP_TEST_CRED, +] +DEFAULT_SEVERITIES = [ + IncidentSeverity.CRITICAL, + IncidentSeverity.HIGH, + IncidentSeverity.MEDIUM, + IncidentSeverity.UNKNOWN, +] +DEFAULT_STATUSES = [ + IncidentStatus.TRIGGERED, + IncidentStatus.ASSIGNED, + IncidentStatus.RESOLVED, +] # We exclude "IGNORED" ones +DEFAULT_VALIDITIES = [ + IncidentValidity.VALID, + IncidentValidity.FAILED_TO_CHECK, + IncidentValidity.NO_CHECKER, + IncidentValidity.UNKNOWN, +] # We exclude "INVALID" ones -DEFAULT_EXCLUDED_TAGS = ["TEST_FILE", "FALSE_POSITIVE", "CHECK_RUN_SKIP_FALSE_POSITIVE", "CHECK_RUN_SKIP_LOW_RISK", "CHECK_RUN_SKIP_TEST_CRED"] -DEFAULT_SEVERITIES = ["critical", "high", "medium"] -DEFAULT_STATUSES = ["TRIGGERED", "ASSIGNED", "RESOLVED"] # We exclude "IGNORED" ones -DEFAULT_VALIDITIES = ["valid", "failed_to_check", "no_checker", "unknown"] # We exclude "invalid" ones - -class ListRepoOccurrencesParams(BaseModel): - """Parameters for listing repository occurrences.""" - repository_name: str | None = Field( - default=None, - description="The full repository name. For example, for https://github.com/GitGuardian/ggmcp.git the full name is GitGuardian/ggmcp. Pass the current repository name if not provided. Not required if source_id is provided." - ) - source_id: str | None = Field( - default=None, - description="The GitGuardian source ID to filter by. Can be obtained using find_current_source_id. If provided, repository_name is not required." - ) +class ListRepoOccurrencesFilters(BaseModel): + """Filters for listing repository occurrences.""" from_date: str | None = Field( default=None, description="Filter occurrences created after this date (ISO format: YYYY-MM-DD)" ) @@ -36,16 +48,112 @@ class ListRepoOccurrencesParams(BaseModel): default=DEFAULT_EXCLUDED_TAGS, description="Exclude occurrences with these tag names. Pass empty list to disable filtering." ) + status: list[str] | None = Field(default=DEFAULT_STATUSES, description="Filter by status (list of status names)") + severity: list[str] | None = Field(default=DEFAULT_SEVERITIES, + description="Filter by severity (list of severity names)") + validity: list[str] | None = Field(default=DEFAULT_VALIDITIES, + description="Filter by validity (list of validity names)") + + +class ListRepoOccurrencesBaseParams(BaseModel): + """Parameters for listing repository occurrences.""" + repository_name: str | None = Field( + default=None, + description="The full repository name. For example, for https://github.com/GitGuardian/gg-mcp.git the full name is GitGuardian/gg-mcp. Pass the current repository name if not provided. Not required if source_id is provided." + ) + source_id: str | int | None = Field( + default=None, + description="The GitGuardian source ID to filter by. Can be obtained using find_current_source_id. If provided, repository_name is not required." + ) ordering: str | None = Field(default=None, description="Sort field (e.g., 'date', '-date' for descending)") per_page: int = Field(default=20, description="Number of results per page (default: 20, min: 1, max: 100)") cursor: str | None = Field(default=None, description="Pagination cursor for fetching next page of results") get_all: bool = Field(default=False, description="If True, fetch all results using cursor-based pagination") - status: list[str] | None = Field(default=DEFAULT_STATUSES, description="Filter by status (list of status names)") - severity: list[str] | None = Field(default=DEFAULT_SEVERITIES, description="Filter by severity (list of severity names)") - validity: list[str] | None = Field(default=DEFAULT_VALIDITIES, description="Filter by validity (list of validity names)") + + @model_validator(mode="after") + def validate_source_or_repository(self) -> "ListRepoOccurrencesBaseParams": + """Validate that either source_id or repository_name is provided.""" + if not self.source_id and not self.repository_name: + raise ValueError("Either 'source_id' or 'repository_name' must be provided") + return self + + +class ListRepoOccurrencesParams(ListRepoOccurrencesFilters, ListRepoOccurrencesBaseParams): + pass -async def list_repo_occurrences(params: ListRepoOccurrencesParams) -> dict[str, Any]: +class ListRepoOccurrencesResult(BaseModel): + """Result from listing repository occurrences.""" + repository: str | None = Field(default=None, description="Repository name") + occurrences_count: int = Field(description="Number of occurrences returned") + occurrences: list[dict[str, Any]] = Field(default_factory=list, description="List of occurrence objects") + cursor: str | None = Field(default=None, description="Pagination cursor for next page") + has_more: bool = Field(default=False, description="Whether more results are available") + applied_filters: dict[str, Any] = Field(default_factory=dict, description="Filters that were applied to the query") + suggestion: str = Field(default="", description="Suggestions for interpreting or modifying the results") + + +class ListRepoOccurrencesError(BaseModel): + """Error result from listing repository occurrences.""" + error: str = Field(description="Error message") + + +def _build_filter_info(params: ListRepoOccurrencesParams) -> dict[str, Any]: + """Build a dictionary describing the filters applied to the query.""" + filters = {} + + # Include all active filters + if params.from_date: + filters["from_date"] = params.from_date + if params.to_date: + filters["to_date"] = params.to_date + if params.presence: + filters["presence"] = params.presence + if params.tags: + filters["tags"] = [tag.value if hasattr(tag, 'value') else tag for tag in params.tags] + if params.exclude_tags: + filters["exclude_tags"] = [tag.value if hasattr(tag, 'value') else tag for tag in params.exclude_tags] + if params.status: + filters["status"] = [st.value if hasattr(st, 'value') else st for st in params.status] + if params.severity: + filters["severity"] = [sev.value if hasattr(sev, 'value') else sev for sev in params.severity] + if params.validity: + filters["validity"] = [v.value if hasattr(v, 'value') else v for v in params.validity] + + return filters + + +def _build_suggestion(params: ListRepoOccurrencesParams, occurrences_count: int) -> str: + """Build a suggestion message based on applied filters and results.""" + suggestions = [] + + # Explain what's being filtered + if params.exclude_tags: + excluded_tag_names = [tag.name if hasattr(tag, 'name') else tag for tag in params.exclude_tags] + suggestions.append(f"Occurrences were filtered to exclude tags: {', '.join(excluded_tag_names)}") + + if params.status: + status_names = [st.name if hasattr(st, 'name') else st for st in params.status] + suggestions.append(f"Filtered by status: {', '.join(status_names)}") + + if params.severity: + sev_names = [sev.name if hasattr(sev, 'name') else sev for sev in params.severity] + suggestions.append(f"Filtered by severity: {', '.join(sev_names)}") + + if params.validity: + val_names = [v.name if hasattr(v, 'name') else v for v in params.validity] + suggestions.append(f"Filtered by validity: {', '.join(val_names)}") + + # If no results, suggest how to get more + if occurrences_count == 0 and suggestions: + suggestions.append( + "No occurrences matched the applied filters. Try with exclude_tags=[] or different status/severity/validity filters to see all occurrences.") + + return "\n".join(suggestions) if suggestions else "" + + +async def list_repo_occurrences( + params: ListRepoOccurrencesParams) -> ListRepoOccurrencesResult | ListRepoOccurrencesError: """ List secret occurrences for a specific repository using the GitGuardian v1/occurrences/secrets API. @@ -67,17 +175,22 @@ async def list_repo_occurrences(params: ListRepoOccurrencesParams) -> dict[str, By default, occurrences tagged with TEST_FILE or FALSE_POSITIVE are excluded. Pass exclude_tags=[] to disable this filtering. Args: - params: ListRepoOccurrencesParams model containing all filtering options + params: ListRepoOccurrencesParams model containing all filtering options. + Either repository_name or source_id must be provided (validated by model). Returns: - List of secret occurrences with detailed match information including file locations and indices + ListRepoOccurrencesResult: Pydantic model containing: + - repository: Repository name + - occurrences_count: Number of occurrences returned + - occurrences: List of occurrence objects with exact match locations + - cursor: Pagination cursor (if applicable) + - has_more: Whether more results are available + - applied_filters: Dictionary of filters that were applied + - suggestion: Suggestions for interpreting or modifying results + + ListRepoOccurrencesError: Pydantic model with error message if the operation fails """ client = get_client() - - # Validate that at least one of repository_name or source_id is provided - if not params.repository_name and not params.source_id: - return {"error": "Either repository_name or source_id must be provided"} - logger.debug(f"Listing occurrences with repository_name={params.repository_name}, source_id={params.source_id}") try: @@ -124,27 +237,35 @@ async def list_repo_occurrences(params: ListRepoOccurrencesParams) -> dict[str, # Handle the response format if isinstance(result, dict): occurrences = result.get("occurrences", []) - return { - "repository": params.repository_name, - "occurrences_count": len(occurrences), - "occurrences": occurrences, - "cursor": result.get("cursor"), - "has_more": result.get("has_more", False), - } + count = len(occurrences) + return ListRepoOccurrencesResult( + repository=params.repository_name, + occurrences_count=count, + occurrences=occurrences, + cursor=result.get("cursor"), + has_more=result.get("has_more", False), + applied_filters=_build_filter_info(params), + suggestion=_build_suggestion(params, count), + ) elif isinstance(result, list): # If get_all=True, we get a list directly - return { - "repository": params.repository_name, - "occurrences_count": len(result), - "occurrences": result, - } + count = len(result) + return ListRepoOccurrencesResult( + repository=params.repository_name, + occurrences_count=count, + occurrences=result, + applied_filters=_build_filter_info(params), + suggestion=_build_suggestion(params, count), + ) else: - return { - "repository": params.repository_name, - "occurrences_count": 0, - "occurrences": [], - } + return ListRepoOccurrencesResult( + repository=params.repository_name, + occurrences_count=0, + occurrences=[], + applied_filters=_build_filter_info(params), + suggestion=_build_suggestion(params, 0), + ) except Exception as e: logger.error(f"Error listing repository occurrences: {str(e)}") - return {"error": f"Failed to list repository occurrences: {str(e)}"} + return ListRepoOccurrencesError(error=f"Failed to list repository occurrences: {str(e)}") diff --git a/packages/gg_api_core/src/gg_api_core/tools/remediate_secret_incidents.py b/packages/gg_api_core/src/gg_api_core/tools/remediate_secret_incidents.py index daa7162..4865af0 100644 --- a/packages/gg_api_core/src/gg_api_core/tools/remediate_secret_incidents.py +++ b/packages/gg_api_core/src/gg_api_core/tools/remediate_secret_incidents.py @@ -1,267 +1,159 @@ +from pathlib import Path from typing import Any import logging -from pydantic import BaseModel, Field +from jinja2 import Template +from pydantic import BaseModel, Field, model_validator +from gg_api_core.client import TagNames from gg_api_core.utils import get_client -from .list_repo_occurrences import list_repo_occurrences, ListRepoOccurrencesParams +from .list_repo_occurrences import list_repo_occurrences, ListRepoOccurrencesParams, ListRepoOccurrencesFilters from .list_repo_incidents import list_repo_incidents logger = logging.getLogger(__name__) +REMEDIATION_PROMPT_PATH = Path(__file__).parent / "remediation_prompt.md" + + +class ListRepoOccurrencesParamsForRemediate(ListRepoOccurrencesParams): + # Overriding the tags one to add a default filter : for remediation, we're more interested in occurrences that + # are in the branch the developer is currently on. And occurrences on DEFAULT_BRANCH are a heuristic for that + tags: list[str] = Field( + default=[TagNames.DEFAULT_BRANCH.value], + description="List of tags to filter incidents by. Default to DEFAULT_BRANCH to avoid requiring a git checkout for the fix", + ) + class RemediateSecretIncidentsParams(BaseModel): """Parameters for remediating secret incidents.""" - repository_name: str = Field( - description="The full repository name. For example, for https://github.com/GitGuardian/ggmcp.git the full name is GitGuardian/ggmcp. Pass the current repository name if not provided." + repository_name: str | None = Field( + default=None, + description="The full repository name. For example, for https://github.com/GitGuardian/ggmcp.git the full name is GitGuardian/ggmcp. Pass the current repository name if not provided.", ) - include_git_commands: bool = Field( + source_id: str | int | None = Field( + default=None, + description="The source ID of the repository. Pass the current repository source ID if not provided.", + ) + get_all: bool = Field(default=True, description="Whether to get all occurrences or just the first page") + mine: bool = Field( + default=False, + description="If True, fetch only incidents assigned to the current user. Set to False to get all incidents.", + ) + + # Behaviour + git_commands: bool = Field( default=True, description="Whether to include git commands to fix incidents in git history" ) create_env_example: bool = Field( - default=True, description="Whether to create a .env.example file with placeholders for detected secrets" - ) - get_all: bool = Field(default=True, description="Whether to get all incidents or just the first page") - mine: bool = Field( default=True, - description="If True, fetch only incidents assigned to the current user. Set to False to get all incidents.", + description="Whether to suggest creating a .env.example file with placeholders for detected secrets" + ) + add_to_env: bool = Field( + default=True, description="Whether to suggest adding secrets to .env file" ) + # sub tools + list_repo_occurrences_params: ListRepoOccurrencesParamsForRemediate = Field( + default_factory=ListRepoOccurrencesParamsForRemediate, + description="Parameters for listing repository occurrences", + ) -async def remediate_secret_incidents(params: RemediateSecretIncidentsParams) -> dict[str, Any]: - """ - Find and remediate secret incidents in the current repository using EXACT match locations. - By default, this tool only shows incidents assigned to the current user. Pass mine=False to get all incidents related to this repo. +class RemediateSecretIncidentsResult(BaseModel): + """Result from remediating secret incidents.""" + remediation_instructions: str = Field(default="", description="Instructions for remediating occurrences") + occurrences_count: int = Field(default=0, description="Number of occurrences found") + suggested_occurrences_for_remediation_count: int = Field(default=0, + description="Number of occurrences suggested for remediation") + + sub_tools_results: dict[str, BaseModel] = Field(default_factory=dict, description="Results from sub tools") - This tool now uses the occurrences API to get precise file locations, line numbers, and character indices, - eliminating the need to search for secrets in files. The workflow is: - 1. Fetch secret occurrences with exact match locations (file path, line_start, line_end, index_start, index_end) - 2. Group occurrences by file for efficient remediation - 3. Sort matches from bottom to top to prevent line number shifts during editing - 4. Provide detailed remediation steps with exact locations for each secret - 5. IMPORTANT: Make the changes to the codebase using the provided indices: - - Use index_start and index_end to locate the exact secret in the file - - Replace hardcoded secrets with environment variable references - - Ensure all occurrences are removed from the codebase - - IMPORTANT: If the repository uses a package manager (npm, cargo, uv, etc.), use it to install required packages - 6. Optional: Generate git commands to rewrite history and remove secrets from git +class RemediateSecretIncidentsError(BaseModel): + """Error result from remediating secret incidents.""" + error: str = Field(description="Error message") + sub_tools_results: dict[str, Any] = Field(default_factory=dict, description="Results from sub tools") + + +async def remediate_secret_incidents( + params: RemediateSecretIncidentsParams) -> RemediateSecretIncidentsResult | RemediateSecretIncidentsError: + """ + Find and remediate secret incidents in the current repository. - The tool provides: - - Exact file paths and line numbers for each secret - - Character-level indices (index_start, index_end) to locate secrets precisely - - Context lines (pre/post) to understand the surrounding code - - Sorted matches to enable safe sequential removal (bottom-to-top) + This tool uses the occurrences API to find secrets and provides simple remediation suggestions. Args: params: RemediateSecretIncidentsParams model containing remediation configuration Returns: - A dictionary containing: - - repository_info: Information about the repository - - summary: Overview of occurrences, files affected, and secret types - - remediation_steps: Detailed steps with exact locations for each file - - env_example_content: Suggested .env.example content (if requested) - - git_commands: Git commands to fix history (if requested) + RemediateSecretIncidentsResult or RemediateSecretIncidentsError """ - logger.debug(f"Using remediate_secret_incidents with occurrences API for: {params.repository_name}") + logger.debug(f"Using remediate_secret_incidents for: {params.repository_name}") try: - # Get detailed occurrences with exact match locations - occurrences_params = ListRepoOccurrencesParams( - repository_name=params.repository_name, - get_all=params.get_all, - ) - occurrences_result = await list_repo_occurrences(occurrences_params) - - if "error" in occurrences_result: - return {"error": occurrences_result["error"]} + # Use the list_repo_occurrences_params and update with parent-level repository info + occurrences_params = params.list_repo_occurrences_params.model_copy(update={ + "repository_name": params.repository_name or params.list_repo_occurrences_params.repository_name, + "source_id": params.source_id or params.list_repo_occurrences_params.source_id, + "get_all": params.get_all, + }) - occurrences = occurrences_result.get("occurrences", []) + occurrences_result = await list_repo_occurrences(occurrences_params) + if hasattr(occurrences_result, "error") and occurrences_result.error: + return RemediateSecretIncidentsError( + error=occurrences_result.error, + sub_tools_results={"list_repo_occurrences": occurrences_result} + ) - # Filter by assignee if mine=True + occurrences = occurrences_result.occurrences if params.mine: - # Get current user info to filter by assignee - client = get_client() - try: - token_info = await client.get_current_token_info() - current_user_id = token_info.get("user_id") if token_info else None - - if current_user_id: - # Filter occurrences assigned to current user - occurrences = [ - occ for occ in occurrences - if occ.get("incident", {}).get("assignee_id") == current_user_id - ] - logger.debug(f"Filtered to {len(occurrences)} occurrences assigned to user {current_user_id}") - except Exception as e: - logger.warning(f"Could not filter by assignee: {str(e)}") + occurrences = await filter_mine(occurrences) + occurrences_count = len(occurrences) + occurrences_result.occurrences = await trim_occurrences_for_remediation(occurrences) if not occurrences: - return { - "repository_info": {"name": params.repository_name}, - "message": "No secret occurrences found for this repository that match the criteria.", - "remediation_steps": [], - } - - # Process occurrences for remediation with exact location data - logger.debug(f"Processing {len(occurrences)} occurrences with exact locations for remediation") - result = await _process_occurrences_for_remediation( - occurrences=occurrences, - repository_name=params.repository_name, - include_git_commands=params.include_git_commands, - create_env_example=params.create_env_example, + remediation_instructions = ("No secret occurrences found for this repository that match the criteria. " + "Adjust 'list_repo_occurrences_params' to modify filtering.") + else: + # Load and render the Jinja2 template + template_content = REMEDIATION_PROMPT_PATH.read_text() + template = Template(template_content) + remediation_instructions = template.render( + add_to_env=params.add_to_env, + env_example=params.create_env_example, + git_commands=params.git_commands, + ) + return RemediateSecretIncidentsResult( + remediation_instructions=remediation_instructions, + sub_tools_results={"list_repo_occurrences": occurrences_result}, + occurrences_count=occurrences_count, + suggested_occurrences_for_remediation_count=len(occurrences), ) - logger.debug( - f"Remediation processing complete, returning result with {len(result.get('remediation_steps', []))} steps" - ) - return result + except Exception as e: logger.error(f"Error remediating incidents: {str(e)}") - return {"error": f"Failed to remediate incidents: {str(e)}"} - - -async def _process_occurrences_for_remediation( - occurrences: list[dict[str, Any]], - repository_name: str, - include_git_commands: bool = True, - create_env_example: bool = True, -) -> dict[str, Any]: - """ - Process occurrences for remediation using exact match locations. - - This function leverages the detailed location data from occurrences (file paths, line numbers, - character indices) to provide precise remediation instructions without needing to search files. - - Args: - occurrences: List of occurrences with exact match locations - repository_name: Repository name - include_git_commands: Whether to include git commands - create_env_example: Whether to create .env.example - - Returns: - Remediation steps for each occurrence with exact file locations - """ - # Group occurrences by file for efficient remediation - occurrences_by_file = {} - secret_types = set() - affected_files = set() - - for occurrence in occurrences: - # Extract location data - matches = occurrence.get("matches", []) - incident_data = occurrence.get("incident", {}) - secret_type = incident_data.get("detector", {}).get("name", "Unknown") - secret_types.add(secret_type) + return RemediateSecretIncidentsError(error=f"Failed to remediate incidents: {str(e)}") - for match in matches: - file_path = match.get("match", {}).get("filename") - if not file_path: - continue - affected_files.add(file_path) - - if file_path not in occurrences_by_file: - occurrences_by_file[file_path] = [] - - # Store detailed match information - match_info = { - "occurrence_id": occurrence.get("id"), - "incident_id": incident_data.get("id"), - "secret_type": secret_type, - "line_start": match.get("match", {}).get("line_start"), - "line_end": match.get("match", {}).get("line_end"), - "index_start": match.get("match", {}).get("index_start"), - "index_end": match.get("match", {}).get("index_end"), - "match_type": match.get("type"), - "pre_line_start": match.get("pre_line_start"), - "pre_line_end": match.get("pre_line_end"), - "post_line_start": match.get("post_line_start"), - "post_line_end": match.get("post_line_end"), - } - occurrences_by_file[file_path].append(match_info) - - # Build remediation steps with exact locations - remediation_steps = [] - - for file_path, matches in occurrences_by_file.items(): - # Sort matches by line number (descending) so we can remove from bottom to top - # This prevents line number shifts when making multiple edits - sorted_matches = sorted(matches, key=lambda m: m["line_start"] or 0, reverse=True) - - step = { - "file": file_path, - "action": "remove_secrets", - "matches": sorted_matches, - "instructions": [ - f"File: {file_path}", - f"Found {len(sorted_matches)} secret(s) in this file", - "Matches are sorted from bottom to top for safe sequential removal", - "", - "For each match:", - "1. Read the file content", - f"2. Navigate to line {sorted_matches[0].get('line_start')} (and other match locations)", - "3. Use the exact index_start and index_end to locate the secret", - "4. Replace the hardcoded secret with an environment variable reference", - "5. Ensure the secret is added to .env (gitignored) and .env.example (committed)", - ], - "recommendations": [ - "Replace secrets with environment variables (e.g., process.env.API_KEY, os.getenv('API_KEY'))", - "Add the real secret to .env file (ensure .env is in .gitignore)", - "Add a placeholder to .env.example for documentation", - "Use a secrets management solution for production (e.g., AWS Secrets Manager, HashiCorp Vault)", - ], - } - remediation_steps.append(step) - - # Generate .env.example content if requested - env_example_content = None - if create_env_example: - env_vars = [] - for secret_type in secret_types: - # Generate sensible environment variable names from secret types - env_var_name = secret_type.upper().replace(" ", "_").replace("-", "_") - env_vars.append(f"{env_var_name}=your_{secret_type.lower().replace(' ', '_')}_here") - - if env_vars: - env_example_content = "\n".join(env_vars) - - # Generate git commands if requested - git_commands = None - if include_git_commands: - git_commands = { - "warning": "⚠️ These commands will rewrite git history. Only use if you understand the implications.", - "commands": [ - "# First, ensure all secrets are removed from working directory", - "git add .", - 'git commit -m "Remove hardcoded secrets"', - ], - } - - result = { - "repository_info": {"name": repository_name}, - "summary": { - "total_occurrences": len(occurrences), - "affected_files": len(affected_files), - "secret_types": list(secret_types), - "files": list(affected_files), - }, - "remediation_steps": remediation_steps, - } - - if env_example_content: - result["env_example_content"] = env_example_content - result["env_example_instructions"] = [ - "Create or update .env.example in your repository root:", - f"```\n{env_example_content}\n```", - "", - "Ensure .env is in .gitignore:", - "```\n.env\n```", - ] +async def filter_mine(occurrences): + """Filter occurrences create by the current user""" + client = get_client() + try: + token_info = await client.get_current_token_info() + current_user_id = token_info.get("user_id") if token_info else None + + if current_user_id: + occurrences = [ + occ for occ in occurrences + if occ.get("incident", {}).get("assignee_id") == current_user_id + ] + logger.debug(f"Filtered to {len(occurrences)} occurrences for user {current_user_id}") + except Exception as e: + logger.warning(f"Could not filter by assignee: {str(e)}") + return occurrences - if git_commands: - result["git_commands"] = git_commands - return result +async def trim_occurrences_for_remediation(occurrences): + """Limit the number of occurrences to be remediated by the agent""" + return occurrences[:10] diff --git a/packages/gg_api_core/src/gg_api_core/tools/remediation_prompt.md b/packages/gg_api_core/src/gg_api_core/tools/remediation_prompt.md new file mode 100644 index 0000000..60626b9 --- /dev/null +++ b/packages/gg_api_core/src/gg_api_core/tools/remediation_prompt.md @@ -0,0 +1,37 @@ +**Prompt: Secret Remediation in Git Repository** + +**Context:** + +You are provided detailed location of hardcoded secrets in a Git repository. +This prompt provides remediation instructions for each occurrence. + +**Instructions:** + +1. **Leverage provided information** from `list_repo_occurrences` tool: For every occurrence, use the information from + matches : + * Leverage the `filepath` to find the file, + * Use the `matches` to read each element of an occurrence : `indice_start`, `indice_end`, `pre_line_start`, + `pre_line_end`, `post_line_start` and `post_line_end` for precise secret locations. + +2. **Remediation steps for each secret**: + + * Remove hardcoded secrets from the code. + * Replace them with references to environment variables (e.g., `process.env.API_KEY`, `os.getenv('API_KEY')`). + * If applicable, run the repository’s package manager to install required dependencies. + {% if add_to_env %} + * Add the real secret to a `.env` file, + * Ensuring `.env` is in `.gitignore`. + {% endif %} + {% if env_example %} + * Add a placeholder to `.env.example` to document expected environment variables. + {% endif %} + +3. **Optional follow-up**: + +{% if git_commands %} + +* Suggest `git` commands to the user for remediation (staging changes, committing, removing secrets from history). + {% endif %} + + + diff --git a/packages/gg_api_core/src/gg_api_core/tools/scan_secret.py b/packages/gg_api_core/src/gg_api_core/tools/scan_secret.py index 9168f31..7f61acb 100644 --- a/packages/gg_api_core/src/gg_api_core/tools/scan_secret.py +++ b/packages/gg_api_core/src/gg_api_core/tools/scan_secret.py @@ -1,3 +1,4 @@ +from typing import Any from pydantic import BaseModel, Field import logging @@ -20,18 +21,38 @@ class ScanSecretsParams(BaseModel): ) -async def scan_secrets(params: ScanSecretsParams): +class ScanSecretsResult(BaseModel): + """Result from scanning secrets.""" + model_config = {"extra": "allow"} # Allow additional fields from API + + scan_results: list[dict[str, Any]] = Field(default_factory=list, description="Scan results for each document") + + +async def scan_secrets(params: ScanSecretsParams) -> ScanSecretsResult: """ Scan multiple content items for secrets and policy breaks. This tool allows you to scan multiple files or content strings at once for secrets and policy violations. Each document must have a 'document' field and can optionally include a 'filename' field for better context. + IMPORTANT: + - Only send documents that are part of the codebase + - Do not send documents that are in .gitignore + - The 'document' field is the file content (string), not the filename + Args: params: ScanSecretsParams model containing documents to scan Returns: - Scan results for all documents, including any detected secrets or policy breaks + ScanSecretsResult: Pydantic model containing: + - scan_results: List of scan result objects for each document, including: + - policy_break_count: Number of policy violations found + - policies: List of policies applied + - policy_breaks: Detailed information about each policy break/secret detected + - Additional fields from the API response + + Raises: + Exception: If the scan operation fails or documents are invalid """ try: client = get_client() @@ -61,7 +82,15 @@ async def scan_secrets(params: ScanSecretsParams): result = await client.multiple_scan(params.documents) logger.debug(f"Scanned {len(params.documents)} documents") - return result + # Wrap the result in Pydantic model + if isinstance(result, list): + return ScanSecretsResult(scan_results=result) + elif isinstance(result, dict): + # If API returns a dict with scan_results key + return ScanSecretsResult(**result) + else: + # Fallback: wrap in scan_results + return ScanSecretsResult(scan_results=[result]) except Exception as e: logger.error(f"Error scanning for secrets: {str(e)}") raise \ No newline at end of file diff --git a/packages/secops_mcp_server/src/secops_mcp_server/server.py b/packages/secops_mcp_server/src/secops_mcp_server/server.py index 726bee9..ef9821c 100644 --- a/packages/secops_mcp_server/src/secops_mcp_server/server.py +++ b/packages/secops_mcp_server/src/secops_mcp_server/server.py @@ -40,7 +40,7 @@ class ListIncidentsParams(BaseModel): default=None, description="Filter incidents created before this date (ISO format: YYYY-MM-DD)" ) assignee_email: str | None = Field(default=None, description="Filter incidents assigned to this email") - assignee_id: str | None = Field(default=None, description="Filter incidents assigned to this user id") + assignee_id: str | int | None = Field(default=None, description="Filter incidents assigned to this user id") validity: str | None = Field( default=None, description="Filter incidents by validity (valid, invalid, failed_to_check, no_checker, unknown)" ) @@ -61,8 +61,8 @@ class ListHoneytokensParams(BaseModel): default=None, description="Sort field (e.g., 'name', '-name', 'created_at', '-created_at')" ) show_token: bool = Field(default=False, description="Whether to include token details in the response") - creator_id: str | None = Field(default=None, description="Filter by creator ID") - creator_api_token_id: str | None = Field(default=None, description="Filter by creator API token ID") + creator_id: str | int | None = Field(default=None, description="Filter by creator ID") + creator_api_token_id: str | int | None = Field(default=None, description="Filter by creator API token ID") per_page: int = Field(default=20, description="Number of results per page (default: 20, min: 1, max: 100)") get_all: bool = Field(default=False, description="If True, fetch all results using cursor-based pagination") mine: bool = Field(default=False, description="If True, fetch honeytokens created by the current user") @@ -70,11 +70,11 @@ class ListHoneytokensParams(BaseModel): class ManageIncidentParams(BaseModel): """Parameters for managing an incident.""" - incident_id: str = Field(description="ID of the secret incident to manage") + incident_id: str | int = Field(description="ID of the secret incident to manage") action: Literal["assign", "unassign", "resolve", "ignore", "reopen"] = Field( description="Action to perform on the incident" ) - assignee_id: str | None = Field( + assignee_id: str | int | None = Field( default=None, description="ID of the member to assign the incident to (required for 'assign' action)" ) ignore_reason: str | None = Field( @@ -86,20 +86,20 @@ class ManageIncidentParams(BaseModel): class UpdateOrCreateIncidentCustomTagsParams(BaseModel): """Parameters for updating or creating incident custom tags.""" - incident_id: str = Field(description="ID of the secret incident") + incident_id: str | int = Field(description="ID of the secret incident") custom_tags: list[str | dict[str, str]] = Field(description="List of custom tags to apply to the incident") class UpdateIncidentStatusParams(BaseModel): """Parameters for updating incident status.""" - incident_id: str = Field(description="ID of the secret incident") + incident_id: str | int = Field(description="ID of the secret incident") status: str = Field(description="New status (IGNORED, TRIGGERED, ASSIGNED, RESOLVED)") class ReadCustomTagsParams(BaseModel): """Parameters for reading custom tags.""" action: Literal["list_tags", "get_tag"] = Field(description="Action to perform related to reading custom tags") - tag_id: str | None = Field( + tag_id: str | int | None = Field( default=None, description="ID of the custom tag to retrieve (used with 'get_tag' action)" ) @@ -109,7 +109,7 @@ class WriteCustomTagsParams(BaseModel): action: Literal["create_tag", "delete_tag"] = Field(description="Action to perform related to writing custom tags") key: str | None = Field(default=None, description="Key for the new tag (used with 'create_tag' action)") value: str | None = Field(default=None, description="Value for the new tag (used with 'create_tag' action)") - tag_id: str | None = Field( + tag_id: str | int | None = Field( default=None, description="ID of the custom tag to delete (used with 'delete_tag' action)" ) @@ -513,7 +513,6 @@ async def write_custom_tags(params: WriteCustomTagsParams): logger.error(f"Traceback: {traceback.format_exc()}") - if __name__ == "__main__": logger.info("Starting SecOps MCP server...") mcp.run() diff --git a/scripts/run_tool.py b/scripts/run_tool.py index 35f136d..ea197ab 100644 --- a/scripts/run_tool.py +++ b/scripts/run_tool.py @@ -1,11 +1,63 @@ +from gg_api_core.tools.find_current_source_id import find_current_source_id +from gg_api_core.tools.list_honey_tokens import ListHoneytokensParams, list_honeytokens +from gg_api_core.tools.list_repo_incidents import ListRepoIncidentsParams, list_repo_incidents from gg_api_core.tools.list_repo_occurrences import list_repo_occurrences, ListRepoOccurrencesParams import asyncio -async def main(): +from gg_api_core.tools.remediate_secret_incidents import RemediateSecretIncidentsParams, remediate_secret_incidents, \ + ListRepoOccurrencesParamsForRemediate +from gg_api_core.tools.scan_secret import scan_secrets, ScanSecretsParams + + +async def run_fetch_repo_occurrences(): result = await list_repo_occurrences( - ListRepoOccurrencesParams(source_id="9036019") + ListRepoOccurrencesParams(source_id="9036019", get_all=False, status=None, + severity=["critical", "high", "medium", "low", "info", "unknown"]) + ) + print(result) + + +async def run_remediate_secret_incidents(): + result = await remediate_secret_incidents( + RemediateSecretIncidentsParams(source_id="9036019") ) print(result) + +async def run_find_current_source_id(): + result = await find_current_source_id() + print(result) + + +async def main(): + print(await run_find_current_source_id()) + + # Remediate + print(await remediate_secret_incidents( + RemediateSecretIncidentsParams( + list_repo_occurrences_params=ListRepoOccurrencesParamsForRemediate(source_id="9036019"))) + ) + + # Occurrences + print(await list_repo_occurrences( + ListRepoOccurrencesParams(source_id="9036019", get_all=False, status=None, + severity=["critical", "high", "medium", "low", "info", "unknown"], tags=["TEST_FILE"]) + )) + + # Incidents + print(await list_repo_incidents( + ListRepoIncidentsParams(source_id="9036019", get_all=False, status=None, + severity=["critical", "high", "medium", "low", "info", "unknown"], tags=["TEST_FILE"]))) + + print(await list_repo_incidents(ListRepoIncidentsParams(source_id="9036019"))) + + # Honey Tokens + print(await list_honeytokens(ListHoneytokensParams())) + + # Scan + print(await scan_secrets( + ScanSecretsParams(documents=[{'document': 'file content', 'filename': 'optional_filename.txt'}, ]))) + + if __name__ == "__main__": asyncio.run(main()) diff --git a/tests/tools/test_find_current_source_id.py b/tests/tools/test_find_current_source_id.py index 004284c..3718851 100644 --- a/tests/tools/test_find_current_source_id.py +++ b/tests/tools/test_find_current_source_id.py @@ -51,9 +51,9 @@ async def test_find_current_source_id_exact_match(self, mock_gitguardian_client) ) # Verify response - assert result["repository_name"] == "GitGuardian/ggmcp" - assert result["source_id"] == "source_123" - assert "message" in result + assert result.repository_name == "GitGuardian/ggmcp" + assert result.source_id == "source_123" + assert hasattr(result, "message") @pytest.mark.asyncio async def test_find_current_source_id_multiple_candidates( @@ -94,11 +94,11 @@ async def test_find_current_source_id_multiple_candidates( result = await find_current_source_id() # Verify response - assert result["repository_name"] == "GitGuardian/test-repo" - assert "candidates" in result - assert len(result["candidates"]) == 2 - assert "message" in result - assert "suggestion" in result + assert result.repository_name == "GitGuardian/test-repo" + assert hasattr(result, "candidates") + assert len(result.candidates) == 2 + assert hasattr(result, "message") + assert hasattr(result, "suggestion") @pytest.mark.asyncio async def test_find_current_source_id_no_match_with_fallback( @@ -137,8 +137,8 @@ async def mock_get_source(name, return_all_on_no_match=False): result = await find_current_source_id() # Verify response - assert result["repository_name"] == "OrgName/repo-name" - assert result["source_id"] == "source_fallback" + assert result.repository_name == "OrgName/repo-name" + assert result.source_id == "source_fallback" @pytest.mark.asyncio async def test_find_current_source_id_no_match_at_all( @@ -163,9 +163,9 @@ async def test_find_current_source_id_no_match_at_all( result = await find_current_source_id() # Verify response - assert result["repository_name"] == "Unknown/repo" - assert "error" in result - assert "not found in GitGuardian" in result["error"] + assert result.repository_name == "Unknown/repo" + assert hasattr(result, "error") + assert "not found in GitGuardian" in result.error @pytest.mark.asyncio async def test_find_current_source_id_not_a_git_repo( @@ -186,8 +186,8 @@ async def test_find_current_source_id_not_a_git_repo( result = await find_current_source_id() # Verify error response - assert "error" in result - assert "Not a git repository" in result["error"] + assert hasattr(result, "error") + assert "Not a git repository" in result.error @pytest.mark.asyncio async def test_find_current_source_id_git_timeout(self, mock_gitguardian_client): @@ -204,8 +204,8 @@ async def test_find_current_source_id_git_timeout(self, mock_gitguardian_client) result = await find_current_source_id() # Verify error response - assert "error" in result - assert "timed out" in result["error"] + assert hasattr(result, "error") + assert "timed out" in result.error @pytest.mark.asyncio async def test_find_current_source_id_invalid_url(self, mock_gitguardian_client): @@ -225,8 +225,8 @@ async def test_find_current_source_id_invalid_url(self, mock_gitguardian_client) result = await find_current_source_id() # Verify error response - assert "error" in result - assert "Could not parse repository URL" in result["error"] + assert hasattr(result, "error") + assert "Could not parse repository URL" in result.error @pytest.mark.asyncio async def test_find_current_source_id_gitlab_url(self, mock_gitguardian_client): @@ -256,8 +256,8 @@ async def test_find_current_source_id_gitlab_url(self, mock_gitguardian_client): result = await find_current_source_id() # Verify response - assert result["repository_name"] == "company/project" - assert result["source_id"] == "source_gitlab" + assert result.repository_name == "company/project" + assert result.source_id == "source_gitlab" @pytest.mark.asyncio async def test_find_current_source_id_ssh_url(self, mock_gitguardian_client): @@ -286,8 +286,8 @@ async def test_find_current_source_id_ssh_url(self, mock_gitguardian_client): result = await find_current_source_id() # Verify response - assert result["repository_name"] == "GitGuardian/ggmcp" - assert result["source_id"] == "source_ssh" + assert result.repository_name == "GitGuardian/ggmcp" + assert result.source_id == "source_ssh" @pytest.mark.asyncio async def test_find_current_source_id_client_error(self, mock_gitguardian_client): @@ -312,5 +312,5 @@ async def test_find_current_source_id_client_error(self, mock_gitguardian_client result = await find_current_source_id() # Verify error response - assert "error" in result - assert "Failed to find source_id" in result["error"] + assert hasattr(result, "error") + assert "Failed to find source_id" in result.error diff --git a/tests/tools/test_generate_honey_tokens.py b/tests/tools/test_generate_honey_tokens.py index 2e2ed79..ab1f9cf 100644 --- a/tests/tools/test_generate_honey_tokens.py +++ b/tests/tools/test_generate_honey_tokens.py @@ -38,10 +38,10 @@ async def test_generate_honeytoken_success(self, mock_gitguardian_client): ) # Verify response - assert result["id"] == "honeytoken_id" - assert result["token"] == "fake_token_value" - assert "injection_recommendations" in result - assert "instructions" in result["injection_recommendations"] + assert result.id == "honeytoken_id" + assert result.token == "fake_token_value" + assert hasattr(result, "injection_recommendations") + assert "instructions" in result.injection_recommendations @pytest.mark.asyncio async def test_generate_honeytoken_missing_id(self, mock_gitguardian_client): diff --git a/tests/tools/test_list_honey_tokens.py b/tests/tools/test_list_honey_tokens.py index dfdd57b..3d5cb11 100644 --- a/tests/tools/test_list_honey_tokens.py +++ b/tests/tools/test_list_honey_tokens.py @@ -53,9 +53,9 @@ async def test_list_honeytokens_success(self, mock_gitguardian_client): mock_gitguardian_client.list_honeytokens.assert_called_once() # Verify response - assert len(result) == 2 - assert result[0]["name"] == "test_token_1" - assert result[1]["status"] == "REVOKED" + assert len(result.honeytokens) == 2 + assert result.honeytokens[0]["name"] == "test_token_1" + assert result.honeytokens[1]["status"] == "REVOKED" @pytest.mark.asyncio async def test_list_honeytokens_list_format(self, mock_gitguardian_client): @@ -90,8 +90,8 @@ async def test_list_honeytokens_list_format(self, mock_gitguardian_client): ) # Verify response - assert len(result) == 1 - assert result[0]["id"] == "honeytoken_1" + assert len(result.honeytokens) == 1 + assert result.honeytokens[0]["id"] == "honeytoken_1" @pytest.mark.asyncio async def test_list_honeytokens_with_filters(self, mock_gitguardian_client): @@ -136,8 +136,8 @@ async def test_list_honeytokens_with_filters(self, mock_gitguardian_client): assert call_kwargs["per_page"] == 50 # Verify response - assert len(result) == 1 - assert result[0]["status"] == "ACTIVE" + assert len(result.honeytokens) == 1 + assert result.honeytokens[0]["status"] == "ACTIVE" @pytest.mark.asyncio async def test_list_honeytokens_mine_true(self, mock_gitguardian_client): @@ -187,8 +187,8 @@ async def test_list_honeytokens_mine_true(self, mock_gitguardian_client): assert call_kwargs["creator_id"] == "user_123" # Verify response - assert len(result) == 1 - assert result[0]["creator_id"] == "user_123" + assert len(result.honeytokens) == 1 + assert result.honeytokens[0]["creator_id"] == "user_123" @pytest.mark.asyncio async def test_list_honeytokens_mine_true_no_user_id( @@ -264,7 +264,7 @@ async def test_list_honeytokens_get_all(self, mock_gitguardian_client): assert call_kwargs["get_all"] is True # Verify response - assert len(result) == 3 + assert len(result.honeytokens) == 3 @pytest.mark.asyncio async def test_list_honeytokens_empty_response(self, mock_gitguardian_client): @@ -293,7 +293,7 @@ async def test_list_honeytokens_empty_response(self, mock_gitguardian_client): ) # Verify response is empty - assert len(result) == 0 + assert len(result.honeytokens) == 0 @pytest.mark.asyncio async def test_list_honeytokens_client_error(self, mock_gitguardian_client): @@ -425,7 +425,7 @@ async def test_list_honeytokens_show_token_true(self, mock_gitguardian_client): assert call_kwargs["show_token"] is True # Verify response includes token - assert result[0]["token"] == "secret_value" + assert result.honeytokens[0]["token"] == "secret_value" @pytest.mark.asyncio async def test_list_honeytokens_mine_true_token_info_error( @@ -461,4 +461,4 @@ async def test_list_honeytokens_mine_true_token_info_error( ) # Verify function completes without error - assert result == [] + assert result.honeytokens == [] diff --git a/tests/tools/test_list_repo_incidents.py b/tests/tools/test_list_repo_incidents.py index 35464eb..b1d8af0 100644 --- a/tests/tools/test_list_repo_incidents.py +++ b/tests/tools/test_list_repo_incidents.py @@ -18,7 +18,7 @@ async def test_list_repo_incidents_with_repository_name( """ # Mock the client response mock_response = { - "data": [ + "incidents": [ { "id": "incident_1", "detector": {"name": "AWS Access Key"}, @@ -56,7 +56,8 @@ async def test_list_repo_incidents_with_repository_name( assert call_kwargs["mine"] is True # Verify response - assert result == mock_response + assert result.total_count == mock_response["total_count"] + assert len(result.incidents) == len(mock_response["incidents"]) @pytest.mark.asyncio async def test_list_repo_incidents_with_source_id(self, mock_gitguardian_client): @@ -106,9 +107,9 @@ async def test_list_repo_incidents_with_source_id(self, mock_gitguardian_client) assert call_args[1]["with_sources"] == "false" # Verify response - assert "source_id" in result - assert result["source_id"] == "source_123" - assert len(result["incidents"]) == 1 + assert hasattr(result, "source_id") + assert result.source_id == "source_123" + assert len(result.incidents) == 1 @pytest.mark.asyncio async def test_list_repo_incidents_with_filters(self, mock_gitguardian_client): @@ -182,8 +183,8 @@ async def test_list_repo_incidents_get_all(self, mock_gitguardian_client): mock_gitguardian_client.paginate_all.assert_called_once() # Verify response - assert result["total_count"] == 3 - assert len(result["incidents"]) == 3 + assert result.total_count == 3 + assert len(result.incidents) == 3 @pytest.mark.asyncio async def test_list_repo_incidents_no_repository_or_source( @@ -212,8 +213,8 @@ async def test_list_repo_incidents_no_repository_or_source( ) # Verify error response - assert "error" in result - assert "Either repository_name or source_id must be provided" in result["error"] + assert hasattr(result, "error") + assert "Either repository_name or source_id must be provided" in result.error @pytest.mark.asyncio async def test_list_repo_incidents_client_error(self, mock_gitguardian_client): @@ -246,8 +247,8 @@ async def test_list_repo_incidents_client_error(self, mock_gitguardian_client): ) # Verify error response - assert "error" in result - assert "Failed to list repository incidents" in result["error"] + assert hasattr(result, "error") + assert "Failed to list repository incidents" in result.error @pytest.mark.asyncio async def test_list_repo_incidents_with_cursor(self, mock_gitguardian_client): @@ -322,9 +323,9 @@ async def test_list_repo_incidents_source_id_list_response( ) # Verify response format - assert result["source_id"] == "source_123" - assert result["total_count"] == 2 - assert len(result["incidents"]) == 2 + assert result.source_id == "source_123" + assert result.total_count == 2 + assert len(result.incidents) == 2 @pytest.mark.asyncio async def test_list_repo_incidents_get_all_dict_response( @@ -348,6 +349,6 @@ async def test_list_repo_incidents_get_all_dict_response( ) # Verify response - assert result["source_id"] == "source_123" - assert result["total_count"] == 2 - assert len(result["incidents"]) == 2 + assert result.source_id == "source_123" + assert result.total_count == 2 + assert len(result.incidents) == 2 diff --git a/tests/tools/test_list_repo_occurrences.py b/tests/tools/test_list_repo_occurrences.py index 461aa07..fec9ce7 100644 --- a/tests/tools/test_list_repo_occurrences.py +++ b/tests/tools/test_list_repo_occurrences.py @@ -1,6 +1,7 @@ from unittest.mock import AsyncMock import pytest +from pydantic import ValidationError from gg_api_core.tools.list_repo_occurrences import list_repo_occurrences, ListRepoOccurrencesParams @@ -65,9 +66,11 @@ async def test_list_repo_occurrences_with_repository_name( assert call_kwargs["with_sources"] is False # Verify response - assert result["repository"] == "GitGuardian/test-repo" - assert result["occurrences_count"] == 1 - assert len(result["occurrences"]) == 1 + assert result.repository == "GitGuardian/test-repo" + assert result.occurrences_count == 1 + assert len(result.occurrences) == 1 + assert result.applied_filters is not None + assert result.suggestion is not None @pytest.mark.asyncio async def test_list_repo_occurrences_with_source_id(self, mock_gitguardian_client): @@ -102,7 +105,9 @@ async def test_list_repo_occurrences_with_source_id(self, mock_gitguardian_clien assert call_kwargs["with_sources"] is False # Verify response - assert result["occurrences_count"] == 1 + assert result.occurrences_count == 1 + assert result.applied_filters is not None + assert result.suggestion is not None @pytest.mark.asyncio async def test_list_repo_occurrences_with_filters(self, mock_gitguardian_client): @@ -169,8 +174,10 @@ async def test_list_repo_occurrences_get_all(self, mock_gitguardian_client): ) # Verify response - assert result["occurrences_count"] == 2 - assert len(result["occurrences"]) == 2 + assert result.occurrences_count == 2 + assert len(result.occurrences) == 2 + assert result.applied_filters is not None + assert result.suggestion is not None @pytest.mark.asyncio async def test_list_repo_occurrences_no_repository_or_source( @@ -178,11 +185,11 @@ async def test_list_repo_occurrences_no_repository_or_source( ): """ GIVEN: Neither repository_name nor source_id provided - WHEN: Attempting to list occurrences - THEN: An error is returned + WHEN: Attempting to create params + THEN: A ValidationError is raised """ - # Call the function without repository_name or source_id - result = await list_repo_occurrences( + # Try to create params without repository_name or source_id + with pytest.raises(ValidationError) as exc_info: ListRepoOccurrencesParams( repository_name=None, source_id=None, @@ -195,11 +202,11 @@ async def test_list_repo_occurrences_no_repository_or_source( cursor=None, get_all=False, ) - ) - # Verify error response - assert "error" in result - assert "Either repository_name or source_id must be provided" in result["error"] + # Verify error message + errors = exc_info.value.errors() + assert len(errors) == 1 + assert "Either 'source_id' or 'repository_name' must be provided" in str(errors[0]) @pytest.mark.asyncio async def test_list_repo_occurrences_client_error(self, mock_gitguardian_client): @@ -220,8 +227,8 @@ async def test_list_repo_occurrences_client_error(self, mock_gitguardian_client) ) # Verify error response - assert "error" in result - assert "Failed to list repository occurrences" in result["error"] + assert hasattr(result, "error") + assert "Failed to list repository occurrences" in result.error @pytest.mark.asyncio async def test_list_repo_occurrences_with_cursor(self, mock_gitguardian_client): @@ -261,8 +268,8 @@ async def test_list_repo_occurrences_with_cursor(self, mock_gitguardian_client): assert call_kwargs["source_name"] == "GitGuardian/test-repo" # Verify response includes cursor - assert result["cursor"] == "next_cursor_123" - assert result["has_more"] is True + assert result.cursor == "next_cursor_123" + assert result.has_more is True @pytest.mark.asyncio async def test_list_repo_occurrences_empty_response(self, mock_gitguardian_client): @@ -285,8 +292,10 @@ async def test_list_repo_occurrences_empty_response(self, mock_gitguardian_clien ) # Verify response - assert result["occurrences_count"] == 0 - assert len(result["occurrences"]) == 0 + assert result.occurrences_count == 0 + assert len(result.occurrences) == 0 + assert result.applied_filters is not None + assert result.suggestion is not None @pytest.mark.asyncio async def test_list_repo_occurrences_unexpected_response_type( @@ -307,5 +316,7 @@ async def test_list_repo_occurrences_unexpected_response_type( ) # Verify response defaults to empty - assert result["occurrences_count"] == 0 - assert result["occurrences"] == [] + assert result.occurrences_count == 0 + assert result.occurrences == [] + assert result.applied_filters is not None + assert result.suggestion is not None diff --git a/tests/tools/test_remediate_secret_incidents.py b/tests/tools/test_remediate_secret_incidents.py index 94ffb18..95c1e19 100644 --- a/tests/tools/test_remediate_secret_incidents.py +++ b/tests/tools/test_remediate_secret_incidents.py @@ -1,11 +1,67 @@ from unittest.mock import AsyncMock, patch import pytest +from pydantic import ValidationError from gg_api_core.tools.remediate_secret_incidents import ( remediate_secret_incidents, _process_occurrences_for_remediation, RemediateSecretIncidentsParams, ) +from gg_api_core.tools.list_repo_occurrences import ListRepoOccurrencesResult, ListRepoOccurrencesError + + +class TestRemediateSecretIncidentsParams: + """Tests for RemediateSecretIncidentsParams validation.""" + + def test_params_with_repository_name(self): + """ + GIVEN: RemediateSecretIncidentsParams with repository_name provided + WHEN: Creating the params + THEN: Validation should pass + """ + params = RemediateSecretIncidentsParams( + repository_name="GitGuardian/test-repo" + ) + assert params.repository_name == "GitGuardian/test-repo" + assert params.source_id is None + + def test_params_with_source_id(self): + """ + GIVEN: RemediateSecretIncidentsParams with source_id provided + WHEN: Creating the params + THEN: Validation should pass + """ + params = RemediateSecretIncidentsParams(source_id="source_123") + assert params.source_id == "source_123" + assert params.repository_name is None + + def test_params_with_both_repository_name_and_source_id(self): + """ + GIVEN: RemediateSecretIncidentsParams with both repository_name and source_id provided + WHEN: Creating the params + THEN: Validation should pass + """ + params = RemediateSecretIncidentsParams( + repository_name="GitGuardian/test-repo", source_id="source_123" + ) + assert params.repository_name == "GitGuardian/test-repo" + assert params.source_id == "source_123" + + def test_params_with_neither_repository_name_nor_source_id(self): + """ + GIVEN: RemediateSecretIncidentsParams with neither repository_name nor source_id provided + WHEN: Creating the params + THEN: Validation should fail with ValueError + """ + with pytest.raises(ValidationError) as exc_info: + RemediateSecretIncidentsParams() + + # Verify the error message + errors = exc_info.value.errors() + assert len(errors) == 1 + assert "Either 'source_id' or 'repository_name' must be provided" in str( + errors[0] + ) class TestRemediateSecretIncidents: @@ -19,8 +75,8 @@ async def test_remediate_secret_incidents_success(self, mock_gitguardian_client) THEN: Detailed remediation steps with file locations are returned """ # Mock list_repo_occurrences to return occurrences - mock_occurrences = { - "occurrences": [ + mock_occurrences = ListRepoOccurrencesResult( + occurrences=[ { "id": "occ_1", "matches": [ @@ -42,7 +98,10 @@ async def test_remediate_secret_incidents_success(self, mock_gitguardian_client) }, } ], - } + occurrences_count=1, + applied_filters={}, + suggestion="", + ) # Mock get_current_token_info for filtering by assignee mock_gitguardian_client.get_current_token_info = AsyncMock( @@ -51,8 +110,8 @@ async def test_remediate_secret_incidents_success(self, mock_gitguardian_client) # Patch list_repo_occurrences with patch( - "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", - AsyncMock(return_value=mock_occurrences), + "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", + AsyncMock(return_value=mock_occurrences), ): # Call the function result = await remediate_secret_incidents( @@ -62,25 +121,25 @@ async def test_remediate_secret_incidents_success(self, mock_gitguardian_client) ) # Verify response structure - assert "repository_info" in result - assert "summary" in result - assert "remediation_steps" in result - assert "env_example_content" in result - assert "git_commands" in result + assert result.repository_info is not None + assert result.summary is not None + assert result.remediation_steps is not None + assert result.env_example_content is not None + assert result.git_commands is not None # Verify summary - assert result["summary"]["total_occurrences"] == 1 - assert result["summary"]["affected_files"] == 1 - assert "AWS Access Key" in result["summary"]["secret_types"] + assert result.summary.get("total_occurrences") == 1 + assert result.summary.get("affected_files") == 1 + assert "AWS Access Key" in result.summary.get("secret_types", []) # Verify remediation steps - assert len(result["remediation_steps"]) == 1 - assert result["remediation_steps"][0]["file"] == "config.py" - assert len(result["remediation_steps"][0]["matches"]) == 1 + assert len(result.remediation_steps) == 1 + assert result.remediation_steps[0].get("file") == "config.py" + assert len(result.remediation_steps[0].get("matches", [])) == 1 @pytest.mark.asyncio async def test_remediate_secret_incidents_no_occurrences( - self, mock_gitguardian_client + self, mock_gitguardian_client ): """ GIVEN: No occurrences found for the repository @@ -88,12 +147,17 @@ async def test_remediate_secret_incidents_no_occurrences( THEN: A message indicating no occurrences is returned """ # Mock list_repo_occurrences to return empty occurrences - mock_occurrences = {"occurrences": []} + mock_occurrences = ListRepoOccurrencesResult( + occurrences=[], + occurrences_count=0, + applied_filters={"tags_exclude": ["TEST_FILE"]}, + suggestion="No occurrences matched the applied filters.", + ) # Patch list_repo_occurrences with patch( - "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", - AsyncMock(return_value=mock_occurrences), + "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", + AsyncMock(return_value=mock_occurrences), ): # Call the function result = await remediate_secret_incidents( @@ -103,9 +167,11 @@ async def test_remediate_secret_incidents_no_occurrences( ) # Verify response - assert "message" in result - assert "No secret occurrences found" in result["message"] - assert result["remediation_steps"] == [] + assert result.message is not None + assert "No secret occurrences found" in result.message + assert result.remediation_steps == [] + assert result.applied_filters is not None + assert result.suggestion is not None @pytest.mark.asyncio async def test_remediate_secret_incidents_error(self, mock_gitguardian_client): @@ -115,12 +181,12 @@ async def test_remediate_secret_incidents_error(self, mock_gitguardian_client): THEN: The error is propagated in the response """ # Mock list_repo_occurrences to return error - mock_occurrences = {"error": "API connection failed"} + mock_occurrences = ListRepoOccurrencesError(error="API connection failed") # Patch list_repo_occurrences with patch( - "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", - AsyncMock(return_value=mock_occurrences), + "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", + AsyncMock(return_value=mock_occurrences), ): # Call the function result = await remediate_secret_incidents( @@ -130,12 +196,12 @@ async def test_remediate_secret_incidents_error(self, mock_gitguardian_client): ) # Verify error response - assert "error" in result - assert "API connection failed" in result["error"] + assert hasattr(result, "error") + assert "API connection failed" in result.error @pytest.mark.asyncio async def test_remediate_secret_incidents_mine_false( - self, mock_gitguardian_client + self, mock_gitguardian_client ): """ GIVEN: mine=False flag to include all incidents @@ -143,8 +209,8 @@ async def test_remediate_secret_incidents_mine_false( THEN: All occurrences are included regardless of assignee """ # Mock list_repo_occurrences to return multiple occurrences - mock_occurrences = { - "occurrences": [ + mock_occurrences = ListRepoOccurrencesResult( + occurrences=[ { "id": "occ_1", "matches": [ @@ -166,12 +232,15 @@ async def test_remediate_secret_incidents_mine_false( }, } ], - } + occurrences_count=1, + applied_filters={}, + suggestion="", + ) # Patch list_repo_occurrences with patch( - "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", - AsyncMock(return_value=mock_occurrences), + "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", + AsyncMock(return_value=mock_occurrences), ): # Call the function with mine=False result = await remediate_secret_incidents( @@ -181,11 +250,11 @@ async def test_remediate_secret_incidents_mine_false( ) # Verify all occurrences are included (not filtered by assignee) - assert result["summary"]["total_occurrences"] == 1 + assert result.summary.get("total_occurrences") == 1 @pytest.mark.asyncio async def test_remediate_secret_incidents_no_git_commands( - self, mock_gitguardian_client + self, mock_gitguardian_client ): """ GIVEN: include_git_commands=False @@ -193,8 +262,8 @@ async def test_remediate_secret_incidents_no_git_commands( THEN: Git commands are not included in the response """ # Mock list_repo_occurrences to return occurrences - mock_occurrences = { - "occurrences": [ + mock_occurrences = ListRepoOccurrencesResult( + occurrences=[ { "id": "occ_1", "matches": [ @@ -215,7 +284,10 @@ async def test_remediate_secret_incidents_no_git_commands( }, } ], - } + occurrences_count=1, + applied_filters={}, + suggestion="", + ) # Mock get_current_token_info mock_gitguardian_client.get_current_token_info = AsyncMock( @@ -224,8 +296,8 @@ async def test_remediate_secret_incidents_no_git_commands( # Patch list_repo_occurrences with patch( - "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", - AsyncMock(return_value=mock_occurrences), + "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", + AsyncMock(return_value=mock_occurrences), ): # Call the function with include_git_commands=False result = await remediate_secret_incidents( @@ -237,11 +309,11 @@ async def test_remediate_secret_incidents_no_git_commands( ) # Verify git commands are not included - assert "git_commands" not in result + assert result.git_commands is None @pytest.mark.asyncio async def test_remediate_secret_incidents_no_env_example( - self, mock_gitguardian_client + self, mock_gitguardian_client ): """ GIVEN: create_env_example=False @@ -249,8 +321,8 @@ async def test_remediate_secret_incidents_no_env_example( THEN: Env example content is not included in the response """ # Mock list_repo_occurrences to return occurrences - mock_occurrences = { - "occurrences": [ + mock_occurrences = ListRepoOccurrencesResult( + occurrences=[ { "id": "occ_1", "matches": [ @@ -271,7 +343,10 @@ async def test_remediate_secret_incidents_no_env_example( }, } ], - } + occurrences_count=1, + applied_filters={}, + suggestion="", + ) # Mock get_current_token_info mock_gitguardian_client.get_current_token_info = AsyncMock( @@ -280,8 +355,8 @@ async def test_remediate_secret_incidents_no_env_example( # Patch list_repo_occurrences with patch( - "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", - AsyncMock(return_value=mock_occurrences), + "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", + AsyncMock(return_value=mock_occurrences), ): # Call the function with create_env_example=False result = await remediate_secret_incidents( @@ -293,11 +368,11 @@ async def test_remediate_secret_incidents_no_env_example( ) # Verify env example is not included - assert "env_example_content" not in result + assert result.env_example_content is None @pytest.mark.asyncio async def test_remediate_secret_incidents_multiple_files( - self, mock_gitguardian_client + self, mock_gitguardian_client ): """ GIVEN: Occurrences across multiple files @@ -305,8 +380,8 @@ async def test_remediate_secret_incidents_multiple_files( THEN: Remediation steps are provided for each file """ # Mock list_repo_occurrences to return occurrences in different files - mock_occurrences = { - "occurrences": [ + mock_occurrences = ListRepoOccurrencesResult( + occurrences=[ { "id": "occ_1", "matches": [ @@ -346,7 +421,10 @@ async def test_remediate_secret_incidents_multiple_files( }, }, ], - } + occurrences_count=2, + applied_filters={}, + suggestion="", + ) # Mock get_current_token_info mock_gitguardian_client.get_current_token_info = AsyncMock( @@ -355,8 +433,8 @@ async def test_remediate_secret_incidents_multiple_files( # Patch list_repo_occurrences with patch( - "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", - AsyncMock(return_value=mock_occurrences), + "gg_api_core.tools.remediate_secret_incidents.list_repo_occurrences", + AsyncMock(return_value=mock_occurrences), ): # Call the function result = await remediate_secret_incidents( @@ -366,9 +444,10 @@ async def test_remediate_secret_incidents_multiple_files( ) # Verify response - assert result["summary"]["total_occurrences"] == 2 - assert result["summary"]["affected_files"] == 2 - assert len(result["remediation_steps"]) == 2 + sub_tool_result = result.sub_tools_results.get("list_repo_occurrences") + assert sub_tool_result.get("total_occurrences") == 2 + assert sub_tool_result.get("affected_files") == 2 + assert len(result.remediation_steps) == 2 @pytest.mark.asyncio async def test_process_occurrences_for_remediation(self): @@ -409,16 +488,16 @@ async def test_process_occurrences_for_remediation(self): ) # Verify response structure - assert "repository_info" in result - assert "summary" in result - assert "remediation_steps" in result - assert "env_example_content" in result - assert "git_commands" in result + assert hasattr(result, "repository_info") + assert hasattr(result, "summary") + assert hasattr(result, "remediation_steps") + assert hasattr(result, "env_example_content") + assert hasattr(result, "git_commands") # Verify remediation steps are sorted bottom-to-top - step = result["remediation_steps"][0] - assert step["file"] == "config.py" - assert len(step["matches"]) == 1 + step = result.remediation_steps[0] + assert step.get("file") == "config.py" + assert len(step.get("matches", [])) == 1 @pytest.mark.asyncio async def test_process_occurrences_for_remediation_sorting(self): @@ -478,6 +557,6 @@ async def test_process_occurrences_for_remediation_sorting(self): ) # Verify matches are sorted bottom to top (line 15 before line 5) - step = result["remediation_steps"][0] - assert step["matches"][0]["line_start"] == 15 - assert step["matches"][1]["line_start"] == 5 + step = result.remediation_steps[0] + assert step.get("matches", [])[0].get("line_start") == 15 + assert step.get("matches", [])[1].get("line_start") == 5 diff --git a/tests/tools/test_scan_secrets.py b/tests/tools/test_scan_secrets.py index 77cab43..e0ea39b 100644 --- a/tests/tools/test_scan_secrets.py +++ b/tests/tools/test_scan_secrets.py @@ -48,8 +48,8 @@ async def test_scan_secrets_success(self, mock_gitguardian_client): mock_gitguardian_client.multiple_scan.assert_called_once_with(documents) # Verify response - assert result == mock_response - assert result[0]["policy_break_count"] == 1 + assert result.scan_results == mock_response + assert result.scan_results[0]["policy_break_count"] == 1 @pytest.mark.asyncio async def test_scan_secrets_no_secrets_found(self, mock_gitguardian_client): @@ -73,8 +73,8 @@ async def test_scan_secrets_no_secrets_found(self, mock_gitguardian_client): result = await scan_secrets(ScanSecretsParams(documents=documents)) # Verify response - assert result[0]["policy_break_count"] == 0 - assert len(result[0]["policy_breaks"]) == 0 + assert result.scan_results[0]["policy_break_count"] == 0 + assert len(result.scan_results[0]["policy_breaks"]) == 0 @pytest.mark.asyncio async def test_scan_secrets_multiple_documents(self, mock_gitguardian_client): @@ -98,7 +98,7 @@ async def test_scan_secrets_multiple_documents(self, mock_gitguardian_client): result = await scan_secrets(ScanSecretsParams(documents=documents)) # Verify response - assert len(result) == 2 + assert len(result.scan_results) == 2 @pytest.mark.asyncio async def test_scan_secrets_without_filename(self, mock_gitguardian_client): diff --git a/uv.lock b/uv.lock index eb4c365..a108a9d 100644 --- a/uv.lock +++ b/uv.lock @@ -147,6 +147,7 @@ version = "0.1.0" source = { editable = "packages/gg_api_core" } dependencies = [ { name = "httpx" }, + { name = "jinja2" }, { name = "mcp", extra = ["cli"] }, { name = "pydantic-settings" }, { name = "python-dotenv" }, @@ -155,6 +156,7 @@ dependencies = [ [package.metadata] requires-dist = [ { name = "httpx", specifier = ">=0.28.1" }, + { name = "jinja2", specifier = ">=3.1.0" }, { name = "mcp", extras = ["cli"], specifier = ">=1.9.0" }, { name = "pydantic-settings", specifier = ">=2.0.0" }, { name = "python-dotenv", specifier = ">=1.0.0" }, @@ -255,6 +257,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2c/e1/e6716421ea10d38022b952c159d5161ca1193197fb744506875fbb87ea7b/iniconfig-2.1.0-py3-none-any.whl", hash = "sha256:9deba5723312380e77435581c6bf4935c94cbfab9b1ed33ef8d238ea168eb760", size = 6050 }, ] +[[package]] +name = "jinja2" +version = "3.1.6" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markupsafe" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/df/bf/f7da0350254c0ed7c72f3e33cef02e048281fec7ecec5f032d4aac52226b/jinja2-3.1.6.tar.gz", hash = "sha256:0137fb05990d35f1275a587e9aee6d56da821fc83491a0fb838183be43f66d6d", size = 245115 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899 }, +] + [[package]] name = "markdown-it-py" version = "3.0.0" @@ -267,6 +281,58 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/42/d7/1ec15b46af6af88f19b8e5ffea08fa375d433c998b8a7639e76935c14f1f/markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1", size = 87528 }, ] +[[package]] +name = "markupsafe" +version = "3.0.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/7e/99/7690b6d4034fffd95959cbe0c02de8deb3098cc577c67bb6a24fe5d7caa7/markupsafe-3.0.3.tar.gz", hash = "sha256:722695808f4b6457b320fdc131280796bdceb04ab50fe1795cd540799ebe1698", size = 80313 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/38/2f/907b9c7bbba283e68f20259574b13d005c121a0fa4c175f9bed27c4597ff/markupsafe-3.0.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:e1cf1972137e83c5d4c136c43ced9ac51d0e124706ee1c8aa8532c1287fa8795", size = 11622 }, + { url = "https://files.pythonhosted.org/packages/9c/d9/5f7756922cdd676869eca1c4e3c0cd0df60ed30199ffd775e319089cb3ed/markupsafe-3.0.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:116bb52f642a37c115f517494ea5feb03889e04df47eeff5b130b1808ce7c219", size = 12029 }, + { url = "https://files.pythonhosted.org/packages/00/07/575a68c754943058c78f30db02ee03a64b3c638586fba6a6dd56830b30a3/markupsafe-3.0.3-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:133a43e73a802c5562be9bbcd03d090aa5a1fe899db609c29e8c8d815c5f6de6", size = 24374 }, + { url = "https://files.pythonhosted.org/packages/a9/21/9b05698b46f218fc0e118e1f8168395c65c8a2c750ae2bab54fc4bd4e0e8/markupsafe-3.0.3-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ccfcd093f13f0f0b7fdd0f198b90053bf7b2f02a3927a30e63f3ccc9df56b676", size = 22980 }, + { url = "https://files.pythonhosted.org/packages/7f/71/544260864f893f18b6827315b988c146b559391e6e7e8f7252839b1b846a/markupsafe-3.0.3-cp313-cp313-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:509fa21c6deb7a7a273d629cf5ec029bc209d1a51178615ddf718f5918992ab9", size = 21990 }, + { url = "https://files.pythonhosted.org/packages/c2/28/b50fc2f74d1ad761af2f5dcce7492648b983d00a65b8c0e0cb457c82ebbe/markupsafe-3.0.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a4afe79fb3de0b7097d81da19090f4df4f8d3a2b3adaa8764138aac2e44f3af1", size = 23784 }, + { url = "https://files.pythonhosted.org/packages/ed/76/104b2aa106a208da8b17a2fb72e033a5a9d7073c68f7e508b94916ed47a9/markupsafe-3.0.3-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:795e7751525cae078558e679d646ae45574b47ed6e7771863fcc079a6171a0fc", size = 21588 }, + { url = "https://files.pythonhosted.org/packages/b5/99/16a5eb2d140087ebd97180d95249b00a03aa87e29cc224056274f2e45fd6/markupsafe-3.0.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:8485f406a96febb5140bfeca44a73e3ce5116b2501ac54fe953e488fb1d03b12", size = 23041 }, + { url = "https://files.pythonhosted.org/packages/19/bc/e7140ed90c5d61d77cea142eed9f9c303f4c4806f60a1044c13e3f1471d0/markupsafe-3.0.3-cp313-cp313-win32.whl", hash = "sha256:bdd37121970bfd8be76c5fb069c7751683bdf373db1ed6c010162b2a130248ed", size = 14543 }, + { url = "https://files.pythonhosted.org/packages/05/73/c4abe620b841b6b791f2edc248f556900667a5a1cf023a6646967ae98335/markupsafe-3.0.3-cp313-cp313-win_amd64.whl", hash = "sha256:9a1abfdc021a164803f4d485104931fb8f8c1efd55bc6b748d2f5774e78b62c5", size = 15113 }, + { url = "https://files.pythonhosted.org/packages/f0/3a/fa34a0f7cfef23cf9500d68cb7c32dd64ffd58a12b09225fb03dd37d5b80/markupsafe-3.0.3-cp313-cp313-win_arm64.whl", hash = "sha256:7e68f88e5b8799aa49c85cd116c932a1ac15caaa3f5db09087854d218359e485", size = 13911 }, + { url = "https://files.pythonhosted.org/packages/e4/d7/e05cd7efe43a88a17a37b3ae96e79a19e846f3f456fe79c57ca61356ef01/markupsafe-3.0.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:218551f6df4868a8d527e3062d0fb968682fe92054e89978594c28e642c43a73", size = 11658 }, + { url = "https://files.pythonhosted.org/packages/99/9e/e412117548182ce2148bdeacdda3bb494260c0b0184360fe0d56389b523b/markupsafe-3.0.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:3524b778fe5cfb3452a09d31e7b5adefeea8c5be1d43c4f810ba09f2ceb29d37", size = 12066 }, + { url = "https://files.pythonhosted.org/packages/bc/e6/fa0ffcda717ef64a5108eaa7b4f5ed28d56122c9a6d70ab8b72f9f715c80/markupsafe-3.0.3-cp313-cp313t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4e885a3d1efa2eadc93c894a21770e4bc67899e3543680313b09f139e149ab19", size = 25639 }, + { url = "https://files.pythonhosted.org/packages/96/ec/2102e881fe9d25fc16cb4b25d5f5cde50970967ffa5dddafdb771237062d/markupsafe-3.0.3-cp313-cp313t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8709b08f4a89aa7586de0aadc8da56180242ee0ada3999749b183aa23df95025", size = 23569 }, + { url = "https://files.pythonhosted.org/packages/4b/30/6f2fce1f1f205fc9323255b216ca8a235b15860c34b6798f810f05828e32/markupsafe-3.0.3-cp313-cp313t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:b8512a91625c9b3da6f127803b166b629725e68af71f8184ae7e7d54686a56d6", size = 23284 }, + { url = "https://files.pythonhosted.org/packages/58/47/4a0ccea4ab9f5dcb6f79c0236d954acb382202721e704223a8aafa38b5c8/markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:9b79b7a16f7fedff2495d684f2b59b0457c3b493778c9eed31111be64d58279f", size = 24801 }, + { url = "https://files.pythonhosted.org/packages/6a/70/3780e9b72180b6fecb83a4814d84c3bf4b4ae4bf0b19c27196104149734c/markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_riscv64.whl", hash = "sha256:12c63dfb4a98206f045aa9563db46507995f7ef6d83b2f68eda65c307c6829eb", size = 22769 }, + { url = "https://files.pythonhosted.org/packages/98/c5/c03c7f4125180fc215220c035beac6b9cb684bc7a067c84fc69414d315f5/markupsafe-3.0.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:8f71bc33915be5186016f675cd83a1e08523649b0e33efdb898db577ef5bb009", size = 23642 }, + { url = "https://files.pythonhosted.org/packages/80/d6/2d1b89f6ca4bff1036499b1e29a1d02d282259f3681540e16563f27ebc23/markupsafe-3.0.3-cp313-cp313t-win32.whl", hash = "sha256:69c0b73548bc525c8cb9a251cddf1931d1db4d2258e9599c28c07ef3580ef354", size = 14612 }, + { url = "https://files.pythonhosted.org/packages/2b/98/e48a4bfba0a0ffcf9925fe2d69240bfaa19c6f7507b8cd09c70684a53c1e/markupsafe-3.0.3-cp313-cp313t-win_amd64.whl", hash = "sha256:1b4b79e8ebf6b55351f0d91fe80f893b4743f104bff22e90697db1590e47a218", size = 15200 }, + { url = "https://files.pythonhosted.org/packages/0e/72/e3cc540f351f316e9ed0f092757459afbc595824ca724cbc5a5d4263713f/markupsafe-3.0.3-cp313-cp313t-win_arm64.whl", hash = "sha256:ad2cf8aa28b8c020ab2fc8287b0f823d0a7d8630784c31e9ee5edea20f406287", size = 13973 }, + { url = "https://files.pythonhosted.org/packages/33/8a/8e42d4838cd89b7dde187011e97fe6c3af66d8c044997d2183fbd6d31352/markupsafe-3.0.3-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:eaa9599de571d72e2daf60164784109f19978b327a3910d3e9de8c97b5b70cfe", size = 11619 }, + { url = "https://files.pythonhosted.org/packages/b5/64/7660f8a4a8e53c924d0fa05dc3a55c9cee10bbd82b11c5afb27d44b096ce/markupsafe-3.0.3-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:c47a551199eb8eb2121d4f0f15ae0f923d31350ab9280078d1e5f12b249e0026", size = 12029 }, + { url = "https://files.pythonhosted.org/packages/da/ef/e648bfd021127bef5fa12e1720ffed0c6cbb8310c8d9bea7266337ff06de/markupsafe-3.0.3-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f34c41761022dd093b4b6896d4810782ffbabe30f2d443ff5f083e0cbbb8c737", size = 24408 }, + { url = "https://files.pythonhosted.org/packages/41/3c/a36c2450754618e62008bf7435ccb0f88053e07592e6028a34776213d877/markupsafe-3.0.3-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:457a69a9577064c05a97c41f4e65148652db078a3a509039e64d3467b9e7ef97", size = 23005 }, + { url = "https://files.pythonhosted.org/packages/bc/20/b7fdf89a8456b099837cd1dc21974632a02a999ec9bf7ca3e490aacd98e7/markupsafe-3.0.3-cp314-cp314-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:e8afc3f2ccfa24215f8cb28dcf43f0113ac3c37c2f0f0806d8c70e4228c5cf4d", size = 22048 }, + { url = "https://files.pythonhosted.org/packages/9a/a7/591f592afdc734f47db08a75793a55d7fbcc6902a723ae4cfbab61010cc5/markupsafe-3.0.3-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ec15a59cf5af7be74194f7ab02d0f59a62bdcf1a537677ce67a2537c9b87fcda", size = 23821 }, + { url = "https://files.pythonhosted.org/packages/7d/33/45b24e4f44195b26521bc6f1a82197118f74df348556594bd2262bda1038/markupsafe-3.0.3-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:0eb9ff8191e8498cca014656ae6b8d61f39da5f95b488805da4bb029cccbfbaf", size = 21606 }, + { url = "https://files.pythonhosted.org/packages/ff/0e/53dfaca23a69fbfbbf17a4b64072090e70717344c52eaaaa9c5ddff1e5f0/markupsafe-3.0.3-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:2713baf880df847f2bece4230d4d094280f4e67b1e813eec43b4c0e144a34ffe", size = 23043 }, + { url = "https://files.pythonhosted.org/packages/46/11/f333a06fc16236d5238bfe74daccbca41459dcd8d1fa952e8fbd5dccfb70/markupsafe-3.0.3-cp314-cp314-win32.whl", hash = "sha256:729586769a26dbceff69f7a7dbbf59ab6572b99d94576a5592625d5b411576b9", size = 14747 }, + { url = "https://files.pythonhosted.org/packages/28/52/182836104b33b444e400b14f797212f720cbc9ed6ba34c800639d154e821/markupsafe-3.0.3-cp314-cp314-win_amd64.whl", hash = "sha256:bdc919ead48f234740ad807933cdf545180bfbe9342c2bb451556db2ed958581", size = 15341 }, + { url = "https://files.pythonhosted.org/packages/6f/18/acf23e91bd94fd7b3031558b1f013adfa21a8e407a3fdb32745538730382/markupsafe-3.0.3-cp314-cp314-win_arm64.whl", hash = "sha256:5a7d5dc5140555cf21a6fefbdbf8723f06fcd2f63ef108f2854de715e4422cb4", size = 14073 }, + { url = "https://files.pythonhosted.org/packages/3c/f0/57689aa4076e1b43b15fdfa646b04653969d50cf30c32a102762be2485da/markupsafe-3.0.3-cp314-cp314t-macosx_10_13_x86_64.whl", hash = "sha256:1353ef0c1b138e1907ae78e2f6c63ff67501122006b0f9abad68fda5f4ffc6ab", size = 11661 }, + { url = "https://files.pythonhosted.org/packages/89/c3/2e67a7ca217c6912985ec766c6393b636fb0c2344443ff9d91404dc4c79f/markupsafe-3.0.3-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:1085e7fbddd3be5f89cc898938f42c0b3c711fdcb37d75221de2666af647c175", size = 12069 }, + { url = "https://files.pythonhosted.org/packages/f0/00/be561dce4e6ca66b15276e184ce4b8aec61fe83662cce2f7d72bd3249d28/markupsafe-3.0.3-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1b52b4fb9df4eb9ae465f8d0c228a00624de2334f216f178a995ccdcf82c4634", size = 25670 }, + { url = "https://files.pythonhosted.org/packages/50/09/c419f6f5a92e5fadde27efd190eca90f05e1261b10dbd8cbcb39cd8ea1dc/markupsafe-3.0.3-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:fed51ac40f757d41b7c48425901843666a6677e3e8eb0abcff09e4ba6e664f50", size = 23598 }, + { url = "https://files.pythonhosted.org/packages/22/44/a0681611106e0b2921b3033fc19bc53323e0b50bc70cffdd19f7d679bb66/markupsafe-3.0.3-cp314-cp314t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:f190daf01f13c72eac4efd5c430a8de82489d9cff23c364c3ea822545032993e", size = 23261 }, + { url = "https://files.pythonhosted.org/packages/5f/57/1b0b3f100259dc9fffe780cfb60d4be71375510e435efec3d116b6436d43/markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:e56b7d45a839a697b5eb268c82a71bd8c7f6c94d6fd50c3d577fa39a9f1409f5", size = 24835 }, + { url = "https://files.pythonhosted.org/packages/26/6a/4bf6d0c97c4920f1597cc14dd720705eca0bf7c787aebc6bb4d1bead5388/markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_riscv64.whl", hash = "sha256:f3e98bb3798ead92273dc0e5fd0f31ade220f59a266ffd8a4f6065e0a3ce0523", size = 22733 }, + { url = "https://files.pythonhosted.org/packages/14/c7/ca723101509b518797fedc2fdf79ba57f886b4aca8a7d31857ba3ee8281f/markupsafe-3.0.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:5678211cb9333a6468fb8d8be0305520aa073f50d17f089b5b4b477ea6e67fdc", size = 23672 }, + { url = "https://files.pythonhosted.org/packages/fb/df/5bd7a48c256faecd1d36edc13133e51397e41b73bb77e1a69deab746ebac/markupsafe-3.0.3-cp314-cp314t-win32.whl", hash = "sha256:915c04ba3851909ce68ccc2b8e2cd691618c4dc4c4232fb7982bca3f41fd8c3d", size = 14819 }, + { url = "https://files.pythonhosted.org/packages/1a/8a/0402ba61a2f16038b48b39bccca271134be00c5c9f0f623208399333c448/markupsafe-3.0.3-cp314-cp314t-win_amd64.whl", hash = "sha256:4faffd047e07c38848ce017e8725090413cd80cbc23d86e55c587bf979e579c9", size = 15426 }, + { url = "https://files.pythonhosted.org/packages/70/bc/6f1c2f612465f5fa89b95bead1f44dcb607670fd42891d8fdcd5d039f4f4/markupsafe-3.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:32001d6a8fc98c8cb5c947787c5d08b0a50663d139f1305bac5885d98d9b40fa", size = 14146 }, +] + [[package]] name = "mcp" version = "1.9.2"