Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
1098180
Update check_missing_values
qx-teo Dec 10, 2021
3989131
Fix NA_Values Testing
qx-teo Dec 10, 2021
78af92f
Clarify error messaging
qx-teo Jan 19, 2022
29c84d2
Improved testing
qx-teo Jan 19, 2022
13a2538
Limit check_na_vals to checking window
qx-teo Jan 19, 2022
91d6dec
Prevent filtering on empty df
qx-teo Jan 20, 2022
7bbd515
Fix lint
qx-teo Jan 20, 2022
8e18815
Fix test
qx-teo Jan 20, 2022
66b618b
Merge branch 'main' into 7dav_new_geo_ids
qx-teo Jan 20, 2022
a96d888
Merge branch '7dav_new_geo_ids' of https://github.com/cmu-delphi/covi…
qx-teo Jan 20, 2022
03e1d65
Merge pull request #1503 from cmu-delphi/bot/sync-prod-main
krivard Jan 31, 2022
9ca21f9
Flip order of filename and asset id
krivard Jan 31, 2022
dbea6ad
Merge pull request #1416 from cmu-delphi/7dav_new_geo_ids
krivard Jan 31, 2022
36820c4
initial add prop signals
nmdefries Jan 31, 2022
5b77a1d
tests
nmdefries Feb 1, 2022
8b6c649
add to validation smooth sigs list
nmdefries Feb 1, 2022
68365b3
Document make_signal_name
nmdefries Feb 1, 2022
0fc1743
prop constants to boolean
nmdefries Feb 1, 2022
ccc5ed6
drop duplicate rounding
nmdefries Feb 1, 2022
043e485
add admissions to validator smoothing setting
nmdefries Feb 1, 2022
af1fee0
Merge pull request #1505 from cmu-delphi/ndefries/add-hosp-prop
krivard Feb 2, 2022
ff5557a
Merge pull request #1504 from cmu-delphi/krivard/flip-input-cache-order
krivard Feb 2, 2022
21aebd5
chore: bump delphi_utils to 0.3.0
Feb 2, 2022
806aa43
chore: bump covidcast-indicators to 0.3.0
Feb 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.2.25
current_version = 0.3.0
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
2 changes: 1 addition & 1 deletion _delphi_utils_python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.2.11
current_version = 0.3.0
commit = True
message = chore: bump delphi_utils to {new_version}
tag = False
Expand Down
2 changes: 1 addition & 1 deletion _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
from .nancodes import Nans
from .weekday import Weekday

__version__ = "0.2.11"
__version__ = "0.3.0"
39 changes: 39 additions & 0 deletions _delphi_utils_python/delphi_utils/validator/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def validate(self, all_frames, report):
self.check_max_allowed_max_date(
max_date, geo_type, signal_type, report)

self.check_na_vals(geo_sig_df, geo_type, signal_type, report)

# Get relevant reference data from API dictionary.
api_df_or_error = all_api_df[(geo_type, signal_type)]

Expand Down Expand Up @@ -168,6 +170,43 @@ def validate(self, all_frames, report):
if self.test_mode and kroc == 2:
break

def check_na_vals(self, geo_sig_df, geo_type, signal_type, report):
"""Check if there are any NA values.

In particular, make sure that error doesn't occur for new Geo IDs introduced.

Arguments:
- geo_type: str; geo type name (county, msa, hrr, state) as in the CSV name
- signal_type: str; signal name as in the CSV name
- report: ValidationReport; report where results are added

Returns:
- None
"""
def replace_first_six(df, start_date):
x = df.val.isnull()
# First 6 days have to be null
x.iloc[:6] = False
df = df[x]
return df.time_value[df.time_value >= start_date]

grouped_df = geo_sig_df.groupby('geo_id')
error_df = grouped_df.apply(replace_first_six,
start_date = self.params.time_window.start_date)

if not error_df.empty:
for index, value in error_df.iteritems():
report.add_raised_error(
ValidationFailure("check_val_missing",
geo_type=geo_type,
signal=signal_type,
date=value,
message=f"geo_id {index[0]}"
)
)

report.increment_total_checks()

