Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
d8ceff3
initialize-new-integration dwd icon-eu forecast. And register.
JackKelly Jul 16, 2025
be3a19c
Gemini CLI's first draft. And I've started updating.
JackKelly Jul 16, 2025
311f176
Merge branch 'main' into update-icon-eu-from-dwd
JackKelly Jul 18, 2025
063bf24
All that's left to do in template_config is check the data_vars
JackKelly Jul 21, 2025
12baf81
First complete draft of ICON-EU template_config.py
JackKelly Jul 21, 2025
bc3f3a3
Add fields for all DataVarAttrs from ICON PDF doc.
JackKelly Jul 22, 2025
b26950b
Reformat the comment strings so they are all under 100 chars long.
JackKelly Jul 22, 2025
dce7afe
Add a note about alb_rad not being included
JackKelly Jul 22, 2025
159b89d
Implemented first test for template_config. Add latest.zarr
JackKelly Jul 22, 2025
46fc81b
Use the correct CRS in template_config and its test
JackKelly Jul 23, 2025
eb01463
Update the DataVarAttrs with short_name and standard_name from ICON m…
JackKelly Jul 24, 2025
eeddd29
Update latest.zarr metadata with short_name and standard_name from GR…
JackKelly Jul 24, 2025
7a7a9c8
Removing dynamical_dataset.py and region_job.py from git repo. These …
JackKelly Jul 24, 2025
7035578
Add expected_forecast_length.
JackKelly Jul 28, 2025
ba89fac
Update append_dim_start datetime, as per Alden's review
JackKelly Jul 28, 2025
85ead6b
Add dwd/__init__.py and dwd/icon_eu/__init__.py as per Alden's review…
JackKelly Jul 28, 2025
307e43c
Remove noqa: F401, as per Alden's review
JackKelly Jul 28, 2025
80ca631
Merge branch 'main' into update-icon-eu-from-dwd
JackKelly Jul 28, 2025
96ce2f2
Replace hyphen with underscore in ICON EU dataset_id
JackKelly Jul 28, 2025
a8e0885
Add a comment confirming that the coords are for pixel centers
JackKelly Jul 28, 2025
6fbb722
Comment that the CRS is a perfect sphere extracted from GRIB.
JackKelly Jul 28, 2025
20cb35f
lead_time is 93 steps NOT 120 steps!\n\nBug found by Alden in PR review.
JackKelly Jul 28, 2025
898b1d5
Merge branch 'update-icon-eu-from-dwd' of github.com:JackKelly/reform…
JackKelly Jul 28, 2025
516156b
Change wind_v_10 to wind_v_10m
JackKelly Jul 28, 2025
8d5abcf
Change relative_humidity to relative_humidity_2m
JackKelly Jul 28, 2025
975edf4
Change pressure_reduced_to_msl to pressure_reduced_to_mean_sea_level
JackKelly Jul 28, 2025
248806a
Use chunks with 165 pixels in the latitude dim
JackKelly Jul 28, 2025
2cfee1c
Merge branch 'update-icon-eu-from-dwd' of github.com:JackKelly/reform…
JackKelly Jul 28, 2025
e5be1a5
Add comment about chunk sizes in MB
JackKelly Jul 28, 2025
2a3e893
Add comment about size of shards in MB
JackKelly Jul 28, 2025
e974bfe
Remove the text 'mean over forecast time'.
JackKelly Jul 28, 2025
d063a6c
Replace 'total_precipitation' with 'precipitation_surface'
JackKelly Jul 28, 2025
576b0a9
Add unmodified files created by initialize-new-integration
JackKelly Jul 28, 2025
6e7c81b
Update pytest to 8.4.1. Doesn't fix the issue yet.
JackKelly Jul 28, 2025
416748a
Adding missing __init__.py files to test directories
JackKelly Jul 28, 2025
1f79980
Change latitude chunks to 219 so pydantic tests pass.
JackKelly Jul 28, 2025
7a8ebcc
Merge branch 'main' into update-icon-eu-from-dwd
JackKelly Aug 4, 2025
a09e0e6
Change `wind_u_10` to `wind_u_10m`
JackKelly Aug 4, 2025
887c8f4
Update Zarr metadata. Tests pass.
JackKelly Aug 4, 2025
9ed6424
Merge branch 'main' into update-icon-eu-from-dwd
JackKelly Aug 4, 2025
7f513ad
Re-run initialize-new-integration after merging with main branch
JackKelly Aug 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dev-dependencies = [
"pandas-stubs>=2.2.2.240909",
"pre-commit>=3.8.0",
"pyqt6>=6.7.1",
"pytest>=8.3.4",
"pytest>=8.4.1",
Copy link
Contributor Author

@JackKelly JackKelly Aug 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see this comment for more info on why I updated pytest: #184 (comment)

"ruff==0.12.1",
"types-requests>=2.32.0.20240914",
]
Expand Down
2 changes: 2 additions & 0 deletions src/reformatters/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
NoaaNdviCdrAnalysisDataset,
)
from reformatters.contrib.uarizona.swann.analysis import UarizonaSwannAnalysisDataset
from reformatters.dwd.icon_eu.forecast import DwdIconEuForecastDataset
from reformatters.example.new_dataset import initialize_new_integration
from reformatters.noaa.gfs.forecast import NoaaGfsForecastDataset
from reformatters.noaa.hrrr.forecast_48_hour.dynamical_dataset import (
Expand Down Expand Up @@ -52,6 +53,7 @@ class UpstreamGriddedZarrsDatasetStorageConfig(StorageConfig):
storage_config=UpstreamGriddedZarrsDatasetStorageConfig()
),
NoaaGfsForecastDataset(storage_config=SourceCoopDatasetStorageConfig()),
DwdIconEuForecastDataset(storage_config=SourceCoopDatasetStorageConfig()),
NoaaHrrrForecast48HourDataset(storage_config=SourceCoopDatasetStorageConfig()),
]

