Skip to content

Commit 9dbbc59

Browse files
minhkhulaysim319
andauthored
2070 add backup of source data to nssp (#2072)
* base changes * backup dir * add test * lint * adding tests for create_backup_csv * also writing into parquet * adding pyarrow as dependency * clean test * adjusting logic to match new naming format and chunking * moving dependencies * lint * made test more robust * fix test * clean up * adding parqut into gitignore * placate the linter --------- Co-authored-by: Amaris Sim <aysim319@gmail.com>
1 parent 57368e9 commit 9dbbc59

File tree

12 files changed

+236
-44
lines changed

12 files changed

+236
-44
lines changed

_delphi_utils_python/delphi_utils/export.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""Export data in the format expected by the Delphi API."""
22
# -*- coding: utf-8 -*-
3-
import gzip
43
import logging
54
from datetime import datetime
65
from os.path import getsize, join
@@ -189,15 +188,21 @@ def create_backup_csv(
189188
issue = datetime.today().strftime("%Y%m%d")
190189

191190
backup_filename = [issue, geo_res, metric, sensor]
192-
backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz"
191+
backup_filename = "_".join(filter(None, backup_filename))
193192
backup_file = join(backup_dir, backup_filename)
194-
195-
with gzip.open(backup_file, "wt", newline="") as f:
196-
df.to_csv(f, index=False, na_rep="NA")
197-
198-
if logger:
199-
logger.info(
200-
"Backup file created",
201-
backup_file=backup_file,
202-
backup_size=getsize(backup_file),
193+
try:
194+
# defacto data format is csv, but parquet preserved data types (keeping both as intermidary measures)
195+
df.to_csv(
196+
f"{backup_file}.csv.gz", index=False, na_rep="NA", compression="gzip"
203197
)
198+
df.to_parquet(f"{backup_file}.parquet", index=False)
199+
200+
if logger:
201+
logger.info(
202+
"Backup file created",
203+
backup_file=backup_file,
204+
backup_size=getsize(f"{backup_file}.csv.gz"),
205+
)
206+
# pylint: disable=W0703
207+
except Exception as e:
208+
logger.info("Backup file creation failed", msg=e)

_delphi_utils_python/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ dependencies = [
2323
"gitpython",
2424
"importlib_resources>=1.3",
2525
"numpy",
26+
"pyarrow",
2627
"pandas>=1.1.0",
2728
"requests",
2829
"slackclient",

_delphi_utils_python/tests/test_export.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
"""Tests for exporting CSV files."""
22
from datetime import datetime
3+
import logging
34
from os import listdir
45
from os.path import join
5-
from typing import Any, Dict, List
6+
from typing import Any, Dict
67

78
import mock
89
import numpy as np
910
import pandas as pd
1011
from pandas.testing import assert_frame_equal
1112

12-
from delphi_utils import create_export_csv, Nans
13+
from delphi_utils import create_export_csv, Nans, create_backup_csv, get_structured_logger
1314

1415

1516
def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
@@ -386,3 +387,22 @@ def test_export_sort(self, tmp_path):
386387
})
387388
sorted_csv = _set_df_dtypes(pd.read_csv(join(tmp_path, "20200215_county_test.csv")), dtypes={"geo_id": str})
388389
assert_frame_equal(sorted_csv,expected_df)
390+
391+
def test_create_backup_regular(self, caplog, tmp_path):
392+
caplog.set_level(logging.INFO)
393+
logger = get_structured_logger()
394+
today = datetime.strftime(datetime.today(), "%Y%m%d")
395+
dtypes = self.DF.dtypes.to_dict()
396+
del dtypes["timestamp"]
397+
geo_res = "county"
398+
metric = "test"
399+
sensor = "deaths"
400+
create_backup_csv(df=self.DF, backup_dir=tmp_path, custom_run=False, issue=None, geo_res=geo_res, metric=metric, sensor=sensor, logger=logger)
401+
assert "Backup file created" in caplog.text
402+
403+
actual = pd.read_csv(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.csv.gz"), dtype=dtypes, parse_dates=["timestamp"])
404+
assert self.DF.equals(actual)
405+
406+
actual_parquet = pd.read_parquet(join(tmp_path, f"{today}_{geo_res}_{metric}_{sensor}.parquet"))
407+
assert actual_parquet.equals(actual)
408+

ansible/templates/nssp-params-prod.json.j2

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"common": {
33
"export_dir": "/common/covidcast/receiving/nssp",
4+
"backup_dir": "./raw_data_backups",
45
"log_filename": "/var/log/indicators/nssp.log",
56
"log_exceptions": false
67
},
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
*.csv
2-
*.gz
2+
*.gz
3+
*.parquet

nchs_mortality/tests/test_pull.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import glob
12
import os
23
import pytest
34

45
import pandas as pd
6+
7+
from delphi_utils import get_structured_logger
58
from delphi_utils.geomap import GeoMapper
69

710
from delphi_nchs_mortality.pull import pull_nchs_mortality_data, standardize_columns
@@ -98,13 +101,30 @@ def test_bad_file_with_missing_cols(self):
98101
with pytest.raises(ValueError):
99102
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv")
100103

101-
def test_backup_today_data(self):
104+
def test_backup_today_data(self, caplog):
102105
today = pd.Timestamp.today().strftime("%Y%m%d")
103106
backup_dir = "./raw_data_backups"
104-
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv")
105-
backup_file = f"{backup_dir}/{today}.csv.gz"
106-
backup_df = pd.read_csv(backup_file)
107+
logger = get_structured_logger()
108+
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv", logger=logger)
109+
110+
# Check logger used:
111+
assert "Backup file created" in caplog.text
112+
113+
# Check that backup file was created
114+
backup_files = glob.glob(f"{backup_dir}/{today}*")
115+
assert len(backup_files) == 2, "Backup file was not created"
116+
107117
source_df = pd.read_csv("test_data/test_data.csv")
118+
for backup_file in backup_files:
119+
if backup_file.endswith(".csv.gz"):
120+
backup_df = pd.read_csv(backup_file)
121+
else:
122+
backup_df = pd.read_parquet(backup_file)
123+
pd.testing.assert_frame_equal(source_df, backup_df)
124+
125+
backup_file_parquet = f"{backup_dir}/{today}.parquet"
126+
backup_df = pd.read_parquet(backup_file_parquet)
108127
pd.testing.assert_frame_equal(source_df, backup_df)
128+
109129
if os.path.exists(backup_file):
110130
os.remove(backup_file)

nssp/delphi_nssp/pull.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# -*- coding: utf-8 -*-
22
"""Functions for pulling NSSP ER data."""
3-
3+
import logging
44
import textwrap
5+
from typing import Optional
56

67
import pandas as pd
8+
from delphi_utils import create_backup_csv
79
from sodapy import Socrata
810

911
from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
@@ -27,7 +29,7 @@ def warn_string(df, type_dict):
2729
return warn
2830

2931

30-
def pull_nssp_data(socrata_token: str):
32+
def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
3133
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.
3234
3335
The output dataset has:
@@ -38,9 +40,13 @@ def pull_nssp_data(socrata_token: str):
3840
Parameters
3941
----------
4042
socrata_token: str
41-
My App Token for pulling the NWSS data (could be the same as the nchs data)
42-
test_file: Optional[str]
43-
When not null, name of file from which to read test data
43+
My App Token for pulling the NSSP data (could be the same as the nchs data)
44+
backup_dir: str
45+
Directory to which to save raw backup data
46+
custom_run: bool
47+
Flag indicating if the current run is a patch. If so, don't save any data to disk
48+
logger: Optional[logging.Logger]
49+
logger object
4450
4551
Returns
4652
-------
@@ -59,6 +65,7 @@ def pull_nssp_data(socrata_token: str):
5965
results.extend(page)
6066
offset += limit
6167
df_ervisits = pd.DataFrame.from_records(results)
68+
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
6269
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
6370
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)
6471

nssp/delphi_nssp/run.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,14 @@ def run_module(params):
7878
log_exceptions=params["common"].get("log_exceptions", True),
7979
)
8080
export_dir = params["common"]["export_dir"]
81+
backup_dir = params["common"]["backup_dir"]
82+
custom_run = params["common"].get("custom_run", False)
8183
socrata_token = params["indicator"]["socrata_token"]
8284

8385
run_stats = []
8486
## build the base version of the signal at the most detailed geo level you can get.
8587
## compute stuff here or farm out to another function or file
86-
df_pull = pull_nssp_data(socrata_token)
88+
df_pull = pull_nssp_data(socrata_token, backup_dir, custom_run=custom_run, logger=logger)
8789
## aggregate
8890
geo_mapper = GeoMapper()
8991
for signal in SIGNALS:

nssp/params.json.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"common": {
33
"export_dir": "./receiving",
4+
"backup_dir": "./raw_data_backups",
45
"log_filename": "./nssp.log",
56
"log_exceptions": false
67
},

nssp/raw_data_backups/.gitignore

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# You should hard commit a prototype for this file, but we
2+
# want to avoid accidental adding of API tokens and other
3+
# private data parameters
4+
params.json
5+
6+
# Do not commit output files
7+
receiving/*.csv
8+
9+
# Remove macOS files
10+
.DS_Store
11+
12+
# virtual environment
13+
dview/
14+
15+
# Byte-compiled / optimized / DLL files
16+
__pycache__/
17+
*.py[cod]
18+
*$py.class
19+
20+
# C extensions
21+
*.so
22+
23+
# Distribution / packaging
24+
coverage.xml
25+
.Python
26+
build/
27+
develop-eggs/
28+
dist/
29+
downloads/
30+
eggs/
31+
.eggs/
32+
lib/
33+
lib64/
34+
parts/
35+
sdist/
36+
var/
37+
wheels/
38+
*.egg-info/
39+
.installed.cfg
40+
*.egg
41+
MANIFEST
42+
43+
# PyInstaller
44+
# Usually these files are written by a python script from a template
45+
# before PyInstaller builds the exe, so as to inject date/other infos into it.
46+
*.manifest
47+
*.spec
48+
49+
# Installer logs
50+
pip-log.txt
51+
pip-delete-this-directory.txt
52+
53+
# Unit test / coverage reports
54+
htmlcov/
55+
.tox/
56+
.coverage
57+
.coverage.*
58+
.cache
59+
nosetests.xml
60+
coverage.xml
61+
*.cover
62+
.hypothesis/
63+
.pytest_cache/
64+
65+
# Translations
66+
*.mo
67+
*.pot
68+
69+
# Django stuff:
70+
*.log
71+
.static_storage/
72+
.media/
73+
local_settings.py
74+
75+
# Flask stuff:
76+
instance/
77+
.webassets-cache
78+
79+
# Scrapy stuff:
80+
.scrapy
81+
82+
# Sphinx documentation
83+
docs/_build/
84+
85+
# PyBuilder
86+
target/
87+
88+
# Jupyter Notebook
89+
.ipynb_checkpoints
90+
91+
# pyenv
92+
.python-version
93+
94+
# celery beat schedule file
95+
celerybeat-schedule
96+
97+
# SageMath parsed files
98+
*.sage.py
99+
100+
# Environments
101+
.env
102+
.venv
103+
env/
104+
venv/
105+
ENV/
106+
env.bak/
107+
venv.bak/
108+
109+
# Spyder project settings
110+
.spyderproject
111+
.spyproject
112+
113+
# Rope project settings
114+
.ropeproject
115+
116+
# mkdocs documentation
117+
/site
118+
119+
# mypy
120+
.mypy_cache/

0 commit comments

Comments
 (0)