Skip to content

Commit ca0323a

Browse files
committed
suggested changes and refactor pull_nhsn_data
1 parent d481a39 commit ca0323a

File tree

6 files changed

+84
-151
lines changed

6 files changed

+84
-151
lines changed

nhsn/delphi_nhsn/pull.py

Lines changed: 16 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ def pull_nhsn_data(
123123
backup_dir: str,
124124
custom_run: bool,
125125
issue_date: Optional[str],
126+
preliminary: bool = False,
126127
logger: Optional[logging.Logger] = None,
128+
127129
):
128130
"""Pull the latest NHSN hospital admission data, and conforms it into a dataset.
129131
@@ -140,6 +142,10 @@ def pull_nhsn_data(
140142
Directory to which to save raw backup data
141143
custom_run: bool
142144
Flag indicating if the current run is a patch. If so, don't save any data to disk
145+
preliminary: bool
146+
Flag indicating if the grabbing main or preliminary data
147+
issue_date:
148+
date to indicate which backup file to pull for patching
143149
logger: Optional[logging.Logger]
144150
logger object
145151
@@ -148,22 +154,26 @@ def pull_nhsn_data(
148154
pd.DataFrame
149155
Dataframe as described above.
150156
"""
157+
dataset_id = PRELIM_DATASET_ID if preliminary else MAIN_DATASET_ID
151158
# Pull data from Socrata API
152159
df = (
153-
pull_data(socrata_token, MAIN_DATASET_ID, backup_dir, logger)
160+
pull_data(socrata_token, dataset_id, backup_dir, logger)
154161
if not custom_run
155-
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=False)
162+
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=preliminary)
156163
)
157164

158-
recently_updated = True if custom_run else check_last_updated(socrata_token, MAIN_DATASET_ID, logger)
165+
recently_updated = True if custom_run else check_last_updated(socrata_token, dataset_id, logger)
166+
167+
type_dict = PRELIM_TYPE_DICT if preliminary else TYPE_DICT
168+
keep_columns = list(type_dict.keys())
169+
filtered_type_dict = copy.deepcopy(type_dict)
159170

160-
keep_columns = list(TYPE_DICT.keys())
171+
signal_map = PRELIM_SIGNALS_MAP if preliminary else SIGNALS_MAP
161172

162173
if not df.empty and recently_updated:
163174
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})
164-
filtered_type_dict = copy.deepcopy(TYPE_DICT)
165175