Expand Down
Empty file.
Empty file.
1 change: 1 addition & 0 deletions src/reformatters/dwd/icon_eu/forecast/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .dynamical_dataset import DwdIconEuForecastDataset as DwdIconEuForecastDataset
55 changes: 55 additions & 0 deletions src/reformatters/dwd/icon_eu/forecast/dynamical_dataset.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't modified this file yet. This is just the output of initialize-new-integration. I'll modify this file in a subsequent PR.

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from collections.abc import Sequence

from reformatters.common import validation
from reformatters.common.dynamical_dataset import DynamicalDataset
from reformatters.common.kubernetes import CronJob

from .region_job import DwdIconEuForecastRegionJob, DwdIconEuForecastSourceFileCoord
from .template_config import DwdIconEuDataVar, DwdIconEuForecastTemplateConfig


class DwdIconEuForecastDataset(
DynamicalDataset[DwdIconEuDataVar, DwdIconEuForecastSourceFileCoord]
):
template_config: DwdIconEuForecastTemplateConfig = DwdIconEuForecastTemplateConfig()
region_job_class: type[DwdIconEuForecastRegionJob] = DwdIconEuForecastRegionJob

def operational_kubernetes_resources(self, image_tag: str) -> Sequence[CronJob]:
"""Return the kubernetes cron job definitions to operationally update and validate this dataset."""
# operational_update_cron_job = ReformatCronJob(
# name=f"{self.dataset_id}-operational-update",
# schedule=_OPERATIONAL_CRON_SCHEDULE,
# pod_active_deadline=timedelta(minutes=30),
# image=image_tag,
# dataset_id=self.dataset_id,
# cpu="14",
# memory="30G",
# shared_memory="12G",
# ephemeral_storage="30G",
# secret_names=self.storage_config.k8s_secret_names,
# )
# validation_cron_job = ValidationCronJob(
# name=f"{self.dataset_id}-validation",
# schedule=_VALIDATION_CRON_SCHEDULE,
# pod_active_deadline=timedelta(minutes=10),
# image=image_tag,
# dataset_id=self.dataset_id,
# cpu="1.3",
# memory="7G",
# secret_names=self.storage_config.k8s_secret_names,
# )

# return [operational_update_cron_job, validation_cron_job]
raise NotImplementedError(
f"Implement `operational_kubernetes_resources` on {self.__class__.__name__}"
)

def validators(self) -> Sequence[validation.DataValidator]:
"""Return a sequence of DataValidators to run on this dataset."""
# return (
# validation.check_analysis_current_data,
# validation.check_analysis_recent_nans,
# )
raise NotImplementedError(
f"Implement `validators` on {self.__class__.__name__}"
)
289 changes: 289 additions & 0 deletions src/reformatters/dwd/icon_eu/forecast/region_job.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't modified this file yet. This is just the output of initialize-new-integration. I'll modify this file in a subsequent PR.

Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
from collections.abc import Callable, Mapping, Sequence
from pathlib import Path

import xarray as xr

from reformatters.common.logging import get_logger
from reformatters.common.region_job import (
CoordinateValueOrRange,
RegionJob,
SourceFileCoord,
)
from reformatters.common.storage import StoreFactory
from reformatters.common.types import (
AppendDim,
ArrayFloat32,
DatetimeLike,
Dim,
)

from .template_config import DwdIconEuDataVar

log = get_logger(__name__)


class DwdIconEuForecastSourceFileCoord(SourceFileCoord):
"""Coordinates of a single source file to process."""

def get_url(self) -> str:
raise NotImplementedError("Return the URL of the source file.")

