@@ -47,6 +47,11 @@ class AgentHarnessConfig(BaseModel):
4747 path_to_data_pushed_to_scenario : Optional [str ] = None
4848
4949
50+ class AgentHarnessOpts (BaseModel ):
51+ benchmark_exec_max_attempts : int = 3
52+ benchmark_exec_retry_interval : int = 5
53+
54+
5055class AgentHarness :
5156
5257 def __init__ (
@@ -62,6 +67,7 @@ def __init__(
6267 single_run = False ,
6368 interval = 5 ,
6469 benchmark_timeout = 300 ,
70+ opts : Optional [AgentHarnessOpts ] = AgentHarnessOpts (),
6571 ) -> None :
6672 self .agent_manifest = agent_manifest
6773 self .agent_directory = agent_directory
@@ -81,6 +87,7 @@ def __init__(
8187 )
8288 self .stop_event = asyncio .Event ()
8389 self .task_history = []
90+ self .opts = opts
8491
8592 async def run (self ):
8693
@@ -107,17 +114,12 @@ async def run(self):
107114 benchmark_id = benchmark_entry .benchmark_id
108115 logger .info (f"Take the benchmark '{ benchmark_entry .benchmark_id } '" )
109116 self .add_history (benchmark_id )
110- self .rest_client .put (
111- f"{ self .agent_manifest .manifest_endpoint } /benchmark-entries/{ benchmark_id } " ,
112- Status (phase = AgentPhaseEnum .Executing ).model_dump_json (),
113- )
114- is_completed = await self .run_benchmark (benchmark_id , benchmark_entry .agent_access_info .id )
115- if is_completed :
116- phase = AgentPhaseEnum .Finished
117- else :
118- phase = AgentPhaseEnum .TimeedOut
119- self .rest_client .put (
120- f"{ self .agent_manifest .manifest_endpoint } /benchmark-entries/{ benchmark_id } " , Status (phase = phase ).model_dump_json ()
117+ await run_with_retry (
118+ self .run_benchmark_with_status_update ,
119+ retries = self .opts .benchmark_exec_max_attempts ,
120+ delay = self .opts .benchmark_exec_retry_interval ,
121+ benchmark_id = benchmark_id ,
122+ benchmark_entry = benchmark_entry ,
121123 )
122124 if self .single_run :
123125 logger .info ("Task completed. Exiting due to run-once mode." )
@@ -127,6 +129,18 @@ async def run(self):
127129 await asyncio .sleep (self .interval )
128130 elapsed_time += self .interval
129131
132+ async def run_benchmark_with_status_update (self , benchmark_id , benchmark_entry : AgentBenchmarkEntry ):
133+ self .rest_client .put (
134+ f"{ self .agent_manifest .manifest_endpoint } /benchmark-entries/{ benchmark_id } " ,
135+ Status (phase = AgentPhaseEnum .Executing ).model_dump_json (),
136+ )
137+ is_completed = await self .run_benchmark (benchmark_id , benchmark_entry .agent_access_info .id )
138+ if is_completed :
139+ phase = AgentPhaseEnum .Finished
140+ else :
141+ phase = AgentPhaseEnum .TimeedOut
142+ self .rest_client .put (f"{ self .agent_manifest .manifest_endpoint } /benchmark-entries/{ benchmark_id } " , Status (phase = phase ).model_dump_json ())
143+
130144 async def run_benchmark (self , benchmark_id , agent_id ):
131145
132146 timeout = self .benchmark_timeout
@@ -166,13 +180,13 @@ async def run_benchmark(self, benchmark_id, agent_id):
166180 return False
167181
168182 async def run_agent (self , target_bundle : Bundle , benchmark_id : str , agent_id : str ):
169- response = self .rest_client .get (f"/benchmarks/{ benchmark_id } /agents/{ agent_id } " )
170- agent = Agent .model_validate (response .json ())
171- agent_info = AgentInfo (id = agent .metadata .id , name = agent .spec .name , directory = self .agent_directory )
172- ao = AgentOperator (agent_info = agent_info )
173- self .rest_client .assign (benchmark_id , agent_id , target_bundle .metadata .id )
174- self .rest_client .push_agent_status (benchmark_id , agent_id , AgentPhaseEnum .Executing )
175183 try :
184+ response = self .rest_client .get (f"/benchmarks/{ benchmark_id } /agents/{ agent_id } " )
185+ agent = Agent .model_validate (response .json ())
186+ agent_info = AgentInfo (id = agent .metadata .id , name = agent .spec .name , directory = self .agent_directory )
187+ ao = AgentOperator (agent_info = agent_info )
188+ self .rest_client .assign (benchmark_id , agent_id , target_bundle .metadata .id )
189+ self .rest_client .push_agent_status (benchmark_id , agent_id , AgentPhaseEnum .Executing )
176190 shared_workspace = Path ("/tmp" ) / "shared_workspace" / agent .metadata .id / target_bundle .spec .name
177191 shared_workspace .mkdir (parents = True , exist_ok = True )
178192 output_dir_per_bundle = Path ("/tmp" ) / "output" / agent .metadata .id / target_bundle .spec .name
@@ -191,7 +205,10 @@ async def run_agent(self, target_bundle: Bundle, benchmark_id: str, agent_id: st
191205 except Exception as e :
192206 err = traceback .format_exc ()
193207 logger .error (err )
194- self .rest_client .push_agent_status (benchmark_id , agent_id , AgentPhaseEnum .Error , message = f"{ e } " )
208+ try :
209+ self .rest_client .push_agent_status (benchmark_id , agent_id , AgentPhaseEnum .Error , message = f"{ e } " )
210+ except Exception as e2 :
211+ logger .error (f"Failed to update agent status to 'Error' for benchmark { benchmark_id !r} (agent { agent_id !r} ): { e2 } " )
195212
196213 def wait_bundle_finished ():
197214 logger .info (f"Wait for bundle to finish..." )
@@ -228,6 +245,17 @@ def add_history(self, benchmark_id: str, bundle: Optional[Bundle] = None, agent_
228245 self .task_history .append (item )
229246
230247
248+ async def run_with_retry (func , retries = 3 , delay = 5 , * args , ** kwargs ):
249+ for attempt in range (1 , retries + 1 ):
250+ try :
251+ return await func (* args , ** kwargs )
252+ except Exception as e :
253+ logger .error (f"Attempt { attempt } /{ retries } failed for { func .__name__ } : { e } " )
254+ if attempt < retries :
255+ await asyncio .sleep (delay )
256+ raise RuntimeError (f"{ func .__name__ } failed after { retries } attempts" )
257+
258+
231259def run (args ):
232260 with open (args .input ) as f :
233261 agent_manifest = AgentManifest .model_validate_json (f .read ())
@@ -238,6 +266,10 @@ def run(args):
238266 data = yaml .safe_load (f .read ())
239267 config = AgentHarnessConfig .model_validate (data )
240268
269+ opts = AgentHarnessOpts (
270+ benchmark_exec_retry_interval = args .benchmark_exec_retry_interval ,
271+ benchmark_exec_max_attempts = args .benchmark_exec_max_attempts ,
272+ )
241273 agent_harness = AgentHarness (
242274 agent_manifest ,
243275 args .agent_directory ,
@@ -249,5 +281,6 @@ def run(args):
249281 benchmark_timeout = args .benchmark_timeout ,
250282 config = config ,
251283 single_run = args .single_run ,
284+ opts = opts ,
252285 )
253286 asyncio .run (agent_harness .run ())
0 commit comments