@@ -216,31 +216,34 @@ class Services:
216216 # log file stream lag
217217 CAT_LOG_SLEEP = 1
218218
219+ # command timeout for commands which start schedulers
220+ START_TIMEOUT = 120
221+
219222 @staticmethod
220223 def _error (message : Union [Exception , str ]):
221224 """Format error case response."""
222- return [
225+ return (
223226 False ,
224227 str (message )
225- ]
228+ )
226229
227230 @staticmethod
228231 def _return (message : str ):
229232 """Format success case response."""
230- return [
233+ return (
231234 True ,
232235 message
233- ]
236+ )
234237
235238 @classmethod
236239 async def clean (
237240 cls ,
241+ workflows_mgr : 'WorkflowsManager' ,
238242 workflows : Iterable ['Tokens' ],
239243 args : dict ,
240- workflows_mgr : 'WorkflowsManager' ,
241244 executor : 'Executor' ,
242245 log : 'Logger'
243- ):
246+ ) -> tuple [ bool , str ] :
244247 """Calls `cylc clean`"""
245248 # Convert Schema options → cylc.flow.workflow_files.init_clean opts:
246249 opts = _schema_opts_to_api_opts (args , schema = CleanOptions )
@@ -273,25 +276,50 @@ async def scan(
273276 cls ,
274277 args : dict ,
275278 workflows_mgr : 'WorkflowsManager' ,
276- ):
279+ ) -> tuple [ bool , str ] :
277280 await workflows_mgr .scan ()
278281 return cls ._return ("Scan requested" )
279282
280283 @classmethod
281- async def play (
284+ async def run_command (
282285 cls ,
286+ command : Iterable [str ],
283287 workflows : Iterable [Tokens ],
284288 args : Dict [str , Any ],
285- workflows_mgr : 'WorkflowsManager' ,
286289 log : 'Logger' ,
287- ) -> List [Union [bool , str ]]:
288- """Calls `cylc play`."""
290+ timeout : int ,
291+ success_msg : str = 'Command succeeded' ,
292+ fail_msg : str = 'Command failed' ,
293+ ) -> tuple [bool , str ]:
294+ """Calls the specified Cylc command.
295+
296+ Args:
297+ command:
298+ The Cylc subcommand to run.
299+ e.g ["play"] or ["cat-log", "-m", "p"].
300+ workflows:
301+ The workflows to run this command against.
302+ args:
303+ CLI arguments to be provided to this command.
304+ e.g {'color': 'never'} would result in "--color=never".
305+ log:
306+ The application log, used to record this command invocation.
307+ timeout:
308+ Length of time to wait for the command to complete.
309+ success_msg:
310+ Message to be used in the response if the command succeeds.
311+ fail_msg:
312+ Message to be used in the response if the command fails.
313+
314+ Returns:
315+
316+ """
289317 cylc_version = args .pop ('cylc_version' , None )
290318 results : Dict [str , str ] = {}
291319 failed = False
292320 for tokens in workflows :
293321 try :
294- cmd = _build_cmd (['cylc' , 'play' , '--color=never' ], args )
322+ cmd = _build_cmd (['cylc' , * command , '--color=never' ], args )
295323
296324 if tokens ['user' ] and tokens ['user' ] != getuser ():
297325 return cls ._error (
@@ -322,10 +350,10 @@ async def play(
322350 stderr = PIPE ,
323351 text = True
324352 )
325- ret_code = proc .wait (timeout = 120 )
353+ ret_code = proc .wait (timeout = timeout )
326354
327355 if ret_code :
328- msg = f"Command failed ({ ret_code } ): { cmd_repr } "
356+ msg = f"{ fail_msg } ({ ret_code } ): { cmd_repr } "
329357 out , err = proc .communicate ()
330358 results [wflow ] = err .strip () or out .strip () or msg
331359 log .error (
@@ -335,26 +363,65 @@ async def play(
335363 )
336364 failed = True
337365 else :
338- results [wflow ] = 'started'
366+ results [wflow ] = success_msg
339367
340368 except Exception as exc : # unexpected error
341369 log .exception (exc )
342370 return cls ._error (exc )
343371
344372 if failed :
345373 if len (results ) == 1 :
374+ # all commands failed
346375 return cls ._error (results .popitem ()[1 ])
347- # else log each workflow result on separate lines
376+
377+ # some commands failed
348378 return cls ._error (
379+ # log each workflow result on separate lines
349380 "\n \n " + "\n \n " .join (
350381 f"{ wflow } : { msg } " for wflow , msg in results .items ()
351382 )
352383 )
353384
385+ # all commands succeeded
386+ return cls ._return (f'Workflow(s) { success_msg } ' )
387+
388+ @classmethod
389+ async def play (
390+ cls , workflows_mgr : 'WorkflowsManager' , * args , ** kwargs
391+ ) -> tuple [bool , str ]:
392+ """Calls `cylc play`."""
393+ ret = await cls .run_command (
394+ ('play' ,),
395+ * args ,
396+ ** kwargs ,
397+ timeout = cls .START_TIMEOUT ,
398+ success_msg = 'started' ,
399+ )
400+
401+ # trigger a re-scan
402+ await workflows_mgr .scan ()
403+
404+ # return results
405+ return ret
406+
407+ @classmethod
408+ async def validate_reinstall (
409+ cls , workflows_mgr : 'WorkflowsManager' , * args , ** kwargs
410+ ) -> tuple [bool , str ]:
411+ """Calls `cylc validate-reinstall`."""
412+ ret = await cls .run_command (
413+ ('validate-reinstall' , '--yes' ),
414+ * args ,
415+ ** kwargs ,
416+ timeout = cls .START_TIMEOUT ,
417+ success_msg = 'reinstalled' ,
418+ )
419+
354420 # trigger a re-scan
355421 await workflows_mgr .scan ()
356- # send a success message
357- return cls ._return ('Workflow(s) started' )
422+
423+ # return results
424+ return ret
358425
359426 @staticmethod
360427 async def enqueue (stream , queue ):
@@ -581,8 +648,7 @@ async def service(
581648 command : str ,
582649 workflows : Iterable ['Tokens' ],
583650 kwargs : Dict [str , Any ],
584- ) -> List [Union [bool , str ]]:
585-
651+ ) -> tuple [bool , str ]:
586652 # GraphQL v3 includes all variables that are set, even if set to null.
587653 kwargs = {
588654 k : v
@@ -592,19 +658,26 @@ async def service(
592658
593659 if command == 'clean' : # noqa: SIM116
594660 return await Services .clean (
661+ self .workflows_mgr ,
595662 workflows ,
596663 kwargs ,
597- self .workflows_mgr ,
598664 log = self .log ,
599665 executor = self .executor
600666 )
601- elif command == 'play' :
667+ elif command == 'play' : # noqa: SIM116
602668 return await Services .play (
669+ self .workflows_mgr ,
603670 workflows ,
604671 kwargs ,
605- self .workflows_mgr ,
606672 log = self .log
607673 )
674+ elif command == 'validate_reinstall' :
675+ return await Services .validate_reinstall (
676+ self .workflows_mgr ,
677+ workflows ,
678+ kwargs ,
679+ log = self .log ,
680+ )
608681 elif command == 'scan' :
609682 return await Services .scan (
610683 kwargs ,
0 commit comments