def check_min_allowed_max_date(self, max_date, geo_type, signal_type, report):
"""Check if time since data was generated is reasonable or too long ago.

Expand Down
8 changes: 0 additions & 8 deletions _delphi_utils_python/delphi_utils/validator/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,6 @@ def check_bad_val(self, df_to_test, nameformat, signal_type, report):

report.increment_total_checks()

if df_to_test['val'].isnull().values.any():
report.add_raised_error(
ValidationFailure("check_val_missing",
filename=nameformat,
message="val column can't have any cell that is NA"))

report.increment_total_checks()

if not df_to_test[(df_to_test['val'] < 0)].empty:
report.add_raised_error(
ValidationFailure("check_val_lt_0",
Expand Down
2 changes: 1 addition & 1 deletion _delphi_utils_python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

setup(
name="delphi_utils",
version="0.2.11",
version="0.3.0",
description="Shared Utility Functions for Indicators",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
22 changes: 22 additions & 0 deletions _delphi_utils_python/tests/validator/test_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ def test_0_vs_many(self):
assert len(report.raised_errors) == 1
assert report.raised_errors[0].check_name == "check_rapid_change_num_rows"

class TestCheckNaVals:
params = {
"common": {
"data_source": "",
"span_length": 14,
"end_date": "2020-09-02"
}
}
def test_missing(self):
validator = DynamicValidator(self.params)
report = ValidationReport([])
data = {"val": [np.nan] * 15, "geo_id": [0,1] * 7 + [2],
"time_value": ["2021-08-30"] * 14 + ["2021-05-01"]}
df = pd.DataFrame(data)
df.time_value = (pd.to_datetime(df.time_value)).dt.date
validator.check_na_vals(df, "geo", "signal", report)

assert len(report.raised_errors) == 2
assert report.raised_errors[0].check_name == "check_val_missing"
assert report.raised_errors[0].message == "geo_id 0"
assert report.raised_errors[1].check_name == "check_val_missing"
assert report.raised_errors[1].message == "geo_id 1"

class TestCheckAvgValDiffs:
params = {
Expand Down
9 changes: 0 additions & 9 deletions _delphi_utils_python/tests/validator/test_static.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,15 +362,6 @@ def test_empty_df(self):

assert len(report.raised_errors) == 0

def test_missing(self):
validator = StaticValidator(self.params)
report = ValidationReport([])
df = pd.DataFrame([np.nan], columns=["val"])
validator.check_bad_val(df, FILENAME, "signal", report)

assert len(report.raised_errors) == 1
assert report.raised_errors[0].check_name == "check_val_missing"

def test_lt_0(self):
validator = StaticValidator(self.params)
report = ValidationReport([])
Expand Down
4 changes: 3 additions & 1 deletion ansible/templates/dsew_community_profile-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
"ref_window_size": 7,
"smoothed_signals": [
"naats_total_7dav",
"naats_positivity_7dav"
"naats_positivity_7dav",
"confirmed_admissions_covid_1d_prop_7dav",
"confirmed_admissions_covid_1d_7dav"
]
}
}
Expand Down
24 changes: 18 additions & 6 deletions dsew_community_profile/delphi_dsew_community_profile/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,34 @@ class Transform:
SIGNALS = {
"total": {
"is_rate" : False,
"api_name": "naats_total_7dav"
"api_name": "naats_total_7dav",
"make_prop": False
},
"positivity": {
"is_rate" : True,
"api_name": "naats_positivity_7dav"
"api_name": "naats_positivity_7dav",
"make_prop": False
},
"confirmed covid-19 admissions": {
"is_rate" : False,
"api_name": "confirmed_admissions_covid_1d_7dav"
"api_name": "confirmed_admissions_covid_1d_7dav",
"make_prop": True,
"api_prop_name": "confirmed_admissions_covid_1d_prop_7dav"
}
}

COUNTS_7D_SIGNALS = {key for key, value in SIGNALS.items() if not value["is_rate"]}

def make_signal_name(key):
"""Convert a signal key to the corresponding signal name for the API."""
def make_signal_name(key, is_prop=False):
"""Convert a signal key to the corresponding signal name for the API.