def out_loc(
self,
) -> Mapping[Dim, CoordinateValueOrRange]:
"""
Returns a data array indexer which identifies the region in the output dataset
to write the data from the source file. The indexer is a dict from dimension
names to coordinate values or slices.
"""
# If the names of the coordinate attributes of your SourceFileCoord subclass are also all
# dimension names in the output dataset (e.g. init_time and lead_time),
# delete this implementation and use the default implementation of this method.
#
# Examples where you would override this method:
# - An analysis dataset created from forecast data:
# return {"time": self.init_time + self.lead_time}
return super().out_loc()


class DwdIconEuForecastRegionJob(
RegionJob[DwdIconEuDataVar, DwdIconEuForecastSourceFileCoord]
):
# Optionally, limit the number of variables downloaded together.
# If set to a value less than len(data_vars), downloading, reading/recompressing,
# and uploading steps will be pipelined within a region job.
# 5 is a reasonable default if it is possible to download less than all
# variables in a single file (e.g. you have a grib index).
# Leave unset if you have to download a whole file to get one variable out
# to avoid re-downloading the same file multiple times.
#
# max_vars_per_download_group: ClassVar[int | None] = None

# Implement this method only if different variables must be retrieved from different urls
#
# # @classmethod
# def source_groups(
# cls,
# data_vars: Sequence[DwdIconEuDataVar],
# ) -> Sequence[Sequence[DwdIconEuDataVar]]:
# """
# Return groups of variables, where all variables in a group can be retrieived from the same source file.
# """
# grouped = defaultdict(list)
# for data_var in data_vars:
# grouped[data_var.internal_attrs.file_type].append(data_var)
# return list(grouped.values())

# Implement this method only if specific post processing in this dataset
# requires data from outside the region defined by self.region,
# e.g. for deaccumulation or interpolation along append_dim in an analysis dataset.
#
# def get_processing_region(self) -> slice:
# """
# Return a slice of integer offsets into self.template_ds along self.append_dim that identifies
# the region to process. In most cases this is exactly self.region, but if additional data outside
# the region is required, for example for correct interpolation or deaccumulation, this method can
# return a modified slice (e.g. `slice(self.region.start - 1, self.region.stop + 1)`).
# """
# return self.region

def generate_source_file_coords(
self,
processing_region_ds: xr.Dataset,
data_var_group: Sequence[DwdIconEuDataVar],
) -> Sequence[DwdIconEuForecastSourceFileCoord]:
"""Return a sequence of coords, one for each source file required to process the data covered by processing_region_ds."""
# return [
# DwdIconEuForecastSourceFileCoord(
# init_time=init_time,
# lead_time=lead_time,
# )
# for init_time, lead_time in itertools.product(
# processing_region_ds["init_time"].values,
# processing_region_ds["lead_time"].values,
# )
# ]
raise NotImplementedError(
"Return a sequence of SourceFileCoord objects, one for each source file required to process the data covered by processing_region_ds."
)

def download_file(self, coord: DwdIconEuForecastSourceFileCoord) -> Path:
"""Download the file for the given coordinate and return the local path."""
# return http_download_to_disk(coord.get_url(), self.dataset_id)
raise NotImplementedError(
"Download the file for the given coordinate and return the local path."
)

def read_data(
self,
coord: DwdIconEuForecastSourceFileCoord,
data_var: DwdIconEuDataVar,
) -> ArrayFloat32:
"""Read and return an array of data for the given variable and source file coordinate."""
# with rasterio.open(coord.downloaded_file_path) as reader:
# TODO: make a band index based on tag matching utility function
# matching_indexes = [
# i
# for i in range(reader.count)
# if (tags := reader.tags(i))["GRIB_ELEMENT"]
# == data_var.internal_attrs.grib_element
# and tags["GRIB_COMMENT"] == data_var.internal_attrs.grib_comment
# ]
# assert len(matching_indexes) == 1, f"Expected exactly 1 matching band, found {matching_indexes}. {data_var.internal_attrs.grib_element=}, {data_var.internal_attrs.grib_description=}, {coord.downloaded_file_path=}"
# rasterio_band_index = 1 + matching_indexes[0] # rasterio is 1-indexed
# return reader.read(rasterio_band_index, dtype=np.float32)
raise NotImplementedError(
"Read and return data for the given variable and source file coordinate."
)

# Implement this to apply transformations to the array (e.g. deaccumulation)
#
# def apply_data_transformations(
# self, data_array: xr.DataArray, data_var: DwdIconEuDataVar
# ) -> None:
# """
# Apply in-place data transformations to the output data array for a given data variable.

# This method is called after reading all data for a variable into the shared-memory array,
# and before writing shards to the output store. The default implementation applies binary
# rounding to float32 arrays if `data_var.internal_attrs.keep_mantissa_bits` is set.

