diff --git a/changes.d/746.feat.md b/changes.d/746.feat.md new file mode 100644 index 00000000..5dda33ed --- /dev/null +++ b/changes.d/746.feat.md @@ -0,0 +1 @@ +Add support for the `cylc validate-reinstall` command. diff --git a/cylc/uiserver/resolvers.py b/cylc/uiserver/resolvers.py index ca0c778d..56f35a28 100644 --- a/cylc/uiserver/resolvers.py +++ b/cylc/uiserver/resolvers.py @@ -27,6 +27,7 @@ DEVNULL, PIPE, Popen, + TimeoutExpired, ) from textwrap import indent from time import time @@ -216,31 +217,34 @@ class Services: # log file stream lag CAT_LOG_SLEEP = 1 + # command timeout for commands which start schedulers + START_TIMEOUT = 120 + @staticmethod def _error(message: Union[Exception, str]): """Format error case response.""" - return [ + return ( False, str(message) - ] + ) @staticmethod def _return(message: str): """Format success case response.""" - return [ + return ( True, message - ] + ) @classmethod async def clean( cls, + workflows_mgr: 'WorkflowsManager', workflows: Iterable['Tokens'], args: dict, - workflows_mgr: 'WorkflowsManager', executor: 'Executor', log: 'Logger' - ): + ) -> tuple[bool, str]: """Calls `cylc clean`""" # Convert Schema options → cylc.flow.workflow_files.init_clean opts: opts = _schema_opts_to_api_opts(args, schema=CleanOptions) @@ -273,25 +277,51 @@ async def scan( cls, args: dict, workflows_mgr: 'WorkflowsManager', - ): + ) -> tuple[bool, str]: await workflows_mgr.scan() return cls._return("Scan requested") @classmethod - async def play( + async def run_command( cls, + command: Iterable[str], workflows: Iterable[Tokens], args: Dict[str, Any], - workflows_mgr: 'WorkflowsManager', log: 'Logger', - ) -> List[Union[bool, str]]: - """Calls `cylc play`.""" + timeout: int, + success_msg: str = 'Command succeeded', + fail_msg: str = 'Command failed', + ) -> tuple[bool, str]: + """Calls the specified Cylc command. + + Args: + command: + The Cylc subcommand to run. + e.g ["play"] or ["cat-log", "-m", "p"]. + workflows: + The workflows to run this command against. + args: + CLI arguments to be provided to this command. + e.g {'color': 'never'} would result in "--color=never". + log: + The application log, used to record this command invocation. + timeout: + Length of time to wait for the command to complete. + The command will be killed if the timeout elapses. + success_msg: + Message to be used in the response if the command succeeds. + fail_msg: + Message to be used in the response if the command fails. + + Returns: + + """ cylc_version = args.pop('cylc_version', None) results: Dict[str, str] = {} failed = False for tokens in workflows: try: - cmd = _build_cmd(['cylc', 'play', '--color=never'], args) + cmd = _build_cmd(['cylc', *command, '--color=never'], args) if tokens['user'] and tokens['user'] != getuser(): return cls._error( @@ -313,7 +343,7 @@ async def play( env.pop('CYLC_ENV_NAME', None) env['CYLC_VERSION'] = cylc_version - # run cylc play + # run command proc = Popen( cmd, env=env, @@ -322,11 +352,21 @@ async def play( stderr=PIPE, text=True ) - ret_code = proc.wait(timeout=120) - if ret_code: - msg = f"Command failed ({ret_code}): {cmd_repr}" + try: + ret_code = proc.wait(timeout=timeout) + except TimeoutExpired as exc: + proc.kill() + ret_code = 124 # mimic `timeout` command error code + # NOTE: preserve any stderr that the command produced this + # far as this may help with debugging + out, err = proc.communicate() + err = str(exc) + (('\n' + err) if err else '') + else: out, err = proc.communicate() + + if ret_code: + msg = f"{fail_msg} ({ret_code}): {cmd_repr}" results[wflow] = err.strip() or out.strip() or msg log.error( f"{msg}\n" @@ -335,7 +375,7 @@ async def play( ) failed = True else: - results[wflow] = 'started' + results[wflow] = success_msg except Exception as exc: # unexpected error log.exception(exc) @@ -343,18 +383,71 @@ async def play( if failed: if len(results) == 1: + # all commands failed return cls._error(results.popitem()[1]) - # else log each workflow result on separate lines + + # some commands failed return cls._error( + # log each workflow result on separate lines "\n\n" + "\n\n".join( f"{wflow}: {msg}" for wflow, msg in results.items() ) ) + # all commands succeeded + return cls._return(f'Workflow(s) {success_msg}') + + @classmethod + async def play( + cls, + workflows_mgr: 'WorkflowsManager', + workflows: Iterable[Tokens], + args: dict, + log, + **kwargs, + ) -> tuple[bool, str]: + """Calls `cylc play`.""" + ret = await cls.run_command( + ('play',), + workflows, + args, + log, + cls.START_TIMEOUT, + **kwargs, + success_msg='started', + ) + + # trigger a re-scan + await workflows_mgr.scan() + + # return results + return ret + + @classmethod + async def validate_reinstall( + cls, + workflows_mgr: 'WorkflowsManager', + workflows: Iterable[Tokens], + args: dict, + log, + **kwargs, + ) -> tuple[bool, str]: + """Calls `cylc validate-reinstall`.""" + ret = await cls.run_command( + ('validate-reinstall', '--yes'), + workflows, + args, + log, + cls.START_TIMEOUT, + **kwargs, + success_msg='reinstalled', + ) + # trigger a re-scan await workflows_mgr.scan() - # send a success message - return cls._return('Workflow(s) started') + + # return results + return ret @staticmethod async def enqueue(stream, queue): @@ -581,8 +674,7 @@ async def service( command: str, workflows: Iterable['Tokens'], kwargs: Dict[str, Any], - ) -> List[Union[bool, str]]: - + ) -> tuple[bool, str]: # GraphQL v3 includes all variables that are set, even if set to null. kwargs = { k: v @@ -592,19 +684,26 @@ async def service( if command == 'clean': # noqa: SIM116 return await Services.clean( + self.workflows_mgr, workflows, kwargs, - self.workflows_mgr, log=self.log, executor=self.executor ) - elif command == 'play': + elif command == 'play': # noqa: SIM116 return await Services.play( + self.workflows_mgr, workflows, kwargs, - self.workflows_mgr, log=self.log ) + elif command == 'validate_reinstall': + return await Services.validate_reinstall( + self.workflows_mgr, + workflows, + kwargs, + log=self.log, + ) elif command == 'scan': return await Services.scan( kwargs, diff --git a/cylc/uiserver/schema.py b/cylc/uiserver/schema.py index 364f9d89..636b3f14 100644 --- a/cylc/uiserver/schema.py +++ b/cylc/uiserver/schema.py @@ -241,6 +241,48 @@ class Arguments: result = GenericScalar() +class ValidateReinstall(graphene.Mutation): + class Meta: + description = sstrip(''' + Validate, reinstall, then reload or restart as appropriate. + + This command updates a workflow to reflect any new changes made in + the workflow source directory since it was installed. + + The workflow will be reinstalled, then either: + * Reloaded (if the workflow is running), + * or restarted (if it is stopped). + ''') + resolver = partial(mutator, command='validate_reinstall') + + class Arguments: + workflows = graphene.List(WorkflowID, required=True) + cylc_version = CylcVersion( + description=sstrip(''' + Set the Cylc version that the workflow starts with. + ''') + ) + set = graphene.List( # noqa: A003 (graphql field name) + graphene.String, + description=sstrip(''' + Set the value of a Jinja2 template variable in the workflow + definition. Values should be valid Python literals so strings + must be quoted e.g. `STR="string"`, `INT=43`, `BOOL=True`. + This option can be used multiple times on the command line. + NOTE: these settings persist across workflow restarts, but can + be set again on the `cylc play` command line if they need to be + overridden. + ''') + ) + reload_global = graphene.Boolean( + default_value=False, + required=False, + description="Reload global config as well as the workflow config" + ) + + result = GenericScalar() + + class Clean(graphene.Mutation): class Meta: description = sstrip(''' @@ -894,6 +936,7 @@ class Logs(graphene.ObjectType): class UISMutations(Mutations): play = _mut_field(Play) + validate_reinstall = _mut_field(ValidateReinstall) clean = _mut_field(Clean) scan = _mut_field(Scan) diff --git a/cylc/uiserver/tests/test_authorise.py b/cylc/uiserver/tests/test_authorise.py index c751ffb7..24889637 100644 --- a/cylc/uiserver/tests/test_authorise.py +++ b/cylc/uiserver/tests/test_authorise.py @@ -52,31 +52,13 @@ "set_verbosity", "stop", "trigger", + "validate_reinstall", ] ALL_OPS = [ - "clean", + *CONTROL_OPS, "read", "broadcast", - "ext_trigger", - "hold", - "kill", - "message", - "pause", - "play", - "poll", - "release", - "release_hold_point", - "reload", - "remove", - "resume", - "scan", - "set", - "set_graph_window_extent", - "set_hold_point", - "set_verbosity", - "stop", - "trigger", ] FAKE_SITE_CONF = { @@ -154,6 +136,7 @@ "pause", "set", "release", + "validate_reinstall", }, id="user in * and groups, owner in * and groups", ), diff --git a/cylc/uiserver/tests/test_resolvers.py b/cylc/uiserver/tests/test_resolvers.py index 026ec1bd..8e8ca5cc 100644 --- a/cylc/uiserver/tests/test_resolvers.py +++ b/cylc/uiserver/tests/test_resolvers.py @@ -15,6 +15,7 @@ import asyncio from concurrent.futures import ThreadPoolExecutor +import re from typing import Any, Dict, List, Tuple import logging import os @@ -23,12 +24,6 @@ from subprocess import Popen, TimeoutExpired from types import SimpleNamespace -import sys -if sys.version_info >= (3, 11): - from asyncio import timeout -else: - from async_timeout import timeout - from cylc.flow import CYLC_LOG from cylc.flow.exceptions import CylcError from cylc.flow.id import Tokens @@ -64,8 +59,8 @@ def test__schema_opts_to_api_opts(schema_opts, schema, expect): @pytest.mark.parametrize( 'func, message, expect', [ - (services._return, 'Hello.', [True, 'Hello.']), - (services._error, 'Goodbye.', [False, 'Goodbye.']) + (services._return, 'Hello.', (True, 'Hello.')), + (services._error, 'Goodbye.', (False, 'Goodbye.')) ] ) def test_Services_anciliary_methods(func, message, expect): @@ -74,6 +69,9 @@ def test_Services_anciliary_methods(func, message, expect): assert func(message) == expect +@pytest.mark.parametrize( + 'service', (Services.play, Services.validate_reinstall) +) @pytest.mark.parametrize( 'workflows, args, env, expected_ret, expected_env', [ @@ -81,7 +79,7 @@ def test_Services_anciliary_methods(func, message, expect): [Tokens('wflow1'), Tokens('~murray/wflow2')], {}, {}, - [True, "Workflow(s) started"], + (True, r"Workflow\(s\) .*ed"), {}, id="multiple" ), @@ -89,7 +87,7 @@ def test_Services_anciliary_methods(func, message, expect): [Tokens('~feynman/wflow1')], {}, {}, - [False, "Cannot start workflows for other users."], + (False, "Cannot start workflows for other users."), {}, id="other user's wflow" ), @@ -97,7 +95,7 @@ def test_Services_anciliary_methods(func, message, expect): [Tokens('wflow1')], {'cylc_version': 'top'}, {'CYLC_VERSION': 'bottom', 'CYLC_ENV_NAME': 'quark'}, - [True, "Workflow(s) started"], + (True, r"Workflow\(s\) .*ed"), {'CYLC_VERSION': 'top'}, id="cylc version overrides env" ), @@ -105,13 +103,14 @@ def test_Services_anciliary_methods(func, message, expect): [Tokens('wflow1')], {}, {'CYLC_VERSION': 'charm', 'CYLC_ENV_NAME': 'quark'}, - [True, "Workflow(s) started"], + (True, r"Workflow\(s\) .*ed"), {'CYLC_VERSION': 'charm', 'CYLC_ENV_NAME': 'quark'}, id="cylc env not overriden if no version specified" ), ] ) -async def test_play( +async def test_start_services( + service, monkeypatch: pytest.MonkeyPatch, workflows: List[Tokens], args: Dict[str, Any], @@ -119,7 +118,7 @@ async def test_play( expected_ret: list, expected_env: Dict[str, str], ): - """It runs cylc play correctly. + """It runs cylc play / vr correctly. Params: workflows: list of workflow tokens @@ -139,28 +138,33 @@ async def test_play( return_value=Mock( spec=Popen, wait=Mock(return_value=0), + communicate=lambda: ('out', 'err'), ) ) monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen) - ret = await Services.play( + status, message = await service( + Mock(spec=WorkflowsManager), workflows, {'some': 'opt', **args}, - workflows_mgr=Mock(spec=WorkflowsManager), log=Mock(), ) - assert ret == expected_ret + assert status == expected_ret[0] + assert re.match(expected_ret[1], message) for i, call_args in enumerate(mock_popen.call_args_list): cmd_str = ' '.join(call_args.args[0]) - assert cmd_str.startswith('cylc play') + assert cmd_str.startswith('cylc ') assert '--some opt' in cmd_str assert workflows[i]['workflow'] in cmd_str assert call_args.kwargs['env'] == expected_env +@pytest.mark.parametrize( + 'service', (Services.play, Services.validate_reinstall) +) @pytest.mark.parametrize( 'workflows, popen_ret_codes, popen_communicate,' 'expected_ret, expected_log', @@ -169,37 +173,38 @@ async def test_play( [Tokens('wflow1')], [1], ("something", "bad things!!"), - "bad things!!", - ["Command failed (1): cylc play", "something", "bad things!!"], + r"bad things!!.*", + ["Command failed \(1\): cylc ", "something", "bad things!!"], id="one" ), pytest.param( [Tokens('wflow1'), Tokens('wflow2')], [1, 0], ("", "bad things!!"), - "\n\nwflow1: bad things!!\n\nwflow2: started", - ["Command failed (1): cylc play", "bad things!!"], + r"\n\nwflow1: bad things!!\n\nwflow2: .*ed.*", + [r"Command failed \(1\): cylc ", "bad things!!"], id="multiple" ), pytest.param( [Tokens('wflow1')], [1], ("something", ""), - "something", - ["Command failed (1): cylc play", "something"], + r"something.*", + [r"Command failed \(1\): cylc ", "something"], id="uses stdout if stderr empty" ), pytest.param( [Tokens('wflow1')], [4], ("", ""), - "Command failed (4): cylc play", - ["Command failed (4): cylc play"], + r"Command failed \(4\): cylc .*", + [r"Command failed \(4\): cylc "], id="fallback msg if stdout/stderr empty" ), ] ) -async def test_play_fail( +async def test_start_services_fail( + service, monkeypatch: pytest.MonkeyPatch, workflows: List[Tokens], popen_ret_codes: List[int], @@ -208,7 +213,7 @@ async def test_play_fail( expected_log: List[str], caplog: pytest.LogCaptureFixture, ): - """It returns suitable error messages if cylc play fails. + """It returns suitable error messages if cylc play / vr fails. Params: workflows: list of workflow tokens @@ -216,6 +221,7 @@ async def test_play_fail( popen_communicate: stdout, stderr for cylc play expected: (beginning of) expected returned error message """ + popen_ret_codes = list(popen_ret_codes) mock_popen = Mock( spec=Popen, return_value=Mock( @@ -227,17 +233,18 @@ async def test_play_fail( monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen) caplog.set_level(logging.ERROR) - status, message = await Services.play( + status, message = await service( + Mock(spec=WorkflowsManager), workflows, {}, - workflows_mgr=Mock(spec=WorkflowsManager), log=logging.root, ) assert status is False - assert message.startswith(expected_ret) + assert re.match(expected_ret, message) + # Should be logged too: for msg in expected_log: - assert msg in caplog.text + assert re.search(msg, caplog.text) async def test_play_timeout(monkeypatch: pytest.MonkeyPatch): @@ -247,19 +254,23 @@ def wait(timeout): mock_popen = Mock( spec=Popen, - return_value=Mock(spec=Popen, wait=wait) + return_value=Mock( + spec=Popen, + wait=wait, + communicate=lambda: ('out', 'err'), + ), ) monkeypatch.setattr('cylc.uiserver.resolvers.Popen', mock_popen) ret = await Services.play( + Mock(spec=WorkflowsManager), [Tokens('wflow1')], {}, - workflows_mgr=Mock(spec=WorkflowsManager), log=Mock(), ) - assert ret == [ - False, "Command 'cylc play wflow1' timed out after 120 seconds" - ] + assert ret == ( + False, "Command 'cylc play wflow1' timed out after 120 seconds\nerr" + ) @pytest.fixture @@ -318,7 +329,7 @@ async def test_cat_log(workflow_run_dir, app, fast_sleep): # note - timeout tests that the cat-log process is being stopped correctly first_response = None - async with timeout(20): + async with asyncio.timeout(20): ret = services.cat_log(workflow, app, info) actual = '' is_first = True @@ -367,7 +378,7 @@ async def test_cat_log_timeout(workflow_run_dir, app, fast_sleep): ret = services.cat_log(workflow, app, info) responses = [] - async with timeout(5): + async with asyncio.timeout(5): async for response in ret: responses.append(response) await asyncio.sleep(0) @@ -401,12 +412,12 @@ def bad_clean(*a, **k): caplog.set_level(logging.ERROR) ret = Services.clean( + Mock(spec=WorkflowsManager), [Tokens('wflow1')], {}, - workflows_mgr=Mock(spec=WorkflowsManager), executor=ThreadPoolExecutor(1), log=logging.root, ) err_msg = "CylcError: bad things!!" - assert (await ret) == [False, err_msg] + assert (await ret) == (False, err_msg) assert err_msg in caplog.text