166-
for signal, col_name in SIGNALS_MAP.items():
176+
for signal, col_name in signal_map.items():
167177
# older backups don't have certain columns
168178
try:
169179
df[signal] = df[col_name]
@@ -181,66 +191,3 @@ def pull_nhsn_data(
181191
df = pd.DataFrame(columns=keep_columns)
182192

183193
return df
184-
185-
186-
def pull_preliminary_nhsn_data(
187-
socrata_token: str,
188-
backup_dir: str,
189-
custom_run: bool,
190-
issue_date: Optional[str],
191-
logger: Optional[logging.Logger] = None,
192-
):
193-
"""Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset.
194-
195-
The output dataset has:
196-
197-
- Each row corresponds to a single observation
198-
- Each row additionally has columns for the signals in SIGNALS
199-
200-
Parameters
201-
----------
202-
socrata_token: str
203-
My App Token for pulling the NHSN data
204-
backup_dir: str
205-
Directory to which to save raw backup data
206-
custom_run: bool
207-
Flag indicating if the current run is a patch. If so, don't save any data to disk
208-
logger: Optional[logging.Logger]
209-
logger object
210-
211-
Returns
212-
-------
213-
pd.DataFrame
214-
Dataframe as described above.
215-
"""
216-
# Pull data from Socrata API
217-
df = (
218-
pull_data(socrata_token, PRELIM_DATASET_ID, backup_dir, logger)
219-
if not custom_run
220-
else pull_data_from_file(backup_dir, issue_date, logger, prelim_flag=True)
221-
)
222-
223-
keep_columns = list(PRELIM_TYPE_DICT.keys())
224-
recently_updated = True if custom_run else check_last_updated(socrata_token, PRELIM_DATASET_ID, logger)
225-
226-
if not df.empty and recently_updated:
227-
df = df.rename(columns={"weekendingdate": "timestamp", "jurisdiction": "geo_id"})
228-
filtered_type_dict = copy.deepcopy(PRELIM_TYPE_DICT)
229-
230-
for signal, col_name in PRELIM_SIGNALS_MAP.items():
231-
try:
232-
df[signal] = df[col_name]
233-
except KeyError:
234-
logger.info("column not available in data", col_name=col_name, signal=signal)
235-
keep_columns.remove(signal)
236-
del filtered_type_dict[signal]
237-
238-
df = df[keep_columns]
239-
df = df.astype(filtered_type_dict)
240-
241-
df["geo_id"] = df["geo_id"].str.lower()
242-
df.loc[df["geo_id"] == "usa", "geo_id"] = "us"
243-
else:
244-
df = pd.DataFrame(columns=keep_columns)
245-
246-
return df

nhsn/delphi_nhsn/run.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from delphi_utils.export import create_export_csv
2525

2626
from .constants import GEOS, PRELIM_SIGNALS_MAP, SIGNALS_MAP
27-
from .pull import pull_nhsn_data, pull_preliminary_nhsn_data
27+
from .pull import pull_nhsn_data
2828

2929

3030
def run_module(params, logger=None):
@@ -56,8 +56,8 @@ def run_module(params, logger=None):
5656
export_start_date = export_start_date.strftime("%Y-%m-%d")
5757

5858
nhsn_df = pull_nhsn_data(socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger)
59-
preliminary_nhsn_df = pull_preliminary_nhsn_data(
60-
socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger
59+
preliminary_nhsn_df = pull_nhsn_data(
60+
socrata_token, backup_dir, custom_run=custom_run, issue_date=issue_date, logger=logger, preliminary=True
6161
)
6262

6363
geo_mapper = GeoMapper()
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
timestamp,geo_id,confirmed_admissions_covid_ew,confirmed_admissions_flu_ew,confirmed_admissions_rsv_ew,hosprep_confirmed_admissions_covid_ew,hosprep_confirmed_admissions_flu_ew,hosprep_confirmed_admissions_rsv_ew
2+
2021-08-21,md,53.0,2.0,0.0,13.0,13.0,1.0
3+
2021-08-21,co,852.0,0.0,,92.0,78.0,0.0
4+
2021-08-21,us,10384.0,6049.0,84.0,5426.0,5426.0,469.0
5+
2021-08-28,co,835.0,1.0,,92.0,78.0,0.0
6+
2021-08-28,us,94596.0,262.0,,5391.0,4397.0,0.0
7+
2021-09-04,co,1000.0,3.0,,92.0,78.0,0.0
8+
2021-09-04,us,93241.0,282.0,,5392.0,4396.0,0.0
9+
2021-09-11,co,982.0,2.0,,92.0,78.0,0.0
10+
2021-09-11,us,88162.0,247.0,,5391.0,4377.0,0.0
11+
2021-09-18,co,955.0,0.0,,92.0,78.0,0.0
12+
2021-09-18,us,79169.0,261.0,,5394.0,4362.0,0.0
13+
2021-09-25,co,993.0,0.0,,92.0,78.0,0.0
14+
2021-09-25,us,67740.0,234.0,,5393.0,4368.0,0.0
15+
2021-10-02,co,970.0,0.0,,92.0,78.0,0.0
16+
2021-10-02,us,58076.0,253.0,,5395.0,4391.0,0.0
17+
2021-10-09,co,1079.0,1.0,,92.0,78.0,0.0
18+
2021-10-09,us,51744.0,341.0,,5396.0,4379.0,0.0
19+
2021-10-16,co,1231.0,0.0,,92.0,78.0,0.0
20+
2021-10-16,us,45978.0,266.0,,5394.0,4307.0,0.0
21+
2021-10-16,region 1,45978.0,266.0,,5394.0,4307.0,0.0
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
timestamp,geo_id,confirmed_admissions_covid_ew_prelim,confirmed_admissions_flu_ew_prelim,confirmed_admissions_rsv_ew_prelim,hosprep_confirmed_admissions_covid_ew_prelim,hosprep_confirmed_admissions_flu_ew_prelim,hosprep_confirmed_admissions_rsv_ew_prelim
2+
2021-08-21,mi,269.0,523.0,1.0,152.0,152.0,4.0
3+
2021-08-21,co,852.0,0.0,,92.0,78.0,0.0
4+
2021-08-21,us,8946.0,5576.0,61.0,5422.0,5422.0,485.0
5+
2021-08-28,co,835.0,1.0,,92.0,78.0,0.0
6+
2021-08-28,us,94596.0,262.0,,5391.0,4397.0,0.0
7+
2021-09-04,co,1000.0,3.0,,92.0,78.0,0.0
8+
2021-09-04,us,93241.0,282.0,,5392.0,4396.0,0.0
9+
2021-09-11,co,982.0,2.0,,92.0,78.0,0.0
10+
2021-09-11,us,88162.0,247.0,,5391.0,4377.0,0.0
11+
2021-09-18,co,955.0,0.0,,92.0,78.0,0.0
12+
2021-09-18,us,79169.0,261.0,,5394.0,4362.0,0.0
13+
2021-09-25,co,993.0,0.0,,92.0,78.0,0.0
14+
2021-09-25,us,67740.0,234.0,,5393.0,4368.0,0.0
15+
2021-10-02,co,970.0,0.0,,92.0,78.0,0.0
16+
2021-10-02,us,58076.0,253.0,,5395.0,4391.0,0.0
17+
2021-10-09,co,1079.0,1.0,,92.0,78.0,0.0
18+
2021-10-09,us,51744.0,341.0,,5396.0,4379.0,0.0
19+
2021-10-16,co,1231.0,0.0,,92.0,78.0,0.0
20+
2021-10-16,us,45978.0,266.0,,5394.0,4307.0,0.0

nhsn/tests/test_patch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from delphi_nhsn.patch import filter_source_files, patch
1414
from delphi_nhsn.constants import TOTAL_ADMISSION_COVID_COL, TOTAL_ADMISSION_FLU_COL, \
1515
NUM_HOSP_REPORTING_FLU_COL, NUM_HOSP_REPORTING_COVID_COL, GEOS, TOTAL_ADMISSION_COVID, TOTAL_ADMISSION_FLU, \
16-
NUM_HOSP_REPORTING_COVID, NUM_HOSP_REPORTING_FLU, NUM_HOSP_REPORTING_RSV_COL, TOTAL_ADMISSION_RSV_COL
16+
NUM_HOSP_REPORTING_RSV_COL, TOTAL_ADMISSION_RSV_COL
1717
from conftest import TEST_DATA, PRELIM_TEST_DATA, TEST_DIR
1818

1919
class TestPatch:

nhsn/tests/test_pull.py

Lines changed: 23 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
pull_nhsn_data,
1111
pull_data,
1212
pull_data_from_file,
13-
pull_preliminary_nhsn_data, check_last_updated
13+
check_last_updated
1414
)
1515
from delphi_nhsn.constants import TYPE_DICT, PRELIM_TYPE_DICT, PRELIM_DATASET_ID, MAIN_DATASET_ID
1616

@@ -21,12 +21,16 @@
2121
"test_data": TEST_DATA,
2222
"msg_prefix": "",
2323
"prelim_flag": False,
24+
"expected_data": f"{TEST_DIR}/test_data/expected_df.csv",
25+
"type_dict": TYPE_DICT,
2426
},
2527

