Skip to content

Commit 1becc8f

Browse files
authored
Refactor zarr data copying to use store.set instead of fsspec operations (#194)
* Initial replacement of fsspec interactions with store.set * Move _get_fs_and_path to progress tracker since it's the only use * Implement retries
1 parent 63e2fef commit 1becc8f

File tree

4 files changed

+74
-33
lines changed

4 files changed

+74
-33
lines changed

src/reformatters/common/retry.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import time
2+
from collections.abc import Callable
3+
from typing import Any
4+
5+
6+
def retry(func: Callable[[], Any], max_attempts: int = 6) -> Any:
7+
"""Simple retry utility that sleeps for a short time between attempts."""
8+
for attempt in range(max_attempts):
9+
try:
10+
return func()
11+
except Exception:
12+
if attempt == max_attempts - 1: # Last attempt failed
13+
raise
14+
time.sleep(attempt)
15+
continue

src/reformatters/common/update_progress_tracker.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,33 @@
33
import threading
44
from collections.abc import Sequence
55

6+
import fsspec # type: ignore
67
import zarr.storage
78

89
from reformatters.common.config_models import BaseInternalAttrs, DataVar
910
from reformatters.common.fsspec import fsspec_apply
1011
from reformatters.common.logging import get_logger
11-
from reformatters.common.zarr import _get_fs_and_path
1212

1313
log = get_logger(__name__)
1414

1515
PROCESSED_VARIABLES_KEY = "processed_variables"
1616

1717

18+
def _get_fs_and_path(
19+
store: zarr.abc.store.Store,
20+
) -> tuple[fsspec.AbstractFileSystem, str]:
21+
"""Gross work around to allow us to make other store types quack like FsspecStore."""
22+
fs = getattr(store, "fs", None)
23+
if not isinstance(fs, fsspec.AbstractFileSystem):
24+
raise ValueError(
25+
"primary_store must have an fs that is an instance of fsspec.AbstractFileSystem"
26+
)
27+
path = getattr(store, "path", None)
28+
if not isinstance(path, str):
29+
raise ValueError("primary_store must have a path attribute that is a string")
30+
return fs, path
31+
32+
1833
class UpdateProgressTracker:
1934
"""
2035
Tracks which variables have been processed within a time slice of a job.

src/reformatters/common/zarr.py

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
from typing import Literal
55
from uuid import uuid4
66

7-
import fsspec # type: ignore
87
import xarray as xr
98
import zarr
109
from fsspec.implementations.local import LocalFileSystem # type: ignore
1110

1211
from reformatters.common.config import Config
13-
from reformatters.common.fsspec import fsspec_apply
1412
from reformatters.common.logging import get_logger
13+
from reformatters.common.retry import retry
1514

1615
logger = get_logger(__name__)
1716

@@ -113,15 +112,11 @@ def copy_data_var(
113112
f"Copying data var chunks to primary store ({primary_store}) for {relative_dir}."
114113
)
115114

116-
fs, path = _get_fs_and_path(primary_store)
117-
fs.auto_mkdir = True
118-
119-
# We want to support local and s3fs filesystems. fsspec local filesystem is sync,
120-
# but our s3fs from zarr.storage.FsspecStore is async and here we work around it.
121-
# The AsyncFileSystem wrapper on LocalFilesystem raises NotImplementedError when _put is called.
122-
source = f"{tmp_store / relative_dir}/"
123-
dest = f"{path}/{relative_dir}"
124-
fsspec_apply(fs, "put", source, dest, recursive=True, auto_mkdir=True)
115+
for file in tmp_store.glob(f"{relative_dir}**/*"):
116+
if not file.is_file():
117+
continue
118+
key = str(file.relative_to(tmp_store))
119+
sync_to_store(primary_store, key, file.read_bytes())
125120

126121
if track_progress_callback is not None:
127122
track_progress_callback()
@@ -151,25 +146,18 @@ def copy_zarr_metadata(
151146
metadata_files.append(tmp_store / "zarr.json")
152147
metadata_files.extend(tmp_store.glob("*/zarr.json"))
153148

154-
fs, path = _get_fs_and_path(primary_store)
155-
156-
# This could be partially parallelized BUT make sure to write the coords before the metadata.
157149
for file in metadata_files:
158-
relative = file.relative_to(tmp_store)
159-
dest = f"{path}/{relative}"
160-
fsspec_apply(fs, "put_file", file, dest)
161-
162-
163-
def _get_fs_and_path(
164-
store: zarr.abc.store.Store,
165-
) -> tuple[fsspec.AbstractFileSystem, str]:
166-
"""Gross work around to allow us to make other store types quack like FsspecStore."""
167-
fs = getattr(store, "fs", None)
168-
if not isinstance(fs, fsspec.AbstractFileSystem):
169-
raise ValueError(
170-
"primary_store must have an fs that is an instance of fsspec.AbstractFileSystem"
171-
)
172-
path = getattr(store, "path", None)
173-
if not isinstance(path, str):
174-
raise ValueError("primary_store must have a path attribute that is a string")
175-
return fs, path
150+
relative_path = str(file.relative_to(tmp_store))
151+
sync_to_store(primary_store, relative_path, file.read_bytes())
152+
153+
154+
def sync_to_store(store: zarr.abc.store.Store, key: str, data: bytes) -> None:
155+
retry(
156+
lambda: zarr.core.sync.sync(
157+
store.set(
158+
key,
159+
zarr.core.buffer.default_buffer_prototype().buffer.from_bytes(data),
160+
)
161+
),
162+
max_attempts=6,
163+
)

tests/common/test_retry.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from unittest.mock import Mock
2+
3+
import pytest
4+
5+
from reformatters.common.retry import retry
6+
7+
8+
def test_retry_succeeds_on_first_attempt() -> None:
9+
mock_func = Mock(return_value="success")
10+
result = retry(mock_func)
11+
assert result == "success"
12+
13+
14+
def test_retry_succeeds_after_failures() -> None:
15+
mock_func = Mock(side_effect=[ValueError("fail"), "success"])
16+
result = retry(mock_func, max_attempts=3)
17+
assert result == "success"
18+
19+
20+
def test_retry_fails_after_max_attempts() -> None:
21+
mock_func = Mock(side_effect=ValueError("persistent failure"))
22+
with pytest.raises(ValueError, match="persistent failure"):
23+
retry(mock_func, max_attempts=2)

0 commit comments

Comments
 (0)