Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.

Commit 4140fbd

Browse files
committed
Merge branch 'main' of github.com:openclimatefix/nowcasting_dataset
2 parents f821da4 + e7dfea1 commit 4140fbd

File tree

7 files changed

+76
-34
lines changed

7 files changed

+76
-34
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[bumpversion]
22
commit = True
33
tag = True
4-
current_version = 3.3.9
4+
current_version = 3.3.10
55

66
[bumpversion:file:setup.py]
77
search = version="{current_version}"

nowcasting_dataset/data_sources/datasource_output.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,14 @@ def check_nan_and_inf(self, data: xr.Dataset, variable_name: str = None):
5858
"""Check that all values are non NaNs and not infinite"""
5959

6060
if np.isnan(data).any():
61-
message = f"Some {self.__class__.__name__} data values are NaNs"
61+
message = f"Some {self.__class__.__name__} data values are NaNs. "
6262
if variable_name is not None:
6363
message += f" ({variable_name})"
64+
65+
# find out which example has nans in it
66+
for i in range(data.shape[0]):
67+
if np.isnan(data[i]).any():
68+
message += f" Nans in example {i}."
6469
logger.error(message)
6570
raise Exception(message)
6671

nowcasting_dataset/data_sources/gsp/gsp_data_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,8 @@ def get_locations(self, t0_datetimes_utc: pd.DatetimeIndex) -> List[SpaceTimeLoc
225225
if total_gsp_nan_count == 0:
226226

227227
# get random GSP metadata
228-
indexes = list(
229-
self.rng.integers(low=0, high=len(self.metadata), size=len(t0_datetimes_utc))
228+
indexes = sorted(
229+
list(self.rng.integers(low=0, high=len(self.metadata), size=len(t0_datetimes_utc)))
230230
)
231231
metadata = self.metadata.iloc[indexes]
232232

nowcasting_dataset/data_sources/satellite/satellite_data_source.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ def __post_init__(
5454
if self.is_live:
5555
# This is to account for the delay in satellite data
5656
self.total_seq_length = (
57-
self.history_length - (self.live_delay_minutes / self.time_resolution_minutes) + 1
57+
self.history_length
58+
- int(self.live_delay_minutes / self.time_resolution_minutes)
59+
+ 1
5860
)
5961

6062
self._shape_of_example = (

nowcasting_dataset/manager/manager_live.py

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def sample_spatial_and_temporal_locations_for_examples(
148148

149149
return locations
150150

151-
def create_batches(self) -> None:
151+
def create_batches(self, use_async: Optional[bool] = True) -> None:
152152
"""Create batches (if necessary).
153153
154154
Make dirs: `<output_data.filepath> / <split_name> / <data_source_name>`.
@@ -216,33 +216,38 @@ def create_batches(self) -> None:
216216
f"About to submit create_batches task for {data_source_name}, {split_name}"
217217
)
218218

219-
# Sometimes when debuggin it is easy to use non async
220-
# data_source.create_batches(**kwargs_for_create_batches)
221-
222-
async_result = pool.apply_async(
223-
data_source.create_batches,
224-
kwds=kwargs_for_create_batches,
225-
callback=partial(
226-
callback, data_source_name=data_source_name, split_name=split_name
227-
),
228-
error_callback=partial(
229-
error_callback,
230-
data_source_name=data_source_name,
231-
split_name=split_name,
232-
an_error_has_occured=an_error_has_occured,
233-
),
234-
)
235-
async_results_from_create_batches.append(async_result)
236-
237-
# Wait for all async_results to finish:
238-
for async_result in async_results_from_create_batches:
239-
async_result.wait()
240-
if an_error_has_occured.is_set():
241-
# An error has occurred but, at this point in the code, we don't know which
242-
# worker process raised the exception. But, with luck, the worker process
243-
# will have logged an informative exception via the _error_callback func.
244-
raise RuntimeError(
245-
f"A worker process raised an exception whilst working on {split_name}!"
219+
if ~use_async:
220+
# Sometimes when debuggin it is easy to use non async
221+
data_source.create_batches(**kwargs_for_create_batches)
222+
else:
223+
224+
async_result = pool.apply_async(
225+
data_source.create_batches,
226+
kwds=kwargs_for_create_batches,
227+
callback=partial(
228+
callback, data_source_name=data_source_name, split_name=split_name
229+
),
230+
error_callback=partial(
231+
error_callback,
232+
data_source_name=data_source_name,
233+
split_name=split_name,
234+
an_error_has_occured=an_error_has_occured,
235+
),
246236
)
237+
async_results_from_create_batches.append(async_result)
238+
239+
# Wait for all async_results to finish:
240+
for async_result in async_results_from_create_batches:
241+
async_result.wait()
242+
if an_error_has_occured.is_set():
243+
# An error has occurred but, at this point in the code,
244+
# we don't know which worker process raised the exception.
245+
# But, with luck, the worker process
246+
# will have logged an informative exception via the
247+
# _error_callback func.
248+
raise RuntimeError(
249+
f"A worker process raised an exception whilst "
250+
f"working on {split_name}!"
251+
)
247252

248253
logger.info(f"Finished creating batches for {split_name}!")

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
setup(
1313
name="nowcasting_dataset",
14-
version="3.3.9",
14+
version="3.3.10",
1515
license="MIT",
1616
description="Nowcasting Dataset",
1717
author="Jack Kelly, Peter Dudfield, Jacob Bieker",

tests/manager/test_manager_live.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,36 @@ def test_batches(test_configuration_filename, sat, gsp):
129129
assert os.path.exists(f"{dst_path}/live/satellite/000000.nc")
130130

131131

132+
def test_batches_not_async(test_configuration_filename, sat, gsp):
133+
"""Test that batches can be made"""
134+
135+
manager = ManagerLive()
136+
manager.load_yaml_configuration(filename=test_configuration_filename)
137+
138+
with tempfile.TemporaryDirectory() as local_temp_path, tempfile.TemporaryDirectory() as dst_path: # noqa 101
139+
140+
# set local temp path, and dst path
141+
manager.config.output_data.filepath = Path(dst_path)
142+
manager.local_temp_path = Path(local_temp_path)
143+
144+
# Set data sources
145+
manager.data_sources = {"gsp": gsp, "satellite": sat}
146+
manager.data_source_which_defines_geospatial_locations = gsp
147+
148+
# make file for locations
149+
manager.create_files_specifying_spatial_and_temporal_locations_of_each_example(
150+
t0_datetime=datetime(2020, 4, 1, 13)
151+
) # noqa 101
152+
153+
# make batches
154+
manager.create_batches(use_async=False)
155+
156+
assert os.path.exists(f"{dst_path}/live")
157+
assert os.path.exists(f"{dst_path}/live/gsp")
158+
assert os.path.exists(f"{dst_path}/live/gsp/000000.nc")
159+
assert os.path.exists(f"{dst_path}/live/satellite/000000.nc")
160+
161+
132162
def test_run_error(test_configuration_filename):
133163
"""Test to initialize data sources and get batches"""
134164

0 commit comments

Comments
 (0)