2628
{"id":PRELIM_DATASET_ID,
2729
"test_data":PRELIM_TEST_DATA,
2830
"msg_prefix": "Preliminary ",
2931
"prelim_flag": True,
32+
"expected_data": f"{TEST_DIR}/test_data/expected_df_prelim.csv",
33+
"type_dict": PRELIM_TYPE_DICT,
3034
}
3135
]
3236

@@ -75,123 +79,63 @@ def test_pull_from_file(self, caplog, dataset, params_w_patch):
7579

7680
@patch("delphi_nhsn.pull.Socrata")
7781
@patch("delphi_nhsn.pull.create_backup_csv")
78-
def test_pull_nhsn_data_output(self, mock_create_backup, mock_socrata, caplog, params):
82+
@pytest.mark.parametrize('dataset', DATASETS, ids=["data", "prelim_data"])
83+
def test_pull_nhsn_data_output(self, mock_create_backup, mock_socrata, dataset, caplog, params):
7984
now = time.time()
8085
# Mock Socrata client and its get method
8186
mock_client = MagicMock()
8287
mock_socrata.return_value = mock_client
83-
mock_client.get.side_effect = [TEST_DATA,[]]
84-
88+
mock_client.get.side_effect = [dataset["test_data"],[]]
8589
mock_client.get_metadata.return_value = {"rowsUpdatedAt": now}
8690

8791
backup_dir = params["common"]["backup_dir"]
8892
test_token = params["indicator"]["socrata_token"]
8993
custom_run = params["common"]["custom_run"]
90-
9194
logger = get_structured_logger()
9295

93-
result = pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger)
96+
expected_df = pd.read_csv(dataset["expected_data"])
97+
98+
result = pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger, preliminary=dataset["prelim_flag"])
9499
mock_create_backup.assert_called_once()
95100

96-
expected_columns = set(TYPE_DICT.keys())
101+
expected_columns = set(expected_df.columns)
97102
assert set(result.columns) == expected_columns
98103

99104
for column in list(result.columns):
100105
# some states don't report confirmed admissions rsv
101-
if column == "confirmed_admissions_rsv_ew":
106+
if column == "confirmed_admissions_rsv_ew" and not dataset["prelim_flag"]:
107+
continue
108+
if column == "confirmed_admissions_rsv_ew_prelim" and dataset["prelim_flag"]:
102109
continue
103110
assert result[column].notnull().all(), f"{column} has rogue NaN"
104111

112+
expected_df = expected_df.astype(dataset["type_dict"])
105113

