Skip to content

Commit ca356ed

Browse files
committed
Acquisition: refactor csv_importer for cleanliness
* simplify and unify CovidcastRow creation from file * convert CSV file details tuple to NamedTuple
1 parent 8da3e0a commit ca356ed

File tree

4 files changed

+74
-33
lines changed

4 files changed

+74
-33
lines changed

src/acquisition/covidcast/csv_importer.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from dataclasses import dataclass
77
from datetime import date
88
from glob import glob
9+
from typing import Iterator, NamedTuple, Optional, Tuple
910

1011
# third party
1112
import epiweeks as epi
@@ -14,6 +15,10 @@
1415
# first party
1516
from delphi_utils import Nans
1617
from delphi.utils.epiweek import delta_epiweeks
18+
from delphi.epidata.acquisition.covidcast.database import CovidcastRow
19+
from delphi.epidata.acquisition.covidcast.logger import get_structured_logger
20+
21+
DFRow = NamedTuple('DFRow', [('geo_id', str), ('value', float), ('stderr', float), ('sample_size', float), ('missing_value', int), ('missing_stderr', int), ('missing_sample_size', int)])
1722
PathDetails = NamedTuple('PathDetails', [('source', str), ("signal", str), ('time_type', str), ('geo_type', str), ('time_value', int), ('issue', int), ('lag', int)])
1823

1924