# Subclasses may override this method to implement additional transformations such as
# deaccumulation, interpolation or other custom logic. All transformations should be
# performed in-place (don't copy `data_array`, it's large).

# Parameters
# ----------
# data_array : xr.DataArray
# The output data array to be transformed in-place.
# data_var : DwdIconEuDataVar
# The data variable metadata object, which may contain transformation parameters.
# """
# super().apply_data_transformations(data_array, data_var)

def update_template_with_results(
self, process_results: Mapping[str, Sequence[DwdIconEuForecastSourceFileCoord]]
) -> xr.Dataset:
"""
Update template dataset based on processing results. This method is called
during operational updates.

Subclasses should implement this method to apply dataset-specific adjustments
based on the processing results. Examples include:
- Trimming dataset along append_dim to only include successfully processed data
- Loading existing coordinate values from the primary store and updating them based on results
- Updating metadata based on what was actually processed vs what was planned

The default implementation trims along append_dim to end at the most recent
successfully processed coordinate (timestamp).

Parameters
----------
process_results : Mapping[str, Sequence[DwdIconEuForecastSourceFileCoord]]
Mapping from variable names to their source file coordinates with final processing status.

Returns
-------
xr.Dataset
Updated template dataset reflecting the actual processing results.
"""
# The super() implementation looks like this:
#
# max_append_dim_processed = max(
# (
# c.out_loc()[self.append_dim] # type: ignore[type-var]
# for c in chain.from_iterable(process_results.values())
# if c.status == SourceFileStatus.Succeeded
# ),
# default=None,
# )
# if max_append_dim_processed is None:
# # No data was processed, trim the template to stop before this job's region
# # This is using isel's exclusive slice end behavior
# return self.template_ds.isel(
# {self.append_dim: slice(None, self.region.start)}
# )
# else:
# return self.template_ds.sel(
# {self.append_dim: slice(None, max_append_dim_processed)}
# )
#
# If you like the above behavior, skip implementing this method.
# If you need to customize the behavior, implement this method.

raise NotImplementedError(
"Subclasses implement update_template_with_results() with dataset-specific logic"
)

@classmethod
def operational_update_jobs(
cls,
primary_store_factory: StoreFactory,
tmp_store: Path,
get_template_fn: Callable[[DatetimeLike], xr.Dataset],
append_dim: AppendDim,
all_data_vars: Sequence[DwdIconEuDataVar],
reformat_job_name: str,
) -> tuple[
Sequence["RegionJob[DwdIconEuDataVar, DwdIconEuForecastSourceFileCoord]"],
xr.Dataset,
]:
"""
Return the sequence of RegionJob instances necessary to update the dataset
from its current state to include the latest available data.

Also return the template_ds, expanded along append_dim through the end of
the data to process. The dataset returned here may extend beyond the
available data at the source, in which case `update_template_with_results`
will trim the dataset to the actual data processed.

The exact logic is dataset-specific, but it generally follows this pattern:
1. Figure out the range of time to process: append_dim_start (inclusive) and append_dim_end (exclusive)
a. Read existing data from the primary store to determine what's already processed
b. Optionally identify recent incomplete/non-final data for reprocessing
2. Call get_template_fn(append_dim_end) to get the template_ds
3. Create RegionJob instances by calling cls.get_jobs(..., filter_start=append_dim_start)

Parameters
----------
primary_store_factory : StoreFactory
The factory to get the primary store to read existing data from and write updates to.
tmp_store : Path
The temporary Zarr store to write into while processing.
get_template_fn : Callable[[DatetimeLike], xr.Dataset]
Function to get the template_ds for the operational update.
append_dim : AppendDim
The dimension along which data is appended (e.g., "time").
all_data_vars : Sequence[DwdIconEuDataVar]
Sequence of all data variable configs for this dataset.
reformat_job_name : str
The name of the reformatting job, used for progress tracking.
This is often the name of the Kubernetes job, or "local".

Returns
-------
Sequence[RegionJob[DwdIconEuDataVar, DwdIconEuForecastSourceFileCoord]]
RegionJob instances that need processing for operational updates.
xr.Dataset
The template_ds for the operational update.
"""
# existing_ds = xr.open_zarr(primary_store_factory.store())
# append_dim_start = existing_ds[append_dim].max()
# append_dim_end = pd.Timestamp.now()
# template_ds = get_template_fn(append_dim_end)

# jobs = cls.get_jobs(
# kind="operational-update",
# primary_store_factory=primary_store_factory,
# tmp_store=tmp_store,
# template_ds=template_ds,
# append_dim=append_dim,
# all_data_vars=all_data_vars,
# reformat_job_name=reformat_job_name,
# filter_start=append_dim_start,
# )
# return jobs, template_ds

raise NotImplementedError(
"Subclasses implement operational_update_jobs() with dataset-specific logic"
)
Loading