Skip to content

Commit 91c9e6c

Browse files
authored
NOAA NDVI CDR: fetch more recent files from NCEI instead of S3 (#189)
* wip * Fixes * Comments and clean up * Clean up * Add missing init files * revert log to debug level
1 parent 2465ee9 commit 91c9e6c

File tree

5 files changed

+215
-10
lines changed

5 files changed

+215
-10
lines changed

src/reformatters/common/download.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def http_store(base_url: str) -> obstore.store.HTTPStore:
6767
base_url,
6868
client_options={
6969
"connect_timeout": "4 seconds",
70-
"timeout": "16 seconds",
70+
"timeout": "120 seconds",
7171
},
7272
retry_config={
7373
"max_retries": 16,

src/reformatters/contrib/noaa/__init__.py

Whitespace-only changes.

src/reformatters/contrib/noaa/ndvi_cdr/__init__.py

Whitespace-only changes.

src/reformatters/contrib/noaa/ndvi_cdr/analysis/region_job.py

Lines changed: 92 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import re
12
from collections.abc import Callable, Mapping, Sequence
23
from pathlib import Path
34
from typing import cast
@@ -7,10 +8,16 @@
78
import obstore
89
import pandas as pd
910
import rasterio # type: ignore[import-untyped]
11+
import requests
1012
import xarray as xr
1113
import zarr
1214

13-
from reformatters.common.download import download_to_disk, get_local_path, s3_store
15+
from reformatters.common.download import (
16+
download_to_disk,
17+
get_local_path,
18+
http_store,
19+
s3_store,
20+
)
1421
from reformatters.common.iterating import item
1522
from reformatters.common.logging import get_logger
1623
from reformatters.common.region_job import (
@@ -61,14 +68,20 @@ def out_loc(self) -> Mapping[Dim, CoordinateValueOrRange]:
6168
class NoaaNdviCdrAnalysisRegionJob(
6269
RegionJob[NoaaNdviCdrDataVar, NoaaNdviCdrAnalysisSourceFileCoord]
6370
):
64-
download_parallelism: int = 10
71+
# Set lower than would be needed for fetching exclusively from S3
72+
# to accomodate the cases where we are downloading from NCEI.
73+
download_parallelism: int = 5
6574

6675
# We observed deadlocks when using more than 2 threads to read data into shared memory.
6776
read_parallelism: int = 1
6877

6978
s3_bucket_url: str = "s3://noaa-cdr-ndvi-pds"
7079
s3_region: str = "us-east-1"
7180

81+
root_nc_url: str = (
82+
"http://ncei.noaa.gov/data/land-normalized-difference-vegetation-index/access"
83+
)
84+
7285
def generate_source_file_coords(
7386
self,
7487
processing_region_ds: xr.Dataset,
@@ -96,10 +109,20 @@ def generate_source_file_coords(
96109
# We want to extract the date part (e.g., 19810728)
97110
try:
98111
_, date_str, _ = filepath.rsplit("_", 2)
112+
99113
# Parse date string to pd.Timestamp
100114
file_time = pd.Timestamp(date_str)
101115
filename = Path(filepath).name
102-
url = f"{self.s3_bucket_url}/data/{year}/{filename}"
116+
117+
# if file_time is within 2 weeks of today, fetch from ncei,
118+
# otherwise fetch from S3
119+
two_weeks_ago = pd.Timestamp.now() - pd.Timedelta(days=14)
120+
is_within_last_2_weeks = two_weeks_ago <= file_time
121+
if is_within_last_2_weeks:
122+
url = f"{self.root_nc_url}/{year}/{filename}"
123+
else:
124+
url = f"{self.s3_bucket_url}/data/{year}/{filename}"
125+
103126
urls_by_time[file_time] = url
104127
except Exception as e:
105128
log.warning(f"Skipping file {filepath} due to error: {e}")
@@ -117,14 +140,20 @@ def generate_source_file_coords(
117140

118141
def download_file(self, coord: NoaaNdviCdrAnalysisSourceFileCoord) -> Path:
119142
"""Download the file for the given coordinate and return the local path."""
120-
store = s3_store(self.s3_bucket_url, self.s3_region, skip_signature=True)
143+
url = coord.get_url()
144+
parsed_url = urlparse(url)
145+
146+
store: obstore.store.HTTPStore | obstore.store.S3Store
147+
if parsed_url.netloc == "ncei.noaa.gov":
148+
store = http_store(f"https://{parsed_url.netloc}")
149+
else:
150+
store = s3_store(self.s3_bucket_url, self.s3_region, skip_signature=True)
121151

122-
s3_url = coord.get_url()
123-
object_key = urlparse(s3_url).path.removeprefix("/")
124-
local_path = get_local_path(self.dataset_id, object_key)
152+
remote_path = urlparse(url).path.removeprefix("/")
153+
local_path = get_local_path(self.dataset_id, remote_path)
125154

126-
download_to_disk(store, object_key, local_path, overwrite_existing=True)
127-
log.debug(f"Downloaded {object_key} to {local_path}")
155+
download_to_disk(store, remote_path, local_path, overwrite_existing=True)
156+
log.debug(f"Downloaded {url} to {local_path}")
128157

129158
return local_path
130159

@@ -197,6 +226,22 @@ def _read_usable_ndvi(
197226
return cast(ArrayFloat32, ndvi_data)
198227

199228
def _list_source_files(self, year: int) -> list[str]:
229+
# We believe NCEI will have more recent files before S3 does.
230+
# While this gap may only be a couple of weeks at most, we cannot enumerate
231+
# files by a coarser granularity than a year. The reason we check if the requested
232+
# year is the current or previous year is to be sure that we continue to check
233+
# NCEI in early January of the current year. I.e., in Jan 2026, we should check
234+
# NCEI for the 2025 files.
235+
#
236+
# We hardcode 2025 as the earliest year to check NCEI, since as of this writing,
237+
# we know S3 is up to date through June 2025. Backfills should go through S3.
238+
current_year = pd.Timestamp.now().year
239+
if year >= 2025 and year in (current_year, current_year - 1):
240+
return self._list_ncei_source_files(year)
241+
else:
242+
return self._list_s3_source_files(year)
243+
244+
def _list_s3_source_files(self, year: int) -> list[str]:
200245
store = s3_store(self.s3_bucket_url, self.s3_region, skip_signature=True)
201246
results = list(obstore.list(store, f"data/{year}", chunk_size=366))
202247
if len(results) == 0:
@@ -208,6 +253,44 @@ def _list_source_files(self, year: int) -> list[str]:
208253

209254
return [result["path"] for result in results[0]]
210255

256+
def _list_ncei_source_files(self, year: int) -> list[str]:
257+
"""List source files from NCEI.
258+
259+
The response text from NCEI is HTML with a table enumerating available files. Example:
260+
261+
<td><a href="VIIRS-Land_v001_JP113C1_NOAA-20_20250101_c20250103153010.nc">VIIRS-Land_v001_JP113C1_NOAA-20_20250101_c20250103153010.nc</a></td>
262+
<td align="right">2025-01-05 15:40</td>
263+
<td align="right">63914048</td>
264+
<td></td>
265+
</tr>
266+
<tr>
267+
<td><a href="VIIRS-Land_v001_JP113C1_NOAA-20_20250102_c20250104153009.nc">VIIRS-Land_v001_JP113C1_NOAA-20_20250102_c20250104153009.nc</a></td>
268+
...
269+
"""
270+
ncei_url = f"{self.root_nc_url}/{year}/"
271+
272+
response = requests.get(ncei_url, timeout=15)
273+
response.raise_for_status()
274+
275+
content = response.text
276+
filenames = re.findall(r"href=\"(VIIRS-Land.+nc)\"", content)
277+
filenames = list(set(filenames))
278+
279+
# Simple check: startswith, endswith, and only one .nc present
280+
def is_valid_viirs_nc(fname: str) -> bool:
281+
return (
282+
fname.startswith("VIIRS-Land")
283+
and fname.endswith(".nc")
284+
and fname.count(".nc") == 1
285+
)
286+
287+
assert all(is_valid_viirs_nc(fname) for fname in filenames), (
288+
"Some filenames do not conform to expected structure: "
289+
+ str([fname for fname in filenames if not is_valid_viirs_nc(fname)])
290+
)
291+
292+
return filenames
293+
211294
@classmethod
212295
def operational_update_jobs(
213296
cls,

tests/contrib/noaa/ndvi_cdr/analysis/region_job_test.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from typing import Any
12
from unittest.mock import Mock
23

34
import numpy as np
@@ -315,3 +316,124 @@ def mock_read_netcdf_data(
315316
assert result[2, 0] == 0.2 # land_no_desert+aerosol preserved
316317
assert np.isnan(result[2, 1]) # no aerosol quality masked
317318
assert np.isnan(result[2, 2]) # no aerosol quality masked
319+
320+
321+
def test_generate_source_file_coords_uses_ncei_for_recent_year(
322+
monkeypatch: pytest.MonkeyPatch,
323+
) -> None:
324+
"""Test that NCEI is used for recent source files in generate_source_file_coords."""
325+
326+
# Mock pd.Timestamp.now to return a date within 2 weeks of the test files
327+
monkeypatch.setattr("pandas.Timestamp.now", lambda: pd.Timestamp("2026-01-15"))
328+
monkeypatch.setattr("obstore.list", Mock())
329+
330+
def mock_requests_get(url: str, **kwargs: Any) -> Mock:
331+
mock_response = Mock()
332+
mock_response.raise_for_status = Mock()
333+
if "2025" in url:
334+
mock_response.text = """
335+
<a href="VIIRS-Land_v001_JP113C1_NOAA-20_20251231_c20250102153009.nc">VIIRS-Land_v001_JP113C1_NOAA-20_20251231_c20250102153009.nc</a>
336+
"""
337+
elif "2026" in url:
338+
mock_response.text = """
339+
<a href="VIIRS-Land_v001_JP113C1_NOAA-20_20260101_c20260103153010.nc">VIIRS-Land_v001_JP113C1_NOAA-20_20260101_c20260103153010.nc</a>
340+
<a href="VIIRS-Land_v001_JP113C1_NOAA-20_20260102_c20260104153009.nc">VIIRS-Land_v001_JP113C1_NOAA-20_20260102_c20260104153009.nc</a>
341+
"""
342+
else:
343+
mock_response.text = ""
344+
return mock_response
345+
346+
monkeypatch.setattr("requests.get", mock_requests_get)
347+
348+
template_config = NoaaNdviCdrAnalysisTemplateConfig()
349+
350+
template_ds = xr.Dataset(
351+
coords={
352+
"time": pd.date_range("2025-12-31", "2026-01-02", freq="D"),
353+
"latitude": np.linspace(89.999998472637188, -89.999998472637188, 3600),
354+
"longitude": np.linspace(-180.000006104363450, 179.999993895636550, 7200),
355+
}
356+
)
357+
358+
region_job = NoaaNdviCdrAnalysisRegionJob.model_construct(
359+
final_store=get_zarr_store("prod-path", "test-dataset", "test-version"),
360+
tmp_store=Mock(),
361+
template_ds=template_ds,
362+
data_vars=template_config.data_vars,
363+
append_dim=template_config.append_dim,
364+
region=Mock(spec=slice),
365+
reformat_job_name="test",
366+
)
367+
368+
processing_region_ds = template_ds.isel(latitude=slice(0, 10))
369+
coords = region_job.generate_source_file_coords(
370+
processing_region_ds, template_config.data_vars
371+
)
372+
373+
assert len(coords) == 3
374+
assert (
375+
coords[0].get_url()
376+
== "s3://noaa-cdr-ndvi-pds/data/2025/VIIRS-Land_v001_JP113C1_NOAA-20_20251231_c20250102153009.nc"
377+
)
378+
assert (
379+
coords[1].get_url()
380+
== "http://ncei.noaa.gov/data/land-normalized-difference-vegetation-index/access/2026/VIIRS-Land_v001_JP113C1_NOAA-20_20260101_c20260103153010.nc"
381+
)
382+
assert (
383+
coords[2].get_url()
384+
== "http://ncei.noaa.gov/data/land-normalized-difference-vegetation-index/access/2026/VIIRS-Land_v001_JP113C1_NOAA-20_20260102_c20260104153009.nc"
385+
)
386+
387+
388+
@pytest.mark.parametrize(
389+
"test_year,expected_source,expected_result",
390+
[
391+
(
392+
2026,
393+
"ncei",
394+
["ncei_file.nc"],
395+
), # Current year -> NCEI (# current year mocked to 2026)
396+
(2025, "ncei", ["ncei_file.nc"]), # Previous year -> NCEI
397+
(2024, "s3", ["s3_file.nc"]), # 2+ years ago -> S3
398+
(2020, "s3", ["s3_file.nc"]), # Older year -> S3
399+
],
400+
)
401+
def test_list_source_files_routing_by_year(
402+
monkeypatch: pytest.MonkeyPatch,
403+
test_year: int,
404+
expected_source: str,
405+
expected_result: list[str],
406+
) -> None:
407+
"""Test that _list_source_files routes to NCEI for recent years and S3 for older years."""
408+
# Mock current date to 2026
409+
mock_now = Mock(return_value=pd.Timestamp("2026-06-15"))
410+
monkeypatch.setattr("pandas.Timestamp.now", mock_now)
411+
412+
template_config = NoaaNdviCdrAnalysisTemplateConfig()
413+
414+
region_job = NoaaNdviCdrAnalysisRegionJob.model_construct(
415+
final_store=get_zarr_store("prod-path", "test-dataset", "test-version"),
416+
tmp_store=Mock(),
417+
template_ds=Mock(),
418+
data_vars=template_config.data_vars,
419+
append_dim=template_config.append_dim,
420+
region=Mock(spec=slice),
421+
reformat_job_name="test",
422+
)
423+
424+
# Mock both methods
425+
mock_ncei = Mock(return_value=["ncei_file.nc"])
426+
mock_s3 = Mock(return_value=["s3_file.nc"])
427+
monkeypatch.setattr(region_job, "_list_ncei_source_files", mock_ncei)
428+
monkeypatch.setattr(region_job, "_list_s3_source_files", mock_s3)
429+
430+
result = region_job._list_source_files(test_year)
431+
432+
assert result == expected_result
433+
434+
if expected_source == "ncei":
435+
mock_ncei.assert_called_once_with(test_year)
436+
mock_s3.assert_not_called()
437+
else:
438+
mock_s3.assert_called_once_with(test_year)
439+
mock_ncei.assert_not_called()

0 commit comments

Comments
 (0)