Skip to content

Commit da821fc

Browse files
committed
chore(typing): Add Pydantic models for all tools
1 parent 4169622 commit da821fc

11 files changed

+281
-164
lines changed
Lines changed: 97 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,41 @@
11
from typing import Any
22
import logging
33
import subprocess
4+
from pydantic import BaseModel, Field
45
from gg_api_core.utils import get_client, parse_repo_url
56

67
logger = logging.getLogger(__name__)
78

89

10+
class SourceCandidate(BaseModel):
11+
"""A candidate source that might match the repository."""
12+
id: str = Field(description="Source ID")
13+
url: str | None = Field(default=None, description="Repository URL")
14+
name: str | None = Field(default=None, description="Repository name")
15+
monitored: bool | None = Field(default=None, description="Whether source is monitored")
16+
deleted_at: str | None = Field(default=None, description="Deletion timestamp if deleted")
917

10-
async def find_current_source_id() -> dict[str, Any]:
18+
19+
class FindCurrentSourceIdResult(BaseModel):
20+
"""Successful result from finding source ID."""
21+
repository_name: str = Field(description="Detected repository name")
22+
source_id: str | None = Field(default=None, description="GitGuardian source ID (if exact match)")
23+
source: dict[str, Any] | None = Field(default=None, description="Full source information (if exact match)")
24+
message: str | None = Field(default=None, description="Status or informational message")
25+
suggestion: str | None = Field(default=None, description="Suggestions for next steps")
26+
candidates: list[SourceCandidate] | None = Field(default=None, description="List of candidate sources (if no exact match)")
27+
28+
29+
class FindCurrentSourceIdError(BaseModel):
30+
"""Error result from finding source ID."""
31+
error: str = Field(description="Error message")
32+
repository_name: str | None = Field(default=None, description="Repository name if detected")
33+
details: str | None = Field(default=None, description="Additional error details")
34+
message: str | None = Field(default=None, description="User-friendly message")
35+
suggestion: str | None = Field(default=None, description="Suggestions for resolving the error")
36+
37+
38+
async def find_current_source_id() -> FindCurrentSourceIdResult | FindCurrentSourceIdError:
1139
"""
1240
Find the GitGuardian source_id for the current repository.
1341
@@ -19,12 +47,20 @@ async def find_current_source_id() -> dict[str, Any]:
1947
5. If no exact match, returns all search results for the model to choose from
2048
2149
Returns:
22-
A dictionary containing:
23-
- repository_name: The detected repository name
24-
- source_id: The GitGuardian source ID (if exact match found)
25-
- source: Full source information from GitGuardian (if exact match found)
26-
- candidates: List of candidate sources (if no exact match but potential matches found)
27-
- error: Error message if something went wrong
50+
FindCurrentSourceIdResult: Pydantic model containing:
51+
- repository_name: The detected repository name
52+
- source_id: The GitGuardian source ID (if exact match found)
53+
- source: Full source information from GitGuardian (if exact match found)
54+
- message: Status or informational message
55+
- suggestion: Suggestions for next steps
56+
- candidates: List of SourceCandidate objects (if no exact match but potential matches found)
57+
58+
FindCurrentSourceIdError: Pydantic model containing:
59+
- error: Error message
60+
- repository_name: Repository name if detected
61+
- details: Additional error details
62+
- message: User-friendly message
63+
- suggestion: Suggestions for resolving the error
2864
"""
2965
client = get_client()
3066
logger.debug("Finding source_id for current repository")
@@ -42,21 +78,21 @@ async def find_current_source_id() -> dict[str, Any]:
4278
remote_url = result.stdout.strip()
4379
logger.debug(f"Found remote URL: {remote_url}")
4480
except subprocess.CalledProcessError as e:
45-
return {
46-
"error": "Not a git repository or no remote 'origin' configured",
47-
"details": str(e),
48-
}
81+
return FindCurrentSourceIdError(
82+
error="Not a git repository or no remote 'origin' configured",
83+
details=str(e),
84+
)
4985
except subprocess.TimeoutExpired:
50-
return {"error": "Git command timed out"}
86+
return FindCurrentSourceIdError(error="Git command timed out")
5187

