From 1275a0e0f3a6212985b2c3c95406554b6befe98e Mon Sep 17 00:00:00 2001 From: andrew Date: Tue, 3 Nov 2020 16:08:27 -0800 Subject: [PATCH] Move agg funcs to new file --- .../covidcast-py/covidcast/__init__.py | 3 +- .../covidcast-py/covidcast/covidcast.py | 95 ---------------- .../covidcast-py/covidcast/wrangle.py | 100 +++++++++++++++++ .../tests/covidcast/test_covidcast.py | 95 ---------------- .../tests/covidcast/test_wrangle.py | 102 ++++++++++++++++++ 5 files changed, 204 insertions(+), 191 deletions(-) create mode 100644 Python-packages/covidcast-py/covidcast/wrangle.py create mode 100644 Python-packages/covidcast-py/tests/covidcast/test_wrangle.py diff --git a/Python-packages/covidcast-py/covidcast/__init__.py b/Python-packages/covidcast-py/covidcast/__init__.py index 7feb2397..93869b76 100644 --- a/Python-packages/covidcast-py/covidcast/__init__.py +++ b/Python-packages/covidcast-py/covidcast/__init__.py @@ -12,7 +12,8 @@ """ -from .covidcast import signal, metadata, aggregate_signals +from .covidcast import signal, metadata +from .wrangle import aggregate_signals from .plotting import plot, plot_choropleth, get_geo_df, animate from .geography import (fips_to_name, cbsa_to_name, abbr_to_name, name_to_abbr, name_to_cbsa, name_to_fips, diff --git a/Python-packages/covidcast-py/covidcast/covidcast.py b/Python-packages/covidcast-py/covidcast/covidcast.py index 5fcbfe52..9a9df7d8 100644 --- a/Python-packages/covidcast-py/covidcast/covidcast.py +++ b/Python-packages/covidcast-py/covidcast/covidcast.py @@ -1,7 +1,6 @@ """This is the client side library for accessing the COVIDcast API.""" import warnings from datetime import timedelta, date -from functools import reduce from typing import Union, Iterable, Tuple, List import pandas as pd @@ -272,100 +271,6 @@ def metadata() -> pd.DataFrame: return meta_df -def aggregate_signals(signals: list, - dt: list = None, - join_type: str = "outer", - output_format:str = "wide") -> pd.DataFrame: - """Given a list of DataFrames, [optionally] lag each one and join them into one DataFrame. - - This method takes a list of DataFrames containing signal information for - geographic regions across time, and outputs a single DataFrame of the signals aggregated - with lags applied to signals if specified. The input DataFrames must all be of the same - geography type. - - If ``output_format = 'wide'``, a DataFrame with a column - for each signal value for each region/time. The ``data_source``, - ``signal``, and index of each DataFrame in ``signals`` are appended to the - front of each output column name separated by underscores (e.g. - ``source_signal_0_inputcolumn``), and the original data_source and signal - columns will be dropped. A single ``geo_type`` column will be returned in the final - DataFrame. - - If ``output_format = 'wide'``, all input DataFrames must have the same columns, - and the output will be the concatenation of all the lagged DataFrames. - - Each signal's time value can be shifted for analysis on lagged signals using the ``dt`` - argument, which takes a list of integer days to lag each signal's date. Lagging a signal by +1 - day means that all the dates get shifted forward by 1 day (e.g. Jan 1 becomes Jan 2). - - :param signals: List of DataFrames to join. - :param dt: List of lags in days for each of the input DataFrames in ``signals``. - Defaults to ``None``. When provided, must be the same length as ``signals``. - :param join_type: Type of join to be done between the DataFrames in ``signals``. - Defaults to ``"outer"``, so the output DataFrame contains all region/time - combinations at which at least one signal was observed. - Only applies if ``output_format='wide'`` - :param output_format: ``'wide'`` or ``'long'``. If ``wide``, a dataframe with a column - per signal is returned. If ``long``, all signals are concatenated into one dataframe with - a single column for the signal value. - :return: DataFrame of aggregated signals. - - """ - if dt is not None and len(dt) != len(signals): - raise ValueError("Length of `dt` must be same as length of `signals`") - if output_format not in ["long", "wide"]: - raise ValueError("`output_format` must be either 'long' or 'wide'") - - dt = [0] * len(signals) if not dt else dt - first_geo_type = _detect_metadata(signals[0])[2] - dt_dfs = [] - for df, lag in zip(signals, dt): - source, sig_type, geo_type = _detect_metadata(df) - if geo_type != first_geo_type: - raise ValueError("Multiple geo_types detected. " - "All signals must have the same geo_type to be aggregated.") - df_c = df.copy() # make a copy so we don't modify originals - df_c["time_value"] = [day + timedelta(lag) for day in df_c["time_value"]] # lag dates - dt_dfs.append((df_c, source, sig_type, geo_type)) - return _agg_wide(dt_dfs, join_type) if output_format == "wide" else _agg_long(dt_dfs) - - -def _agg_wide(processed_signals: list, - join_type: str = "outer") -> pd.DataFrame: - """Join together a list of signal DataFrames, renaming columns to prevent collisions. - - :param processed_signals: List of df and metadata tuples to join together. - :param join_type: Type of join to conduct between all the DataFrames. - :return: A single DataFrames which is the join of the input DataFrames. - """ - join_cols = ["time_value", "geo_value"] - for i, (df, source, sig_type, _) in enumerate(processed_signals): - # drop and rename columns so the joined doesn't have duplicate and/or redundant columns. - df.drop(["signal", "data_source", "geo_type"], axis=1, inplace=True) - df.rename( - columns={j: f"{source}_{sig_type}_{i}_{j}" for j in df.columns if j not in join_cols}, - inplace=True) - dfs_to_join = [df for df, *_ in processed_signals] - joined_df = reduce(lambda x, y: pd.merge(x, y, on=join_cols, how=join_type, sort=True), - dfs_to_join) - joined_df["geo_type"] = processed_signals[0][-1] # use geotype of first df - return joined_df - - -def _agg_long(processed_signals: list) -> pd.DataFrame: - """Concatenate a list of signal DataFrames with identical columns. - - :param processed_signals: List of DataFrame and metadata tuples to concatenate together. - :return: Single DataFrames of all input signals concatenated - """ - first_columns = processed_signals[0][0].columns - for df, *_ in processed_signals: - if any(df.columns != first_columns): - raise ValueError("Inconsistent columns detected. All columns must be the same to use" - "'long' output.") - return pd.concat([df for df, *_ in processed_signals]).reset_index(drop=True) - - def _detect_metadata(data: pd.DataFrame, data_source_col: str = "data_source", signal_col: str = "signal", diff --git a/Python-packages/covidcast-py/covidcast/wrangle.py b/Python-packages/covidcast-py/covidcast/wrangle.py new file mode 100644 index 00000000..315ea5c9 --- /dev/null +++ b/Python-packages/covidcast-py/covidcast/wrangle.py @@ -0,0 +1,100 @@ +"""Functions for manipulating signal DataFrames.""" +from datetime import timedelta +from functools import reduce + +import pandas as pd + +from .covidcast import _detect_metadata + + +def aggregate_signals(signals: list, + dt: list = None, + join_type: str = "outer", + output_format: str = "wide") -> pd.DataFrame: + """Given a list of DataFrames, [optionally] lag each one and join them into one DataFrame. + + This method takes a list of DataFrames containing signal information for + geographic regions across time, and outputs a single DataFrame of the signals aggregated + with lags applied to signals if specified. The input DataFrames must all be of the same + geography type. + + If ``output_format = 'wide'``, a DataFrame with a column + for each signal value for each region/time. The ``data_source``, + ``signal``, and index of each DataFrame in ``signals`` are appended to the + front of each output column name separated by underscores (e.g. + ``source_signal_0_inputcolumn``), and the original data_source and signal + columns will be dropped. A single ``geo_type`` column will be returned in the final + DataFrame. + + If ``output_format = 'wide'``, all input DataFrames must have the same columns, + and the output will be the concatenation of all the lagged DataFrames. + + Each signal's time value can be shifted for analysis on lagged signals using the ``dt`` + argument, which takes a list of integer days to lag each signal's date. Lagging a signal by +1 + day means that all the dates get shifted forward by 1 day (e.g. Jan 1 becomes Jan 2). + + :param signals: List of DataFrames to join. + :param dt: List of lags in days for each of the input DataFrames in ``signals``. + Defaults to ``None``. When provided, must be the same length as ``signals``. + :param join_type: Type of join to be done between the DataFrames in ``signals``. + Defaults to ``"outer"``, so the output DataFrame contains all region/time + combinations at which at least one signal was observed. + Only applies if ``output_format='wide'`` + :param output_format: ``'wide'`` or ``'long'``. If ``wide``, a dataframe with a column + per signal is returned. If ``long``, all signals are concatenated into one dataframe with + a single column for the signal value. + :return: DataFrame of aggregated signals. + + """ + if dt is not None and len(dt) != len(signals): + raise ValueError("Length of `dt` must be same as length of `signals`") + if output_format not in ["long", "wide"]: + raise ValueError("`output_format` must be either 'long' or 'wide'") + + dt = [0] * len(signals) if not dt else dt + first_geo_type = _detect_metadata(signals[0])[2] + dt_dfs = [] + for df, lag in zip(signals, dt): + source, sig_type, geo_type = _detect_metadata(df) + if geo_type != first_geo_type: + raise ValueError("Multiple geo_types detected. " + "All signals must have the same geo_type to be aggregated.") + df_c = df.copy() # make a copy so we don't modify originals + df_c["time_value"] = [day + timedelta(lag) for day in df_c["time_value"]] # lag dates + dt_dfs.append((df_c, source, sig_type, geo_type)) + return _agg_wide(dt_dfs, join_type) if output_format == "wide" else _agg_long(dt_dfs) + + +def _agg_wide(processed_signals: list, + join_type: str = "outer") -> pd.DataFrame: + """Join together a list of signal DataFrames, renaming columns to prevent collisions. + + :param processed_signals: List of df and metadata tuples to join together. + :param join_type: Type of join to conduct between all the DataFrames. + :return: A single DataFrames which is the join of the input DataFrames. + """ + join_cols = ["time_value", "geo_value"] + for i, (df, source, sig_type, _) in enumerate(processed_signals): + # drop and rename columns so the joined doesn't have duplicate and/or redundant columns. + df.drop(["signal", "data_source", "geo_type"], axis=1, inplace=True) + df.rename( + columns={j: f"{source}_{sig_type}_{i}_{j}" for j in df.columns if j not in join_cols}, + inplace=True) + dfs_to_join = [df for df, *_ in processed_signals] + joined_df = reduce(lambda x, y: x.merge(y, on=join_cols, how=join_type, sort=True), dfs_to_join) + joined_df["geo_type"] = processed_signals[0][-1] # use geotype of first df + return joined_df + + +def _agg_long(processed_signals: list) -> pd.DataFrame: + """Concatenate a list of signal DataFrames with identical columns. + + :param processed_signals: List of DataFrame and metadata tuples to concatenate together. + :return: Single DataFrames of all input signals concatenated + """ + first_columns = processed_signals[0][0].columns + for df, *_ in processed_signals: + if any(df.columns != first_columns): + raise ValueError("Inconsistent columns detected. All columns must be the same to use" + "'long' output.") + return pd.concat([df for df, *_ in processed_signals]).reset_index(drop=True) diff --git a/Python-packages/covidcast-py/tests/covidcast/test_covidcast.py b/Python-packages/covidcast-py/tests/covidcast/test_covidcast.py index 8f493898..a7adda6d 100644 --- a/Python-packages/covidcast-py/tests/covidcast/test_covidcast.py +++ b/Python-packages/covidcast-py/tests/covidcast/test_covidcast.py @@ -99,101 +99,6 @@ def test_metadata(mock_covidcast_meta): covidcast.metadata() -def test_aggregate_signals(): - test_input1 = pd.DataFrame( - {"geo_value": ["a", "b", "c", "a"], - "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)], - "value": [2, 4, 6, 8], - "signal": ["i", "i", "i", "i"], - "geo_type": ["state", "state", "state", "state"], - "data_source": ["x", "x", "x", "x"]}) - test_input2 = pd.DataFrame( - {"geo_value": ["a", "b", "c", "d"], - "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1)], - "value": [1, 3, 5, 7], - "signal": ["j", "j", "j", "j"], - "geo_type": ["state", "state", "state", "state"], - "data_source": ["y", "y", "y", "y"], - "extra_col": ["0", "0", "0", "0"]}) - test_input3 = pd.DataFrame( - {"geo_value": ["b", "c", "d", "b"], - "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)], - "value": [0.5, 1.5, 2.5, 3.5], - "signal": ["k", "k", "k", "k"], - "geo_type": ["state", "state", "state", "state"], - "data_source": ["z", "z", "z", "z"]}) - # test 3 signals from 3 sources with outer join - expected1 = pd.DataFrame( - {"geo_value": ["a", "b", "c", "d", "a", "b", "c", "d", "b"], - "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), - date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), - date(2020, 1, 3)], - "x_i_0_value": [2, 4, 6, np.nan, 8, np.nan, np.nan, np.nan, np.nan], - "y_j_1_value": [1, 3, 5, 7, np.nan, np.nan, np.nan, np.nan, np.nan], - "y_j_1_extra_col": ["0", "0", "0", "0", np.nan, np.nan, np.nan, np.nan, np.nan], - "z_k_2_value": [np.nan, np.nan, np.nan, np.nan, np.nan, 0.5, 1.5, 2.5, 3.5], - "geo_type": ["state"]*9}) - assert covidcast.aggregate_signals( - [test_input1, test_input2, test_input3], dt=[0, 0, 1]).equals(expected1) - - # test 3 signals from 3 sources with inner join has no intersection - assert covidcast.aggregate_signals( - [test_input1, test_input3], dt=[0, 1], join_type="inner").empty - - # test 2 signals from same source (one lagged) with inner join - expected2 = pd.DataFrame( - {"geo_value": ["a"], - "time_value": [date(2020, 1, 2)], - "x_i_0_value": [8], - "x_i_1_value": [2], - "geo_type": ["state"]}) - assert covidcast.aggregate_signals( - [test_input1, test_input1], dt=[0, 1], join_type="inner").equals(expected2) - - # test same signal twice with a lag - expected3 = pd.DataFrame( - {"geo_value": ["a", "b", "c", "a", "b", "c", "a"], - "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2), - date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 3)], - "x_i_0_value": [2, 4, 6, 8, np.nan, np.nan, np.nan], - "x_i_1_value": [np.nan, np.nan, np.nan, 2, 4, 6, 8], - "geo_type": ["state"]*7}) - - assert covidcast.aggregate_signals([test_input1, test_input1], dt=[0, 1]).equals(expected3) - - # test long output - expected4 = pd.DataFrame( - {"geo_value": ["a", "b", "c", "a"]*2, - "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2), - date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 3)], - "value": [2, 4, 6, 8]*2, - "signal": ["i", "i", "i", "i"]*2, - "geo_type": ["state", "state", "state", "state"]*2, - "data_source": ["x", "x", "x", "x"]*2}) - - assert covidcast.aggregate_signals([test_input1, test_input1], - dt=[0, 1], - output_format="long").equals(expected4) - # test long output with different column names - with pytest.raises(ValueError): - covidcast.aggregate_signals([test_input1, test_input2], output_format="long") - - # test invalid lag length - with pytest.raises(ValueError): - covidcast.aggregate_signals([test_input1, test_input1], dt=[0]) - - # test mixed geo_types - test_input4 = pd.DataFrame( - {"geo_value": ["b", "c", "d", "b"], - "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)], - "value": [0.5, 1.5, 2.5, 3.5], - "signal": ["k", "k", "k", "k"], - "geo_type": ["county", "county", "county", "county"], - "data_source": ["z", "z", "z", "z"]}) - with pytest.raises(ValueError): - covidcast.aggregate_signals([test_input1, test_input4]) - - def test__detect_metadata(): test_input = pd.DataFrame( {"data_source": ["a", "a"], "signal": ["b", "b"], "geo_type": ["c", "c"]}) diff --git a/Python-packages/covidcast-py/tests/covidcast/test_wrangle.py b/Python-packages/covidcast-py/tests/covidcast/test_wrangle.py new file mode 100644 index 00000000..1ef1bb7f --- /dev/null +++ b/Python-packages/covidcast-py/tests/covidcast/test_wrangle.py @@ -0,0 +1,102 @@ +import pytest +from datetime import date + +import pandas as pd +import numpy as np + +from covidcast import wrangle + + +def test_aggregate_signals(): + test_input1 = pd.DataFrame( + {"geo_value": ["a", "b", "c", "a"], + "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)], + "value": [2, 4, 6, 8], + "signal": ["i", "i", "i", "i"], + "geo_type": ["state", "state", "state", "state"], + "data_source": ["x", "x", "x", "x"]}) + test_input2 = pd.DataFrame( + {"geo_value": ["a", "b", "c", "d"], + "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1)], + "value": [1, 3, 5, 7], + "signal": ["j", "j", "j", "j"], + "geo_type": ["state", "state", "state", "state"], + "data_source": ["y", "y", "y", "y"], + "extra_col": ["0", "0", "0", "0"]}) + test_input3 = pd.DataFrame( + {"geo_value": ["b", "c", "d", "b"], + "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)], + "value": [0.5, 1.5, 2.5, 3.5], + "signal": ["k", "k", "k", "k"], + "geo_type": ["state", "state", "state", "state"], + "data_source": ["z", "z", "z", "z"]}) + # test 3 signals from 3 sources with outer join + expected1 = pd.DataFrame( + {"geo_value": ["a", "b", "c", "d", "a", "b", "c", "d", "b"], + "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), + date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), + date(2020, 1, 3)], + "x_i_0_value": [2, 4, 6, np.nan, 8, np.nan, np.nan, np.nan, np.nan], + "y_j_1_value": [1, 3, 5, 7, np.nan, np.nan, np.nan, np.nan, np.nan], + "y_j_1_extra_col": ["0", "0", "0", "0", np.nan, np.nan, np.nan, np.nan, np.nan], + "z_k_2_value": [np.nan, np.nan, np.nan, np.nan, np.nan, 0.5, 1.5, 2.5, 3.5], + "geo_type": ["state"]*9}) + assert wrangle.aggregate_signals( + [test_input1, test_input2, test_input3], dt=[0, 0, 1]).equals(expected1) + + # test 3 signals from 3 sources with inner join has no intersection + assert wrangle.aggregate_signals( + [test_input1, test_input3], dt=[0, 1], join_type="inner").empty + + # test 2 signals from same source (one lagged) with inner join + expected2 = pd.DataFrame( + {"geo_value": ["a"], + "time_value": [date(2020, 1, 2)], + "x_i_0_value": [8], + "x_i_1_value": [2], + "geo_type": ["state"]}) + assert wrangle.aggregate_signals( + [test_input1, test_input1], dt=[0, 1], join_type="inner").equals(expected2) + + # test same signal twice with a lag + expected3 = pd.DataFrame( + {"geo_value": ["a", "b", "c", "a", "b", "c", "a"], + "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2), + date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 3)], + "x_i_0_value": [2, 4, 6, 8, np.nan, np.nan, np.nan], + "x_i_1_value": [np.nan, np.nan, np.nan, 2, 4, 6, 8], + "geo_type": ["state"]*7}) + + assert wrangle.aggregate_signals([test_input1, test_input1], dt=[0, 1]).equals(expected3) + + # test long output + expected4 = pd.DataFrame( + {"geo_value": ["a", "b", "c", "a"]*2, + "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2), + date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 2), date(2020, 1, 3)], + "value": [2, 4, 6, 8]*2, + "signal": ["i", "i", "i", "i"]*2, + "geo_type": ["state", "state", "state", "state"]*2, + "data_source": ["x", "x", "x", "x"]*2}) + + assert wrangle.aggregate_signals([test_input1, test_input1], + dt=[0, 1], + output_format="long").equals(expected4) + # test long output with different column names + with pytest.raises(ValueError): + wrangle.aggregate_signals([test_input1, test_input2], output_format="long") + + # test invalid lag length + with pytest.raises(ValueError): + wrangle.aggregate_signals([test_input1, test_input1], dt=[0]) + + # test mixed geo_types + test_input4 = pd.DataFrame( + {"geo_value": ["b", "c", "d", "b"], + "time_value": [date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 1), date(2020, 1, 2)], + "value": [0.5, 1.5, 2.5, 3.5], + "signal": ["k", "k", "k", "k"], + "geo_type": ["county", "county", "county", "county"], + "data_source": ["z", "z", "z", "z"]}) + with pytest.raises(ValueError): + wrangle.aggregate_signals([test_input1, test_input4])