99 SIGNALS ,
1010 PROVIDER_NORMS ,
1111 METRIC_SIGNALS ,
12- METRIC_DATES ,
13- SAMPLE_SITE_NAMES ,
1412 SIG_DIGITS ,
15- NEWLINE ,
13+ TYPE_DICT ,
14+ TYPE_DICT_METRIC ,
1615)
1716
1817
@@ -35,34 +34,29 @@ def sig_digit_round(value, n_digits):
3534 return result
3635
3736
38- def construct_typedicts ():
39- """Create the type conversion dictionary for both dataframes."""
40- # basic type conversion
41- type_dict = {key : float for key in SIGNALS }
42- type_dict ["timestamp" ] = "datetime64[ns]"
43- # metric type conversion
44- signals_dict_metric = {key : float for key in METRIC_SIGNALS }
45- metric_dates_dict = {key : "datetime64[ns]" for key in METRIC_DATES }
46- type_dict_metric = {** metric_dates_dict , ** signals_dict_metric , ** SAMPLE_SITE_NAMES }
47- return type_dict , type_dict_metric
48-
49-
50- def warn_string (df , type_dict ):
51- """Format the warning string."""
52- return f"""
37+ def convert_df_type (df , type_dict , logger ):
38+ """Convert types and warn if there are unexpected columns."""
39+ try :
40+ df = df .astype (type_dict )
41+ except KeyError as exc :
42+ newline = "\n "
43+ raise KeyError (
44+ f"""
5345Expected column(s) missed, The dataset schema may
5446have changed. Please investigate and amend the code.
5547
56- Columns needed:
57- { NEWLINE .join (sorted (type_dict .keys ()))}
48+ expected={ newline .join (sorted (type_dict .keys ()))}
5849
59- Columns available:
60- { NEWLINE .join (sorted (df .columns ))}
50+ received={ newline .join (sorted (df .columns ))}
6151"""
52+ ) from exc
53+ if new_columns := set (df .columns ) - set (type_dict .keys ()):
54+ logger .info ("New columns found in NWSS dataset." , new_columns = new_columns )
55+ return df
6256
6357
6458def reformat (df , df_metric ):
65- """Add columns from df_metric to df, and rename some columns.
59+ """Add columns from df_metric to df, and rename some columns.
6660
6761 Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp.
6862 """
@@ -80,27 +74,16 @@ def reformat(df, df_metric):
8074 return df
8175
8276
83- def drop_unnormalized (df ):
84- """Drop unnormalized.
85-
86- mutate `df` to no longer have rows where the normalization scheme isn't actually identified,
87- as we can't classify the kind of signal
88- """
89- return df [~ df ["normalization" ].isna ()]
90-
91-
9277def add_identifier_columns (df ):
9378 """Add identifier columns.
9479
9580 Add columns to get more detail than key_plot_id gives;
9681 specifically, state, and `provider_normalization`, which gives the signal identifier
9782 """
98- df ["state" ] = df .key_plot_id .str .extract (
99- r"_(\w\w)_"
100- ) # a pair of alphanumerics surrounded by _
101- df ["provider" ] = df .key_plot_id .str .extract (
102- r"(.*)_[a-z]{2}_"
103- ) # anything followed by state ^
83+ # a pair of alphanumerics surrounded by _
84+ df ["state" ] = df .key_plot_id .str .extract (r"_(\w\w)_" )
85+ # anything followed by state ^
86+ df ["provider" ] = df .key_plot_id .str .extract (r"(.*)_[a-z]{2}_" )
10487 df ["signal_name" ] = df .provider + "_" + df .normalization
10588
10689
@@ -120,7 +103,7 @@ def check_endpoints(df):
120103 )
121104
122105
123- def pull_nwss_data (token : str ):
106+ def pull_nwss_data (token : str , logger ):
124107 """Pull the latest NWSS Wastewater data, and conforms it into a dataset.
125108
126109 The output dataset has:
@@ -141,11 +124,6 @@ def pull_nwss_data(token: str):
141124 pd.DataFrame
142125 Dataframe as described above.
143126 """
144- # Constants
145- keep_columns = [* SIGNALS , * METRIC_SIGNALS ]
146- # concentration key types
147- type_dict , type_dict_metric = construct_typedicts ()
148-
149127 # Pull data from Socrata API
150128 client = Socrata ("data.cdc.gov" , token )
151129 results_concentration = client .get ("g653-rqe2" , limit = 10 ** 10 )
@@ -154,19 +132,14 @@ def pull_nwss_data(token: str):
154132 df_concentration = pd .DataFrame .from_records (results_concentration )
155133 df_concentration = df_concentration .rename (columns = {"date" : "timestamp" })
156134
157- try :
158- df_concentration = df_concentration .astype (type_dict )
159- except KeyError as exc :
160- raise ValueError (warn_string (df_concentration , type_dict )) from exc
135+ # Schema checks.
136+ df_concentration = convert_df_type (df_concentration , TYPE_DICT , logger )
137+ df_metric = convert_df_type (df_metric , TYPE_DICT_METRIC , logger )
161138
162- try :
163- df_metric = df_metric .astype (type_dict_metric )
164- except KeyError as exc :
165- raise ValueError (warn_string (df_metric , type_dict_metric )) from exc
139+ # Drop sites without a normalization scheme.
140+ df = df_concentration [~ df_concentration ["normalization" ].isna ()]
166141
167- # if the normalization scheme isn't recorded, why is it even included as a sample site?
168- df = drop_unnormalized (df_concentration )
169- # pull 2 letter state labels out of the key_plot_id labels
142+ # Pull 2 letter state labels out of the key_plot_id labels.
170143 add_identifier_columns (df )
171144
172145 # move population and metric signals over to df
@@ -180,13 +153,14 @@ def pull_nwss_data(token: str):
180153 # otherwise, best to assume some value rather than break the data)
181154 df .population_served = df .population_served .ffill ()
182155 check_endpoints (df )
183- keep_columns .extend (
184- [
185- "timestamp" ,
186- "state" ,
187- "population_served" ,
188- "normalization" ,
189- "provider" ,
190- ]
191- )
156+
157+ keep_columns = [
158+ * SIGNALS ,
159+ * METRIC_SIGNALS ,
160+ "timestamp" ,
161+ "state" ,
162+ "population_served" ,
163+ "normalization" ,
164+ "provider" ,
165+ ]
192166 return df [keep_columns ]
0 commit comments