5288
# Parse repository name from remote URL
5389
repository_name = parse_repo_url(remote_url)
5490

5591
if not repository_name:
56-
return {
57-
"error": f"Could not parse repository URL: {remote_url}",
58-
"details": "The URL format is not recognized. Supported platforms: GitHub, GitLab (Cloud & Self-hosted), Bitbucket (Cloud & Data Center), Azure DevOps",
59-
}
92+
return FindCurrentSourceIdError(
93+
error=f"Could not parse repository URL: {remote_url}",
94+
details="The URL format is not recognized. Supported platforms: GitHub, GitLab (Cloud & Self-hosted), Bitbucket (Cloud & Data Center), Azure DevOps",
95+
)
6096

6197
logger.info(f"Detected repository name: {repository_name}")
6298

@@ -67,31 +103,31 @@ async def find_current_source_id() -> dict[str, Any]:
67103
if isinstance(result, dict):
68104
source_id = result.get("id")
69105
logger.info(f"Found exact match with source_id: {source_id}")
70-
return {
71-
"repository_name": repository_name,
72-
"source_id": source_id,
73-
"source": result,
74-
"message": f"Successfully found exact match for GitGuardian source: {repository_name}",
75-
}
106+
return FindCurrentSourceIdResult(
107+
repository_name=repository_name,
108+
source_id=source_id,
109+
source=result,
110+
message=f"Successfully found exact match for GitGuardian source: {repository_name}",
111+
)
76112