106-
@patch("delphi_nhsn.pull.Socrata")
107-
def test_pull_nhsn_data_backup(self, mock_socrata, caplog, params):
108-
now = time.time()
109-
# Mock Socrata client and its get method
110-
mock_client = MagicMock()
111-
mock_socrata.return_value = mock_client
112-
mock_client.get.side_effect = [TEST_DATA, []]
113-
114-
mock_client.get_metadata.return_value = {"rowsUpdatedAt": now}
114+
pd.testing.assert_frame_equal(expected_df, result)
115115

116-
today = pd.Timestamp.today().strftime("%Y%m%d")
117-
backup_dir = params["common"]["backup_dir"]
118-
custom_run = params["common"]["custom_run"]
119-
test_token = params["indicator"]["socrata_token"]
120-
121-
# Load test data
122-
expected_data = pd.DataFrame(TEST_DATA)
123-
124-
logger = get_structured_logger()
125-
# Call function with test token
126-
pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger)
127-
128-
# Check logger used:
129-
assert "Backup file created" in caplog.text
130-
131-
# Check that backup file was created
132-
backup_files = glob.glob(f"{backup_dir}/{today}*")
133-
assert len(backup_files) == 2, "Backup file was not created"
134-
135-
for backup_file in backup_files:
136-
if backup_file.endswith(".csv.gz"):
137-
dtypes = expected_data.dtypes.to_dict()
138-
actual_data = pd.read_csv(backup_file, dtype=dtypes)
139-
else:
140-
actual_data = pd.read_parquet(backup_file)
141-
pd.testing.assert_frame_equal(expected_data, actual_data)
142-
143-
# clean up
144-
for file in backup_files:
145-
os.remove(file)
146116

147117
@patch("delphi_nhsn.pull.Socrata")
148-
@patch("delphi_nhsn.pull.create_backup_csv")
149-
def test_pull_prelim_nhsn_data_output(self, mock_create_backup, mock_socrata, caplog, params):
118+
@pytest.mark.parametrize('dataset', DATASETS, ids=["data", "prelim_data"])
119+
def test_pull_nhsn_data_backup(self, mock_socrata, dataset, caplog, params):
150120
now = time.time()
151121
# Mock Socrata client and its get method
152122
mock_client = MagicMock()
153123
mock_socrata.return_value = mock_client
154-
mock_client.get.side_effect = [PRELIM_TEST_DATA, []]
124+
mock_client.get.side_effect = [dataset["test_data"], []]
155125

156126
mock_client.get_metadata.return_value = {"rowsUpdatedAt": now}
157127

158-
backup_dir = params["common"]["backup_dir"]
159-
test_token = params["indicator"]["socrata_token"]
160-
custom_run = params["common"]["custom_run"]
161-
162-
logger = get_structured_logger()
163-
164-
result = pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger)
165-
mock_create_backup.assert_called_once()
166-
167-
expected_columns = set(PRELIM_TYPE_DICT.keys())
168-
assert set(result.columns) == expected_columns
169-
170-
for column in list(result.columns):
171-
# some states don't report confirmed admissions rsv
172-
if column == "confirmed_admissions_rsv_ew_prelim":
173-
continue
174-
assert result[column].notnull().all(), f"{column} has rogue NaN"
175-
@patch("delphi_nhsn.pull.Socrata")
176-
def test_pull_prelim_nhsn_data_backup(self, mock_socrata, caplog, params):
177-
now = time.time()
178-
# Mock Socrata client and its get method
179-
mock_client = MagicMock()
180-
mock_socrata.return_value = mock_client
181-
mock_client.get.side_effect = [PRELIM_TEST_DATA, []]
182-
183-
mock_client.get_metadata.return_value = {"rowsUpdatedAt": now}
184128
today = pd.Timestamp.today().strftime("%Y%m%d")
185129
backup_dir = params["common"]["backup_dir"]
186130
custom_run = params["common"]["custom_run"]
187131
test_token = params["indicator"]["socrata_token"]
188132

189133
# Load test data
190-
expected_data = pd.DataFrame(PRELIM_TEST_DATA)
134+
expected_data = pd.DataFrame(dataset["test_data"])
191135

192136
logger = get_structured_logger()
193137
# Call function with test token
194-
pull_preliminary_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger)
138+
pull_nhsn_data(test_token, backup_dir, custom_run, issue_date=None, logger=logger, preliminary=dataset["prelim_flag"])
195139

196140
# Check logger used:
197141
assert "Backup file created" in caplog.text
@@ -212,6 +156,7 @@ def test_pull_prelim_nhsn_data_backup(self, mock_socrata, caplog, params):
212156
for file in backup_files:
213157
os.remove(file)
214158

159+
215160
@pytest.mark.parametrize('dataset', DATASETS, ids=["data", "prelim_data"])
216161
@pytest.mark.parametrize("updatedAt", [time.time(), time.time() - 172800], ids=["updated", "stale"])
217162
@patch("delphi_nhsn.pull.Socrata")

0 commit comments

Comments
 (0)