-
Notifications
You must be signed in to change notification settings - Fork 16
first draft of splitting NWSS signals #1946
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
2043eaa
38713cb
2c96adc
6f68309
c7e300e
f9b0687
d337b4e
c968604
55a821c
61277b2
402f2ab
e82e9fd
9c546ec
19fe055
89b5793
55d56b3
b5c38ce
eadd03d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,14 +5,7 @@ | |
| import pandas as pd | ||
| from sodapy import Socrata | ||
|
|
||
| from .constants import ( | ||
| SIGNALS, | ||
| METRIC_SIGNALS, | ||
| METRIC_DATES, | ||
| SAMPLE_SITE_NAMES, | ||
| SIG_DIGITS, | ||
| NEWLINE, | ||
| ) | ||
| from .constants import METRIC_SIGNALS, PROVIDER_NORMS, SIG_DIGITS, SIGNALS, TYPE_DICT, TYPE_DICT_METRIC | ||
|
|
||
|
|
||
| def sig_digit_round(value, n_digits): | ||
|
|
@@ -34,47 +27,70 @@ def sig_digit_round(value, n_digits): | |
| return result | ||
|
|
||
|
|
||
| def construct_typedicts(): | ||
| """Create the type conversion dictionary for both dataframes.""" | ||
| # basic type conversion | ||
| type_dict = {key: float for key in SIGNALS} | ||
| type_dict["timestamp"] = "datetime64[ns]" | ||
| # metric type conversion | ||
| signals_dict_metric = {key: float for key in METRIC_SIGNALS} | ||
| metric_dates_dict = {key: "datetime64[ns]" for key in METRIC_DATES} | ||
| type_dict_metric = {**metric_dates_dict, **signals_dict_metric, **SAMPLE_SITE_NAMES} | ||
| return type_dict, type_dict_metric | ||
|
|
||
|
|
||
| def warn_string(df, type_dict): | ||
| """Format the warning string.""" | ||
| return f""" | ||
| def convert_df_type(df, type_dict, logger): | ||
| """Convert types and warn if there are unexpected columns.""" | ||
| try: | ||
| df = df.astype(type_dict) | ||
| except KeyError as exc: | ||
| raise KeyError( | ||
| f""" | ||
| Expected column(s) missed, The dataset schema may | ||
| have changed. Please investigate and amend the code. | ||
|
|
||
| Columns needed: | ||
| {NEWLINE.join(sorted(type_dict.keys()))} | ||
|
|
||
| Columns available: | ||
| {NEWLINE.join(sorted(df.columns))} | ||
| expected={''.join(sorted(type_dict.keys()))} | ||
| received={''.join(sorted(df.columns))} | ||
| """ | ||
| ) from exc | ||
| if new_columns := set(df.columns) - set(type_dict.keys()): | ||
| logger.info("New columns found in NWSS dataset.", new_columns=new_columns) | ||
| return df | ||
|
|
||
|
|
||
| def reformat(df, df_metric): | ||
| """Add columns from df_metric to df, and rename some columns. | ||
dsweber2 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def add_population(df, df_metric): | ||
| """Add the population column from df_metric to df, and rename some columns.""" | ||
| Specifically the population and METRIC_SIGNAL columns, and renames date_start to timestamp. | ||
dsweber2 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
| # drop unused columns from df_metric | ||
| df_population = df_metric.loc[:, ["key_plot_id", "date_start", "population_served"]] | ||
| df_metric_core = df_metric.loc[:, ["key_plot_id", "date_end", "population_served", *METRIC_SIGNALS]] | ||
| # get matching keys | ||
| df_population = df_population.rename(columns={"date_start": "timestamp"}) | ||
| df_population = df_population.set_index(["key_plot_id", "timestamp"]) | ||
| df_metric_core = df_metric_core.rename(columns={"date_end": "timestamp"}) | ||
| df_metric_core = df_metric_core.set_index(["key_plot_id", "timestamp"]) | ||
| df = df.set_index(["key_plot_id", "timestamp"]) | ||
|
|
||
| df = df.join(df_population) | ||
| df = df.join(df_metric_core) | ||
| df = df.reset_index() | ||
| return df | ||
|
|
||
|
|
||
| def pull_nwss_data(socrata_token: str): | ||
| def add_identifier_columns(df): | ||
| """Add identifier columns. | ||
|
|
||
| Add columns to get more detail than key_plot_id gives; | ||
| specifically, state, and `provider_normalization`, which gives the signal identifier | ||
nmdefries marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
| # a pair of alphanumerics surrounded by _ | ||
| df["state"] = df.key_plot_id.str.extract(r"_(\w\w)_") | ||
| # anything followed by state ^ | ||
| df["provider"] = df.key_plot_id.str.extract(r"(.*)_[a-z]{2}_") | ||
nmdefries marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| df["signal_name"] = df.provider + "_" + df.normalization | ||
|
|
||
|
|
||
| def check_endpoints(df): | ||
nmdefries marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Make sure that there aren't any new signals that we need to add.""" | ||
| # compare with existing column name checker | ||
| # also add a note about handling errors | ||
| unique_provider_norms = ( | ||
| df[["provider", "normalization"]] | ||
| .drop_duplicates() | ||
| .sort_values(["provider", "normalization"]) | ||
| .reset_index(drop=True) | ||
| ) | ||
| if not unique_provider_norms.equals(pd.DataFrame(PROVIDER_NORMS)): | ||
| raise ValueError(f"There are new providers and/or norms. They are\n{unique_provider_norms}") | ||
|
|
||
|
|
||
| def pull_nwss_data(token: str, logger): | ||
dsweber2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Pull the latest NWSS Wastewater data, and conforms it into a dataset. | ||
|
|
||
| The output dataset has: | ||
|
|
@@ -95,40 +111,43 @@ def pull_nwss_data(socrata_token: str): | |
| pd.DataFrame | ||
| Dataframe as described above. | ||
| """ | ||
| # concentration key types | ||
| type_dict, type_dict_metric = construct_typedicts() | ||
|
|
||
| # Pull data from Socrata API | ||
| client = Socrata("data.cdc.gov", socrata_token) | ||
| client = Socrata("data.cdc.gov", token) | ||
| results_concentration = client.get("g653-rqe2", limit=10**10) | ||
| results_metric = client.get("2ew6-ywp6", limit=10**10) | ||
nmdefries marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| df_metric = pd.DataFrame.from_records(results_metric) | ||
| df_concentration = pd.DataFrame.from_records(results_concentration) | ||
| df_concentration = df_concentration.rename(columns={"date": "timestamp"}) | ||
|
|
||
| try: | ||
| df_concentration = df_concentration.astype(type_dict) | ||
| except KeyError as exc: | ||
| raise ValueError(warn_string(df_concentration, type_dict)) from exc | ||
| # Schema checks. | ||
| df_concentration = convert_df_type(df_concentration, TYPE_DICT, logger) | ||
| df_metric = convert_df_type(df_metric, TYPE_DICT_METRIC, logger) | ||
|
|
||
| try: | ||
| df_metric = df_metric.astype(type_dict_metric) | ||
| except KeyError as exc: | ||
| raise ValueError(warn_string(df_metric, type_dict_metric)) from exc | ||
| # Drop sites without a normalization scheme. | ||
| df = df_concentration[~df_concentration["normalization"].isna()] | ||
|
|
||
| # pull 2 letter state labels out of the key_plot_id labels | ||
| df_concentration["state"] = df_concentration.key_plot_id.str.extract(r"_(\w\w)_") | ||
| # Pull 2 letter state labels out of the key_plot_id labels. | ||
| add_identifier_columns(df) | ||
dsweber2 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| # move population and metric signals over to df | ||
| df = reformat(df, df_metric) | ||
| # round out some of the numeric noise that comes from smoothing | ||
| df_concentration[SIGNALS[0]] = sig_digit_round( | ||
| df_concentration[SIGNALS[0]], SIG_DIGITS | ||
| ) | ||
| for signal in [*SIGNALS, *METRIC_SIGNALS]: | ||
| df[signal] = sig_digit_round(df[signal], SIG_DIGITS) | ||
|
|
||
| df_concentration = add_population(df_concentration, df_metric) | ||
| # if there are population NA's, assume the previous value is accurate (most | ||
| # likely introduced by dates only present in one and not the other; even | ||
| # otherwise, best to assume some value rather than break the data) | ||
|
||
| df_concentration.population_served = df_concentration.population_served.ffill() | ||
|
|
||
| keep_columns = ["timestamp", "state", "population_served"] | ||
| return df_concentration[SIGNALS + keep_columns] | ||
| df.population_served = df.population_served.ffill() | ||
|
||
| check_endpoints(df) | ||
|
|
||
| keep_columns = [ | ||
| *SIGNALS, | ||
| *METRIC_SIGNALS, | ||
| "timestamp", | ||
| "state", | ||
| "population_served", | ||
| "normalization", | ||
| "provider", | ||
| ] | ||
| return df[keep_columns] | ||
Uh oh!
There was an error while loading. Please reload this page.