Skip to content

Commit 7e2229a

Browse files
committed
base changes
1 parent 57368e9 commit 7e2229a

File tree

3 files changed

+127
-12
lines changed

3 files changed

+127
-12
lines changed

nssp/delphi_nssp/constants.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,29 @@
4141
"fips": str,
4242
}
4343
)
44+
45+
SECONDARY_COLS_MAP = {
46+
"week_end": "timestamp",
47+
"geography": "geo_value",
48+
"percent_visits": "val",
49+
"pathogen": "signal",
50+
}
51+
52+
SECONDARY_SIGNALS_MAP = {
53+
"COVID-19": "pct_ed_visits_covid_secondary",
54+
"INFLUENZA": "pct_ed_visits_influenza_secondary",
55+
"RSV": "pct_ed_visits_rsv_secondary",
56+
"Combined": "pct_ed_visits_combined_secondary",
57+
}
58+
59+
SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()]
60+
SECONDARY_GEOS = ["state","nation","hhs"]
61+
62+
SECONDARY_TYPE_DICT = {
63+
"timestamp": "datetime64[ns]",
64+
"geo_value": str,
65+
"val": float,
66+
"geo_type": str,
67+
"signal": str,
68+
}
69+
SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()]

nssp/delphi_nssp/pull.py

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import pandas as pd
77
from sodapy import Socrata
88

9-
from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
9+
from .constants import *
1010

1111

1212
def warn_string(df, type_dict):
@@ -28,19 +28,13 @@ def warn_string(df, type_dict):
2828

2929

3030
def pull_nssp_data(socrata_token: str):
31-
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.
32-
33-
The output dataset has:
34-
35-
- Each row corresponds to a single observation
36-
- Each row additionally has columns for the signals in SIGNALS
31+
"""Pull the latest NSSP ER visits primary dataset
32+
https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
3733
3834
Parameters
3935
----------
4036
socrata_token: str
41-
My App Token for pulling the NWSS data (could be the same as the nchs data)
42-
test_file: Optional[str]
43-
When not null, name of file from which to read test data
37+
My App Token for pulling the NSSP data (could be the same as the nchs data)
4438
4539
Returns
4640
-------
@@ -72,3 +66,57 @@ def pull_nssp_data(socrata_token: str):
7266

7367
keep_columns = ["timestamp", "geography", "county", "fips"]
7468
return df_ervisits[SIGNALS + keep_columns]
69+
70+
71+
def secondary_pull_nssp_data(socrata_token: str):
72+
"""Pull the latest NSSP ER visits secondary dataset:
73+
https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview
74+
75+
The output dataset has:
76+
77+
- Each row corresponds to a single observation
78+
79+
Parameters
80+
----------
81+
socrata_token: str
82+
My App Token for pulling the NSSP data (could be the same as the nchs data)
83+
84+
Returns
85+
-------
86+
pd.DataFrame
87+
Dataframe as described above.
88+
"""
89+
# Pull data from Socrata API
90+
client = Socrata("data.cdc.gov", socrata_token)
91+
results = []
92+
offset = 0
93+
limit = 50000 # maximum limit allowed by SODA 2.0
94+
while True:
95+
page = client.get("7mra-9cq9", limit=limit, offset=offset)
96+
if not page:
97+
break # exit the loop if no more results
98+
results.extend(page)
99+
offset += limit
100+
df_ervisits = pd.DataFrame.from_records(results)
101+
df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP)
102+
103+
# geo_type is not provided in the dataset, so we infer it from the geo_value
104+
# which is either state names, "National" or hhs region numbers
105+
df_ervisits['geo_type'] = 'state'
106+
107+
df_ervisits.loc[df_ervisits['geo_value'] == 'National', 'geo_type'] = 'nation'
108+
109+
hhs_region_mask = df_ervisits['geo_value'].str.startswith('Region ')
110+
df_ervisits.loc[hhs_region_mask, 'geo_value'] = df_ervisits.loc[hhs_region_mask, 'geo_value'].str.replace('Region ', '')
111+
df_ervisits.loc[hhs_region_mask, 'geo_type'] = 'hhs'
112+
113+
df_ervisits['signal'] = df_ervisits['signal'].map(SECONDARY_SIGNALS_MAP)
114+
115+
df_ervisits = df_ervisits[SECONDARY_KEEP_COLS]
116+
117+
try:
118+
df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT)
119+
except KeyError as exc:
120+
raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc
121+
122+
return df_ervisits

nssp/delphi_nssp/run.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
from delphi_utils.geomap import GeoMapper
3232
from delphi_utils.nancodes import add_default_nancodes
3333

34-
from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS
35-
from .pull import pull_nssp_data
34+
from .constants import *
35+
from .pull import pull_nssp_data, secondary_pull_nssp_data
3636

3737

3838
def add_needed_columns(df, col_names=None):
@@ -81,6 +81,7 @@ def run_module(params):
8181
socrata_token = params["indicator"]["socrata_token"]
8282

8383
run_stats = []
84+
8485
## build the base version of the signal at the most detailed geo level you can get.
8586
## compute stuff here or farm out to another function or file
8687
df_pull = pull_nssp_data(socrata_token)
@@ -137,5 +138,45 @@ def run_module(params):
137138
if len(dates) > 0:
138139
run_stats.append((max(dates), len(dates)))
139140

141+
secondary_df_pull = secondary_pull_nssp_data(socrata_token)
142+
## aggregate
143+
geo_mapper = GeoMapper()
144+
for signal in SECONDARY_SIGNALS:
145+
for geo in SECONDARY_GEOS:
146+
df = secondary_df_pull.copy()
147+
logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal)
148+
if geo == "state":
149+
df = df[(df["geo_type"] == "state")]
150+
df["geo_id"] = df["geo_value"].apply(
151+
lambda x: (
152+
us.states.lookup(x).abbr.lower() if us.states.lookup(x)
153+
else ("dc" if x == "District of Columbia" else x)
154+
)
155+
)
156+
unexpected_state_names = df[df["geo_id"] == df["geo_value"]]
157+
if unexpected_state_names.shape[0] > 0:
158+
logger.error("Unexpected state names", df=unexpected_state_names)
159+
exit(1)
160+
elif geo == "nation":
161+
df = df[(df["geo_type"] == "nation")]
162+
df["geo_id"] = "us"
163+
elif geo == "hhs":
164+
df = df[(df["geo_type"] == "hhs")]
165+
df["geo_id"] = df["geo_type"]
166+
# add se, sample_size, and na codes
167+
missing_cols = set(CSV_COLS) - set(df.columns)
168+
df = add_needed_columns(df, col_names=list(missing_cols))
169+
df_csv = df[CSV_COLS + ["timestamp"]]
170+
# actual export
171+
dates = create_export_csv(
172+
df_csv,
173+
geo_res=geo,
174+
export_dir=export_dir,
175+
sensor=signal,
176+
weekly_dates=True,
177+
)
178+
if len(dates) > 0:
179+
run_stats.append((max(dates), len(dates)))
180+
140181
## log this indicator run
141182
logging(start_time, run_stats, logger)

0 commit comments

Comments
 (0)