1- import dask .dataframe as dd
1+ """Module providing functions for processing and wrangling data."""
2+
23from datetime import datetime
4+ from pathlib import Path
5+
6+ import dask .dataframe as dd
37import numpy as np
48import pandas as pd
5- from pathlib import Path
69
710from .config import Config
811
9- def format_outname (prefix : str , se : bool , weekday :bool ):
10- '''
12+
13+ def format_outname (prefix : str , se : bool , weekday : bool ):
14+ """
15+ Write out results.
1116
1217 Parameters
1318 ----------
14- prefix
15- se
16- weekday
19+ prefix:
20+ se: boolean to write out standard errors, if true, use an obfuscated name
21+ weekday: boolean for weekday adjustments.
22+ signals will be generated with weekday adjustments (True) or without
23+ adjustments (False)
1724
1825 Returns
1926 -------
20-
21- '''
22- # write out results
27+ outname str
28+ """
2329 out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
2430 if se :
2531 assert prefix is not None , "template has no obfuscated prefix"
2632 out_name = prefix + "_" + out_name
2733 return out_name
2834
35+
2936def format_df (df : pd .DataFrame , geo_id : str , se : bool , logger ):
30- '''
31- format dataframe and checks for anomalies to write results
37+ """
38+ Format dataframe and checks for anomalies to write results.
39+
3240 Parameters
3341 ----------
3442 df: dataframe from output from update_sensor
@@ -39,9 +47,9 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
3947 Returns
4048 -------
4149 filtered and formatted dataframe
42- '''
50+ """
4351 # report in percentage
44- df [' val' ] = df [' val' ] * 100
52+ df [" val" ] = df [" val" ] * 100
4553 df ["se" ] = df ["se" ] * 100
4654
4755 val_isnull = df ["val" ].isnull ()
@@ -50,23 +58,23 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
5058 logger .info ("sensor value is nan, check pipeline" )
5159 df = df [~ val_isnull ]
5260
53- se_too_high = df ['se' ] >= 5
61+ se_too_high = df ["se" ] >= 5
5462 df_se_too_high = df [se_too_high ]
5563 if len (df_se_too_high ) > 0 :
5664 logger .info (f"standard error suspiciously high! investigate { geo_id } " )
5765 df = df [~ se_too_high ]
5866
59- sensor_too_high = df [' val' ] >= 90
67+ sensor_too_high = df [" val" ] >= 90
6068 df_sensor_too_high = df [sensor_too_high ]
6169 if len (df_sensor_too_high ) > 0 :
6270 logger .info (f"standard error suspiciously high! investigate { geo_id } " )
6371 df = df [~ sensor_too_high ]
6472
6573 if se :
66- valid_cond = (df ['se' ] > 0 ) & (df [' val' ] > 0 )
74+ valid_cond = (df ["se" ] > 0 ) & (df [" val" ] > 0 )
6775 invalid_df = df [~ valid_cond ]
6876 if len (invalid_df ) > 0 :
69- logger .info (f "p=0, std_err=0 invalid" )
77+ logger .info ("p=0, std_err=0 invalid" )
7078 df = df [valid_cond ]
7179 else :
7280 df ["se" ] = np .NAN
@@ -75,8 +83,10 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
7583 df ["sample_size" ] = np .NAN
7684 return df
7785
78- def write_to_csv (output_df : pd .DataFrame , prefix : str , geo_id : str , weekday : bool , se :bool , logger , output_path = "." ):
79- """Write sensor values to csv.
86+
87+ def write_to_csv (output_df : pd .DataFrame , prefix : str , geo_id : str , weekday : bool , se : bool , logger , output_path = "." ):
88+ """
89+ Write sensor values to csv.
8090
8191 Args:
8292 output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
@@ -91,24 +101,21 @@ def write_to_csv(output_df: pd.DataFrame, prefix: str, geo_id: str, weekday: boo
91101 if se :
92102 logger .info (f"========= WARNING: WRITING SEs TO { out_name } =========" )
93103
94- dates = set (list (output_df [' date' ]))
95- grouped = filtered_df .groupby (' date' )
104+ dates = set (list (output_df [" date" ]))
105+ grouped = filtered_df .groupby (" date" )
96106 for d in dates :
97- filename = "%s/%s_%s_%s.csv" % (output_path ,
98- (d + Config .DAY_SHIFT ).strftime ("%Y%m%d" ),
99- geo_id ,
100- out_name )
107+ filename = "%s/%s_%s_%s.csv" % (output_path , (d + Config .DAY_SHIFT ).strftime ("%Y%m%d" ), geo_id , out_name )
101108 single_date_df = grouped .get_group (d )
102- single_date_df = single_date_df .drop (columns = [' date' ])
109+ single_date_df = single_date_df .drop (columns = [" date" ])
103110 single_date_df .to_csv (filename , index = False , na_rep = "NA" )
104111
105112 logger .debug (f"wrote { len (single_date_df )} rows for { geo_id } " )
106113
107114
108115def csv_to_df (filepath : str , startdate : datetime , enddate : datetime , dropdate : datetime , logger ) -> pd .DataFrame :
109- '''
110- Reads csv using Dask and filters out based on date range and currently unused column,
111- then converts back into pandas dataframe.
116+ """
117+ Read csv using Dask, filters unneeded data, then converts back into pandas dataframe.
118+
112119 Parameters
113120 ----------
114121 filepath: path to the aggregated doctor-visits data
@@ -117,7 +124,7 @@ def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: d
117124 dropdate: data drop date (YYYY-mm-dd)
118125
119126 -------
120- '''
127+ """
121128 filepath = Path (filepath )
122129 logger .info (f"Processing { filepath } " )
123130
@@ -142,7 +149,7 @@ def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: d
142149 assert startdate < enddate , "Start date >= end date"
143150 assert enddate <= dropdate , "End date > drop date"
144151
145- date_filter = (( ddata [Config .DATE_COL ] >= Config .FIRST_DATA_DATE ) & (ddata [Config .DATE_COL ] < dropdate ) )
152+ date_filter = (ddata [Config .DATE_COL ] >= Config .FIRST_DATA_DATE ) & (ddata [Config .DATE_COL ] < dropdate )
146153
147154 df = ddata [date_filter ].compute ()
148155
0 commit comments