77113
# Handle multiple candidates (list result)
78114
elif isinstance(result, list) and len(result) > 0:
79115
logger.info(f"Found {len(result)} candidate sources for repository: {repository_name}")
80-
return {
81-
"repository_name": repository_name,
82-
"message": f"No exact match found for '{repository_name}', but found {len(result)} potential matches.",
83-
"suggestion": "Review the candidates below and determine which source best matches the current repository based on the name and URL.",
84-
"candidates": [
85-
{
86-
"id": source.get("id"),
87-
"url": source.get("url"),
88-
"name": source.get("full_name") or source.get("name"),
89-
"monitored": source.get("monitored"),
90-
"deleted_at": source.get("deleted_at"),
91-
}
116+
return FindCurrentSourceIdResult(
117+
repository_name=repository_name,
118+
message=f"No exact match found for '{repository_name}', but found {len(result)} potential matches.",
119+
suggestion="Review the candidates below and determine which source best matches the current repository based on the name and URL.",
120+
candidates=[
121+
SourceCandidate(
122+
id=source.get("id"),
123+
url=source.get("url"),
124+
name=source.get("full_name") or source.get("name"),
125+
monitored=source.get("monitored"),
126+
deleted_at=source.get("deleted_at"),
127+
)
92128
for source in result
93129
],
94-
}
130+
)
95131

96132
# No matches found at all
97133
else:
@@ -105,39 +141,39 @@ async def find_current_source_id() -> dict[str, Any]:
105141
if isinstance(fallback_result, dict):
106142
source_id = fallback_result.get("id")
107143
logger.info(f"Found match using repo name only, source_id: {source_id}")
108-
return {
109-
"repository_name": repository_name,
110-
"source_id": source_id,
111-
"source": fallback_result,
112-
"message": f"Found match using repository name '{repo_only}' (without organization prefix)",
113-
}
144+
return FindCurrentSourceIdResult(
145+
repository_name=repository_name,
146+
source_id=source_id,
147+
source=fallback_result,
148+
message=f"Found match using repository name '{repo_only}' (without organization prefix)",
149+
)
114150
elif isinstance(fallback_result, list) and len(fallback_result) > 0:
115151
logger.info(f"Found {len(fallback_result)} candidates using repo name only")
116-
return {
117-
"repository_name": repository_name,
118-
"message": f"No exact match for '{repository_name}', but found {len(fallback_result)} potential matches using repo name '{repo_only}'.",
119-
"suggestion": "Review the candidates below and determine which source best matches the current repository.",
120-
"candidates": [
121-
{
122-
"id": source.get("id"),
123-
"url": source.get("url"),
124-
"name": source.get("full_name") or source.get("name"),
125-
"monitored": source.get("monitored"),
126-
"deleted_at": source.get("deleted_at"),
127-
}
152+
return FindCurrentSourceIdResult(
153+
repository_name=repository_name,
154+
message=f"No exact match for '{repository_name}', but found {len(fallback_result)} potential matches using repo name '{repo_only}'.",
155+
suggestion="Review the candidates below and determine which source best matches the current repository.",
156+
candidates=[
157+
SourceCandidate(
158+
id=source.get("id"),
159+
url=source.get("url"),
160+
name=source.get("full_name") or source.get("name"),
161+
monitored=source.get("monitored"),
162+
deleted_at=source.get("deleted_at"),
163+
)
128164
for source in fallback_result
129165
],
130-
}
166+
)
131167

132168
# Absolutely no matches found
133169
logger.warning(f"No sources found for repository: {repository_name}")
134-
return {
135-
"repository_name": repository_name,
136-
"error": f"Repository '{repository_name}' not found in GitGuardian",
137-
"message": "The repository may not be connected to GitGuardian, or you may not have access to it.",
138-
"suggestion": "Check that the repository is properly connected to GitGuardian and that your account has access to it.",
139-
}
170+
return FindCurrentSourceIdError(
171+
repository_name=repository_name,
172+
error=f"Repository '{repository_name}' not found in GitGuardian",
173+
message="The repository may not be connected to GitGuardian, or you may not have access to it.",
174+
suggestion="Check that the repository is properly connected to GitGuardian and that your account has access to it.",
175+
)
140176

141177
except Exception as e:
142178
logger.error(f"Error finding source_id: {str(e)}")
143-
return {"error": f"Failed to find source_id: {str(e)}"}
179+
return FindCurrentSourceIdError(error=f"Failed to find source_id: {str(e)}")

packages/gg_api_core/src/gg_api_core/tools/generate_honey_token.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,42 @@ class GenerateHoneytokenParams(BaseModel):
2121
)
2222

2323

24-
async def generate_honeytoken(params: GenerateHoneytokenParams) -> dict[str, Any]:
24+
class GenerateHoneytokenResult(BaseModel):
25+
"""Result from generating or retrieving a honeytoken."""
26+
model_config = {"extra": "allow"} # Allow additional fields from API
27+
28+
id: str = Field(description="Honeytoken ID")
29+
name: str | None = Field(default=None, description="Honeytoken name")
30+
description: str | None = Field(default=None, description="Honeytoken description")
31+
type: str | None = Field(default=None, description="Honeytoken type")
32+
status: str | None = Field(default=None, description="Honeytoken status")
33+
token: dict[str, Any] | None = Field(default=None, description="Token details if show_token=True")
34+
injection_recommendations: dict[str, Any] | None = Field(default=None, description="Injection recommendations")
35+
36+
37+
async def generate_honeytoken(params: GenerateHoneytokenParams) -> GenerateHoneytokenResult:
2538
"""
2639
Generate an AWS GitGuardian honeytoken and get injection recommendations.
2740
41+
If new_token is False, attempts to retrieve an existing active honeytoken created by the current user
42+
instead of generating a new one. If no existing token is found, a new one will be created.
43+
2844
Args:
2945
params: GenerateHoneytokenParams model containing honeytoken configuration
3046
3147
Returns:
32-
Honeytoken data and injection recommendations
48+
GenerateHoneytokenResult: Pydantic model containing:
49+
- id: Honeytoken ID
50+
- name: Honeytoken name
51+
- description: Honeytoken description
52+
- type: Honeytoken type
53+
- status: Honeytoken status
54+
- token: Token details (if show_token=True was used)
55+
- injection_recommendations: Instructions for injecting the honeytoken
56+
- Additional fields from the API response
57+
58+
Raises:
59+
ToolError: If the honeytoken generation or retrieval fails
3360
"""
3461
client = get_client()
3562
logger.debug(f"Processing honeytoken request with name: {params.name}, new_token: {params.new_token}")
@@ -67,7 +94,7 @@ async def generate_honeytoken(params: GenerateHoneytokenParams) -> dict[str, Any
6794
if honeytoken_id:
6895
detailed_token = await client.get_honeytoken(honeytoken_id, show_token=True)
6996
logger.debug(f"Retrieved existing honeytoken with ID: {honeytoken_id}")
70-
return detailed_token
97+
return GenerateHoneytokenResult(**detailed_token)
7198

7299
logger.debug("No suitable existing honeytokens found, creating a new one")
73100
else:
@@ -95,7 +122,7 @@ async def generate_honeytoken(params: GenerateHoneytokenParams) -> dict[str, Any
95122
"instructions": "Add the honeytoken to your codebase in configuration files, environment variables, or code comments to detect unauthorized access."
96123
}
97124

98-
return result
125+
return GenerateHoneytokenResult(**result)
99126
except Exception as e:
100127
logger.error(f"Error generating honeytoken: {str(e)}")
101128
raise ToolError(f"Failed to generate honeytoken: {str(e)}")

packages/gg_api_core/src/gg_api_core/tools/list_honey_tokens.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,26 @@ class ListHoneytokensParams(BaseModel):
2525
mine: bool = Field(default=False, description="If True, fetch honeytokens created by the current user")
2626

2727

28-
async def list_honeytokens(params: ListHoneytokensParams) -> list[dict[str, Any]]:
28+
class ListHoneytokensResult(BaseModel):
29+
"""Result from listing honeytokens."""
30+
honeytokens: list[dict[str, Any]] = Field(description="List of honeytoken objects")
31+
32+
33+
async def list_honeytokens(params: ListHoneytokensParams) -> ListHoneytokensResult:
2934
"""
3035
List honeytokens from the GitGuardian dashboard with filtering options.
3136
37+
If mine=True, filters honeytokens to show only those created by the current user.
38+
3239
Args:
3340
params: ListHoneytokensParams model containing all filtering options
3441
3542
Returns:
36-
List of honeytokens matching the specified criteria
43+
ListHoneytokensResult: Pydantic model containing:
44+
- honeytokens: List of honeytoken objects matching the specified criteria
45+
46+
Raises:
47+
ToolError: If the listing operation fails
3748
"""
3849
client = get_client()
3950
logger.debug("Listing honeytokens with filters")
@@ -79,7 +90,7 @@ async def list_honeytokens(params: ListHoneytokensParams) -> list[dict[str, Any]
7990
honeytokens = result
8091

8192
logger.debug(f"Found {len(honeytokens)} honeytokens")
82-
return honeytokens
93+
return ListHoneytokensResult(honeytokens=honeytokens)
8394
except Exception as e:
8495
logger.error(f"Error listing honeytokens: {str(e)}")
8596
raise ToolError(str(e))

packages/gg_api_core/src/gg_api_core/tools/list_repo_incidents.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,19 @@ async def list_repo_incidents(params: ListRepoIncidentsParams) -> ListRepoIncide
151151
By default, incidents tagged with TEST_FILE or FALSE_POSITIVE are excluded. Pass exclude_tags=[] to disable this filtering.
152152
153153
Args:
154-
params: ListRepoIncidentsParams model containing all filtering options
154+
params: ListRepoIncidentsParams model containing all filtering options.
155+
Either repository_name or source_id must be provided.
155156
156157
Returns:
157-
List of incidents and occurrences matching the specified criteria
158+
ListRepoIncidentsResult: Pydantic model containing:
159+
- source_id: Source ID of the repository
160+
- incidents: List of incident objects
161+
- total_count: Total number of incidents
162+
- next_cursor: Pagination cursor (if applicable)
163+
- applied_filters: Dictionary of filters that were applied
164+
- suggestion: Suggestions for interpreting or modifying results
165+
166+
ListRepoIncidentsError: Pydantic model with error message if the operation fails
158167
"""
159168
client = get_client()
160169

0 commit comments

Comments
 (0)