diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5d4254c..8489f55 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -17,6 +17,12 @@ repos: - id: check-added-large-files args: ["--maxkb=5000"] + - repo: https://github.com/ikamensh/flynt + rev: 1.0.1 + hooks: + - id: flynt + args: ["--fail-on-change"] + - repo: https://github.com/psf/black rev: 23.12.1 hooks: @@ -27,12 +33,6 @@ repos: hooks: - id: isort - - repo: https://github.com/ikamensh/flynt - rev: 1.0.1 - hooks: - - id: flynt - args: ["--fail-on-change"] - - repo: https://github.com/PyCQA/docformatter rev: v1.7.5 hooks: diff --git a/climateset/__init__.py b/climateset/__init__.py index ab49175..067b165 100644 --- a/climateset/__init__.py +++ b/climateset/__init__.py @@ -6,6 +6,7 @@ DATA_DIR = PROJECT_ROOT / "data" RAW_DATA = DATA_DIR / "raw" PROCESSED_DATA = DATA_DIR / "processed" +FINAL_DATA = DATA_DIR / "final" LOAD_DATA = DATA_DIR / "load" META_DATA = DATA_DIR / "meta" SCRIPT_DIR = PROJECT_ROOT / "scripts" diff --git a/climateset/processing/__init__.py b/climateset/processing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/climateset/processing/processor_step.py b/climateset/processing/processor_step.py new file mode 100644 index 0000000..42b0ff4 --- /dev/null +++ b/climateset/processing/processor_step.py @@ -0,0 +1,23 @@ +from abc import ABC, abstractmethod +from pathlib import Path + + +class AbstractProcessorStep(ABC): + def __init__(self): + self.results_directory = None + + @abstractmethod + def execute(self, input_directory): + pass + + def get_results_directory(self): + return self.results_directory + + +def process_steps(input_directory: Path, list_of_steps: list[AbstractProcessorStep]): + current_input_directory = input_directory + for step in list_of_steps: + step.execute(current_input_directory) + # current_input_directory = step.get_results_directory() + + return current_input_directory diff --git a/climateset/processing/raw/__init__.py b/climateset/processing/raw/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/climateset/processing/raw/abstract_raw_processor.py b/climateset/processing/raw/abstract_raw_processor.py new file mode 100644 index 0000000..4ab0644 --- /dev/null +++ b/climateset/processing/raw/abstract_raw_processor.py @@ -0,0 +1,85 @@ +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Union + +from climateset import CONFIGS +from climateset.processing.raw.checker import ( + AbstractDirectoryChecker, + BasicDirectoryChecker, +) +from climateset.utils import create_logger, get_json_config + +LOGGER = create_logger(__name__) + + +class AbstractRawProcessor(ABC): + """Abstract class for raw processing.""" + + def __init__( + self, + input_directory: Union[str, Path], + working_directory: Union[str, Path], + processing_steps: list = None, + checker: AbstractDirectoryChecker = None, + processing_parameters_config: Union[str, Path] = CONFIGS / "processing" / "raw_processing_params.json", + ): + """Init + Args: + + """ + # type of class + self.type_class = self.type_class_meta() + self.input_directory = input_directory + if isinstance(input_directory, str): + self.input_directory = Path(input_directory) + self.working_directory = working_directory + if isinstance(working_directory, str): + self.working_directory = Path(working_directory) + self.processing_steps = processing_steps + if not self.processing_steps: + self.processing_steps = [] + self.available_steps = {} + self.processing_params_config: Union[str, Path] = processing_parameters_config + self.meta_raw_dict = get_json_config(self.processing_params_config) + self.calendar = self.meta_raw_dict["calendar"] + self.desired_units = self.meta_raw_dict["units"] + + self.checker = checker + if not self.checker: + self.checker: AbstractDirectoryChecker = BasicDirectoryChecker(directory=self.input_directory) + + @abstractmethod + def type_class_meta(self) -> str: + """Returns the name tag of the subclass.""" + + @abstractmethod + def process_directory(self): + """ + Preprocessing a subdir - must be implemented for each subclass. + + Returns: + + """ + + @abstractmethod + def file_belongs_to_type(self, input_file: Path) -> bool: + """ + Checks if a file belongs to input4mips or cmip6 category. + + Args: + input_file (Path): the file that should be checked + Returns: + True if it belong to the class type, False if not + """ + + def add_processing_step(self, steps: Union[str, list]): + if isinstance(steps, str): + steps = [steps] + for step in steps: + if step in self.available_steps: + self.processing_steps.append(step) + + def list_available_steps(self): + step_list = [step for step in self.available_steps if not step.startswith("_")] + LOGGER.info(f"Available steps: {step_list}") + return step_list diff --git a/climateset/processing/raw/checker.py b/climateset/processing/raw/checker.py new file mode 100644 index 0000000..61b7e17 --- /dev/null +++ b/climateset/processing/raw/checker.py @@ -0,0 +1,378 @@ +import gc +import json +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Optional, Union + +import numpy as np +import xarray as xr + +from climateset import CONFIGS +from climateset.utils import create_logger, get_json_config + +LOGGER = create_logger(__name__) + +VERIFY_FILE = "verify_file_ok" +VERIFY_UNITS = "verify_units" +VERIFY_RESOLUTION = "verify_resolution" +VERIFY_LONLAT = "verify_lonlat" +VERIFY_VARIABLES = "verify_variables" + +AVAILABLE_RAW_CHECKS = frozenset([VERIFY_UNITS, VERIFY_RESOLUTION, VERIFY_LONLAT, VERIFY_VARIABLES]) + + +class AbstractDirectoryChecker(ABC): + def __init__(self, directory: Union[str, Path], checker_steps: list = None): + if isinstance(directory, str): + directory = Path(directory) + self.directory: Union[str, Path] = directory + self.checker_steps: list = checker_steps + if not self.checker_steps: + self.checker_steps = [] + + self.results = [] + self.errors = [] + + @abstractmethod + def check_directory(self): + raise NotImplementedError + + def write_results(self, base_path: Path): + if self.results: + filename = base_path / "results.json" + with open(filename, "w", encoding="utf-8") as json_file: + json.dump(self.results, json_file, indent=2) + if self.errors: + filename = base_path / "errors.json" + with open(filename, "w", encoding="utf-8") as json_file: + json.dump(self.errors, json_file, indent=2) + + +class AbstractFileChecker(ABC): + def __init__(self, input_file: Union[str, Path], checker_steps: list = None, available_checks: dict = None): + self.input_file: Union[str, Path] = input_file + self.checker_steps: list = checker_steps + if not self.checker_steps: + self.checker_steps = [] + self.available_steps = available_checks + self.results = {} + self.dataset: Optional[xr.Dataset] = None + + def check(self): + self.check_file_ok() + for step in self.checker_steps: + self.available_steps[step]() + return {"filename": self.input_file.name, "results": self.results} + + def check_file_ok(self): + try: + LOGGER.info(f"Trying to load file {self.input_file}") + ds = xr.load_dataset(self.input_file) + self.dataset = ds + LOGGER.info(f"Successfully loaded file {self.input_file}") + self.results["file_ok"] = True + + except ValueError: + self.results["file_ok"] = False + LOGGER.warning(f"The following file is corrupt:\n {self.input_file}") + self.dataset = None + + def cleanup_memory(self): + del self.dataset + gc.collect() + + +class BasicDirectoryChecker(AbstractDirectoryChecker): + def __init__( + self, + directory: Union[str, Path] = None, + checker_steps: list = None, + ): + super().__init__(directory=directory, checker_steps=checker_steps) + + def check_directory(self) -> bool: + """ + Checking all files in a sub dir. + + TAKES TIME. + Args: + + Returns: + bool: True if all checks were successful, False if not + """ + for f in self.directory.glob("*.nc"): + file_checker = BasicFileChecker(input_file=f, checker_steps=self.checker_steps) + file_checker.check() + if not all(file_checker.results.values()): + self.errors.append(file_checker.results) + self.results.append(file_checker.results) + file_checker.cleanup_memory() + + if self.errors: + LOGGER.warning("There were errors running while running the checks") + LOGGER.warning(f"Errors : {self.errors}") + return False + + LOGGER.info("All checks passed") + return True + + +class BasicFileChecker(AbstractFileChecker): + def __init__( + self, + input_file, + checker_steps: list = None, + ): + super().__init__( + input_file=input_file, checker_steps=checker_steps, available_checks={VERIFY_FILE: self.check_file_ok} + ) + + def check(self): + """ + Logs warnings if there are inconsistent files. + + Checks always + for corruptness, the rest is optional. See initialization of class. + Args: + input_file (Path): The path that should be checked + log_file (Path): Where error / warnings should be logged additionally. + mode (str): 'w' writes the logs into the file, 'a' appends it to + an existing file. + Returns: + True if all checks were passed, False if not. + """ + + self.check_file_ok() + + +class RawDirectoryChecker(AbstractDirectoryChecker): + def __init__( + self, + directory: Union[str, Path] = None, + checker_steps: list = None, + type_class=None, + lonlat_resolution_to_verify: str = "50_km", + processing_params_config: Union[str, Path] = CONFIGS / "processing" / "raw_processing_params.json", + ): + super().__init__(directory=directory, checker_steps=checker_steps) + self.type_class = type_class + self.lonlat_resolution_to_verify = lonlat_resolution_to_verify + self.meta_raw_dict = get_json_config(processing_params_config) + self.calendar = self.meta_raw_dict["calendar"] + self.desired_units = self.meta_raw_dict["units"] + + lons = self.meta_raw_dict[self.lonlat_resolution_to_verify][f"{self.type_class}_lon"] + lats = self.meta_raw_dict[self.lonlat_resolution_to_verify][f"{self.type_class}_lat"] + self.lon_specs = (float(lons["min"]), float(lons["max"]), float(lons["step"])) # (-179.75, 179.75, 0.5) + self.lat_specs = (float(lats["min"]), float(lats["max"]), float(lats["step"])) # (-89.75, 89.75, 0.5) + + def add_unit_check(self): + self.checker_steps.append(VERIFY_UNITS) + + def add_resolution_check(self): + self.checker_steps.append(VERIFY_RESOLUTION) + + def add_lonlat_check(self): + self.checker_steps.append(VERIFY_LONLAT) + + def add_variables_check(self): + self.checker_steps.append(VERIFY_VARIABLES) + + def check_directory(self) -> bool: + """ + Checking all files in a sub dir. + + TAKES TIME. + Returns: + bool: True if all checks were successful, False if not + """ + + for f in self.directory.rglob("*.nc"): + file_checker = RawFileChecker( + input_file=f, + type_class=self.type_class, + checker_steps=self.checker_steps, + lonlat_resolution_to_verify=None, + calendar=self.calendar, + units=self.desired_units, + lon_specs=self.lon_specs, + lat_specs=self.lat_specs, + expected_units=self.meta_raw_dict["units"], + ) + file_checker.check() + if not all(file_checker.results.values()): + self.errors.append(file_checker.results) + self.results.append(file_checker.results) + file_checker.cleanup_memory() + + if self.errors: + LOGGER.warning("There were errors running while running the checks") + LOGGER.warning(f"Errors : {self.errors}") + return False + + LOGGER.info("All checks passed") + return True + + +class RawFileChecker(AbstractFileChecker): + def __init__( + self, + input_file, + type_class=None, + checker_steps: list = None, + lonlat_resolution_to_verify=None, + calendar=None, + units=None, + lon_specs=None, + lat_specs=None, + expected_units=None, + ): + super().__init__( + input_file=input_file, + checker_steps=checker_steps, + available_checks={ + VERIFY_UNITS: self.check_units_ok, + VERIFY_RESOLUTION: self.check_resolution, + VERIFY_LONLAT: self.check_lonlat, + VERIFY_VARIABLES: self.check_variables, + }, + ) + self.type_class = type_class + self.lonlat_resolution_to_verify = lonlat_resolution_to_verify + self.calendar = calendar + self.desired_units = units + self.lon_specs = lon_specs + self.lat_specs = lat_specs + self.results = {} + self.expected_units = expected_units + + def check(self): + """ + Logs warnings if there are inconsistent files. + + Checks always + for corruptness, the rest is optional. See initialization of class. + Args: + input_file (Path): The path that should be checked + log_file (Path): Where error / warnings should be logged additionally. + mode (str): 'w' writes the logs into the file, 'a' appends it to + an existing file. + Returns: + True if all checks were passed, False if not. + """ + + self.check_file_ok() + for step in self.checker_steps: + self.available_steps[step]() + + def check_variables(self): + try: + actual_var = self.dataset.variable_id + # cmip6 case + if self.type_class == "cmip6": + named_var = str(self.input_file.name).split("_")[4] + # input4mips case + else: + named_var = str(self.input_file.parents[0]).split("/")[-4] + if "sum" in named_var: + named_var = named_var.split("_")[0] + + if actual_var != named_var: + self.results["var_ok"] = False + LOGGER.warning( + f"The following file contains the variable {actual_var}, but the variable {named_var} " + f"was expected: \n{self.input_file}" + ) + else: + self.results["var_ok"] = True + except AttributeError: + self.results["var_ok"] = False + LOGGER.warning(f"Dataset does not contain variable_id: \n {self.input_file}") + + def check_lonlat(self, ds, check_dict, input_file, passed_checks): + lon_min, lon_max, lon_step = self.lon_specs # (-179.75, 179.75, 0.5) + lat_min, lat_max, lat_step = self.lat_specs # (-89.75, 89.75, 0.5) + try: + found_lon_min, found_lon_max = ds.lon[0].item(), ds.lon[-1].item() + found_lat_min, found_lat_max = ds.lat[0].item(), ds.lat[-1].item() + found_lon_step = abs(ds.lon[0].item() - ds.lon[1].item()) + found_lat_step = abs(ds.lat[0].item() - ds.lat[1].item()) + + # longitude checks + if (lon_min == found_lon_min) and (lon_max == found_lon_max) and (np.isclose(lon_step, found_lon_step)): + check_dict["lon_ok"] = True + else: + passed_checks = False + check_dict["lon_ok"] = False + LOGGER.warning(f"The following file has unexpected lon values: \n{input_file}") + + # latitude checks + if (lat_min == found_lat_min) and (lat_max == found_lat_max) and (np.isclose(lat_step, found_lat_step)): + check_dict["lat_ok"] = True + else: + passed_checks = False + check_dict["lat_ok"] = False + LOGGER.warning(f"The following file has unexpected lat values: \n{input_file}") + + except AttributeError: + passed_checks = False + check_dict["lon_ok"] = False + check_dict["lat_ok"] = False + LOGGER.warning(f"Dataset contains different namings for longitude / latitude. \n {input_file}") + return passed_checks + + def check_resolution(self): + try: + actual_spat_res = self.dataset.attrs["nominal_resolution"] + actual_spat_res = actual_spat_res.replace("_", " ") # just in case + named_spat_res = " ".join(str(self.input_file.name).split("_")[-5:-3]) + if actual_spat_res != named_spat_res: + self.results["res_ok"] = False + LOGGER.warning( + f"The following file has the nominal resolution {actual_spat_res}, but the resolution " + f"{named_spat_res} was expected: \n{self.input_file}" + ) + else: + self.results["res_ok"] = True + + actual_temp_res = self.dataset.attrs["frequency"] + named_temp_res = str(self.input_file.name).split("_")[-3] + if actual_temp_res != named_temp_res: + self.dataset["freq_ok"] = False + LOGGER.warning( + f"The following file has the temporal frequency {actual_temp_res}, but the frequency " + f"{named_temp_res} was expected: \n{self.input_file}" + ) + else: + self.dataset["freq_ok"] = True + except AttributeError: + self.dataset["res_ok"] = False + self.dataset["freq_ok"] = False + LOGGER.warning(f"The following file gives no access to nominal resolution or frequency: {self.input_file}") + + def check_units_ok(self): + try: + var = self.dataset.variable_id + found_unit = None + + try: + found_unit = self.dataset[var].units + except AttributeError: + try: + found_unit = self.dataset["variable_units"] + except AttributeError: + raise KeyError(f"Unit could not be found in file {self.input_file}") + + expected_unit = self.expected_units[var] + if found_unit != expected_unit: + self.results["units_ok"] = False + LOGGER.warning( + f"The following file has the unit {found_unit}, but the unit {expected_unit} " + f"was expected: \n{self.input_file}" + ) + else: + self.results["units_ok"] = True + + except AttributeError: + self.results["units_ok"] = False + LOGGER.warning(f"The following file does not allow us to call ds.variable_id:\n{self.input_file}") diff --git a/climateset/processing/raw/cmip6_processor.py b/climateset/processing/raw/cmip6_processor.py new file mode 100644 index 0000000..bcac9ad --- /dev/null +++ b/climateset/processing/raw/cmip6_processor.py @@ -0,0 +1,232 @@ +# TODO Adapt module for new pipeline pattern +import os +import re +import subprocess +import warnings +from pathlib import Path + +import xarray as xr +import xmip +import xmip.preprocessing as xmip_preprocessing +from tqdm import tqdm + +from climateset.processing.raw.abstract_raw_processor import AbstractRawProcessor +from climateset.processing.raw.utils import create_generic_output_path + + +# Attention: we need to write _desired_units for our all our data +# Attention: Before you apply, make sure this is only applied to CMIP6 data! +class ClimateModelProcesser(AbstractRawProcessor): + """ + Can be called to apply xmip preprocessing. + + Works only on Cmip6 Data! + """ + + def __init__( + self, + rename_ds: bool = True, + correct_lonlat: bool = True, + correct_units: bool = True, + fix_bounds: bool = True, + verify_units: bool = False, + verify_resolution: bool = False, + verify_lonlat: bool = False, + verify_num_of_files: bool = False, + verify_variables: bool = False, + correct_time_axis: bool = True, + correct_calendar: bool = True, + sum_levels: bool = True, + ): + """Init function for xmip processer + Args: + rename_ds (bool): Make naming in cmip6 consistent + correct_lonlat (bool): Make lonlat naming consistent + correct_units (bool): Make units consistent + fix_bounds (bool): Fix bounds and vertex + verify_units (bool): Makes sure that all ghg conform to the same units + verify_resolution (bool): Checks if the res in the file name is the same like in the file + verify_lonlat (bool): Checks if the longitude, latitude values are as expected + verify_num_of_files (bool): Checks how many files are in the final dir. + raises a warning if more or less than one. + verify_variables (bool): Checks if the variables are also the expected vars. + correct_calendar (bool): Makes sure that the right calendar is used. + correct_time_axis (bool): Makes sure that each file starts at the first + of each time-unit and shifts if not. + sum_levels (bool): Makes sure that all emissions are sumed over the + different levels. + """ + # init abstract class with checks + super().__init__( + verify_units=verify_units, + verify_resolution=verify_resolution, + verify_lonlat=verify_lonlat, + verify_num_of_files=verify_num_of_files, + verify_variables=verify_variables, + ) + + # xmip processing steps + self.rename_ds = rename_ds + self.correct_lonlat = correct_lonlat + self.correct_units = correct_units + self.fix_bounds = fix_bounds + + # climatevers processing steps + self.correct_calendar = correct_calendar + self.correct_time_axis = correct_time_axis + self.sum_levels = sum_levels + + # update the desired unit dicts for unit correction of xmip + for var, unit in self.desired_units.items(): + self.update_desired_units_dict(var, unit) + + def type_class_meta(self) -> str: + """Returns the name tag of the subclass.""" + return "cmip6" + + # TODO share this in process class with input4mips / cmip6 + def file_belongs_to_type(self, input_file: Path) -> bool: + """ + Check if the name cmip6 appears in the path name. + + Args: + input_file (Path): the file that should be checked for cmip6 + Returns: + True if name indicates it's a cmip6 file, False if not. + """ + return bool(re.search("cmip6", str(input_file), re.IGNORECASE)) + + # Is used internally and can be updated externally + def update_desired_units_dict(self, var: str, unit: str): + """ + Must be applied before running xmip processing. + + Updates the global + dict of xmip to include which units you want to use throughout all models. + Args: + var (str): the variable that shall be added as key + unit (str): the desired unit that all models should use as value + """ + # adapt global variable from xmip + xmip.preprocessing._desired_units[var] = unit + + # share this with input4mips? + def process_directory( + self, + sub_dir: Path, + output_dir: Path, + xmip: bool = True, + climateverse: bool = True, + model: str = "", + overwrite: bool = False, + threads: int = 1, + ): + """ + Applies the preprocessing to a complete directory. + + Args: + sub_dir (Path): directory on which xmip processing should be applied + output_dir (Path): where the preprocessed files should be stored + xmip (bool): if xmip processing should be applied + climateverse (bool): if additional processing from us should be applied + model (str): Default is "", i.e. all models are processed the + same, all at once. Set this to a specific model if you want instead. + overwrite (bool): If data can be overwritten or not + """ + print(f"Start Cmip6 preprocessing of {sub_dir}.") + + # loop through sub_dir to remap all files in here + # total_files = len(list(sub_dir.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(sub_dir)): + if len(files) > 0: + for file in files: + # create output dir + input_file = Path(path) / file + + if self.file_belongs_to_type(input_file) and (model in str(input_file)): + # create output file + if sub_dir == output_dir: + output_file = input_file + else: + output_file = create_generic_output_path(output_dir, path, file) + + # process + if (not output_file.is_file()) or overwrite: + if climateverse and xmip: + self.climateverse_process(input_file, output_file, threads=threads) + self.xmip_process(output_file, output_file) + elif climateverse: + self.climateverse_process(input_file, output_file, threads=threads) + elif xmip: + self.xmip_process(input_file, output_file) + else: + warnings.warn("No processing requested.") + + print(f"Finished Cmip6 preprocessing of {sub_dir} and saved it at {output_dir}.") + + def xmip_process(self, input_file: Path, output_file: Path): + """ + Applies the xmip processing to a specific file. + + Make sure this is + a CMIP6 file beforehand. + Args: + input_file (Path): Input netcdf cmip6 file that shall be processed. + output_file (Path): Where the processed dataset shall be stored. + """ + # load file as dataset + ds = xr.load_dataset(input_file) + + # call preprocessing + if self.rename_ds: + ds = xmip_preprocessing.rename_cmip6(ds) + if self.correct_lonlat: + ds = xmip_preprocessing.promote_empty_dims(ds) + ds = xmip_preprocessing.correct_coordinates(ds) + ds = xmip_preprocessing.broadcast_lonlat(ds) + ds = xmip_preprocessing.correct_lon(ds) + if self.correct_units: + ds = xmip_preprocessing.correct_units(ds) + if self.fix_bounds: + ds = xmip_preprocessing.parse_lon_lat_bounds(ds) + ds = xmip_preprocessing.sort_vertex_order(ds) + ds = xmip_preprocessing.maybe_convert_bounds_to_vertex(ds) + ds = xmip_preprocessing.maybe_convert_vertex_to_bounds(ds) + ds = xmip_preprocessing.fix_metadata(ds) + ds = ds.drop_vars(xmip_preprocessing._drop_coords, errors="ignore") + + # store at new location (not raw anymore) + ds.to_netcdf(output_file, mode="w", format="NETCDF4", engine="netcdf4") + + def climateverse_process(self, input_file: Path, output_file: Path, threads: int = 1): + """ + Applies additional processing steps that are not included in xmip. Needs to be applied BEFORE xmip processing + because it's using cdo and xmip breaks cdo formatting (at least for me so far). + + Args: + input_file (Path): The file that should be processed. + output_file (Path): Where the resulting file is stored. + threads (int): threads for cdo + """ + chained_operators = [] + + # cdo vertsum infile outfile + if self.sum_levels: + chained_operators.append("-vertsum") + + # ATTENTION only relevant for "monthly" data! + # cdo -setday,1 -settime,00:00:00 + if self.correct_time_axis: + if "mon" in str(input_file): + chained_operators.append("-setday,1") + chained_operators.append("-settime,00:00:00") + else: + warnings.warn("Skipping correcting time axis, since it should only be applied to monthly data.") + + if self.correct_calendar: + # default calendar: 365_day + chained_operators.append(f"-setcalendar,{self.calendar}") + + # run commands + commands = ["cdo", "-s", "-w", "-L", "-P", str(threads), *chained_operators, input_file, output_file] + subprocess.call(commands) diff --git a/climateset/processing/raw/input4mips/__init__.py b/climateset/processing/raw/input4mips/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/climateset/processing/raw/input4mips/input4mips_processor.py b/climateset/processing/raw/input4mips/input4mips_processor.py new file mode 100644 index 0000000..d177559 --- /dev/null +++ b/climateset/processing/raw/input4mips/input4mips_processor.py @@ -0,0 +1,437 @@ +"""This Module contains a Processor class that automates ProcessorStep processes, as well as individual ProcessorStep +classes that can be used independently for more customized applications.""" +import re +from pathlib import Path +from typing import Union + +from climateset import PROCESSED_DATA +from climateset.processing.processor_step import AbstractProcessorStep +from climateset.processing.raw.abstract_raw_processor import AbstractRawProcessor +from climateset.processing.raw.checker import AbstractDirectoryChecker +from climateset.processing.raw.input4mips.processing import ( + CDO_SET_CALENDAR, + CDO_SET_DAY, + CDO_SET_TIME, + CDO_VERTICAL_SUM, + cdo_process_directory, + correct_units_all_files_in_directory, + create_anthro_fire_directory, + create_emissions_totals, + merge_ghg_directory, + rename_biomassburning_files_in_directory, + sum_sectors_directory, +) +from climateset.utils import create_logger + +# +# Processing steps +# + +# private step +_CDO_PROCESSING = "_cdo_processing" + +# user available steps +CORRECT_NAMES = "correct_names" +REORDER_SSP_CO2 = "reorder_ssp_co2" +CREATE_FIRE_FILES = "create_fire_files" +CORRECT_UNITS = "correct_units" +CORRECT_CALENDAR = "correct_calendar" +CORRECT_TIME_AXIS = "correct_time_axis" +SUM_LEVELS = "sum_levels" +SUM_SECTORS = "sum_sectors" +CREATE_TOTALS = "create_totals" +MERGE_GHG = "merge_ghg" + +AVAILABLE_INPUT4MIPS_PROCESSING_STEPS = frozenset( + [ + CORRECT_NAMES, + REORDER_SSP_CO2, + CREATE_FIRE_FILES, + CORRECT_UNITS, + CORRECT_CALENDAR, + CORRECT_TIME_AXIS, + SUM_LEVELS, + SUM_SECTORS, + CREATE_TOTALS, + MERGE_GHG, + ] +) +ORDER = "order" +PROCESSOR_STEP = "processor_step" + +LOGGER = create_logger(__name__) + + +class Input4MipsEmissionProcessor(AbstractRawProcessor): + """ + Can be called to summarize emission over sectors. + + Only applied to Input4MIPs data. + """ + + def __init__( + self, + input_directory: Union[str, Path], + working_directory: Union[str, Path] = PROCESSED_DATA / "input4mips", + checker: AbstractDirectoryChecker = None, + processing_steps=None, + spat_load: str = "map", + temp_load: str = "mon", + ghg_list: list = None, + ghg_sum_name: str = None, + meta_data: Path = None, + fire_cases: bool = True, + force_sum_sectors: list = None, + input_freq: str = "mon", + input_res: str = "50_km", + ): + """Init emission / ghg processer + Args: + verify_units (bool): Makes sure that all ghg conform to the same units. + Default false because slow. + verify_resolution (bool): Checks if the res in the file name is the same like in the file. + Default false because slow. + verify_lonlat (bool): Checks if the longitude, latitude values are as expected. + Default false because slow. + verify_num_of_files (bool): Checks how many files are in the final dir. + raises a warning if more or less than one. + verify_variables (bool): Checks if the variables are also the expected vars. + fix_co2 (bool): Fixes co2 data if necessary. Needed for ssp scenarios! + Default false, because we are providing raw data where it's already fixed. + correct_names (bool): renames all files that are expected + to be biomassburning files. Attention! apply only to raw data, + will mess things up if applied later! + correct_units (bool): Default False, because it takes long. Corrects + the units according to meta data saved in the raw param file. + correct_calendar (bool): Makes sure that the right calendar is used. + correct_time_axis (bool): Makes sure that each file starts at the first + of each time-unit and shifts if not. + sum_levels (bool): Makes sure that all emissions are sumed over the + different levels and sectors. For AIR files, the emissions are + summed over 25 different height levels. For ANTHRO files, the + emissions are summed over 8 different sectors. Openburning and + biomassburning have only one level (sectors are diff in external + percentage files, see 'fire_cases'). + sum_sectors (bool): Makes sure that ghg are summarized over diff sectors. + Historical data is treated differently. Different versions + are stored in the output_dir for historical data, since different + climate models need different versions here. There is a param/config file + mapping the right climate models to the right sum_sector types. + create_totals (bool): For some tasks you might not want to use + emission files, but arrays of totals over a specific time frame. + With this you can creat either map or total data; monthly or yearly. + See spat_load and temp_load for that. + spat_load (str): Can be "map" or "tot". Map is a long / lat map. + Total is an array with the totals summed over the spatial axis. + temp_load (str): Can be "mon" or "year". Decides if the loaded data + is monthly or yearly averaged. Can be extended later on. + merge_ghg (list or bool): If True: + Simply merges all ghg that can be found in the sub_dir (e.g.). + Instead one can hand over a list, which means that those ghg + specifically will be merged into a new file. + ghg_list (list): The greenhouse gases (ghg) that should be merged in + merge_ghg. Should be the folder names used in the respective dir. + meta_data (Path): Default None. Only necessary when sum_sectors is + used and anthro fire files should be generated. + fire_cases (bool): If the different fire cases should be considered + during sum_sectors. + force_sum_sectors (list): Which GHG are forced to be summed (sum_sectors) even if not all + sectors (AIR, anthro, biomassburning/openburning) is available. + Default is ["CO2"], since CO2 is the only GHG that does not have + biomassburning/openburning files as the rest does. + input_freq (str): Default "mon". Only used for create_totals_subdir. + input_res (str): Default "50 km". Only used for create_totals_subdir. + create_fire_files (bool): Default false, because it only needs to be + created one time. + """ + # init abstract class with checks + super().__init__( + input_directory=input_directory, + working_directory=working_directory, + checker=checker, + processing_steps=processing_steps, + ) + self.cdo_operators = [] + self.spat_load = spat_load + self.temp_load = temp_load + + # ghg list that should be merged + self.ghg_list = ghg_list + if not self.ghg_list: + self.ghg_list = ["BC_sum", "CH4_sum", "SO2_sum"] + self.ghg_sum_name = ghg_sum_name + self.meta_data = meta_data + self.fire_cases = fire_cases + self.force_sum_sectors = force_sum_sectors + if not self.force_sum_sectors: + self.force_sum_sectors = ["CO2"] + self.input_freq = input_freq + self.input_res = input_res + + self.available_steps = { + CORRECT_NAMES: {ORDER: 1, PROCESSOR_STEP: RenameBiomassBurningFilesStep()}, + REORDER_SSP_CO2: {ORDER: 2, PROCESSOR_STEP: ReorderSSPCO2DimensionsStep()}, + CREATE_FIRE_FILES: { + ORDER: 3, + PROCESSOR_STEP: CreateAnthroFireFilesStep( + metadata_directory=self.meta_data, cdo_operators=self.cdo_operators + ), + }, + CORRECT_CALENDAR: {ORDER: 4, PROCESSOR_STEP: self._cdo_add_correct_calendar}, + CORRECT_TIME_AXIS: {ORDER: 5, PROCESSOR_STEP: self._cdo_add_correct_time_axis}, + SUM_LEVELS: {ORDER: 6, PROCESSOR_STEP: self._cdo_add_sum_levels}, + _CDO_PROCESSING: {ORDER: 7, PROCESSOR_STEP: CdoEmissionsProcessingStep(cdo_operators=self.cdo_operators)}, + CORRECT_UNITS: {ORDER: 8, PROCESSOR_STEP: CorrectUnitsStep(desired_units=self.desired_units)}, + SUM_SECTORS: { + ORDER: 9, + PROCESSOR_STEP: SumSectors(working_dir=self.working_directory, meta_data=self.meta_data), + }, + CREATE_TOTALS: { + ORDER: 10, + PROCESSOR_STEP: CreateEmissionsTotals( + working_dir=self.working_directory, + spat=self.spat_load, + temp=self.temp_load, + input_res=self.input_res, + input_freq=self.input_freq, + ), + }, + MERGE_GHG: { + ORDER: 11, + PROCESSOR_STEP: MergeGHG( + working_dir=self.working_directory, ghg_list=self.ghg_list, ghg_sum_name=self.ghg_sum_name + ), + }, + } + + # TODO CO2 baseline - where?? + # CO2 baseline: do this outside, because it means that the data looks + # different for each climate model. best case you just store that somewhere + # in a config, store that well and apply it in the dataloader for each model separatetly? + # self.substract_co2_baseline = substract_co2_baseline + + def add_correct_names_step(self): + self.processing_steps.append(CORRECT_NAMES) + + def add_reorder_ssp_co2_dimensions_step(self): + self.processing_steps.append(REORDER_SSP_CO2) + + def add_create_fire_files(self): + self.processing_steps.append(CREATE_FIRE_FILES) + + def add_correct_units_step(self): + self.processing_steps.append(CORRECT_UNITS) + + def add_correct_calendar_step(self): + self.processing_steps.append(CORRECT_CALENDAR) + + def add_correct_time_axis_step(self): + self.processing_steps.append(CORRECT_TIME_AXIS) + + def add_sum_levels_step(self): + self.processing_steps.append(SUM_LEVELS) + + def add_sum_sectors_step(self): + self.processing_steps.append(SUM_SECTORS) + + def add_create_totals_step(self): + self.processing_steps.append(CREATE_TOTALS) + + def add_merge_ghg_step(self): + self.processing_steps.append(MERGE_GHG) + + def set_load_types(self, spat_load: str = "map", temp_load: str = "mon"): + """ + Setting types for the loading processor. Can be set to update the types, so the same processor instance can be + applied afterward. + + Args: + spat_load (str): Can be "map" or "tot". Map is a long / lat map. + Total is an array with the totals summed over the spatial axis. + temp_load (str): Can be "mon" or "year". Decides if the loaded data + is monthly or yearly averaged. Can be extended later on. + """ + self.spat_load = spat_load + self.temp_load = temp_load + + def type_class_meta(self) -> str: + """Returns the name tag of the subclass.""" + return "input4mips" + + # TODO share this in process class with input4mips / cmip6 + def file_belongs_to_type(self, input_file: Path) -> bool: + """ + Check if the name input4mip(s) appears in the path name. + + Args: + input_file (Path): the file that should be checked for input4mips + Returns: + True if name indicates it's a cmip6 file, False if not. + """ + return bool(re.search("input4mip", str(input_file), re.IGNORECASE)) + + def process_directory(self): + """ + Applying all the stuff as decided in the init function. + + Most of the functions build upon each other, i.e. you cannot apply store_totals if you haven't used merge_ghg + before. To keep things modular the user can still decide which ones to apply (e.g. because some things may have + been applied earlier / are not necessary). Just be aware that things break if you are not making sure that the + order is followed. + """ + special_processes = [ + process for process in self.processing_steps if process in [CORRECT_CALENDAR, CORRECT_TIME_AXIS, SUM_LEVELS] + ] + if special_processes: + for process in special_processes: + self.available_steps[process][PROCESSOR_STEP]() + self.processing_steps.remove(process) + self.processing_steps.append(_CDO_PROCESSING) + + process_list = [self.available_steps[step] for step in self.processing_steps] + ordered_processes = sorted(process_list, key=lambda step: step[ORDER]) + + current_processing_directory = self.input_directory + for process_dict in ordered_processes: + processing_step: AbstractProcessorStep = process_dict[PROCESSOR_STEP] + processing_step.execute(current_processing_directory) + current_processing_directory = processing_step.get_results_directory() + + return current_processing_directory + + def _cdo_add_correct_calendar(self): + self.cdo_operators.append(f"{CDO_SET_CALENDAR},{self.calendar}") + + def _cdo_add_correct_time_axis(self): + # TODO add check for mon data only + self.cdo_operators.append(CDO_SET_DAY) + self.cdo_operators.append(CDO_SET_TIME) + + def _cdo_add_sum_levels(self): + self.cdo_operators.append(CDO_VERTICAL_SUM) + + def set_meta_data(self, meta_data: Path): + """Set meta data path if not done during initialization.""" + self.meta_data = meta_data + + +class RenameBiomassBurningFilesStep(AbstractProcessorStep): + def execute(self, input_directory): + """Renames files (biomassburning) if needed.""" + output_dir = rename_biomassburning_files_in_directory(input_directory) + + LOGGER.info(f"... Finished renaming biomassburning files in {input_directory}") + self.results_directory = output_dir + + +class ReorderSSPCO2DimensionsStep(AbstractProcessorStep): + def execute(self, input_directory): + output_dir = rename_biomassburning_files_in_directory(input_directory) + LOGGER.info(f"... Finished renaming biomassburning files in {input_directory}") + self.results_directory = output_dir + + +class CreateAnthroFireFilesStep(AbstractProcessorStep): + def __init__(self, metadata_directory, cdo_operators, overwrite=False): + super().__init__() + self.metadata_directory = metadata_directory + self.cdo_operators = cdo_operators + self.overwrite = overwrite + + def execute(self, input_directory): + self.results_directory = create_anthro_fire_directory( + input_directory, + meta_data=self.metadata_directory, + cdo_operators=self.cdo_operators, + overwrite=self.overwrite, + ) + + +class CdoEmissionsProcessingStep(AbstractProcessorStep): + def __init__(self, cdo_operators, overwrite=False): + super().__init__() + self.cdo_operators = cdo_operators + self.overwrite = overwrite + + def execute(self, input_directory): + self.results_directory = cdo_process_directory( + input_directory=input_directory, cdo_operators=self.cdo_operators, overwrite=self.overwrite + ) + + +class CorrectUnitsStep(AbstractProcessorStep): + def __init__(self, desired_units): + super().__init__() + self.desired_units = desired_units + + def execute(self, input_directory): + """ + Correcting the ghg units (if possible). + + Attention, this module + is slow since it is not handled with cdo. Do only if necessary. + Args: + input_dir (Path): All files in that dir will be adapted + """ + output_dir = correct_units_all_files_in_directory(input_dir=input_directory, desired_units=self.desired_units) + self.results_directory = output_dir + + +class SumSectors(AbstractProcessorStep): + def __init__(self, working_dir, meta_data): + super().__init__() + self.meta_data = meta_data + self.working_dir = working_dir + + def execute(self, input_directory): + self.results_directory = sum_sectors_directory( + input_dir=input_directory, output_dir=self.working_dir, meta_data=self.meta_data + ) + + +class CreateEmissionsTotals(AbstractProcessorStep): + def __init__( + self, + working_dir, + spat: str = "map", + temp: str = "mon", + input_freq: str = "mon", + input_res: str = "50_km", + overwrite: bool = True, + ): + super().__init__() + self.working_dir = working_dir + self.spat = spat + self.temp = temp + self.input_freq = input_freq + self.input_res = input_res + self.overwrite = overwrite + + def execute(self, input_directory): + self.results_directory = create_emissions_totals( + input_dir=input_directory, + output_dir=self.working_dir, + spat=self.spat, + temp=self.temp, + input_freq=self.input_freq, + input_res=self.input_res, + overwrite=self.overwrite, + ) + + +class MergeGHG(AbstractProcessorStep): + def __init__(self, working_dir, ghg_list, ghg_sum_name, overwrite=False): + super().__init__() + self.working_dir = working_dir + self.ghg_list = ghg_list + self.ghg_sum_name = ghg_sum_name + self.overwrite = overwrite + + def execute(self, input_directory): + self.results_directory = merge_ghg_directory( + input_dir=input_directory, + output_dir=self.working_dir, + ghg_list=self.ghg_list, + ghg_sum_name=self.ghg_sum_name, + overwrite=self.overwrite, + ) diff --git a/climateset/processing/raw/input4mips/processing.py b/climateset/processing/raw/input4mips/processing.py new file mode 100644 index 0000000..b4975b1 --- /dev/null +++ b/climateset/processing/raw/input4mips/processing.py @@ -0,0 +1,901 @@ +import os +import re +import subprocess +from copy import copy +from pathlib import Path + +import pint +import xarray as xr +from tqdm import tqdm + +from climateset.processing.raw.utils import create_generic_output_path +from climateset.utils import create_logger + +LOGGER = create_logger(__name__) + +CDO_SET_CALENDAR = "-setcalendar" +CDO_SET_DAY = "-setday,1" +CDO_SET_TIME = "-settime,00:00:00" +CDO_VERTICAL_SUM = "-vertsum" + + +def rename_biomassburning_file(input_file: Path): + """ + Rename file for biomassburning in case it is in the naked version (from the downloader). Originally, the files are + just called 'GHG' instead of GHG_em_biomassburning. Overwrites the files directly. Attention, + + apply this only directly to the raw data!! The Input4mips preprocesser + renames variables later to the pure / naked GHG version - do not rename + them! + Args: + input_file (Path): File that should be renamed + """ + # fetch the ghg + ghg = str(input_file.parents[0]).split("/")[-4] + # check if this is the naked case + if "_" not in ghg: + new_ghg = f"{ghg}_em_biomassburning" + new_path_name = Path(str(input_file).replace(ghg, new_ghg)) + new_path_name.parent.mkdir(parents=True, exist_ok=True) + # use cdo to rename the var internally + commands = [ + "cdo", + "-s", + "-L", + f"-chname,{ghg},{ghg}_em_biomassburning", + f"-setattribute,variable_id={ghg}_em_biomassburning", + input_file, + new_path_name, + ] + subprocess.call(commands) + + # remove the old file + input_file.unlink() + + +def rename_biomassburning_files_in_directory(input_dir): + LOGGER.info(f"Starting to rename biomassburning files in {input_dir}.") + total_files = len(list(input_dir.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(input_dir), total=total_files): + if len(files) > 0: + for file in files: + rename_biomassburning_file(Path(path) / file) + # remove empty subdirs + for p in input_dir.glob("**/*"): + if p.is_dir() and len(list(p.iterdir())) == 0: + os.removedirs(p) + return input_dir + + +def reorder_ssp_co2_file_dimensions(input_file: Path): + """ + Fix single co2 file. + + CDO can be applied again afterwards. + Memory issues might arise (core dumped). Not sure why. Runs on compute canada though. + + This function is essentially a wrapper around the following CDO command: + + cdo ncpdq --ovr -a time,level,lon,lat input.nc output.nc + + + Args: + input_file (Path): The co2 file that should be fixed. + """ + # ncpdq --ovr -a time,level,lon,lat input.nc output.nc + commands = ["ncpdq", "--ovr", "-a", "time,level,lon,lat", input_file, input_file] + subprocess.call(commands) + + +def reorder_all_ssp_co2_files_dimensions(input_dir: Path): + LOGGER.info(f"Starting to fix co2 files in {input_dir}.") + total_files = len(list(input_dir.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(input_dir), total=total_files): + if len(files) > 0: + for file in files: + if ("ssp" in file) and ("CO2" in file): + file_path = Path(path) / file + reorder_ssp_co2_file_dimensions(file_path) + + LOGGER.info("...Finished fixing CO2 files.") + return input_dir + + +def create_anthro_fire_file( + input_file: Path, metadata_directory: Path, filetype: str, cdo_operators, output_file: Path, threads: int = 1 +): + """ + Create a new openburning or biomassburning file from a given file. + + Returns the path where the new file lifes. + Args: + input_file (Path): + type (str): either 'biomassburning' or 'openburning' + output_file (Path): Where to store the anthro fire file + threads (int): threads for cdo + """ + if metadata_directory is None: + LOGGER.info("You need to provide meta data for this function. Use set_meta_data func for that.") + file_parts = str(input_file.stem).split("_") + scenario, ghg, year = file_parts[1], file_parts[2], file_parts[-1] + meta_dir = metadata_directory / filetype + + # prepare additional operators (calendar and time-axis) for later compatability + chained_operators = copy(cdo_operators) + + if filetype == "openburning": + # find a file that contains openburning, scenario and ghg in the meta_data / "future-openburning" + meta_dir = metadata_directory / "future-openburning" + matched_files = [ + f for f in meta_dir.rglob("*.nc") if (scenario in str(f)) and (filetype in str(f)) and (ghg in str(f)) + ] + # take first file that matches that + raw_percentage_file = matched_files[0] + tmp_perc_file = input_file.parents[0] / f"tmp_percentage_{year}.nc" + + # 1. drop the years you don't need from the meta file. -selyear,2015 + # 2. drop the levels you dont need - sellevel,0,1 + # 3. sum the remaining levels / sectors -vertsum + # -> cdo -vertsum -sellevel,0,1 -selyear,year meta_input.nc 2015_share_output.nc + commands = [ + "cdo", + "-w", + "-s", + "-L", + "-P", + str(threads), + CDO_VERTICAL_SUM, + "-sellevel,0,1", # 0 is agriculture, 1 is deforestation + f"-selyear,{year}", + raw_percentage_file, + tmp_perc_file, + ] + subprocess.call(commands) + + # multiply -> cdo mul fire_file.nc percentage_file new_anthro_fire_file.nc + # [IMPORTANT first file must be input-file -> gets meta data from first input file!] + commands = [ + "cdo", + "-w", + "-s", + "-L", + "-P", + str(threads), + *chained_operators, + "-mul", + input_file, + tmp_perc_file, + output_file, + ] + subprocess.call(commands) + + # remove 2015_share_output file to save space (unlink) + tmp_perc_file.unlink() + + elif filetype == "biomassburning": + meta_dir = metadata_directory / "historic-biomassburning" + matched_files = [ + f for f in meta_dir.rglob("*.nc") if (filetype in str(f)) and (ghg in str(f)) and (year in str(f)) + ] + + # find the agri / defo files (written this way for later extension) + perc_dict = { + "AGRI": None, + "DEFO": None, + } + for f in matched_files: + for key in perc_dict: + if key in str(f): + perc_dict[key] = f + if (perc_dict["AGRI"] is None) or (perc_dict["DEFO"] is None): + LOGGER.warning("The AGRI or DEFO file does not exist, see dictionary: \n", stacklevel=2) + LOGGER.info(perc_dict) + LOGGER.info("... skipping this file.") + return + + # cdo 'expr,percentage_AGRI=percentage_AGRI/100' add percentage_AGRI_year.nc + # percentage_DEFO_year.nc percentage_ANTHRO_year.nc + tmp_anthro_perc_file = input_file.parents[0] / f"tmp_percentage_anthro_{year}.nc" + commands = [ + "cdo", + "-w", + "-s", + "-L", + "-P", + str(threads), + "expr,percentage_AGRI=percentage_AGRI/100;", + "-add", + perc_dict["AGRI"], + perc_dict["DEFO"], + tmp_anthro_perc_file, + ] + subprocess.call(commands) + + # cdo mul input_file.nc percentage_ANTHRO_year.nc output.nc + commands = [ + "cdo", + "-w", + "-s", + "-L", + "-P", + str(threads), + *chained_operators, + "-mul", + input_file, + tmp_anthro_perc_file, + output_file, + ] + subprocess.call(commands) + + # delete temp files + tmp_anthro_perc_file.unlink() + + # BIOMASSBURNING + # TODO get the right biomassburning files: year, GHG, biomassburning and AGRI + DEFO + # 1. cdo 'expr,percentage_AGRI=percentage_AGRI/100' -selyear,year meta_input.nc percentage_AGRI_year.nc + # 2. cdo 'expr,percentage_DEFO=percentage_DEFO/100' -selyear,year meta_input.nc percentage_DEFO_year.nc + # 3. cdo add percentage_AGRI_year.nc percentage_DEFO_year.nc percentage_ANTHRO_year.nc + # 4. cdo mul input_file.nc percentage_ANTHRO_year.nc output.nc + # 5 remove old files: all the percentage files + else: + raise ValueError("Type must be either 'openburing' or 'biomassburning'.") + + +# TODO fix output directory +def create_anthro_fire_directory( + input_directory: Path, meta_data: Path, cdo_operators, overwrite: bool = False +) -> Path: + """ + Creating anthropogenic fire files for a subdir. + + Args: + input_directory (Path): Input dir + """ + # loop through input dir, get biomassburning and openburning files + LOGGER.info(f"Starting to create anthropogenic fire files for files in {input_directory}.") + fire_output_dir = meta_data / "anthro-fire-data" + total_files = len(list(input_directory.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(input_directory), total=total_files): + if len(files) > 0: + for file in files: + input_file = Path(path) / file + # must match the right nominal and temporal resolution + if ("biomassburning" in file) and ("25_km" in file) and ("mon" in file): + filetype = "biomassburning" + elif ("openburning" in file) and ("50_km" in file) and ("mon" in file): + filetype = "openburning" + else: + continue + scenario = file.split("_")[1] + ghg = file.split("_")[2] + fire_name = file.replace("_em_", "_anthro_") + output_file = Path(fire_output_dir / filetype / scenario / ghg / fire_name) + output_file.parent.mkdir(parents=True, exist_ok=True) + if (not output_file.is_file()) or overwrite: + create_anthro_fire_file( + input_file=input_file, + metadata_directory=meta_data, + filetype=filetype, + cdo_operators=cdo_operators, + output_file=output_file, + ) + + LOGGER.info(f"... Finished creating anthro fire files and stored them in {fire_output_dir}") + return input_directory + + +def cdo_process_file(input_file: Path, output_file: Path, threads: int = 1, cdo_operators=None): + """Applies all the emission processing tasks defined during init on + a given netcdf file - but only those ones that can operate on a single + file. + Args: + input_file (Path): The file that should be processed. + output_file (Path): Where the resulting file is stored. + threads (int): threads for cdo + """ + # RENAME variables in case of biomassburning + # needs to be done beforehand becauses messes too much stuff up + # is done immediately in place + if "em_biomassburning" in str(input_file): + ds = xr.load_dataset(input_file) + var = ds.variable_id + old_name = str(input_file) + renamed_file = Path(old_name.replace(".nc", "_renamed.nc")) + if "em_biomassburning" not in var: + commands = [ + "cdo", + "-s", + "-L", + f"-chname,{var},{var}_em_biomassburning", + f"-setattribute,variable_id={var}_em_biomassburning", + input_file, + renamed_file, + ] + subprocess.call(commands) + # rename files & reassign the input_file + input_file.unlink() + input_file = renamed_file.rename(old_name) + + chained_operators = copy(cdo_operators) + # cdo vertsum infile outfile [CHECKED] + # AIR: sum over height levels + # anthro: sum over different sectors + + # ATTENTION only relevant for "monthly" data! + # cdo -setday,1 -settime,00:00:00 + # TODO add check for mon data only + if "mon" not in "": + chained_operators.remove(CDO_SET_DAY) + chained_operators.remove(CDO_SET_TIME) + LOGGER.warning("Tried to set the time axis on non-monthly file; removing related cdo operators for file.") + + # run commands + commands = ["cdo", "-s", "-w", "-L", "-P", str(threads), *chained_operators, input_file, output_file] + subprocess.call(commands) + + # unit_key_list = [ + # "units", "unit", + # "variable_units", "variable_unit", + # "var_units", "var_unit" + # ] + + # for k in unit_key_list: + # try: + # # TODO try to get the units + # pass + # # if successful: break loop + # except AttributeError: + # # TODO continue + # continue + # if unit is None: + # raise KeyError("Unit could not be found in file {}".format(input_file)) + # else: + # # TODO check if the unit is the one saved in the jason file + # calendar = self.meta_raw_dict["calendar"][var] + # pass + # simply raises an error in case we encounter units that we dont expect + + +# TODO fix output directory +def cdo_process_directory(input_directory, cdo_operators, output_directory=None, overwrite=False): + """ + Apply desired emission processing steps to a subdir and store it in output_dir. + + Can only be applied on raw input4mpis data, not on those + that have already been processed (i.e. merged / summed up). + Args: + input_directory (Path): Where the preprocessing should be applied + output_dir (Path): Where the results should be stored + overwrite (bool): If the data should be overwritten if it already exists + """ + # TODO if a year / file is missing -> write a warning! + # TODO LATER add feature to check lon / lat consistency for input4mips + # 1. apply the "preprocessing" - all the functions that can + # be applied to a single file + LOGGER.info(f"Start preprocessing of emission files {input_directory}....") + total_files = len(list(input_directory.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(input_directory), total=total_files): + if len(files) > 0: + for file in files: + # create output dir + input_file = Path(path) / file + # TODO fix filetype check + # if self.file_belongs_to_type(input_file): + output_file = create_generic_output_path(output_directory, path, file) + # skip if file already exists and we dont wanna overwrite it + if (not output_file.is_file()) or overwrite: + LOGGER.info(f"\nProcessing the following file: {input_file}") + cdo_process_file(input_file, output_file, cdo_operators=cdo_operators) + + LOGGER.info(f"...Finished preprocessing \nof {input_directory} and saved it at {output_directory}.") + return output_directory + + +def merge_ghg_files(input_files: list, output_file: Path, threads: int = 1): + """ + Merging GHG emission files together. + + Can only be applied after + 'create_totals_subdir'! + Args: + input_files (list): List of files (Path) that should be merged. + output_file (Path): Where the merged data should be stored + threads (int): How many threads should be used. + """ + if output_file.is_file(): + output_file.unlink() + + commands = ["cdo", "-w", "-s", "-L", "-P", str(threads), "-merge", *input_files, output_file] + + subprocess.call(commands) + + +def create_totals(input_files: list, output_path: Path, spat: str, temp: str, threads: int = 1): + """ + Creating totals for a given list of files for different spatial and temporal settings. + + Args: + input_files (list): List of paths that are used to create totals + output_path (Path): Where the merged totals should be stored. + spat (str): Can be "map" or "total". Map stores ghg emission maps + (longitude / latitude). Total sums the values over the spatial axes. + temp (str): Can be "mon" or "year". In case of year the data is summed + over years. Assumes that monthly data is used! + threads (int): number of threads + """ + # remove the file if it already exists + if output_path.is_file(): + output_path.unlink() + + # first basic command part + commands = [ + "cdo", + "-w", + "-s", + "-L", + "-P", + str(threads), + ] + + # add depending on cases + if spat == "total": + # commands.append("-fldsum") + # ATTTENTION this assumes that sum sectors has been applied before!! + ghg = str(input_files[0]).split("/")[-1].split("_")[2] + commands.append(f"expr,sum=fldsum({ghg});min=fldmin({ghg});max=fldmax({ghg});mean=fldmean({ghg});") + + if temp == "year": + commands.append("-yearsum") + + # add relevant commands for all cases + commands.extend(["-mergetime", *input_files, output_path]) + + subprocess.call(commands) + + +def merge_ghg_directory( + input_dir: Path, output_dir: Path, ghg_list: list, ghg_sum_name: str = "ALL", overwrite: bool = False +): + """ + Merging GHG emission files together. + + Can only be applied after + 'create_totals_subdir'! + Args: + input_dir (Path): Data dir that should be processed + output_dir (Path): Where the merged data should be stored + ghg_sum_name (str): How the dir and files where the merged data is + stored should be tagged. + overwrite (bool): If files should be overwritten when they already exist + """ + LOGGER.info(f"Start creating merged GHG emission files for {ghg_list} in {input_dir}...") + # create a dict with the ghg files that should be merged + file_dict = {} + # total_files = len(list(input_dir.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(input_dir)): + if len(files) > 0: + for file in files: + full_path = str(Path(path) / file) + # do this only for the ghg listed above: + for ghg in ghg_list: + if ghg in full_path: + ghg_file_str = full_path.replace(ghg, ghg_sum_name) + # add ghg group as key if they don't exist yet + if ghg_file_str not in file_dict: + file_dict[ghg_file_str] = [ghg] + else: + file_dict[ghg_file_str].append(ghg) + + # create the needed dirs + for file, ghgs in file_dict.items(): + # create output path + second_path_part = file.split("input4mips/")[-1] + out_path = Path(output_dir / "input4mips" / second_path_part) + out_path.parent.mkdir(parents=True, exist_ok=True) + + # create input files + input_files = [file.replace(ghg_sum_name, ghg) for ghg in ghgs] + + if (not out_path.is_file()) or overwrite: + merge_ghg_files( + input_files, + out_path, + ) + + LOGGER.info(f"... Finished merging GHG {ghg_list} in {output_dir}.") + return output_dir + + +def sum_sectors(input_files: list, sectors: list, output_path: Path, overwrite: bool = True, threads: str = 1): + """ + Args: + input_files (list): List of paths. The paths contain different + sectors and the information is summed up and stored in the + desired output_file. + sectors (list): List of strings. The different files that are stored + in input_files. The variable names of input_files. + output_path (Path): File where the resulting emissions are stored. + The output_path is adapted in case different fire cases are created. + overwrite (bool): True, i.e. if the file in output_path already + exists, it is overwritten. Set to False if you don't wanna overwrite + these files. + threads (str): Default 1. + + """ + # exit this if the file already exists and we dont wanna overwrite them + if output_path.is_file() and (not overwrite): + return + + ghg = sectors[0].split("_")[0] + # math sum expression + sum_expr = f"{ghg}={sectors[0]}" + for sec in sectors[1:]: + sum_expr = f"{sum_expr}+{sec}" + + # cdo expr,'var_new=var1+var2;' -merge infile1 infile2 outfile # works only on cmd + # cdo 'expr,var_new=var1+var2;' -merge infile1 infile2 outfile # works for both + commands = [ + "cdo", + "-w", + "-s", + "-L", + "-P", + str(threads), + f"expr,{sum_expr};", + "-merge", + *input_files, + output_path, + ] + subprocess.call(commands) + + +def correct_units_in_file(input_file: Path, desired_units: dict): + """ + Corrects units of a single file. + + Be aware this is slow if applied + to a large directory. Overwrites the old file + Args: + input_file (Path): The file whose units should be adapted. + """ + ureg = pint.UnitRegistry() + ds = xr.load_dataset(input_file) + + # only use the data variables (ignore bnds vars) + for var in ds.data_vars: + if "bnds" not in str(var): + # check the unit + found_unit = ds[var].units.replace("-", "^-") + desired_unit = desired_units[var].replace("-", "^-") + + # exit if it's the right one + if found_unit == desired_unit: + return + + try: + ureg(found_unit) + ureg(desired_unit) + except pint.errors.UndefinedUnitError: + LOGGER.info( + "Unit is not defined for pint. Check here: " + "https://github.com/hgrecco/pint/blob/master/pint/default_en.txt" + ) + return + # update ds + # this function works for any kind of unit transformations (if they can be transformed) + ds.update({var: xr.apply_ufunc(lambda x: ureg.Quantity(x, found_unit).to(desired_unit).magnitude, ds[var])}) + # update attrs + ds[var].attrs["units"] = desired_unit + # overwrite old file + ds.to_netcdf(input_file, mode="w", format="NETCDF4", engine="netcdf4") + + +def correct_units_all_files_in_directory(input_dir: Path, desired_units: dict): + LOGGER.info(f"Starting to correct all units (if needed) in {input_dir}.") + total_files = len(list(input_dir.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(input_dir), total=total_files): + if len(files) > 0: + for file in files: + correct_units_in_file(Path(path) / file, desired_units) + + LOGGER.info(f"... Finished correcting the units (if needed) in {input_dir}") + return input_dir + + +def sum_sectors_fires(input_files: list, sectors: list, output_path: Path, meta_data: Path, overwrite: bool = True): + """ + Args: + input_files (list): List of paths. The paths contain different + sectors and the information is summed up and stored in the + desired output_file. + sectors (list): List of strings. The different files that are stored + in input_files. The variable names of input_files. + output_path (Path): File where the resulting emissions are stored. + The output_path is adapted in case different fire cases are created. + overwrite (bool): Default is True, i.e. data will be overwritten if it already exists + + """ + # check where (and if) biomassburning or openburning are contained + fire_idx = None + for i, var in enumerate(sectors): + if ("openburning" in var) or ("biomassburning" in var): + fire_idx = i + if fire_idx is None: + raise ValueError( + "Fire cases can only be created if openburning or biomassburning files are given. " + "(Must be contained in the file name as well)." + ) + + # Get ghg and fire type + ghg = sectors[0].split("_")[0] + # fire_type = sectors[fire_idx].split("_")[-1] + + # CASE 1: ALL FIRES + all_fire_output_path = Path(str(output_path).replace(f"_{ghg}_", f"_{ghg}_all-fires_")) + all_fire_files = input_files + all_fire_sectors = sectors + sum_sectors(all_fire_files, all_fire_sectors, all_fire_output_path, overwrite=overwrite) + + # CASE 2: ANTHRO FIRES + anthro_fire_output_path = Path(str(output_path).replace(f"_{ghg}_", f"_{ghg}_anthro-fires_")) + anthro_fire_sectors = sectors + # get the right anthro fire file from meta + # 1. get the current fire file + all_fire_file = input_files[fire_idx] + anthro_file_name = str(all_fire_file.name).replace("_em_", "_anthro_") + potential_anthro_files = [f for f in meta_data.rglob(anthro_file_name)] + if len(potential_anthro_files) < 1: + LOGGER.info(f"No anthro fire file found for {all_fire_file.name}.") + pass + else: + if len(potential_anthro_files) > 1: + LOGGER.info(f"Several fire files found for {all_fire_file.name}. Choosing only the first one.") + anthro_fire_files = input_files.copy() + anthro_fire_files[fire_idx] = potential_anthro_files[0] + sum_sectors(anthro_fire_files, anthro_fire_sectors, anthro_fire_output_path, overwrite=overwrite) + + # CASE 3: NO FIRES + # drop the openburning or biomassburning sectors and files + no_fire_output_path = Path(str(output_path).replace(f"_{ghg}_", f"_{ghg}_no-fires_")) + no_fire_files = input_files.copy() + no_fire_sectors = sectors.copy() + no_fire_files.pop(fire_idx) + no_fire_sectors.pop(fire_idx) + sum_sectors(no_fire_files, no_fire_sectors, no_fire_output_path, overwrite=overwrite) + + +def sum_sector_cases( + file_dict: dict, + output_dir: Path, + meta_data: Path, + fire_cases: bool, + force_sum_sectors: list = None, + overwrite: bool = False, + silent: bool = False, +): + """ + Calls sum sector for different cases. + + Args: + file_dict (dict): "example_path_SECTOR_sth": [file_path, file_path, file_path] + output_dir (Path): output directory, where to save the diff cases. + fire_cases (bool): if the diff fire cases should be considered + force_sum_sectors (str): Default is empty, i.e. no merge is forced. + Put the name of GHG here that should be forced to merge. + overwrite (bool): if the files should be overwritten if they already exist + silent (bool): If this should be processed silently. + """ + if not force_sum_sectors: + force_sum_sectors = [] + + for file, sectors in tqdm(file_dict.items()): + num_nones = sum([sec is None for sec in sectors]) + + # create file lists and names for the case where everything is added + ghg = file.split("input4mips/")[-1].split("/")[1].split("_")[0] + input_files = [Path(file.replace("SECTOR", sec)) for sec in sectors if sec is not None] + sector_list = [f"{ghg}_{sec}" for sec in sectors if sec is not None] + out_path = Path(output_dir / "input4mips" / file.split("input4mips/")[-1].replace("SECTOR", "sum")) + out_path.parent.mkdir(parents=True, exist_ok=True) + + if num_nones == 0: + # normal case + if fire_cases: + sum_sectors_fires(input_files, sector_list, out_path, meta_data=meta_data, overwrite=overwrite) + else: + sum_sectors(input_files, sector_list, out_path, overwrite=overwrite) + + elif (num_nones == 1) and (ghg in force_sum_sectors): + # just the simple summing if desired + warning + if not silent: + LOGGER.warning(f"Merged files for {file}, however some sectors are missing", stacklevel=2) + sum_sectors(input_files, sector_list, out_path, overwrite=overwrite) + + elif (num_nones == 1) and ghg not in force_sum_sectors: + if not silent: + LOGGER.warning( + f"Skipping case {file}. Not all three sectors are available. " + f"You can force merging with 'force_sum_sectors=[GHG1, GHG2]'." + ) + + elif num_nones == 2: + # warning, skipping + if not silent: + LOGGER.warning( + f"Skipping case {file}. At least two files must exist to be summed together.", + stacklevel=2, + ) + else: + raise RuntimeError("'num_nones' must be between 0 and 3.") + + +# TODO fix output dir and overwrite +# TODO break this into two functions +def sum_sectors_directory( + input_dir: Path, + output_dir: Path, + meta_data: Path, + force_sum_sectors: list = None, + fire_cases: bool = False, + overwrite: bool = True, + silent: bool = False, + input_res: str = "50_km", + input_freq: str = "mon", +): + """ + Args: + force_sum_sectors (str): Default is empty, i.e. no merge is forced. + Put the name of GHG here that should be forced to merge. + fire_cases (bool): Default False means that simply all the data is + summed up and that's it. In case this is set to true, three different + things are calculated: anthro-fires, no-fires, all-fires. Can + only be done if "em_openburning" or "em_biomassburning" files + are included. + overwrite (bool): if the files should be overwritten if they already exist + silent (bool): If this should be processed silently. + """ + LOGGER.info(f"Start summing up different types of emission files in {input_dir}...") + if not force_sum_sectors: + force_sum_sectors = [] + # params of the files to sum over + # TODO make these params in a config + # TODO find out which files are skipped + # res = "50_km" + # freq = "mon" + # years = [] + # dicts needed to match files together + # "example_path_SECTOR_sth": [None, None, None] + ssp_file_dict = {} + historical_file_dict = {} + sector_idx = {"em_AIR_anthro": 0, "em_anthro": 1, "em_openburning": 2, "em_biomassburning": 2} + + # match all files that have the exact same name and end on + total_files = len(list(input_dir.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(input_dir), total=total_files): + if len(files) > 0: + for file in files: + full_path = str(Path(path) / file) + # only process the files with the right res and frequency + if (input_res in full_path) and (input_freq in full_path): + # sector descriptions + # check if this is historical or ssp and match + if "ssp" in file: + sector = re.search(r"(em_AIR_anthro|em_anthro|em_openburning)", full_path).group(0) + sector_file_str = full_path.replace(sector, "SECTOR") + # add ghg group as key if they don't exist yet + if sector_file_str not in ssp_file_dict: + ssp_file_dict[sector_file_str] = [None, None, None] + # add the found sector at the right sector index + # ssp_file_dict[sector_file_str][sector_idx[sector]] = full_path + ssp_file_dict[sector_file_str][sector_idx[sector]] = sector + + elif "historical" in file: + sector = re.search(r"(em_AIR_anthro|em_anthro|em_biomassburning)", full_path).group(0) + sector_file_str = full_path.replace(sector, "SECTOR") + # add ghg group as key if they don't exist yet + if sector_file_str not in historical_file_dict: + historical_file_dict[sector_file_str] = [None, None, None] + # add the sector at the right sector index + # history_file_dict[sector_file_str][sector_idx[sector]] = full_path + historical_file_dict[sector_file_str][sector_idx[sector]] = sector + + else: + raise ValueError( + "Input4mips files must include the scenario ('ssp' or 'historical') in their name." + ) + + sum_sector_cases( + file_dict=ssp_file_dict, + output_dir=output_dir, + fire_cases=fire_cases, + meta_data=meta_data, + force_sum_sectors=force_sum_sectors, + overwrite=False, + silent=silent, + ) + + sum_sector_cases( + file_dict=historical_file_dict, + output_dir=output_dir, + fire_cases=fire_cases, + meta_data=meta_data, + force_sum_sectors=force_sum_sectors, + overwrite=False, + silent=silent, + ) + + LOGGER.info(f"...Finished summing up ghg emissions and stored results in {output_dir}") + return output_dir + + +def create_emissions_totals( + input_dir: Path, + output_dir: Path, + spat: str = "map", + temp: str = "mon", + input_freq: str = "mon", + input_res: str = "50_km", + overwrite: bool = True, +): + """ + Creating the data that can actually be loaded by pytorch. + + Args: + input_dir (Path): Data dir that should be processed + output_dir (Path): Where the merged data should be stored + spat (str): Can be "map" or "total". Map stores ghg emission maps + (longitude / latitude). Total sums the values over the spatial axes. + temp (str): Can be "mon" or "year". In case of year the data is summed + over years. Assumes that monthly data is used! + input_freq (str): The frequency of the input data (can only process) + one type after another, so this must be fixed. Default "mon". + input_res (str): The nominal resolution of the input data (can only process) + one type after another, so this must be fixed. Default "50_km". + overwrite (bool): If files should be overwritten when they already exist + """ + LOGGER.info(f"Start creating loadable emission data {input_dir}....") + if spat not in ["map", "total"]: + raise ValueError("Parameter spat must be either 'map' or 'total'.") + if temp not in ["mon", "year"]: + raise ValueError("Parameter temp must be either 'mon' or 'year'.") + + # Match all the files that belong together and contain mon (only difference is the year ending!) + # TODO move this function into an external one + file_dict = {} + # total_files = len(list(input_dir.rglob("*.nc"))) + for path, subdirs, files in tqdm(os.walk(input_dir)): + if len(files) > 0: + for file in files: + full_path = str(Path(path) / file) + if (input_res in full_path) and (input_freq in full_path): + year = int(file.split(".")[0].split("_")[-1]) + year_file_str = full_path.replace(str(year), "YEAR") + # add ghg group as key if they don't exist yet + if year_file_str not in file_dict: + file_dict[year_file_str] = [year] + else: + file_dict[year_file_str].append(year) + + # create the needed dirs + for file, years in file_dict.items(): + # create output path + second_path_part = file.split("input4mips/")[-1] + second_path_part = second_path_part.replace(input_res, f"{spat}_{input_res}") + second_path_part = second_path_part.replace(input_freq, f"{temp}") + second_path_part = second_path_part.replace("YEAR", f"{min(years)}-{max(years)}") + out_path = Path(output_dir / "input4mips" / second_path_part) + out_path.parent.mkdir(parents=True, exist_ok=True) + + # create input files + input_files = [file.replace("YEAR", str(y)) for y in years] + + if (not out_path.is_file()) or overwrite: + create_totals( + input_files, + out_path, + spat, + temp, + ) + + LOGGER.info(f"... Finished creating loadable emission data {output_dir}....") + return output_dir diff --git a/climateset/processing/raw/utils.py b/climateset/processing/raw/utils.py new file mode 100644 index 0000000..0ad1047 --- /dev/null +++ b/climateset/processing/raw/utils.py @@ -0,0 +1,22 @@ +from pathlib import Path + +from climateset.utils import create_logger + +LOGGER = create_logger(__name__) + + +def create_generic_output_path(output_dir: Path, path: str, file: str) -> Path: + """Creates an output path and the necessary parent directories. + 'Generic' in the sense that file names are not adapted as in other + create_output_path functions used in the resolution processers. + Args: + output_dir (Path): First part of the path for the output dir + path (str): Can e.g. stem from os.walk - path of the current file. + file (str): Can e.g. stem from os.walk - current file. + Returns: + Path: New path pointing where the output file can be stored. + """ + topic_dir = file.split("_")[0] + out_path = Path(output_dir / topic_dir / Path(path.split(f"{topic_dir}/")[-1]) / file) + out_path.parent.mkdir(parents=True, exist_ok=True) + return out_path diff --git a/climateset/processing/resolution/__init__.py b/climateset/processing/resolution/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/climateset/utils.py b/climateset/utils.py index d93464b..a325ce9 100644 --- a/climateset/utils.py +++ b/climateset/utils.py @@ -1,3 +1,4 @@ +import json import logging import pathlib import sys @@ -85,7 +86,7 @@ def get_yaml_config(yaml_config_file: Union[str, pathlib.Path], logger: logging. if isinstance(yaml_config_file, str): yaml_config_file = pathlib.Path(yaml_config_file) potential_paths = [ - pathlib.Path(yaml_config_file), + yaml_config_file, CONFIGS / yaml_config_file, CONFIGS / f"{yaml_config_file}.yaml", CONFIGS / f"{yaml_config_file}.yml", @@ -110,3 +111,46 @@ def get_yaml_config(yaml_config_file: Union[str, pathlib.Path], logger: logging. except yaml.YAMLError as e: logger.warning(f"Error loading YAML file [{config_filepath}]: {e}") return {} + + +def get_json_config(json_config_file: Union[str, pathlib.Path], logger=LOGGER) -> dict: + """ + This function takes in the path, or name of the file if it can be found in the config/ folder, with of without the + extension, and returns the values of the file in a dictionary format. + + Ex. For a file named app_config.json, directly in the config/ folder, + the function could be called like so : `params = get_json_config('app_config')` + + Args: + json_config_file: Path to JSON config file. If config file is in the config folder, + logger: Logger to handle messaging, by default LOGGER + + Returns: + Dictionary of JSON configuration values + """ + if isinstance(json_config_file, str): + json_config_file = pathlib.Path(json_config_file) + potential_paths = [ + json_config_file, + CONFIGS / json_config_file, + CONFIGS / f"{json_config_file}.json", + ] + + config_filepath = None + for path in potential_paths: + if path.exists(): + config_filepath = path + logger.info(f"JSON config file [{str(path)}] found.") + break + + if not config_filepath: + logger.error(f"JSON config file [{json_config_file}] not found.") + return {} + + try: + with config_filepath.open("r", encoding="UTF-8") as file: + logger.info(f"Loading JSON config file [{config_filepath}].") + return json.load(file) + except json.JSONDecodeError as e: + logger.warning(f"Error loading JSON file [{config_filepath}]: {e}") + return {} diff --git a/configs/processing/raw_processing_params.json b/configs/processing/raw_processing_params.json new file mode 100644 index 0000000..a119551 --- /dev/null +++ b/configs/processing/raw_processing_params.json @@ -0,0 +1,52 @@ +{ + "calendar": "365_day", + "units": { + "tas": "K", + "pr": "kg m-2 s-1", + "SO2_em_AIR_anthro": "kg m-2 s-1", + "SO2_em_anthro": "kg m-2 s-1", + "SO2_em_openburning": "kg m-2 s-1", + "SO2_em_biomassburning": "kg m-2 s-1", + "SO2_anthro_biomassburning": "kg m-2 s-1", + "CO2_em_AIR_anthro": "kg m-2 s-1", + "CO2_em_anthro": "kg m-2 s-1", + "CO2_em_openburning": "kg m-2 s-1", + "CO2_em_biomassburning": "kg m-2 s-1", + "CO2_anthro_biomassburning": "kg m-2 s-1", + "CH4_em_AIR_anthro": "kg m-2 s-1", + "CH4_em_anthro": "kg m-2 s-1", + "CH4_em_openburning": "kg m-2 s-1", + "CH4_em_biomassburning": "kg m-2 s-1", + "CH4_anthro_biomassburning": "kg m-2 s-1", + "BC_em_AIR_anthro": "kg m-2 s-1", + "BC_em_anthro": "kg m-2 s-1", + "BC_em_openburning": "kg m-2 s-1", + "BC_em_biomassburning": "kg m-2 s-1", + "BC_anthro_biomassburning": "kg m-2 s-1" + }, + "50_km": { + "input4mips_lon": { + "min": -179.75, + "max": 179.75, + "step": 0.5 + }, + "input4mips_lat": { + "min": -89.75, + "max": 89.75, + "step": 0.5 + } + }, + "250_km": { + "cmip6_lon": { + "min": 0, + "max": 357.5, + "step": 2.5 + }, + "cmip6_lat": { + "min": -90, + "max": 90, + "step": 1.8947368421052602 + } + } + +} diff --git a/noxfile.py b/noxfile.py index 42460cb..888c791 100644 --- a/noxfile.py +++ b/noxfile.py @@ -55,9 +55,9 @@ def docformatter(session): @nox.session() def check(session): paths = get_paths(session) + session.run("poetry", "run", "flynt", *paths["all"], external=True) session.run("poetry", "run", "black", "--check", *paths["all"], external=True) session.run("poetry", "run", "isort", *paths["all"], "--check", external=True) - session.run("poetry", "run", "flynt", *paths["all"], external=True) session.run( "poetry", "run", @@ -74,9 +74,9 @@ def check(session): @nox.session() def fix(session): paths = get_paths(session) + session.run("poetry", "run", "flynt", *paths["all"], external=True) session.run("poetry", "run", "black", *paths["all"], external=True) session.run("poetry", "run", "isort", *paths["all"], external=True) - session.run("poetry", "run", "flynt", *paths["all"], external=True) session.run( "poetry", "run", diff --git a/scripts/example_input4mips_processing.py b/scripts/example_input4mips_processing.py new file mode 100644 index 0000000..7562450 --- /dev/null +++ b/scripts/example_input4mips_processing.py @@ -0,0 +1,50 @@ +from pathlib import Path + +from climateset.processing.processor_step import process_steps +from climateset.processing.raw.input4mips.input4mips_processor import ( + CorrectUnitsStep, + CreateAnthroFireFilesStep, + Input4MipsEmissionProcessor, + RenameBiomassBurningFilesStep, + ReorderSSPCO2DimensionsStep, +) + +# +# Simple example with Input4MipsEmissionProcessor +# + +# Stand in for arguments +processor_arguments = {"input_directory": ""} + +# Create processor for a specific directory +processor = Input4MipsEmissionProcessor(**processor_arguments) + +# Add processing steps - These will be ordered by Input4MipsEmissionProcessor +processor.add_correct_calendar_step() +processor.add_reorder_ssp_co2_dimensions_step() +processor.add_correct_names_step() +processor.add_correct_units_step() +processor.add_create_fire_files() + +# Execute steps +processor.process_directory() + +# +# More manual example with manual setting of steps +# + +# Stand ins for arguments +input_directory = Path("SOME_PATH_HERE/") +anthrofire_arguments = {"metadata_directory": "", "cdo_operators": []} +correct_units_arguments = {"desired_units": ""} + +# Create steps +step_1 = RenameBiomassBurningFilesStep() +step_2 = ReorderSSPCO2DimensionsStep() +step_3 = CorrectUnitsStep(**correct_units_arguments) +step_4 = CreateAnthroFireFilesStep(**anthrofire_arguments) + +process_list = [step_1, step_2, step_3, step_4] + +# Execute steps +output_path = process_steps(input_directory=input_directory, list_of_steps=process_list) diff --git a/tests/test_processing/__init__.py b/tests/test_processing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_processing/test_raw_processing.py b/tests/test_processing/test_raw_processing.py new file mode 100644 index 0000000..3f58f85 --- /dev/null +++ b/tests/test_processing/test_raw_processing.py @@ -0,0 +1,61 @@ +from unittest.mock import patch + +import pytest +from xarray import open_dataset + +from climateset import DATA_DIR +from climateset.processing.raw.checker import BasicDirectoryChecker +from climateset.processing.raw.input4mips.input4mips_processing import ( + AVAILABLE_INPUT4MIPS_PROCESSING_STEPS, + Input4MipsEmissionProcesser, +) + + +@pytest.fixture() +def mock_xarray_load_dataset(): + with patch("xarray.load_dataset", new=open_dataset) as mock_function: + yield mock_function + + +@pytest.fixture +def simple_emission_processing_object(): + data_dir = DATA_DIR / "raw/raw_input_vars/PNNL-JGCRI/CO2_em_anthro" + return Input4MipsEmissionProcesser(input_directory=data_dir) + + +def test_emission_processing_init(simple_emission_processing_object): + assert simple_emission_processing_object.input_directory == DATA_DIR / "raw/raw_input_vars/PNNL-JGCRI/CO2_em_anthro" + assert isinstance(simple_emission_processing_object.checker, BasicDirectoryChecker) + + +def test_emission_processing_basic_directory_check(simple_emission_processing_object, mock_xarray_load_dataset): + check_results = simple_emission_processing_object.checker.check_directory() + assert check_results + + +def test_emission_processing_check_available_steps(simple_emission_processing_object): + steps = simple_emission_processing_object.list_available_steps() + is_step_available = [step in AVAILABLE_INPUT4MIPS_PROCESSING_STEPS for step in steps] + assert all(is_step_available) + + +def test_emission_processing_add_processing_steps(simple_emission_processing_object): + step_list = list(AVAILABLE_INPUT4MIPS_PROCESSING_STEPS) + simple_emission_processing_object.add_processing_step(step_list) + is_step_added = [step in step_list for step in simple_emission_processing_object.processing_steps] + assert all(is_step_added) + + +def test_emission_processing_add_individual_steps(simple_emission_processing_object): + simple_emission_processing_object.add_correct_names_step() + simple_emission_processing_object.add_reorder_ssp_co2_dimensions_step() + simple_emission_processing_object.add_create_fire_files() + simple_emission_processing_object.add_correct_units_step() + simple_emission_processing_object.add_correct_calendar_step() + simple_emission_processing_object.add_correct_time_axis_step() + simple_emission_processing_object.add_sum_levels_step() + simple_emission_processing_object.add_sum_sectors_step() + simple_emission_processing_object.add_create_totals_step() + simple_emission_processing_object.add_merge_ghg_step() + for step in simple_emission_processing_object.processing_steps: + assert step in list(AVAILABLE_INPUT4MIPS_PROCESSING_STEPS) diff --git a/tests/test_processing/test_utils.py b/tests/test_processing/test_utils.py new file mode 100644 index 0000000..f4ef867 --- /dev/null +++ b/tests/test_processing/test_utils.py @@ -0,0 +1,11 @@ +from climateset import TEST_DIR +from climateset.processing.raw.input4mips.processing import rename_biomassburning_file + + +def test_rename_biomassburning_files(): + test_file = ( + TEST_DIR / "resources/CO2-em-biomassburning_input4MIPs_emissions_CMIP_VUA-CMIP-BB4CMIP6-1-1_gn_185001-201512.nc" + ) + result = rename_biomassburning_file(test_file) + print(result) + assert False