Note, this function gets called twice with the same `key` for signals that support
population-proportion ("prop") variants.
"""
if is_prop:
return SIGNALS[key]["api_prop_name"]
return SIGNALS[key]["api_name"]

NEWLINE="\n"
NEWLINE = "\n"
IS_PROP = True
NOT_PROP = False
54 changes: 46 additions & 8 deletions dsew_community_profile/delphi_dsew_community_profile/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@

from delphi_utils.geomap import GeoMapper

from .constants import TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE
from .constants import DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING
from .constants import (TRANSFORMS, SIGNALS, COUNTS_7D_SIGNALS, NEWLINE,
IS_PROP, NOT_PROP,
DOWNLOAD_ATTACHMENT, DOWNLOAD_LISTING)

# YYYYMMDD
# example: "Community Profile Report 20211104.xlsx"
Expand Down Expand Up @@ -248,7 +249,7 @@ def _parse_sheet(self, sheet):
if (sheet.level == "msa" or sheet.level == "county") \
and self.publish_date < datetime.date(2021, 1, 8) \
and sig == "confirmed covid-19 admissions":
self.dfs[(sheet.level, sig)] = pd.DataFrame(
self.dfs[(sheet.level, sig, NOT_PROP)] = pd.DataFrame(
columns = ["geo_id", "timestamp", "val", \
"se", "sample_size", "publish_date"]
)
Expand All @@ -258,7 +259,7 @@ def _parse_sheet(self, sheet):
assert len(sig_select) > 0, \
f"No {sig} in any of {select}\n\nAll headers:\n{NEWLINE.join(list(df.columns))}"

self.dfs[(sheet.level, sig)] = pd.concat([
self.dfs[(sheet.level, sig, NOT_PROP)] = pd.concat([
pd.DataFrame({
"geo_id": sheet.geo_id_select(df).apply(sheet.geo_id_apply),
"timestamp": pd.to_datetime(self.times[si[0]][sig]),
Expand All @@ -271,14 +272,18 @@ def _parse_sheet(self, sheet):
])

for sig in COUNTS_7D_SIGNALS:
self.dfs[(sheet.level, sig)]["val"] /= 7 # 7-day total -> 7-day average
self.dfs[(sheet.level, sig, NOT_PROP)]["val"] /= 7 # 7-day total -> 7-day average


def as_cached_filename(params, config):
"""Formulate a filename to uniquely identify this report in the input cache."""
# eg "Community Profile Report 20220128.xlsx"
# but delimiters vary; don't get tripped up if they do something wacky like
# Community.Profile.Report.20220128.xlsx
name, _, ext = config['filename'].rpartition(".")
return os.path.join(
params['indicator']['input_cache'],
f"{config['assetId']}--{config['filename']}"
f"{name}--{config['assetId']}.{ext}"
)

def fetch_listing(params):
Expand Down Expand Up @@ -390,13 +395,46 @@ def fetch_new_reports(params, logger=None):
# add nation from state
geomapper = GeoMapper()
for sig in SIGNALS:
state_key = ("state", sig)
state_key = ("state", sig, NOT_PROP)
if state_key not in ret:
continue
ret[("nation", sig)] = nation_from_state(
ret[("nation", sig, NOT_PROP)] = nation_from_state(
ret[state_key].rename(columns={"geo_id": "state_id"}),
sig,
geomapper
)

for key, df in ret.copy().items():
(geo, sig, _) = key
if SIGNALS[sig]["make_prop"]:
ret[(geo, sig, IS_PROP)] = generate_prop_signal(df, geo, geomapper)

return ret

def generate_prop_signal(df, geo, geo_mapper):
"""Transform base df into a proportion (per 100k population)."""
if geo == "state":
geo = "state_id"
if geo == "county":
geo = "fips"

# Add population data
if geo == "msa":
map_df = geo_mapper.get_crosswalk("fips", geo)
map_df = geo_mapper.add_population_column(
map_df, "fips"
).drop(
"fips", axis=1
).groupby(
geo
).sum(
).reset_index(
)
df = pd.merge(df, map_df, left_on="geo_id", right_on=geo, how="inner")
else:
df = geo_mapper.add_population_column(df, geo, geocode_col="geo_id")

df["val"] = df["val"] / df["population"] * 100000
df.drop(["population", geo], axis=1, inplace=True)

return df
4 changes: 2 additions & 2 deletions dsew_community_profile/delphi_dsew_community_profile/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ def replace_date_param(p):
run_stats = []
dfs = fetch_new_reports(params, logger)
for key, df in dfs.items():
(geo, sig) = key
(geo, sig, is_prop) = key
if sig not in params["indicator"]["export_signals"]:
continue
dates = create_export_csv(
df,
params['common']['export_dir'],
geo,
make_signal_name(sig),
make_signal_name(sig, is_prop),
**export_params
)
if len(dates)>0:
Expand Down
4 changes: 3 additions & 1 deletion dsew_community_profile/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
"ref_window_size": 7,
"smoothed_signals": [
"naats_total_7dav",
"naats_positivity_7dav"
"naats_positivity_7dav",
"confirmed_admissions_covid_1d_prop_7dav",
"confirmed_admissions_covid_1d_7dav"
]
}
}
Expand Down
4 changes: 3 additions & 1 deletion dsew_community_profile/tests/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
"ref_window_size": 7,
"smoothed_signals": [
"naats_total_7dav",
"naats_positivity_7dav"
"naats_positivity_7dav",
"confirmed_admissions_covid_1d_prop_7dav",
"confirmed_admissions_covid_1d_7dav"
]
}
}
Expand Down
Loading