@@ -180,7 +185,8 @@ def find_csv_files(scan_dir, issue=(date.today(), epi.Week.fromdate(date.today()
180185
yield (path, None)
181186
continue
182187

183-
yield (path, (source, signal, time_type, geo_type, time_value, issue_value, lag_value))
188+
yield (path, PathDetails(source, signal, time_type, geo_type, time_value, issue_value, lag_value))
189+
184190

185191
@staticmethod
186192
def is_header_valid(columns):
@@ -344,7 +350,7 @@ def extract_and_check_row(row: DFRow, geo_type: str, filepath: Optional[str] = N
344350

345351

346352
@staticmethod
347-
def load_csv(filepath, geo_type):
353+
def load_csv(filepath: str, details: PathDetails) -> Iterator[Optional[CovidcastRow]]:
348354
"""Load, validate, and yield data as `RowValues` from a CSV file.
349355
350356
filepath: the CSV file to be loaded
@@ -369,9 +375,32 @@ def load_csv(filepath, geo_type):
369375
table.rename(columns={"val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"}, inplace=True)
370376

371377
for row in table.itertuples(index=False):
372-
row_values, error = CsvImporter.extract_and_check_row(row, geo_type, filepath)
378+
csv_row_values, error = CsvImporter.extract_and_check_row(row, details.geo_type, filepath)
379+
373380
if error:
374381
logger.warning(event = 'invalid value for row', detail=(str(row), error), file=filepath)
375382
yield None
376383
continue
377-
yield row_values
384+
385+
yield CovidcastRow(
386+
details.source,
387+
details.signal,
388+
details.time_type,
389+
details.geo_type,
390+
details.time_value,
391+
csv_row_values.geo_value,
392+
csv_row_values.value,
393+
csv_row_values.stderr,
394+
csv_row_values.sample_size,
395+
csv_row_values.missing_value,
396+
csv_row_values.missing_stderr,
397+
csv_row_values.missing_sample_size,
398+
details.issue,
399+
details.lag,
400+
# These four fields are unused by database acquisition
401+
# TODO: These will be used when CovidcastRow is updated.
402+
# id=None,
403+
# direction=None,
404+
# direction_updated_timestamp=0,
405+
# value_updated_timestamp=0,
406+
)

src/acquisition/covidcast/csv_to_database.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,8 @@ def upload_archive(
103103
archive_as_failed(path_src, filename, 'unknown',logger)
104104
continue
105105

106-
(source, signal, time_type, geo_type, time_value, issue, lag) = details
107-
108-
csv_rows = csv_importer_impl.load_csv(path, geo_type)
109-
110-
cc_rows = CovidcastRow.fromCsvRows(csv_rows, source, signal, time_type, geo_type, time_value, issue, lag)
111-
rows_list = list(cc_rows)
106+
csv_rows = CsvImporter.load_csv(path, details)
107+
rows_list = list(csv_rows)
112108
all_rows_valid = rows_list and all(r is not None for r in rows_list)
113109
if all_rows_valid:
114110
try:
@@ -117,12 +113,13 @@ def upload_archive(
117113
logger.info(
118114
"Inserted database rows",
119115
row_count = modified_row_count,
120-
source = source,
121-
signal = signal,
122-
geo_type = geo_type,
123-
time_value = time_value,
124-
issue = issue,
125-
lag = lag)
116+
source = details.source,
117+
signal = details.signal,
118+
geo_type = details.geo_type,
119+
time_value = details.time_value,
120+
issue = details.issue,
121+
lag = details.lag
122+
)
126123
if modified_row_count is None or modified_row_count: # else would indicate zero rows inserted
127124
total_modified_row_count += (modified_row_count if modified_row_count else 0)
128125
database.commit()
@@ -137,9 +134,9 @@ def upload_archive(
137134

138135
# archive the current file based on validation results
139136
if all_rows_valid:
140-
archive_as_successful(path_src, filename, source, logger)
137+
archive_as_successful(path_src, filename, details.source, logger)
141138
else:
142-
archive_as_failed(path_src, filename, source,logger)
139+
archive_as_failed(path_src, filename, details.source, logger)
143140

144141
return total_modified_row_count
145142

tests/acquisition/covidcast/test_csv_importer.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,10 @@ def test_load_csv_with_invalid_header(self, mock_read_csv):
233233

234234
data = {'foo': [1, 2, 3]}
235235
filepath = 'path/name.csv'
236-
geo_type = 'state'
236+
details = PathDetails("src", "name", "day", "state", 20200101, 20200101, 0)
237237

238238
mock_read_csv.return_value = pd.DataFrame(data)
239-
rows = list(CsvImporter.load_csv(filepath, geo_type))
239+
rows = list(CsvImporter.load_csv(filepath, details))
240240

241241
self.assertTrue(mock_read_csv.called)
242242
self.assertTrue(mock_read_csv.call_args[0][0], filepath)
@@ -255,10 +255,10 @@ def test_load_csv_with_valid_header(self, mock_read_csv):
255255
'sample_size': ['301', '302', '303', '304'],
256256
}
257257
filepath = 'path/name.csv'
258-
geo_type = 'state'
258+
details = PathDetails("src", "name", "day", "state", 20200101, 20200101, 0)
259259

260260
mock_read_csv.return_value = pd.DataFrame(data=data)
261-
rows = list(CsvImporter.load_csv(filepath, geo_type))
261+
rows = list(CsvImporter.load_csv(filepath, details))
262262

263263
self.assertTrue(mock_read_csv.called)
264264
self.assertTrue(mock_read_csv.call_args[0][0], filepath)
@@ -292,10 +292,10 @@ def test_load_csv_with_valid_header(self, mock_read_csv):
292292
'missing_sample_size': [Nans.NOT_MISSING] * 2 + [Nans.REGION_EXCEPTION] * 2 + [None]
293293
}
294294
filepath = 'path/name.csv'
295-
geo_type = 'state'
295+
details = PathDetails("src", "name", "day", "state", 20200101, 20200101, 0)
296296

297297
mock_read_csv.return_value = pd.DataFrame(data)
298-
rows = list(CsvImporter.load_csv(filepath, geo_type))
298+
rows = list(CsvImporter.load_csv(filepath, details))
299299

300300
self.assertTrue(mock_read_csv.called)
301301
self.assertTrue(mock_read_csv.call_args[0][0], filepath)

tests/acquisition/covidcast/test_csv_to_database.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515

1616
class UnitTests(unittest.TestCase):
1717
"""Basic unit tests."""
18+
_path_details = [
19+
# a good file
20+
('path/a.csv', PathDetails('src_a', 'sig_a', 'day', 'hrr', 20200419, 20200420, 1)),
21+
# a file with a data error
22+
('path/b.csv', PathDetails('src_b', 'sig_b', 'week', 'msa', 202016, 202017, 1)),
23+
# emulate a file that's named incorrectly
24+
('path/c.csv', None)
25+
]
1826

1927
def test_get_argument_parser(self):
2028
"""Return a parser for command-line arguments."""
@@ -37,25 +45,32 @@ def test_collect_files(self, mock_csv_importer: MagicMock):
3745
def test_upload_archive(self, mock_file_archiver: MagicMock, mock_csv_importer: MagicMock, mock_database: MagicMock):
3846
"""Upload to the database, and archive."""
3947

40-
def make_row(value):
48+
def make_row(value: float, details: PathDetails):
4149
return MagicMock(
50+
source=details.source,
51+
signal=details.signal,
52+
time_type=details.time_type,
53+
geo_type=details.geo_type,
54+
time_value=details.time_value,
55+
issue=details.issue,
56+
lag=details.lag,
4257
geo_value=value,
4358
value=value,
4459
stderr=value,
4560
sample_size=value,
4661
)
4762

48-
def load_csv_impl(path, *args):
63+
def load_csv_impl(path, details):
4964
if path == 'path/a.csv':
5065
# no validation errors
51-
yield make_row('a1')
52-
yield make_row('a2')
53-
yield make_row('a3')
66+
yield make_row('a1', details)
67+
yield make_row('a2', details)
68+
yield make_row('a3', details)
5469
elif path == 'path/b.csv':
5570
# one validation error
56-
yield make_row('b1')
71+
yield make_row('b1', details)
5772
yield None
58-
yield make_row('b3')
73+
yield make_row('b3', details)
5974
else:
6075
# fail the test for any other path
6176
raise Exception('unexpected path')
@@ -69,7 +84,7 @@ def iter_len(l: Iterable) -> int:
6984
mock_logger = MagicMock()
7085

7186
modified_row_count = upload_archive(
72-
self._path_details(),
87+
self._path_details,
7388
mock_database,
7489
make_handlers(data_dir, False),
7590
mock_logger
@@ -179,7 +194,7 @@ def test_database_exception_is_handled(self, mock_file_archiver: MagicMock, mock
179194
data_dir = 'data_dir'
180195
mock_database.insert_or_update_bulk.side_effect = Exception('testing')
181196
mock_csv_importer.find_csv_files.return_value = [
182-
('path/file.csv', ('src', 'sig', 'day', 'hrr', 20200423, 20200424, 1)),
197+
('path/file.csv', PathDetails('src', 'sig', 'day', 'hrr', 20200423, 20200424, 1)),
183198
]
184199
mock_csv_importer.load_csv.return_value = [
185200
MagicMock(geo_value='geo', value=1, stderr=1, sample_size=1),

0 commit comments

Comments
 (0)