1+ from pathlib import Path
12from typing import Any
23import logging
34
1011
1112logger = logging .getLogger (__name__ )
1213
14+ REMEDIATION_PROMPT_PATH = Path ("remediation_prompt.md" )
1315
14- class ListRepoOccurrencesParamsForRemediate ( ListRepoOccurrencesFilters ):
15- """Filter parameters for remediation - repository_name and source_id are provided separately."""
16+
17+ class ListRepoOccurrencesParamsForRemediate ( ListRepoOccurrencesParams ):
1618 # Overriding the tags one to add a default filter : for remediation, we're more interested in occurrences that
1719 # are in the branch the developer is currently on. And occurrences on DEFAULT_BRANCH are a heuristic for that
1820 tags : list [str ] = Field (
@@ -23,26 +25,21 @@ class ListRepoOccurrencesParamsForRemediate(ListRepoOccurrencesFilters):
2325
2426class RemediateSecretIncidentsParams (BaseModel ):
2527 """Parameters for remediating secret incidents."""
26- repository_name : str | None = Field (
27- default = None ,
28- description = "The full repository name. For example, for https://github.com/GitGuardian/ggmcp.git the full name is GitGuardian/ggmcp. Pass the current repository name if not provided." ,
29- )
30- source_id : str | int | None = Field (
31- default = None ,
32- description = "The source ID of the repository. Pass the current repository source ID if not provided." ,
33- )
34- get_all : bool = Field (default = True , description = "Whether to get all incidents or just the first page" )
3528 mine : bool = Field (
3629 default = False ,
3730 description = "If True, fetch only incidents assigned to the current user. Set to False to get all incidents." ,
3831 )
3932
4033 # Behaviour
41- include_git_commands : bool = Field (
34+ git_commands : bool = Field (
4235 default = True , description = "Whether to include git commands to fix incidents in git history"
4336 )
4437 create_env_example : bool = Field (
45- default = True , description = "Whether to create a .env.example file with placeholders for detected secrets"
38+ default = True ,
39+ description = "Whether to suggest creating a .env.example file with placeholders for detected secrets"
40+ )
41+ add_to_env : bool = Field (
42+ default = True , description = "Whether to suggest adding secrets to .env file"
4643 )
4744
4845 # sub tools
@@ -51,27 +48,15 @@ class RemediateSecretIncidentsParams(BaseModel):
5148 description = "Parameters for listing repository occurrences" ,
5249 )
5350
54- @model_validator (mode = "after" )
55- def validate_source_or_repository (self ) -> "RemediateSecretIncidentsParams" :
56- """Validate that either source_id or repository_name is provided."""
57- if not self .source_id and not self .repository_name :
58- raise ValueError ("Either 'source_id' or 'repository_name' must be provided" )
59- return self
60-
6151
6252class RemediateSecretIncidentsResult (BaseModel ):
6353 """Result from remediating secret incidents."""
64- repository_info : dict [str , Any ] = Field (description = "Information about the repository" )
65- summary : dict [str , Any ] | None = Field (default = None , description = "Summary of occurrences, files, and secret types" )
66- remediation_steps : list [dict [str , Any ]] = Field (default_factory = list , description = "Steps for remediating each file" )
67- message : str | None = Field (default = None , description = "Message when no occurrences found" )
68- env_example_content : str | None = Field (default = None , description = "Suggested .env.example content" )
69- env_example_instructions : list [str ] | None = Field (default = None , description = "Instructions for .env.example" )
70- git_commands : dict [str , Any ] | None = Field (default = None , description = "Git commands to fix history" )
71- applied_filters : dict [str , Any ] = Field (default_factory = dict ,
72- description = "Filters applied when querying occurrences" )
73- suggestion : str = Field (default = "" , description = "Suggestions for interpreting results" )
74- sub_tools_results : dict [str , Any ] = Field (default_factory = dict , description = "Results from sub tools" )
54+ remediation_instructions : str = Field (default = "" , description = "Instructions for remediating occurrences" )
55+ occurrences_count : int = Field (default = 0 , description = "Number of occurrences found" )
56+ suggested_occurrences_for_remediation_count : int = Field (default = 0 ,
57+ description = "Number of occurrences suggested for remediation" )
58+
59+ sub_tools_results : dict [str , BaseModel ] = Field (default_factory = dict , description = "Results from sub tools" )
7560
7661
7762class RemediateSecretIncidentsError (BaseModel ):
@@ -83,54 +68,20 @@ class RemediateSecretIncidentsError(BaseModel):
8368async def remediate_secret_incidents (
8469 params : RemediateSecretIncidentsParams ) -> RemediateSecretIncidentsResult | RemediateSecretIncidentsError :
8570 """
86- Find and remediate secret incidents in the current repository using EXACT match locations.
87-
88- By default, this tool only shows incidents assigned to the current user. Pass mine=False to get all incidents related to this repo.
89-
90- This tool now uses the occurrences API to get precise file locations, line numbers, and character indices,
91- eliminating the need to search for secrets in files. The workflow is:
71+ Find and remediate secret incidents in the current repository.
9272
93- 1. Fetch secret occurrences with exact match locations (file path, line_start, line_end, index_start, index_end)
94- 2. Group occurrences by file for efficient remediation
95- 3. Sort matches from bottom to top to prevent line number shifts during editing
96- 4. Provide detailed remediation steps with exact locations for each secret
97- 5. IMPORTANT: Make the changes to the codebase using the provided indices:
98- - Use index_start and index_end to locate the exact secret in the file
99- - Replace hardcoded secrets with environment variable references
100- - Ensure all occurrences are removed from the codebase
101- - IMPORTANT: If the repository uses a package manager (npm, cargo, uv, etc.), use it to install required packages
102- 6. Optional: Generate git commands to rewrite history and remove secrets from git
103-
104- The tool provides:
105- - Exact file paths and line numbers for each secret
106- - Character-level indices (index_start, index_end) to locate secrets precisely
107- - Context lines (pre/post) to understand the surrounding code
108- - Sorted matches to enable safe sequential removal (bottom-to-top)
73+ This tool uses the occurrences API to find secrets and provides simple remediation suggestions.
10974
11075 Args:
11176 params: RemediateSecretIncidentsParams model containing remediation configuration
11277
11378 Returns:
114- RemediateSecretIncidentsResult: Pydantic model containing:
115- - repository_info: Information about the repository
116- - summary: Overview of occurrences, files affected, and secret types
117- - remediation_steps: Detailed steps with exact locations for each file
118- - message: Message when no occurrences are found
119- - env_example_content: Suggested .env.example content (if requested)
120- - env_example_instructions: Instructions for .env.example (if created)
121- - git_commands: Git commands to fix history (if requested)
122- - applied_filters: Filters that were applied when querying occurrences
123- - suggestion: Suggestions for interpreting or modifying results
124-
125- RemediateSecretIncidentsError: Pydantic model with error message if the operation fails
79+ RemediateSecretIncidentsResult or RemediateSecretIncidentsError
12680 """
127- logger .debug (f"Using remediate_secret_incidents with occurrences API for: { params .repository_name } " )
81+ logger .debug (f"Using remediate_secret_incidents for: { params .repository_name } " )
12882
12983 try :
130- # Get detailed occurrences with exact match locations
131- # Build ListRepoOccurrencesParams by combining repository info with filters
132- from .list_repo_occurrences import ListRepoOccurrencesParams
133-
84+ # Build parameters for list_repo_occurrences
13485 occurrences_params = ListRepoOccurrencesParams (
13586 repository_name = params .repository_name ,
13687 source_id = params .source_id ,
@@ -147,212 +98,60 @@ async def remediate_secret_incidents(
14798 cursor = None ,
14899 get_all = params .get_all ,
149100 )
150- occurrences_result = await list_repo_occurrences (occurrences_params )
151101
152- # Check if list_repo_occurrences returned an error
102+ occurrences_result = await list_repo_occurrences ( occurrences_params )
153103 if hasattr (occurrences_result , "error" ) and occurrences_result .error :
154- return RemediateSecretIncidentsError (error = occurrences_result .error ,
155- sub_tools_results = {"list_repo_occurrences" : occurrences_result })
156- occurrences = occurrences_result .occurrences
104+ return RemediateSecretIncidentsError (
105+ error = occurrences_result .error ,
106+ sub_tools_results = {"list_repo_occurrences" : occurrences_result }
107+ )
157108
158- # Filter by assignee if mine=True
109+ occurrences = occurrences_result . occurrences
159110 if params .mine :
160- # Get current user info to filter by assignee
161- client = get_client ()
162- try :
163- token_info = await client .get_current_token_info ()
164- current_user_id = token_info .get ("user_id" ) if token_info else None
165-
166- if current_user_id :
167- # Filter occurrences assigned to current user
168- occurrences = [
169- occ for occ in occurrences
170- if occ .get ("incident" , {}).get ("assignee_id" ) == current_user_id
171- ]
172- logger .debug (f"Filtered to { len (occurrences )} occurrences assigned to user { current_user_id } " )
173- except Exception as e :
174- logger .warning (f"Could not filter by assignee: { str (e )} " )
111+ occurrences = filter_mine (occurrences )
112+ occurrences_count = len (occurrences )
113+ occurrences_result .occurrences = await trim_occurrences_for_remediation (occurrences )
175114
176115 if not occurrences :
177- return RemediateSecretIncidentsResult (
178- repository_info = { "name" : params . repository_name },
179- message = "No secret occurrences found for this repository that match the criteria." ,
180- remediation_steps = [],
181- applied_filters = occurrences_result . applied_filters or {} ,
182- suggestion = occurrences_result . suggestion or "" ,
183- sub_tools_results = { "list_repo_occurrences" : occurrences_result }
116+ remediation_instructions = ( "No secret occurrences found for this repository that match the criteria. "
117+ "Adjust 'list_repo_occurrences_params' to modify filtering." )
118+ else :
119+ remediation_instructions = REMEDIATION_PROMPT_PATH . format (
120+ add_to_env = params . add_to_env ,
121+ create_env_example = params . create_env_example ,
122+ git_commands = params . git_commands ,
184123 )
185-
186- # Process occurrences for remediation with exact location data
187- logger .debug (f"Processing { len (occurrences )} occurrences with exact locations for remediation" )
188- result = await _process_occurrences_for_remediation (
189- occurrences = occurrences ,
190- repository_name = params .repository_name ,
191- include_git_commands = params .include_git_commands ,
192- create_env_example = params .create_env_example ,
193- )
194- logger .debug (
195- f"Remediation processing complete, returning result with { len (result .remediation_steps )} steps"
124+ return RemediateSecretIncidentsResult (
125+ remediation_instructions = remediation_instructions ,
126+ sub_tools_results = {"list_repo_occurrences" : occurrences_result },
127+ occurrences_count = occurrences_count ,
128+ suggested_occurrences_for_remediation_count = len (occurrences ),
196129 )
197130
198- # Add sub_tools_results and applied_filters/suggestion from occurrences_result
199- result_dict = result .model_dump ()
200- result_dict ["sub_tools_results" ] = {
201- "list_repo_occurrences" : {
202- "total_occurrences" : result .summary .get ("total_occurrences" ,
203- len (occurrences )) if result .summary else len (occurrences ),
204- "affected_files" : result .summary .get ("affected_files" , 0 ) if result .summary else 0 ,
205- }
206- }
207- result_dict ["applied_filters" ] = occurrences_result .applied_filters or {}
208- result_dict ["suggestion" ] = occurrences_result .suggestion or ""
209-
210- return RemediateSecretIncidentsResult (** result_dict )
211131
212132 except Exception as e :
213133 logger .error (f"Error remediating incidents: { str (e )} " )
214134 return RemediateSecretIncidentsError (error = f"Failed to remediate incidents: { str (e )} " )
215135
216136
217- async def _process_occurrences_for_remediation (
218- occurrences : list [dict [str , Any ]],
219- repository_name : str ,
220- include_git_commands : bool = True ,
221- create_env_example : bool = True ,
222- ) -> RemediateSecretIncidentsResult :
223- """
224- Process occurrences for remediation using exact match locations.
225-
226- This function leverages the detailed location data from occurrences (file paths, line numbers,
227- character indices) to provide precise remediation instructions without needing to search files.
228-
229- Args:
230- occurrences: List of occurrences with exact match locations
231- repository_name: Repository name
232- include_git_commands: Whether to include git commands
233- create_env_example: Whether to create .env.example
234-
235- Returns:
236- Remediation steps for each occurrence with exact file locations
237- """
238- # Group occurrences by file for efficient remediation
239- occurrences_by_file = {}
240- secret_types = set ()
241- affected_files = set ()
242-
243- for occurrence in occurrences :
244- # Extract location data
245- matches = occurrence .get ("matches" , [])
246- incident_data = occurrence .get ("incident" , {})
247- secret_type = incident_data .get ("detector" , {}).get ("name" , "Unknown" )
248- secret_types .add (secret_type )
249-
250- for match in matches :
251- file_path = match .get ("match" , {}).get ("filename" )
252- if not file_path :
253- continue
254-
255- affected_files .add (file_path )
256-
257- if file_path not in occurrences_by_file :
258- occurrences_by_file [file_path ] = []
259-
260- # Store detailed match information
261- match_info = {
262- "occurrence_id" : occurrence .get ("id" ),
263- "incident_id" : incident_data .get ("id" ),
264- "secret_type" : secret_type ,
265- "line_start" : match .get ("match" , {}).get ("line_start" ),
266- "line_end" : match .get ("match" , {}).get ("line_end" ),
267- "index_start" : match .get ("match" , {}).get ("index_start" ),
268- "index_end" : match .get ("match" , {}).get ("index_end" ),
269- "match_type" : match .get ("type" ),
270- "pre_line_start" : match .get ("pre_line_start" ),
271- "pre_line_end" : match .get ("pre_line_end" ),
272- "post_line_start" : match .get ("post_line_start" ),
273- "post_line_end" : match .get ("post_line_end" ),
274- }
275- occurrences_by_file [file_path ].append (match_info )
276-
277- # Build remediation steps with exact locations
278- remediation_steps = []
279-
280- for file_path , matches in occurrences_by_file .items ():
281- # Sort matches by line number (descending) so we can remove from bottom to top
282- # This prevents line number shifts when making multiple edits
283- sorted_matches = sorted (matches , key = lambda m : m ["line_start" ] or 0 , reverse = True )
284-
285- step = {
286- "file" : file_path ,
287- "action" : "remove_secrets" ,
288- "matches" : sorted_matches ,
289- "instructions" : [
290- f"File: { file_path } " ,
291- f"Found { len (sorted_matches )} secret(s) in this file" ,
292- "Matches are sorted from bottom to top for safe sequential removal" ,
293- "" ,
294- "For each match:" ,
295- "1. Read the file content" ,
296- f"2. Navigate to line { sorted_matches [0 ].get ('line_start' )} (and other match locations)" ,
297- "3. Use the exact index_start and index_end to locate the secret" ,
298- "4. Replace the hardcoded secret with an environment variable reference" ,
299- "5. Ensure the secret is added to .env (gitignored) and .env.example (committed)" ,
300- ],
301- "recommendations" : [
302- "Replace secrets with environment variables (e.g., process.env.API_KEY, os.getenv('API_KEY'))" ,
303- "Add the real secret to .env file (ensure .env is in .gitignore)" ,
304- "Add a placeholder to .env.example for documentation" ,
305- "Use a secrets management solution for production (e.g., AWS Secrets Manager, HashiCorp Vault)" ,
306- ],
307- }
308- remediation_steps .append (step )
309-
310- # Generate .env.example content if requested
311- env_example_content = None
312- if create_env_example :
313- env_vars = []
314- for secret_type in secret_types :
315- # Generate sensible environment variable names from secret types
316- env_var_name = secret_type .upper ().replace (" " , "_" ).replace ("-" , "_" )
317- env_vars .append (f"{ env_var_name } =your_{ secret_type .lower ().replace (' ' , '_' )} _here" )
318-
319- if env_vars :
320- env_example_content = "\n " .join (env_vars )
321-
322- # Generate git commands if requested
323- git_commands = None
324- if include_git_commands :
325- git_commands = {
326- "warning" : "⚠️ These commands will rewrite git history. Only use if you understand the implications." ,
327- "commands" : [
328- "# First, ensure all secrets are removed from working directory" ,
329- "git add ." ,
330- 'git commit -m "Remove hardcoded secrets"' ,
331- ],
332- }
333-
334- result = {
335- "repository_info" : {"name" : repository_name },
336- "summary" : {
337- "total_occurrences" : len (occurrences ),
338- "affected_files" : len (affected_files ),
339- "secret_types" : list (secret_types ),
340- "files" : list (affected_files ),
341- },
342- "remediation_steps" : remediation_steps ,
343- }
344-
345- if env_example_content :
346- result ["env_example_content" ] = env_example_content
347- result ["env_example_instructions" ] = [
348- "Create or update .env.example in your repository root:" ,
349- f"```\n { env_example_content } \n ```" ,
350- "" ,
351- "Ensure .env is in .gitignore:" ,
352- "```\n .env\n ```" ,
353- ]
137+ async def filter_mine (occurrences ):
138+ """Filter occurrences create by the current user"""
139+ client = get_client ()
140+ try :
141+ token_info = await client .get_current_token_info ()
142+ current_user_id = token_info .get ("user_id" ) if token_info else None
143+
144+ if current_user_id :
145+ occurrences = [
146+ occ for occ in occurrences
147+ if occ .get ("incident" , {}).get ("assignee_id" ) == current_user_id
148+ ]
149+ logger .debug (f"Filtered to { len (occurrences )} occurrences for user { current_user_id } " )
150+ except Exception as e :
151+ logger .warning (f"Could not filter by assignee: { str (e )} " )
152+ return occurrences
354153
355- if git_commands :
356- result ["git_commands" ] = git_commands
357154
358- return RemediateSecretIncidentsResult (** result )
155+ async def trim_occurrences_for_remediation (occurrences ):
156+ """Limit the number of occurrences to be remediated by the agent"""
157+ return occurrences [:10 ]
0 commit comments