diff --git a/python/grass/app/tests/grass_app_cli_test.py b/python/grass/app/tests/grass_app_cli_test.py index 7e7472ac534..c20329358f2 100644 --- a/python/grass/app/tests/grass_app_cli_test.py +++ b/python/grass/app/tests/grass_app_cli_test.py @@ -53,3 +53,21 @@ def test_subcommand_run_tool_regular_run(): def test_subcommand_run_tool_failure_run(): """Check that a tool produces non-zero return code""" assert main(["run", "g.region", "raster=does_not_exist"]) == 1 + + +def test_subcommand_run_tool_pack(rows_raster_file3x2, tmp_path): + """Check that a tool accepts and produces a raster pack file""" + output = tmp_path / "output.grass_raster" + assert ( + main( + [ + "run", + "r.mapcalc.simple", + "expression=2 * A", + f"a={rows_raster_file3x2}", + f"output={output}", + ] + ) + == 0 + ) + assert output.exists() diff --git a/python/grass/experimental/tests/conftest.py b/python/grass/experimental/tests/conftest.py index 5bd44da1de0..ea20bfe2ade 100644 --- a/python/grass/experimental/tests/conftest.py +++ b/python/grass/experimental/tests/conftest.py @@ -33,6 +33,14 @@ def xy_session_for_module(tmp_path_factory): yield session +@pytest.fixture +def xy_dataset_session(tmp_path): + """Creates a session with a mapset which has vector with a float column""" + gs.core._create_location_xy(tmp_path, "test") # pylint: disable=protected-access + with gs.setup.init(tmp_path / "test") as session: + yield session + + @pytest.fixture def unique_id(): """A unique alphanumeric identifier""" @@ -69,3 +77,41 @@ def xy_mapset_non_permament(xy_session): # pylint: disable=redefined-outer-name "test1", create=True, env=xy_session.env ) as session: yield session + + +@pytest.fixture +def rows_raster_file3x3(tmp_path): + project = tmp_path / "xy_test3x3" + gs.create_project(project) + with gs.setup.init(project, env=os.environ.copy()) as session: + gs.run_command("g.region", rows=3, cols=3, env=session.env) + gs.mapcalc("rows = row()", env=session.env) + output_file = tmp_path / "rows3x3.grass_raster" + gs.run_command( + "r.pack", + input="rows", + output=output_file, + flags="c", + superquiet=True, + env=session.env, + ) + return output_file + + +@pytest.fixture +def rows_raster_file4x5(tmp_path): + project = tmp_path / "xy_test4x5" + gs.create_project(project) + with gs.setup.init(project, env=os.environ.copy()) as session: + gs.run_command("g.region", rows=4, cols=5, env=session.env) + gs.mapcalc("rows = row()", env=session.env) + output_file = tmp_path / "rows4x5.grass_raster" + gs.run_command( + "r.pack", + input="rows", + output=output_file, + flags="c", + superquiet=True, + env=session.env, + ) + return output_file diff --git a/python/grass/experimental/tools.py b/python/grass/experimental/tools.py new file mode 100644 index 00000000000..b6a4a2f5539 --- /dev/null +++ b/python/grass/experimental/tools.py @@ -0,0 +1,611 @@ +#!/usr/bin/env python + +############################################################################## +# AUTHOR(S): Vaclav Petras +# +# PURPOSE: API to call GRASS tools (modules) as Python functions +# +# COPYRIGHT: (C) 2023-2025 Vaclav Petras and the GRASS Development Team +# +# This program is free software under the GNU General Public +# License (>=v2). Read the file COPYING that comes with GRASS +# for details. +############################################################################## + +"""API to call GRASS tools (modules) as Python functions""" + +import json +import os +import shutil +import subprocess +from pathlib import Path +from io import StringIO + +import numpy as np + +import grass.script as gs +import grass.script.array as garray +from grass.exceptions import CalledModuleError + + +class PackImporterExporter: + def __init__(self, *, run_function, env=None): + self._run_function = run_function + self._env = env + + @classmethod + def is_raster_pack_file(cls, value): + return value.endswith((".grass_raster", ".pack", ".rpack", ".grr")) + + def modify_and_ingest_argument_list(self, args, parameters): + # Uses parameters, but modifies the command, generates list of rasters and vectors. + self.input_rasters = [] + if "inputs" in parameters: + for item in parameters["inputs"]: + if self.is_raster_pack_file(item["value"]): + self.input_rasters.append(Path(item["value"])) + # No need to change that for the original kwargs. + # kwargs[item["param"]] = Path(item["value"]).stem + # Actual parameters to execute are now a list. + for i, arg in enumerate(args): + if arg.startswith(f"{item['param']}="): + arg = arg.replace(item["value"], Path(item["value"]).stem) + args[i] = arg + self.output_rasters = [] + if "outputs" in parameters: + for item in parameters["outputs"]: + if self.is_raster_pack_file(item["value"]): + self.output_rasters.append(Path(item["value"])) + # kwargs[item["param"]] = Path(item["value"]).stem + for i, arg in enumerate(args): + if arg.startswith(f"{item['param']}="): + arg = arg.replace(item["value"], Path(item["value"]).stem) + args[i] = arg + + def import_rasters(self): + for raster_file in self.input_rasters: + # Currently we override the projection check. + self._run_function( + "r.unpack", + input=raster_file, + output=raster_file.stem, + overwrite=True, + superquiet=True, + # flags="o", + env=self._env, + ) + + def export_rasters(self): + # Pack the output raster + for raster in self.output_rasters: + # Overwriting a file is a warning, so to avoid it, we delete the file first. + Path(raster).unlink(missing_ok=True) + + self._run_function( + "r.pack", + input=raster.stem, + output=raster, + flags="c", + overwrite=True, + superquiet=True, + ) + + def import_data(self): + self.import_rasters() + + def export_data(self): + self.export_rasters() + + +class ObjectParameterHandler: + def __init__(self): + self._numpy_inputs = {} + self._numpy_outputs = {} + self._numpy_inputs_ordered = [] + self.stdin = None + + def process_parameters(self, kwargs): + for key, value in kwargs.items(): + if isinstance(value, np.ndarray): + kwargs[key] = gs.append_uuid("tmp_serialized_input_array") + self._numpy_inputs[key] = value + self._numpy_inputs_ordered.append(value) + elif value in (np.ndarray, np.array, garray.array): + # We test for class or the function. + kwargs[key] = gs.append_uuid("tmp_serialized_output_array") + self._numpy_outputs[key] = value + elif isinstance(value, StringIO): + kwargs[key] = "-" + self.stdin = value.getvalue() + + def translate_objects_to_data(self, kwargs, parameters, env): + if "inputs" in parameters: + for param in parameters["inputs"]: + if param["param"] in self._numpy_inputs: + map2d = garray.array(env=env) + map2d[:] = self._numpy_inputs[param["param"]] + map2d.write(kwargs[param["param"]]) + + def input_rows_columns(self): + if not len(self._numpy_inputs_ordered): + return None + return self._numpy_inputs_ordered[0].shape + + def translate_data_to_objects(self, kwargs, parameters, env): + output_arrays = [] + if "outputs" in parameters: + for param in parameters["outputs"]: + if param["param"] not in self._numpy_outputs: + continue + output_array = garray.array(kwargs[param["param"]], env=env) + output_arrays.append(output_array) + if len(output_arrays) == 1: + self.result = output_arrays[0] + return True + if len(output_arrays) > 1: + self.result = tuple(output_arrays) + return True + self.result = None + return False + + +class ToolFunctionNameHelper: + def __init__(self, *, run_function, env, prefix=None): + self._run_function = run_function + self._env = env + self._prefix = prefix + + # def __getattr__(self, name): + # self.get_function(name, exception_type=AttributeError) + + def get_function(self, name, exception_type): + """Parse attribute to GRASS display module. Attribute should be in + the form 'd_module_name'. For example, 'd.rast' is called with 'd_rast'. + """ + if self._prefix: + name = f"{self._prefix}.{name}" + # Reformat string + tool_name = name.replace("_", ".") + # Assert module exists + if not shutil.which(tool_name, path=self._env["PATH"]): + suggestions = self.suggest_tools(tool_name) + if suggestions: + msg = ( + f"Tool {tool_name} not found. " + f"Did you mean: {', '.join(suggestions)}?" + ) + raise AttributeError(msg) + msg = ( + f"Tool or attribute {name} not found. " + "If you are executing a tool, is the session set up and the tool on path? " + "If you are looking for an attribute, is it in the documentation?" + ) + raise AttributeError(msg) + + def wrapper(**kwargs): + # Run module + return self._run_function(tool_name, **kwargs) + + return wrapper + + @staticmethod + def levenshtein_distance(text1: str, text2: str) -> int: + if len(text1) < len(text2): + return ToolFunctionNameHelper.levenshtein_distance(text2, text1) + + if len(text2) == 0: + return len(text1) + + previous_row = list(range(len(text2) + 1)) + for i, char1 in enumerate(text1): + current_row = [i + 1] + for j, char2 in enumerate(text2): + insertions = previous_row[j + 1] + 1 + deletions = current_row[j] + 1 + substitutions = previous_row[j] + (char1 != char2) + current_row.append(min(insertions, deletions, substitutions)) + previous_row = current_row + + return previous_row[-1] + + @staticmethod + def suggest_tools(tool): + # TODO: cache commands also for dir + all_names = list(gs.get_commands()[0]) + result = [] + max_suggestions = 10 + for name in all_names: + if ToolFunctionNameHelper.levenshtein_distance(tool, name) < len(tool) / 2: + result.append(name) + if len(result) >= max_suggestions: + break + return result + + +class ExecutedTool: + """Result returned after executing a tool""" + + def __init__(self, name, kwargs, stdout, stderr): + self._name = name + self._kwargs = kwargs + self._stdout = stdout + self._stderr = stderr + if self._stdout is not None: + self._decoded_stdout = gs.decode(self._stdout) + else: + self._decoded_stdout = None + self._cached_json = None + + @property + def text(self) -> str: + """Text output as decoded string""" + if self._decoded_stdout is None: + return None + return self._decoded_stdout.strip() + + @property + def json(self): + """Text output read as JSON + + This returns the nested structure of dictionaries and lists or fails when + the output is not JSON. + """ + if self._cached_json is None: + self._cached_json = json.loads(self._stdout) + return self._cached_json + + @property + def keyval(self): + """Text output read as key-value pairs separated by equal signs""" + + def conversion(value): + """Convert text to int or float if possible, otherwise return it as is""" + try: + return int(value) + except ValueError: + pass + try: + return float(value) + except ValueError: + pass + return value + + return gs.parse_key_val(self._stdout, val_type=conversion) + + @property + def comma_items(self): + """Text output read as comma-separated list""" + return self.text_split(",") + + @property + def space_items(self): + """Text output read as whitespace-separated list""" + return self.text_split(None) + + def text_split(self, separator=None): + """Parse text output read as list separated by separators + + Any leading or trailing newlines are removed prior to parsing. + """ + # The use of strip is assuming that the output is one line which + # ends with a newline character which is for display only. + return self._decoded_stdout.strip("\n").split(separator) + + def __getitem__(self, name): + if self._stdout: + # We are testing just std out and letting rest to the parse and the user. + # This makes no assumption about how JSON is produced by the tool. + try: + return self.json[name] + except json.JSONDecodeError as error: + if self._kwargs.get("format") == "json": + raise + msg = ( + f"Output of {self._name} cannot be parsed as JSON. " + 'Did you use format="json"?' + ) + raise ValueError(msg) from error + msg = f"No text output for {self._name} to be parsed as JSON" + raise ValueError(msg) + + +class Tools: + """Call GRASS tools as methods + + GRASS tools (modules) can be executed as methods of this class. + """ + + def __init__( + self, + *, + session=None, + env=None, + overwrite=False, + quiet=False, + verbose=False, + superquiet=False, + freeze_region=False, + stdin=None, + errors=None, + capture_output=True, + prefix=None, + ): + if env: + self._env = env.copy() + elif session and hasattr(session, "env"): + self._env = session.env.copy() + else: + self._env = os.environ.copy() + self._region_is_frozen = False + if freeze_region: + self._freeze_region() + if overwrite: + self._overwrite() + # This hopefully sets the numbers directly. An alternative implementation would + # be to pass the parameter every time. + # Does not check for multiple set at the same time, but the most verbose wins + # for safety. + if superquiet: + self._env["GRASS_VERBOSE"] = "0" + if quiet: + self._env["GRASS_VERBOSE"] = "1" + if verbose: + self._env["GRASS_VERBOSE"] = "3" + self._set_stdin(stdin) + self._errors = errors + self._capture_output = capture_output + self._prefix = prefix + self._name_helper = None + + # These could be public, not protected. + def _freeze_region(self): + self._env["GRASS_REGION"] = gs.region_env(env=self._env) + self._region_is_frozen = True + + def _overwrite(self): + self._env["GRASS_OVERWRITE"] = "1" + + def _set_stdin(self, stdin, /): + self._stdin = stdin + + @property + def env(self): + """Internally used environment (reference to it, not a copy)""" + return self._env + + def _process_parameters(self, command, popen_options): + env = popen_options.get("env", self._env) + + return subprocess.run( + [*command, "--json"], text=True, capture_output=True, env=env + ) + + def run(self, name, /, **kwargs): + """Run modules from the GRASS display family (modules starting with "d."). + + This function passes arguments directly to grass.script.run_command() + so the syntax is the same. + + :param str module: name of GRASS module + :param `**kwargs`: named arguments passed to run_command()""" + + object_parameter_handler = ObjectParameterHandler() + object_parameter_handler.process_parameters(kwargs) + + args, popen_options = gs.popen_args_command(name, **kwargs) + + interface_result = self._process_parameters(args, popen_options) + if interface_result.returncode != 0: + # This is only for the error states. + return gs.handle_errors( + interface_result.returncode, + result=None, + args=[name], + kwargs=kwargs, + stderr=interface_result.stderr, + handler="raise", + ) + parameters = json.loads(interface_result.stdout) + object_parameter_handler.translate_objects_to_data( + kwargs, parameters, env=self._env + ) + + # We approximate tool_kwargs as original kwargs. + result = self.run_from_list( + args, + tool_kwargs=kwargs, + processed_parameters=parameters, + stdin=object_parameter_handler.stdin, + **popen_options, + ) + use_objects = object_parameter_handler.translate_data_to_objects( + kwargs, parameters, env=self._env + ) + if use_objects: + result = object_parameter_handler.result + return result + + def run_from_list( + self, + command, + tool_kwargs=None, + stdin=None, + processed_parameters=None, + **popen_options, + ): + if not processed_parameters: + interface_result = self._process_parameters(command, popen_options) + if interface_result.returncode != 0: + # This is only for the error states. + return gs.handle_errors( + interface_result.returncode, + result=None, + args=[command], + kwargs=tool_kwargs, + stderr=interface_result.stderr, + handler="raise", + ) + processed_parameters = json.loads(interface_result.stdout) + + pack_importer_exporter = PackImporterExporter(run_function=self.no_nonsense_run) + pack_importer_exporter.modify_and_ingest_argument_list( + command, processed_parameters + ) + pack_importer_exporter.import_data() + + # We approximate tool_kwargs as original kwargs. + result = self.no_nonsense_run_from_list( + command, + tool_kwargs=tool_kwargs, + stdin=stdin, + **popen_options, + ) + pack_importer_exporter.export_data() + return result + + def run_command(self, name, /, **kwargs): + # TODO: Provide custom implementation for full control + return gs.run_command(name, **kwargs, env=self._env) + + def parse_command(self, name, /, **kwargs): + # TODO: Provide custom implementation for full control + return gs.parse_command(name, **kwargs, env=self._env) + + def no_nonsense_run(self, name, /, *, tool_kwargs=None, stdin=None, **kwargs): + args, popen_options = gs.popen_args_command(name, **kwargs) + return self.no_nonsense_run_from_list( + args, tool_kwargs=tool_kwargs, stdin=stdin, **popen_options + ) + + # Make this an overload of run. + def no_nonsense_run_from_list( + self, command, tool_kwargs=None, stdin=None, **popen_options + ): + # alternatively use dev null as default or provide it as convenient settings + if self._capture_output: + stdout_pipe = gs.PIPE + stderr_pipe = gs.PIPE + else: + stdout_pipe = None + stderr_pipe = None + if self._stdin: + stdin_pipe = gs.PIPE + stdin = gs.utils.encode(self._stdin) + elif stdin: + stdin_pipe = gs.PIPE + stdin = gs.utils.encode(stdin) + else: + stdin_pipe = None + stdin = None + # Allowing to overwrite env, but that's just to have maximum flexibility when + # the session is actually set up, but it may be confusing. + if "env" not in popen_options: + popen_options["env"] = self._env + process = gs.Popen( + command, + stdin=stdin_pipe, + stdout=stdout_pipe, + stderr=stderr_pipe, + **popen_options, + ) + stdout, stderr = process.communicate(input=stdin) + if stderr: + stderr = gs.utils.decode(stderr) + returncode = process.poll() + if returncode and self._errors != "ignore": + raise CalledModuleError( + command[0], + code=" ".join(command), + returncode=returncode, + errors=stderr, + ) + # TODO: solve tool_kwargs is None + # We don't have the keyword arguments to pass to the resulting object. + return ExecutedTool( + name=command[0], kwargs=tool_kwargs, stdout=stdout, stderr=stderr + ) + + def feed_input_to(self, stdin, /): + """Get a new object which will feed text input to a tool or tools""" + return Tools( + env=self._env, + stdin=stdin, + freeze_region=self._region_is_frozen, + errors=self._errors, + capture_output=self._capture_output, + prefix=self._prefix, + ) + + def ignore_errors_of(self): + """Get a new object which will ignore errors of the called tools""" + return Tools(env=self._env, errors="ignore") + + def __getattr__(self, name): + """Parse attribute to GRASS display module. Attribute should be in + the form 'd_module_name'. For example, 'd.rast' is called with 'd_rast'. + """ + if not self._name_helper: + self._name_helper = ToolFunctionNameHelper( + run_function=self.run, + env=self.env, + prefix=self._prefix, + ) + return self._name_helper.get_function(name, exception_type=AttributeError) + + +def _test(): + """Ad-hoc tests and examples of the Tools class""" + session = gs.setup.init("~/grassdata/nc_spm_08_grass7/user1") + + tools = Tools() + tools.g_region(raster="elevation") + tools.r_slope_aspect(elevation="elevation", slope="slope", overwrite=True) + print(tools.r_univar(map="slope", flags="g").keyval) + + print(tools.v_info(map="bridges", flags="c").text) + print( + tools.v_db_univar(map="bridges", column="YEAR_BUILT", format="json").json[ + "statistics" + ]["mean"] + ) + + print(tools.g_mapset(flags="p").text) + print(tools.g_mapsets(flags="l").text_split()) + print(tools.g_mapsets(flags="l").space_items) + print(tools.g_gisenv(get="GISDBASE,LOCATION_NAME,MAPSET", sep="comma").comma_items) + + print(tools.g_region(flags="g").keyval) + + env = os.environ.copy() + env["GRASS_REGION"] = gs.region_env(res=250) + coarse_computation = Tools(env=env) + current_region = coarse_computation.g_region(flags="g").keyval + print(current_region["ewres"], current_region["nsres"]) + coarse_computation.r_slope_aspect( + elevation="elevation", slope="slope", flags="a", overwrite=True + ) + print(coarse_computation.r_info(map="slope", flags="g").keyval) + + independent_computation = Tools(session=session, freeze_region=True) + tools.g_region(res=500) # we would do this for another computation elsewhere + print(independent_computation.g_region(flags="g").keyval["ewres"]) + + tools_pro = Tools( + session=session, freeze_region=True, overwrite=True, superquiet=True + ) + tools_pro.r_slope_aspect(elevation="elevation", slope="slope") + tools_pro.feed_input_to("13.45,29.96,200").v_in_ascii( + input="-", output="point", separator="," + ) + print(tools_pro.v_info(map="point", flags="t").keyval["points"]) + + print(tools_pro.ignore_errors_of().g_version(flags="rge").keyval) + + elevation = "elevation" + exaggerated = "exaggerated" + tools_pro.r_mapcalc(expression=f"{exaggerated} = 5 * {elevation}") + tools_pro.feed_input_to(f"{exaggerated} = 5 * {elevation}").r_mapcalc(file="-") + + +if __name__ == "__main__": + _test() diff --git a/python/grass/tools/Makefile b/python/grass/tools/Makefile index 8820c884347..46ce4463afd 100644 --- a/python/grass/tools/Makefile +++ b/python/grass/tools/Makefile @@ -6,7 +6,9 @@ include $(MODULE_TOPDIR)/include/Make/Python.make DSTDIR = $(ETC)/python/grass/tools MODULES = \ + importexport \ session_tools \ + standalone_tools \ support PYFILES := $(patsubst %,$(DSTDIR)/%.py,$(MODULES) __init__) diff --git a/python/grass/tools/__init__.py b/python/grass/tools/__init__.py index 86826c6dd50..52d243701b3 100644 --- a/python/grass/tools/__init__.py +++ b/python/grass/tools/__init__.py @@ -3,6 +3,10 @@ def __getattr__(name): from .session_tools import Tools return Tools + if name == "StandaloneTools": + from .standalone_tools import StandaloneTools + + return StandaloneTools msg = f"module {__name__} has no attribute {name}" raise AttributeError(msg) diff --git a/python/grass/tools/importexport.py b/python/grass/tools/importexport.py new file mode 100644 index 00000000000..87a7dee6531 --- /dev/null +++ b/python/grass/tools/importexport.py @@ -0,0 +1,108 @@ +import subprocess +from pathlib import Path + + +class ImporterExporter: + raster_pack_suffixes = (".grass_raster", ".pack", ".rpack", ".grr") + + @classmethod + def is_recognized_file(cls, value): + return cls.is_raster_pack_file(value) + + @classmethod + def is_raster_pack_file(cls, value): + if isinstance(value, str): + return value.endswith(cls.raster_pack_suffixes) + if isinstance(value, Path): + return value.suffix in cls.raster_pack_suffixes + return False + + def __init__(self, *, run_function, run_cmd_function): + self._run_function = run_function + self._run_cmd_function = run_cmd_function + self.input_rasters: list[tuple] = [] + self.output_rasters: list[tuple] = [] + + def process_parameter_list(self, command, **popen_options): + """Ingests any file for later imports and exports and replaces arguments + + This function is relatively costly as it calls a subprocess to digest the parameters. + + Returns the list of parameters with inputs and outputs replaced so that a tool + will understand that, i.e., file paths into data names in a project. + """ + # Get processed parameters to distinguish inputs and outputs. + parameters = self._process_parameters(command, **popen_options) + tool_name = parameters["module"] + args = command.copy() + if "inputs" in parameters: + for item in parameters["inputs"]: + if tool_name != "r.unpack" and self.is_raster_pack_file(item["value"]): + in_project_name = self._to_name(item["value"]) + self.input_rasters.append((Path(item["value"]), in_project_name)) + for i, arg in enumerate(args): + if arg.startswith(f"{item['param']}="): + arg = arg.replace(item["value"], in_project_name) + args[i] = arg + if "outputs" in parameters: + for item in parameters["outputs"]: + if tool_name != "r.pack" and self.is_raster_pack_file(item["value"]): + in_project_name = self._to_name(item["value"]) + self.output_rasters.append((Path(item["value"]), in_project_name)) + for i, arg in enumerate(args): + if arg.startswith(f"{item['param']}="): + arg = arg.replace(item["value"], in_project_name) + args[i] = arg + return args + + def _process_parameters(self, command, **popen_options): + """Get parameters processed by the tool itself""" + popen_options["stdin"] = None + popen_options["stdout"] = subprocess.PIPE + # We respect whatever is in the stderr option because that's what the user + # asked for and will expect to get in case of error (we pretend that it was + # the intended run, not our special run before the actual run). + return self._run_cmd_function([*command, "--json"], **popen_options) + + def _to_name(self, value, /): + return Path(value).stem + + def import_rasters(self): + for raster_file, in_project_name in self.input_rasters: + self._run_function( + "r.unpack", + input=raster_file, + output=in_project_name, + overwrite=True, + superquiet=True, + ) + + def export_rasters(self): + # Pack the output raster + for raster_file, in_project_name in self.output_rasters: + # Overwriting a file is a warning, so to avoid it, we delete the file first. + # This creates a behavior consistent with command line tools. + Path(raster_file).unlink(missing_ok=True) + + self._run_function( + "r.pack", + input=in_project_name, + output=raster_file, + flags="c", + overwrite=True, + superquiet=True, + ) + + def import_data(self): + self.import_rasters() + + def export_data(self): + self.export_rasters() + + def cleanup(self): + remove = [name for (unused, name) in self.input_rasters] + remove.extend([name for (unused, name) in self.output_rasters]) + if remove: + self._run_function( + "g.remove", type="raster", name=remove, superquiet=True, flags="f" + ) diff --git a/python/grass/tools/session_tools.py b/python/grass/tools/session_tools.py index fc1f30e6d02..a8ac73e1552 100644 --- a/python/grass/tools/session_tools.py +++ b/python/grass/tools/session_tools.py @@ -18,6 +18,7 @@ import grass.script as gs +from .importexport import ImporterExporter from .support import ParameterConverter, ToolFunctionResolver, ToolResult @@ -130,6 +131,7 @@ def __init__( capture_output=True, capture_stderr=None, consistent_return_value=False, + keep_data=None, ): """ If session is provided and has an env attribute, it is used to execute tools. @@ -183,7 +185,6 @@ def __init__( self._original_env = session.env else: self._original_env = os.environ - self._modified_env = None self._overwrite = overwrite self._verbose = verbose self._quiet = quiet @@ -196,6 +197,11 @@ def __init__( self._capture_stderr = capture_stderr self._name_resolver = None self._consistent_return_value = consistent_return_value + # Decides if we delete at each run or only at the end of context. + self._delete_on_context_exit = False + # User request to keep the data. + self._keep_data = keep_data + self._cleanups = [] def _modified_env_if_needed(self): """Get the environment for subprocesses @@ -274,6 +280,7 @@ def run(self, tool_name_: str, /, **kwargs): args, tool_kwargs=kwargs, input=object_parameter_handler.stdin, + parameter_converter=object_parameter_handler, **popen_options, ) use_objects = object_parameter_handler.translate_data_to_objects( @@ -299,6 +306,7 @@ def run_cmd( command: list[str], *, input: str | bytes | None = None, + parameter_converter: ParameterConverter | None = None, tool_kwargs: dict | None = None, **popen_options, ): @@ -311,12 +319,30 @@ def run_cmd( :param tool_kwargs: named tool arguments used for error reporting (experimental) :param **popen_options: additional options for :py:func:`subprocess.Popen` """ - return self.call_cmd( + if parameter_converter is None: + parameter_converter = ParameterConverter() + parameter_converter.process_parameter_list(command[1:]) + if parameter_converter.import_export: + pack_importer_exporter = ImporterExporter( + run_function=self.call, run_cmd_function=self.call_cmd + ) + command = pack_importer_exporter.process_parameter_list(command) + pack_importer_exporter.import_data() + + # We approximate tool_kwargs as original kwargs. + result = self.call_cmd( command, tool_kwargs=tool_kwargs, input=input, **popen_options, ) + if parameter_converter.import_export: + pack_importer_exporter.export_data() + if self._delete_on_context_exit or self._keep_data: + self._cleanups.append(pack_importer_exporter.cleanup) + else: + pack_importer_exporter.cleanup() + return result def call(self, tool_name_: str, /, **kwargs): """Run a tool by specifying its name as a string and parameters. @@ -423,7 +449,14 @@ def __enter__(self): :returns: reference to the object (self) """ + self._delete_on_context_exit = True return self def __exit__(self, exc_type, exc_value, traceback): """Exit the context manager context.""" + if not self._keep_data: + self.cleanup() + + def cleanup(self): + for cleanup in self._cleanups: + cleanup() diff --git a/python/grass/tools/standalone_tools.py b/python/grass/tools/standalone_tools.py new file mode 100644 index 00000000000..718bb17ec5e --- /dev/null +++ b/python/grass/tools/standalone_tools.py @@ -0,0 +1,303 @@ +############################################################################## +# AUTHOR(S): Vaclav Petras +# +# PURPOSE: API to call GRASS tools as Python functions without a session +# +# COPYRIGHT: (C) 2025 Vaclav Petras and the GRASS Development Team +# +# This program is free software under the GNU General Public +# License (>=v2). Read the file COPYING that comes with GRASS +# for details. +############################################################################## + +""" +An API to call GRASS tools as Python functions without a session + +This is not a stable part of the API. Use at your own risk. +""" + +import os +import tempfile +import json +import subprocess +import tarfile +import weakref +from pathlib import Path + +import grass.script as gs +from .session_tools import Tools +from .support import ParameterConverter, ToolFunctionResolver +from grass.tools.importexport import ImporterExporter + + +# Using inheritance to get the getattr behavior and other functionality, +# but the session and env really make it more seem like a case for composition. +class StandaloneTools: + def __init__( + self, session=None, env=None, work_dir=None, errors=None, capture_output=True + ): + self._tools = None + self._errors = errors + self._capture_output = capture_output + self._crs_initialized = False + self._work_dir = work_dir + self._tmp_dir = None + self._tmp_dir_finalizer = None + if session and env: + msg = "Provide either session or env, not both" + raise ValueError(msg) + self._session = None + if session: + # If session is provided, we will use it as is. + self._session = session + self._crs_initialized = True + self._original_env = None # env should not be used + elif env: + self._original_env = env + else: + self._original_env = os.environ + # Because we don't setup a session here, we don't have runtime available for + # tools to be called through method calls. Should we just start session here + # to have the runtime? + self._region_is_set = False + self._region_file = None + self._region_modified_time = None + self._errors = errors + self._capture_output = capture_output + self._name_helper = None + + def _process_parameters(self, command, popen_options): + if not self._session: + # We create session and an empty XY project in one step. + self._create_session() + + env = popen_options.get("env", self._session.env) + + return subprocess.run( + [*command, "--json"], text=True, capture_output=True, env=env + ) + + def run(self, name, /, **kwargs): + object_parameter_handler = ParameterConverter() + object_parameter_handler.process_parameters(kwargs) + + args, popen_options = gs.popen_args_command(name, **kwargs) + + interface_result = self._process_parameters(args, popen_options) + if interface_result.returncode != 0: + # This is only for the error states. + return gs.handle_errors( + interface_result.returncode, + result=None, + args=[name], + kwargs=kwargs, + stderr=interface_result.stderr, + handler="raise", + ) + parameters = json.loads(interface_result.stdout) + + if not self._session: + # We create session and an empty XY project in one step. + self._create_session() + + rows_columns = object_parameter_handler.input_rows_columns() + if not self._is_region_modified() and rows_columns: + # Reset the region for every run or keep it persistent? + # Also, we now set that even for an existing session, this is + # consistent with behavior without a provided session. + # We could use env to pass the regions which would allow for the change + # while not touching the underlying session. + rows, cols = rows_columns + self.no_nonsense_run( + "g.region", + rows=rows, + cols=cols, + env=self._session.env, + ) + self._region_is_set = True + + object_parameter_handler.translate_objects_to_data( + kwargs, env=self._session.env + ) + + # We approximate tool_kwargs as original kwargs. + result = self.run_from_list( + args, + tool_kwargs=kwargs, + processed_parameters=parameters, + stdin=object_parameter_handler.stdin, + **popen_options, + ) + use_objects = object_parameter_handler.translate_data_to_objects( + kwargs, env=self._session.env + ) + if use_objects: + result = object_parameter_handler.result + return result + + # Make this an overload of run. + # Or at least use the same signature as the parent class. + def run_from_list( + self, + command, + tool_kwargs=None, + stdin=None, + processed_parameters=None, + **popen_options, + ): + """ + + Passing --help to this function will not work. + """ + if not self._session: + # We create session and an empty XY project in one step. + self._create_session() + + pack_importer_exporter = ImporterExporter( + run_function=self.no_nonsense_run, + run_cmd_function=self.no_nonsense_run_from_list, + ) + command = pack_importer_exporter.process_parameter_list(command) + + if not self._crs_initialized: + self._initialize_crs(pack_importer_exporter.input_rasters) + + pack_importer_exporter.import_data() + + if not self._is_region_modified() and pack_importer_exporter.input_rasters: + # Reset the region for every run or keep it persistent? + # Also, we now set that even for an existing session, this is + # consistent with behavior without a provided session. + # We could use env to pass the regions which would allow for the change + # while not touching the underlying session. + self.no_nonsense_run( + "g.region", + raster=pack_importer_exporter.input_rasters[0][0].stem, + env=self._session.env, + ) + self._region_is_set = True + result = self.no_nonsense_run_from_list(command) + pack_importer_exporter.export_data() + return result + + def no_nonsense_run(self, name, /, *, tool_kwargs=None, stdin=None, **kwargs): + args, popen_options = gs.popen_args_command(name, **kwargs) + return self.no_nonsense_run_from_list( + args, tool_kwargs=tool_kwargs, stdin=stdin, **popen_options + ) + + def no_nonsense_run_from_list( + self, command, tool_kwargs=None, stdin=None, **popen_options + ): + if not self._session: + # We create session and an empty XY project in one step. + self._create_session() + if not self._tools: + self._tools = Tools( + overwrite=True, + quiet=False, + verbose=False, + superquiet=False, + errors=self._errors, + capture_output=self._capture_output, + session=self._session, + ) + return self._tools.call_cmd(command) + + def _create_session(self): + # Temporary folder for all our files + if self._work_dir: + base_dir = self._work_dir + else: + # Resource is managed by weakref.finalize. + self._tmp_dir = ( + # pylint: disable=consider-using-with + tempfile.TemporaryDirectory() + ) + + def cleanup(tmpdir): + tmpdir.cleanup() + + self._tmp_dir_finalizer = weakref.finalize(self, cleanup, self._tmp_dir) + base_dir = self._tmp_dir.name + project_name = "project" + project_path = Path(base_dir) / project_name + gs.create_project(project_path) + self._region_file = project_path / "PERMANENT" / "WIND" + self._region_modified_time = self._region_file.stat().st_mtime + self._session = gs.setup.init(project_path, env=self._original_env.copy()) + + def _initialize_crs(self, rasters): + # Get the mapset path + mapset_path = self.no_nonsense_run( + "g.gisenv", get="GISDBASE,LOCATION_NAME,MAPSET", sep="/" + ).text + mapset_path = Path(mapset_path) + + if rasters: + with tarfile.TarFile(rasters[0][0]) as tar: + for name in [ + "PROJ_UNITS", + "PROJ_INFO", + "PROJ_EPSG", + "PROJ_SRID", + "PROJ_WKT", + ]: + try: + tar_info = tar.getmember(name) + except KeyError: + continue + Path(mapset_path / name).write_bytes( + tar.extractfile(tar_info).read() + ) + + def _is_region_modified(self): + if self._region_is_set: + return True + if not self._region_file: + if self._session: + self._region_file = ( + Path( + self.no_nonsense_run( + "g.gisenv", get="GISDBASE,LOCATION_NAME", sep="/" + ).text + ) + / "PERMANENT" + / "WIND" + ) + self._region_modified_time = self._region_file.stat().st_mtime + return False + return self._region_file.stat().st_mtime > self._region_modified_time + + def cleanup(self): + if self._tmp_dir_finalizer: + self._tmp_dir_finalizer() + + def __enter__(self): + """Enter the context manager context. + + Notably, the session is activated using the *init* function. + + :returns: reference to the object (self) + """ + return self + + def __exit__(self, type, value, traceback): + """Exit the context manager context. + + Finishes the existing session. + """ + self.cleanup() + + def __getattr__(self, name): + """Parse attribute to GRASS display module. Attribute should be in + the form 'd_module_name'. For example, 'd.rast' is called with 'd_rast'. + """ + if not self._session: + # We create session and an empty XY project in one step. + self._create_session() + if not self._name_helper: + self._name_helper = ToolFunctionResolver( + run_function=self.run, env=self._session.env + ) + return self._name_helper.get_function(name, exception_type=AttributeError) diff --git a/python/grass/tools/support.py b/python/grass/tools/support.py index 3ca33b2ed64..de59d6b4bde 100644 --- a/python/grass/tools/support.py +++ b/python/grass/tools/support.py @@ -40,6 +40,8 @@ # ga is present as well because that's the only import-time failure we expect. ga = None +from .importexport import ImporterExporter + class ParameterConverter: """Converts parameter values to strings and facilitates flow of the data.""" @@ -51,6 +53,7 @@ def __init__(self): self.stdin = None self.result = None self.temporary_rasters = [] + self.import_export = None def process_parameters(self, kwargs): """Converts high level parameter values to strings. @@ -81,6 +84,24 @@ def process_parameters(self, kwargs): elif isinstance(value, StringIO): kwargs[key] = "-" self.stdin = value.getvalue() + elif self.import_export is None and ImporterExporter.is_recognized_file( + value + ): + self.import_export = True + if self.import_export is None: + self.import_export = False + + def process_parameter_list(self, command): + """Converts or at least processes parameters passed as list of strings""" + for item in command: + splitted = item.split("=", maxsplit=1) + value = splitted[1] if len(splitted) > 1 else item + if self.import_export is None and ImporterExporter.is_recognized_file( + value + ): + self.import_export = True + if self.import_export is None: + self.import_export = False def translate_objects_to_data(self, kwargs, env): """Convert NumPy arrays to GRASS data""" @@ -116,6 +137,13 @@ def translate_data_to_objects(self, kwargs, env): self.result = None return False + def input_rows_columns(self): + if not len(self._numpy_inputs): + return None + # We use first item in the dictionary (which is in insertion order). + shape = next(iter(self._numpy_inputs.values()))[1].shape + return shape[0], shape[1] + class ToolFunctionResolver: def __init__(self, *, run_function, env, allowed_prefix=None): diff --git a/python/grass/tools/tests/conftest.py b/python/grass/tools/tests/conftest.py index f4b9145af1c..73e0ca2b82b 100644 --- a/python/grass/tools/tests/conftest.py +++ b/python/grass/tools/tests/conftest.py @@ -5,6 +5,7 @@ import pytest import grass.script as gs +from grass.experimental import MapsetSession from grass.tools.support import ToolResult, ToolFunctionResolver @@ -16,6 +17,20 @@ def xy_dataset_session(tmp_path): yield session +@pytest.fixture +def xy_mapset_non_permament(xy_dataset_session): # pylint: disable=redefined-outer-name + """Active session in a mapset of an XY location + + Mapset scope is function, while the location scope is module. + + The mapset is not removed by this fixture and relies on the underlying cleanup + procedures which means that it can be examined in the temporary directories + pytest creates. + """ + with MapsetSession("test1", create=True, env=xy_dataset_session.env) as session: + yield session + + @pytest.fixture def empty_result(): return ToolResult( @@ -33,3 +48,41 @@ def empty_string_result(): @pytest.fixture def echoing_resolver(): return ToolFunctionResolver(run_function=lambda x: x, env=os.environ.copy()) + + +@pytest.fixture +def rows_raster_file3x3(tmp_path): + project = tmp_path / "xy_test3x3" + gs.create_project(project) + with gs.setup.init(project, env=os.environ.copy()) as session: + gs.run_command("g.region", rows=3, cols=3, env=session.env) + gs.mapcalc("rows = row()", env=session.env) + output_file = tmp_path / "rows3x3.grass_raster" + gs.run_command( + "r.pack", + input="rows", + output=output_file, + flags="c", + superquiet=True, + env=session.env, + ) + return output_file + + +@pytest.fixture +def rows_raster_file4x5(tmp_path): + project = tmp_path / "xy_test4x5" + gs.create_project(project) + with gs.setup.init(project, env=os.environ.copy()) as session: + gs.run_command("g.region", rows=4, cols=5, env=session.env) + gs.mapcalc("rows = row()", env=session.env) + output_file = tmp_path / "rows4x5.grass_raster" + gs.run_command( + "r.pack", + input="rows", + output=output_file, + flags="c", + superquiet=True, + env=session.env, + ) + return output_file diff --git a/python/grass/tools/tests/grass_tools_session_tools_pack_test.py b/python/grass/tools/tests/grass_tools_session_tools_pack_test.py new file mode 100644 index 00000000000..b20c85780f2 --- /dev/null +++ b/python/grass/tools/tests/grass_tools_session_tools_pack_test.py @@ -0,0 +1,285 @@ +"""Test pack import-export functionality of grass.tools.Tools class""" + +import os +from pathlib import Path + +import pytest + +from grass.exceptions import CalledModuleError +from grass.tools import Tools + + +def test_pack_input_output_tool_name_function( + xy_dataset_session, rows_raster_file3x3, tmp_path +): + """Check input and output pack files work with tool name call""" + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=3, cols=3) + assert os.path.exists(rows_raster_file3x3) + output_file = tmp_path / "file.grass_raster" + tools.r_slope_aspect(elevation=rows_raster_file3x3, slope=output_file) + assert output_file.exists() + assert not tools.g_findfile(element="raster", file="file", format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + assert not tools.g_list(type="raster", format="json") + + +@pytest.mark.parametrize("parameter_type", [str, Path]) +def test_pack_input_output_tool_name_function_string_value( + xy_dataset_session, rows_raster_file3x3, tmp_path, parameter_type +): + """Check input and output pack files work string a parameter + + We make no assumption about the fixture types and explicitly test all + supported parameter types. + """ + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=3, cols=3) + assert os.path.exists(rows_raster_file3x3) + output_file = tmp_path / "file.grass_raster" + tools.r_slope_aspect( + elevation=parameter_type(rows_raster_file3x3), slope=parameter_type(output_file) + ) + assert output_file.exists() + assert not tools.g_findfile(element="raster", file="file", format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + assert not tools.g_list(type="raster", format="json") + + +def test_pack_input_output_with_name_and_parameter_call( + xy_dataset_session, rows_raster_file3x3, tmp_path +): + """Check input and output pack files work with tool name as string""" + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=3, cols=3) + assert os.path.exists(rows_raster_file3x3) + output_file = tmp_path / "file.grass_raster" + tools.run("r.slope.aspect", elevation=rows_raster_file3x3, slope=output_file) + assert output_file.exists() + assert not tools.g_findfile(element="raster", file="file", format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + assert not tools.g_list(type="raster", format="json") + + +def test_pack_input_output_with_subprocess_run_like_call( + xy_dataset_session, rows_raster_file3x3, tmp_path +): + """Check input and output pack files work with command as list""" + tools = Tools(session=xy_dataset_session) + assert os.path.exists(rows_raster_file3x3) + output_file = tmp_path / "file.grass_raster" + tools.run_cmd( + [ + "r.slope.aspect", + f"elevation={rows_raster_file3x3}", + f"aspect={output_file}", + ] + ) + assert output_file.exists() + assert not tools.g_findfile(element="raster", file="file", format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + assert not tools.g_list(type="raster", format="json") + + +def test_no_modify_command(xy_dataset_session, rows_raster_file3x3, tmp_path): + """Check that input command is not modified by the function""" + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=3, cols=3) + output_file = tmp_path / "file.grass_raster" + command = [ + "r.slope.aspect", + f"elevation={rows_raster_file3x3}", + f"slope={output_file}", + ] + original = command.copy() + tools.run_cmd(command) + assert original == command + + +def test_io_cleanup_after_function(xy_dataset_session, rows_raster_file3x3, tmp_path): + """Check input and output rasters are deleted after function call""" + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=3, cols=3) + output_file = tmp_path / "file.grass_raster" + tools.r_slope_aspect(elevation=rows_raster_file3x3, slope=output_file) + assert output_file.exists() + assert not tools.g_findfile(element="raster", file="file", format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + assert not tools.g_list(type="raster", format="json") + + +def test_io_cleanup_after_context(xy_dataset_session, rows_raster_file3x3, tmp_path): + """Check input and output rasters are deleted at the end of context""" + output_file_1 = tmp_path / "file.grass_raster" + output_file_2 = tmp_path / "file2.grass_raster" + with Tools(session=xy_dataset_session) as tools: + tools.g_region(rows=3, cols=3) + tools.r_slope_aspect(elevation=rows_raster_file3x3, slope=output_file_1) + assert output_file_1.exists() + assert tools.g_findfile(element="raster", file="file", format="json")["name"] + tools.r_mapcalc_simple(expression="100 * A", a="file", output=output_file_2) + assert output_file_2.exists() + assert tools.g_findfile(element="raster", file="file2", format="json")["name"] + # The pack files should still exist. + assert output_file_1.exists() + assert output_file_2.exists() + # The in-project rasters should not exist. + assert not tools.g_findfile(element="raster", file="file", format="json")["name"] + assert not tools.g_findfile(element="raster", file="file2", format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + assert not tools.g_list(type="raster", format="json") + + +def test_io_no_cleanup(xy_dataset_session, rows_raster_file3x3, tmp_path): + """Check input and output rasters are deleted only with explicit cleanup call""" + output_file = tmp_path / "file.grass_raster" + tools = Tools(session=xy_dataset_session, keep_data=True) + tools.g_region(rows=3, cols=3) + tools.r_slope_aspect(elevation=rows_raster_file3x3, slope=output_file) + assert output_file.exists() + # Files should still be available. + assert tools.g_findfile(element="raster", file="file", format="json")["name"] + assert tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + # But an explicit cleanup should delete the files. + tools.cleanup() + assert not tools.g_findfile(element="raster", file="file", format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + assert not tools.g_list(type="raster", format="json") + + +def test_io_no_cleanup_with_context(xy_dataset_session, rows_raster_file3x3): + """Check input and output rasters are kept even with context""" + with Tools(session=xy_dataset_session, keep_data=True) as tools: + tools.g_region(rows=3, cols=3) + tools.r_slope_aspect(elevation=rows_raster_file3x3, slope="file.grass_raster") + assert os.path.exists("file.grass_raster") + assert tools.g_findfile(element="raster", file="file", format="json")["name"] + tools.r_mapcalc_simple( + expression="100 * A", a="file", output="file2.grass_raster" + ) + assert os.path.exists("file2.grass_raster") + assert tools.g_findfile(element="raster", file="file2", format="json")["name"] + # The pack files should still exist. + assert os.path.exists("file.grass_raster") + assert os.path.exists("file2.grass_raster") + # The in-project rasters should also exist. + assert tools.g_findfile(element="raster", file="file", format="json")["name"] + assert tools.g_findfile(element="raster", file="file2", format="json")["name"] + assert tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + # But an explicit cleanup should delete the files. + tools.cleanup() + assert not tools.g_findfile(element="raster", file="file", format="json")["name"] + assert not tools.g_findfile(element="raster", file="file2", format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + assert not tools.g_list(type="raster", format="json") + + +def test_wrong_parameter(xy_dataset_session, rows_raster_file3x3): + """Check wrong parameter causes standard exception + + Since the tool is called to process its parameters with pack IO, + the error handling takes a different path than without pack IO active. + """ + tools = Tools(session=xy_dataset_session) + tools.g_region(rows=3, cols=3) + with pytest.raises(CalledModuleError, match="does_not_exist"): + tools.r_slope_aspect( + elevation=rows_raster_file3x3, + slope="file.grass_raster", + does_not_exist="test", + ) + + +def test_direct_r_unpack_to_data(xy_dataset_session, rows_raster_file3x3): + """Check that we can r.unpack data as usual""" + tools = Tools(session=xy_dataset_session, keep_data=True) + tools.g_region(rows=3, cols=3) + name = "data_1" + tools.r_unpack(input=rows_raster_file3x3, output=name) + assert tools.g_findfile(element="raster", file=name, format="json")["name"] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + + +def test_direct_r_unpack_to_pack(xy_dataset_session, rows_raster_file3x3, tmp_path): + """Check that roundtrip from existing packed raster to new packed raster works""" + tools = Tools(session=xy_dataset_session, keep_data=True) + tools.g_region(rows=3, cols=3) + name = "auto_packed_data_1.grass_raster" + packed_file = tmp_path / name + tools.r_unpack(input=rows_raster_file3x3, output=packed_file) + assert packed_file.exists() + assert tools.g_findfile(element="raster", file=packed_file.stem, format="json")[ + "name" + ] + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + + +def test_direct_r_pack_from_data(xy_dataset_session, tmp_path): + """Check that we can r.pack data as usual""" + tools = Tools(session=xy_dataset_session, keep_data=True) + tools.g_region(rows=3, cols=3) + tools.r_mapcalc(expression="data_1 = 1") + name = "manually_packed_data_1.grass_raster" + packed_file = tmp_path / name + tools.r_pack(input="data_1", output=packed_file) + # New file was created. + assert packed_file.exists() + # Input still exists. + assert tools.g_findfile(element="raster", file="data_1", format="json")["name"] + # There should be no raster created automatically. + assert not tools.g_findfile(element="raster", file=packed_file.stem, format="json")[ + "name" + ] + tools.cleanup() + # Input still exists even after cleaning. + assert tools.g_findfile(element="raster", file="data_1", format="json")["name"] + + +def test_direct_r_pack_from_pack(xy_dataset_session, rows_raster_file3x3, tmp_path): + """Check that roundtrip from existing packed raster to raster works""" + tools = Tools(session=xy_dataset_session, keep_data=True) + tools.g_region(rows=3, cols=3) + name = "manually_packed_data_1.grass_raster" + packed_file = tmp_path / name + tools.r_pack(input=rows_raster_file3x3, output=packed_file) + # New file was created. + assert packed_file.exists() + # Input still exists. + assert rows_raster_file3x3.exists() + # Auto-imported raster should exist. + assert tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] + # There should be no raster created automatically. + assert not tools.g_findfile(element="raster", file=packed_file.stem, format="json")[ + "name" + ] + tools.cleanup() + # Auto-imported raster should be deleted. + assert not tools.g_findfile( + element="raster", file=rows_raster_file3x3.stem, format="json" + )["name"] diff --git a/python/grass/tools/tests/grass_tools_standalone_tools_test.py b/python/grass/tools/tests/grass_tools_standalone_tools_test.py new file mode 100644 index 00000000000..02621737c1a --- /dev/null +++ b/python/grass/tools/tests/grass_tools_standalone_tools_test.py @@ -0,0 +1,295 @@ +"""Test grass.experimental.StandaloneTools class""" + +import os + +import numpy as np +import pytest + +from grass.tools import StandaloneTools + + +def test_json(): + """Check that numbers are parsed as numbers""" + tools = StandaloneTools() + assert tools.run_from_list(["g.region", "-p", "format=json"])["nsres"] == 1 + + +def test_run_name_and_parameters(): + """Check that name-and-parameters style works""" + tools = StandaloneTools() + assert tools.run("g.region", flags="p", format="json")["nsres"] == 1 + + +# TODO: This may fail because shutils.which does not setup a session. +def test_function_call_with_json(): + """Check that numbers are parsed as numbers""" + tools = StandaloneTools() + assert tools.g_region(flags="p", format="json")["nsres"] == 1 + + +def test_key_value_parser_number(): + """Check that numbers are parsed as numbers""" + tools = StandaloneTools() + assert tools.run_from_list(["g.region", "-g"]).keyval["nsres"] == 1 + + +def test_pack_input_output(rows_raster_file3x3): + """Check that global overwrite is not used when separate env is used""" + tools = StandaloneTools() + assert os.path.exists(rows_raster_file3x3) + tools.run_from_list( + [ + "r.slope.aspect", + f"elevation={rows_raster_file3x3}", + "aspect=file.grass_raster", + ] + ) + tools.run_from_list( + [ + "r.slope.aspect", + f"elevation={rows_raster_file3x3}", + "aspect=file.grass_raster", + ] + ) + assert os.path.exists("file.grass_raster") + + +def test_pack_initial_region_set(rows_raster_file3x3, rows_raster_file4x5): + """Check that global overwrite is not used when separate env is used""" + tools = StandaloneTools() + assert os.path.exists(rows_raster_file3x3) + tools.r_slope_aspect( + elevation=rows_raster_file3x3, + aspect="file.grass_raster", + flags="a", + ) + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 3 + assert region["cols"] == 3 + + +def test_pack_initial_region_respected(rows_raster_file3x3, rows_raster_file4x5): + """Check that global overwrite is not used when separate env is used""" + tools = StandaloneTools() + tools.r_slope_aspect( + elevation=rows_raster_file3x3, + aspect="file.grass_raster", + flags="a", + ) + tools.r_slope_aspect( + elevation=rows_raster_file4x5, + aspect="file2.grass_raster", + flags="a", + ) + assert os.path.exists("file.grass_raster") + assert os.path.exists("file2.grass_raster") + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 3 + assert region["cols"] == 3 + region = tools.r_info(map="file.grass_raster", format="json") + assert region["rows"] == 3 + assert region["cols"] == 3 + region = tools.r_info(map="file2.grass_raster", format="json") + assert region["rows"] == 3 + assert region["cols"] == 3 + + +def test_pack_manual_region_from_raster_preserved( + rows_raster_file3x3, rows_raster_file4x5 +): + """Check that global overwrite is not used when separate env is used""" + tools = StandaloneTools() + tools.g_region(raster=rows_raster_file4x5) + tools.r_slope_aspect( + elevation=rows_raster_file3x3, + aspect="file.grass_raster", + flags="a", + ) + assert os.path.exists("file.grass_raster") + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 4 + assert region["cols"] == 5 + + +def test_pack_manual_region_from_parameters_preserved(rows_raster_file3x3): + """Check that global overwrite is not used when separate env is used""" + tools = StandaloneTools() + tools.g_region(rows=4, cols=5) + tools.r_slope_aspect( + elevation=rows_raster_file3x3, + aspect="file.grass_raster", + flags="a", + ) + assert os.path.exists("file.grass_raster") + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 4 + assert region["cols"] == 5 + + +def test_pack_manual_region_read_but_not_set(rows_raster_file3x3): + """Check that global overwrite is not used when separate env is used""" + tools = StandaloneTools() + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 1 + assert region["cols"] == 1 + tools.r_slope_aspect( + elevation=rows_raster_file3x3, + aspect="file.grass_raster", + flags="a", + ) + assert os.path.exists("file.grass_raster") + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 3 + assert region["cols"] == 3 + + +def test_pack_region_manually_changed(rows_raster_file3x3, rows_raster_file4x5): + """Check that global overwrite is not used when separate env is used""" + tools = StandaloneTools() + tools.r_slope_aspect( + elevation=rows_raster_file3x3, + aspect="file.grass_raster", + flags="a", + ) + region = tools.g_region(rows=2, cols=2) + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 2 + assert region["cols"] == 2 + tools.r_slope_aspect( + elevation=rows_raster_file4x5, + aspect="file2.grass_raster", + flags="a", + ) + assert os.path.exists("file.grass_raster") + assert os.path.exists("file2.grass_raster") + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 2 + assert region["cols"] == 2 + + +def test_pack_workflow(rows_raster_file4x5): + tools = StandaloneTools() + tools.r_slope_aspect( + elevation=rows_raster_file4x5, slope="slope.grr", aspect="aspect" + ) + tools.r_flow( + elevation=rows_raster_file4x5.stem, + aspect="aspect", + flowaccumulation="flow_accumulation.grr", + flags=3, + ) + assert os.path.exists(rows_raster_file4x5) + assert os.path.exists("slope.grr") + assert os.path.exists("flow_accumulation.grr") + assert not os.path.exists("aspect.grr") + assert not os.path.exists("elevation.grr") + + +def test_regions_independent(): + """Check that global overwrite is not used when separate env is used""" + tools1 = StandaloneTools() + tools1.g_region(rows=2, cols=3) + tools2 = StandaloneTools() + tools2.g_region(rows=4, cols=5) + region = tools1.g_region(flags="p", format="json") + assert region["rows"] == 2 + assert region["cols"] == 3 + region = tools2.g_region(flags="p", format="json") + assert region["rows"] == 4 + assert region["cols"] == 5 + + +def test_region_changed_for_session(xy_dataset_session): + """Check that global overwrite is not used when separate env is used""" + tools1 = StandaloneTools(session=xy_dataset_session) + tools1.g_region(rows=4, cols=5) + tools2 = StandaloneTools(session=xy_dataset_session) + region = tools2.g_region(flags="p", format="json") + assert region["rows"] == 4 + assert region["cols"] == 5 + + +def test_numpy_multiple_inputs_one_output(): + """Check that a NumPy array works for multiple inputs""" + tools = StandaloneTools() + result = tools.r_mapcalc_simple( + expression="A + B", a=np.full((2, 3), 2), b=np.full((2, 3), 5), output=np.array + ) + # Size of output + assert result.shape == (2, 3) + assert np.all(result == np.full((2, 3), 7)) + # Computational region + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 2 + assert region["cols"] == 3 + result = tools.r_mapcalc_simple(expression="int(10)", output=np.array) + assert result.shape == (2, 3) + assert np.all(result == np.full((2, 3), 10)) + + +def test_numpy_manual_region_mismatch(): + """Check that a NumPy array works for multiple inputs""" + tools = StandaloneTools() + result = tools.r_mapcalc_simple( + expression="A + B", a=np.full((2, 3), 2), b=np.full((2, 3), 5), output=np.array + ) + assert result.shape == (2, 3) + region = tools.g_region(rows=2, cols=2) + region = tools.g_region(flags="p", format="json") + assert region["rows"] == 2 + assert region["cols"] == 2 + with pytest.raises(ValueError, match=r"broadcast.*shape.*\(2.*3\).*\(2.*2\)"): + tools.r_mapcalc_simple( + expression="A + B", + a=np.full((2, 3), 2), + b=np.full((2, 3), 5), + output=np.array, + ) + + +def test_numpy_workflow_region_mismatch(): + """Check that a NumPy array works for multiple inputs""" + tools = StandaloneTools() + result = tools.r_mapcalc_simple( + expression="A + B", a=np.full((2, 3), 2), b=np.full((2, 3), 5), output=np.array + ) + assert result.shape == (2, 3) + with pytest.raises(ValueError, match=r"broadcast.*shape.*\(4.*5\).*\(2.*3\)"): + tools.r_mapcalc_simple( + expression="A + B", + a=np.full((4, 5), 2), + b=np.full((4, 5), 5), + output=np.array, + ) + + +def test_numpy_inputs_region_mismatch(): + """Check that a NumPy array works for multiple inputs""" + tools = StandaloneTools() + with pytest.raises(ValueError, match=r"broadcast.*shape.*\(4.*5\).*\(2.*3\)"): + tools.r_mapcalc_simple( + expression="A + B", + a=np.full((2, 3), 2), + b=np.full((4, 5), 5), + output=np.array, + ) + + +# May fail depending on the order of the tests because of the session setup. +def test_existing_session_accepted(xy_mapset_non_permament): + """Check that numbers are parsed as numbers""" + tools = StandaloneTools(session=xy_mapset_non_permament) + assert tools.g_mapset(flags="p").text == xy_mapset_non_permament.name + + +def test_another_session_is_ignored(xy_dataset_session): + """Check that numbers are parsed as numbers""" + from grass.experimental import MapsetSession + + with MapsetSession("test1", create=True, env=xy_dataset_session.env) as session: + tools = StandaloneTools() + assert tools.g_mapset(flags="p").text == "PERMANENT" + from grass.experimental.tools import Tools + + tools = Tools(session=session) + assert tools.g_mapset(flags="p").text == "test1"