@@ -81,32 +81,54 @@ def format_df(df: pd.DataFrame, geo_id: str, se: bool, logger):
8181 return df
8282
8383
84- def write_to_csv (output_df : pd .DataFrame , prefix : str , geo_id : str , weekday : bool , se : bool , logger , output_path = "." ):
84+ def write_to_csv (
85+ output_df : pd .DataFrame , prefix : str , geo_level : str , weekday : bool , se : bool , logger , output_path = "."
86+ ):
8587 """
8688 Write sensor values to csv.
8789
8890 Args:
8991 output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
90- geo_id : geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
92+ geo_level : geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
9193 se: boolean to write out standard errors, if true, use an obfuscated name
9294 out_name: name of the output file
9395 output_path: outfile path to write the csv (default is current directory)
9496 """
95- out_name = format_outname (prefix , se , weekday )
96- filtered_df = format_df (output_df , geo_id , se , logger )
97+ # out_name = format_outname(prefix, se, weekday)
9798
99+ # write out results
100+ out_name = "smoothed_adj_cli" if weekday else "smoothed_cli"
98101 if se :
99- logger .info (f"========= WARNING: WRITING SEs TO { out_name } =========" )
102+ assert prefix is not None , "template has no obfuscated prefix"
103+ out_name = prefix + "_" + out_name
100104
101- dates = set (list (output_df ["date" ]))
102- grouped = filtered_df .groupby ("date" )
103- for d in dates :
104- filename = "%s/%s_%s_%s.csv" % (output_path , (d + Config .DAY_SHIFT ).strftime ("%Y%m%d" ), geo_id , out_name )
105- single_date_df = grouped .get_group (d )
106- single_date_df = single_date_df .drop (columns = ["date" ])
107- single_date_df .to_csv (filename , index = False , na_rep = "NA" )
105+ if se :
106+ logger .info (f"========= WARNING: WRITING SEs TO { out_name } =========" )
108107
109- logger .debug (f"wrote { len (single_date_df )} rows for { geo_id } " )
108+ out_n = 0
109+ for d in set (output_df ["date" ]):
110+ filename = "%s/%s_%s_%s.csv" % (output_path , (d + Config .DAY_SHIFT ).strftime ("%Y%m%d" ), geo_level , out_name )
111+ single_date_df = output_df [output_df ["date" ] == d ]
112+ with open (filename , "w" ) as outfile :
113+ outfile .write ("geo_id,val,se,direction,sample_size\n " )
114+
115+ for line in single_date_df .itertuples ():
116+ geo_id = line .geo_id
117+ sensor = 100 * line .val # report percentages
118+ se_val = 100 * line .se
119+ assert not np .isnan (sensor ), "sensor value is nan, check pipeline"
120+ assert sensor < 90 , f"strangely high percentage { geo_level , sensor } "
121+ if not np .isnan (se_val ):
122+ assert se_val < 5 , f"standard error suspiciously high! investigate { geo_level } "
123+
124+ if se :
125+ assert sensor > 0 and se_val > 0 , "p=0, std_err=0 invalid"
126+ outfile .write ("%s,%f,%s,%s,%s\n " % (geo_id , sensor , se_val , "NA" , "NA" ))
127+ else :
128+ # for privacy reasons we will not report the standard error
129+ outfile .write ("%s,%f,%s,%s,%s\n " % (geo_id , sensor , "NA" , "NA" , "NA" ))
130+ out_n += 1
131+ logger .debug (f"wrote { out_n } rows for { geo_level } " )
110132
111133
112134def csv_to_df (filepath : str , startdate : datetime , enddate : datetime , dropdate : datetime , logger ) -> pd .DataFrame :
@@ -131,29 +153,29 @@ def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: d
131153 dtype = Config .DTYPES ,
132154 blocksize = None ,
133155 )
134-
135- ddata = ddata .dropna ()
136156 # rename inconsistent column names to match config column names
137157 ddata = ddata .rename (columns = Config .DEVIANT_COLS_MAP )
138-
139158 ddata = ddata [Config .FILT_COLS ]
140- ddata [Config .DATE_COL ] = dd .to_datetime (ddata [Config .DATE_COL ])
141-
142- # restrict to training start and end date
143- startdate = startdate - Config .DAY_SHIFT
144159
145- assert startdate > Config .FIRST_DATA_DATE , "Start date <= first day of data"
146- assert startdate < enddate , "Start date >= end date"
147- assert enddate <= dropdate , "End date > drop date"
160+ ddata = ddata .dropna ()
148161
149- date_filter = ( ddata [Config .DATE_COL ] >= Config . FIRST_DATA_DATE ) & (ddata [Config .DATE_COL ] < dropdate )
162+ ddata [Config .DATE_COL ] = dd . to_datetime (ddata [Config .DATE_COL ])
150163
151- df = ddata [ date_filter ] .compute ()
164+ df = ddata .compute ()
152165
153166 # aggregate age groups (so data is unique by service date and FIPS)
154167 df = df .groupby ([Config .DATE_COL , Config .GEO_COL ]).sum (numeric_only = True ).reset_index ()
155168 assert np .sum (df .duplicated ()) == 0 , "Duplicates after age group aggregation"
156169 assert (df [Config .COUNT_COLS ] >= 0 ).all ().all (), "Counts must be nonnegative"
157170
171+ # restrict to training start and end date
172+ startdate = startdate - Config .DAY_SHIFT
173+
174+ assert startdate > Config .FIRST_DATA_DATE , "Start date <= first day of data"
175+ assert startdate < enddate , "Start date >= end date"
176+ assert enddate <= dropdate , "End date > drop date"
177+
178+ date_filter = (df [Config .DATE_COL ] >= Config .FIRST_DATA_DATE ) & (df [Config .DATE_COL ] < dropdate )
179+ df = df [date_filter ]
158180 logger .info (f"Done processing { filepath } " )
159181 return df
0 commit comments