Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
713dde6
reduce memory usage by subsetting columns
nmdefries Mar 3, 2021
5a31249
switch to all_of to trigger error for missing columns
nmdefries Mar 3, 2021
486c7fa
fix hesitant_and_* variables to be defined as among hesitant people
nmdefries Mar 11, 2021
c20a4a5
fix failing tests
nmdefries Mar 12, 2021
65f158f
reduce logic complexity in input file choice
nmdefries Mar 12, 2021
1f10a1b
fix relevant tests. support with new params setting
nmdefries Mar 12, 2021
7328a4f
simplify hesitant_* definitions
nmdefries Mar 12, 2021
c59b3f4
set weekly aggs to same as monthly aggs
nmdefries Mar 12, 2021
eb6874e
deduplicate process of checking column existence and format
nmdefries Mar 12, 2021
0257d95
add missing paren
nmdefries Mar 13, 2021
1aaf826
create local wave variable
nmdefries Mar 16, 2021
972aa89
update tests to match
nmdefries Mar 16, 2021
e5c80f1
Merge pull request #930 from cmu-delphi/fb-package-V4a-wording
krivard Mar 17, 2021
c0a59e2
remove trailing whitespace. add all_of for explicitness
nmdefries Mar 14, 2021
15df269
more column-existence/format deduplication
nmdefries Mar 14, 2021
cdc5c3e
remove unused imports
nmdefries Mar 17, 2021
2ee7cd5
Add hhs and nation
chinandrew Dec 9, 2020
5a0b271
Change ifelse block to a dict of functions
chinandrew Mar 17, 2021
d45dcbe
Merge branch 'main' into fb-package-speed-up-3
nmdefries Mar 18, 2021
65ab02d
unify group_vars definition between funcs
nmdefries Mar 18, 2021
f472223
Merge branch 'fb-package-speed-up-3' of github.com:cmu-delphi/covidca…
nmdefries Mar 18, 2021
572819f
unify group_vars name between funcs
nmdefries Mar 18, 2021
c78b524
Merge pull request #934 from cmu-delphi/fb-package-speed-up-3
krivard Mar 18, 2021
23b8aee
Merge pull request #935 from cmu-delphi/hhs-geo
krivard Mar 19, 2021
64883a1
update n_backfill_days to 70, fix logging format
mariajahja Mar 19, 2021
5304656
Merge pull request #937 from cmu-delphi/hsp-templates
krivard Mar 19, 2021
db86563
validation params for safegraph
sgsmob Mar 19, 2021
797ec0a
add vi/gu to state count
mariajahja Mar 21, 2021
7a4018b
Merge pull request #945 from cmu-delphi/hospclaims-n-geo
krivard Mar 22, 2021
6eabe7f
Drop combo nmf and fb vaccine_likely_local_health from sirCAL
krivard Mar 22, 2021
ddd59bb
Merge pull request #941 from sgsmob/safegraph
krivard Mar 22, 2021
0cca967
Merge pull request #949 from cmu-delphi/sircal-drop-combo-local-health
krivard Mar 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions ansible/templates/safegraph-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,31 @@
"sync": true,
"wip_signal" : []
},
"archive": {
"cache_dir": "./cache"
"validation": {
"common": {
"data_source": "safegraph",
"span_length": 14,
"end_date": "today",
"suppressed_errors": [
{"signal": "bars_visit_num"},
{"signal": "bars_visit_prop"},
{"signal": "restaurants_visit_num"},
{"signal": "restaurants_visit_prop"}
]
},
"static": {
"minimum_sample_size": 100,
"missing_se_allowed": false,
"missing_sample_size_allowed": false
},
"dynamic": {
"ref_window_size": 7,
"smoothed_signals": [
"completely_home_prop_7dav",
"full_time_work_prop_7dav",
"part_time_work_prop_7dav",
"median_home_dwell_time_7dav"
]
}
}
}
4 changes: 2 additions & 2 deletions ansible/templates/sir_complainsalot-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
"fb-survey": {
"max_age": 3,
"maintainers": ["U01069KCRS7"],
"retired-signals": ["smoothed_anxious_5d", "smoothed_wanxious_5d", "smoothed_depressed_5d", "smoothed_wdepressed_5d", "smoothed_felt_isolated_5d", "smoothed_wfelt_isolated_5d", "smoothed_large_event_1d", "smoothed_wlarge_event_1d", "smoothed_restaurant_1d", "smoothed_wrestaurant_1d", "smoothed_shop_1d", "smoothed_wshop_1d", "smoothed_spent_time_1d", "smoothed_wspent_time_1d", "smoothed_travel_outside_state_5d", "smoothed_wtravel_outside_state_5d", "smoothed_work_outside_home_1d", "smoothed_wwork_outside_home_1d", "smoothed_wearing_mask", "smoothed_wwearing_mask"]
"retired-signals": ["smoothed_anxious_5d", "smoothed_wanxious_5d", "smoothed_depressed_5d", "smoothed_wdepressed_5d", "smoothed_felt_isolated_5d", "smoothed_wfelt_isolated_5d", "smoothed_large_event_1d", "smoothed_wlarge_event_1d", "smoothed_restaurant_1d", "smoothed_wrestaurant_1d", "smoothed_shop_1d", "smoothed_wshop_1d", "smoothed_spent_time_1d", "smoothed_wspent_time_1d", "smoothed_travel_outside_state_5d", "smoothed_wtravel_outside_state_5d", "smoothed_work_outside_home_1d", "smoothed_wwork_outside_home_1d", "smoothed_wearing_mask", "smoothed_wwearing_mask", "smoothed_vaccine_likely_local_health", "smoothed_wvaccine_likely_local_health"]
},
"indicator-combination": {
"max_age": 4,
"maintainers": ["U01AP8GSWG3","U01069KCRS7"],
"retired-signals": ["nmf_day_doc_fbs_ght"]
"retired-signals": ["nmf_day_doc_fbs_ght", "nmf_day_doc_fbc_fbs_ght"]
},
"quidel": {
"max_age":6,
Expand Down
2 changes: 1 addition & 1 deletion claims_hosp/delphi_claims_hosp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class GeoConstants:
NUM_COUNTIES = 3141 + 52
NUM_HRRS = 308
NUM_MSAS = 392 + 52 # MSA + States
NUM_STATES = 52 # including DC and PR
NUM_STATES = 54 # including DC, PR, VI, GU
NUM_HHSS = 10
NUM_NATIONS = 1

Expand Down
2 changes: 1 addition & 1 deletion claims_hosp/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"start_date": "2020-02-01",
"end_date": null,
"drop_date": null,
"n_backfill_days": 60,
"n_backfill_days": 70,
"n_waiting_days": 3,
"write_se": false,
"obfuscated_prefix": "foo_obfuscated",
Expand Down
43 changes: 43 additions & 0 deletions doctor_visits/delphi_doctor_visits/geo_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
Created: 2020-04-18
Last modified: 2020-04-30 by Aaron Rumack (add megacounty code)
"""
from functools import partial

import pandas as pd
from delphi_utils.geomap import GeoMapper
Expand All @@ -20,6 +21,14 @@ class GeoMaps:
def __init__(self):
"""Create the underlying GeoMapper."""
self.gmpr = GeoMapper()
self.geo_func = {"county": partial(self.county_to_megacounty,
threshold_visits=Config.MIN_RECENT_VISITS,
threshold_len=Config.RECENT_LENGTH),
"state": self.county_to_state,
"msa": self.county_to_msa,
"hrr": self.county_to_hrr,
"hhs": self.county_to_hhs,
"nation": self.county_to_nation}

@staticmethod
def convert_fips(x):
Expand Down Expand Up @@ -61,6 +70,40 @@ def county_to_state(self, data):

return data.groupby("state_id"), "state_id"

def county_to_hhs(self, data):
"""Aggregate county data to the HHS region resolution.

Args:
data: dataframe aggregated to the daily-county resolution (all 7 cols expected)

Returns: tuple of dataframe at the daily-HHS resolution, and geo_id column name
"""
data = self.gmpr.add_geocode(data,
"fips",
"hhs",
from_col="PatCountyFIPS")
data.drop(columns="PatCountyFIPS", inplace=True)
data = data.groupby(["ServiceDate", "hhs"]).sum().reset_index()

return data.groupby("hhs"), "hhs"

def county_to_nation(self, data):
"""Aggregate county data to the nation resolution.

Args:
data: dataframe aggregated to the daily-county resolution (all 7 cols expected)

Returns: tuple of dataframe at the daily-nation resolution, and geo_id column name
"""
data = self.gmpr.add_geocode(data,
"fips",
"nation",
from_col="PatCountyFIPS")
data.drop(columns="PatCountyFIPS", inplace=True)
data = data.groupby(["ServiceDate", "nation"]).sum().reset_index()

return data.groupby("nation"), "nation"

def county_to_hrr(self, data):
"""Aggregate county data to the HRR resolution.

Expand Down
12 changes: 6 additions & 6 deletions doctor_visits/delphi_doctor_visits/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ def run_module(params):
startdate_dt = enddate_dt - timedelta(days=n_backfill_days)
enddate = str(enddate_dt.date())
startdate = str(startdate_dt.date())
logging.info("drop date:\t\t{dropdate}")
logging.info("first sensor date:\t{startdate}")
logging.info("last sensor date:\t{enddate}")
logging.info("n_backfill_days:\t{n_backfill_days}")
logging.info("n_waiting_days:\t{n_waiting_days}")
logging.info("drop date:\t\t%s", dropdate)
logging.info("first sensor date:\t%s", startdate)
logging.info("last sensor date:\t%s", enddate)
logging.info("n_backfill_days:\t%s", n_backfill_days)
logging.info("n_waiting_days:\t%s", n_waiting_days)

## geographies
geos = ["state", "msa", "hrr", "county"]
geos = ["state", "msa", "hrr", "county", "hhs", "nation"]


## print out other vars
Expand Down
17 changes: 3 additions & 14 deletions doctor_visits/delphi_doctor_visits/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def update_sensor(
startdate: first sensor date (YYYY-mm-dd)
enddate: last sensor date (YYYY-mm-dd)
dropdate: data drop date (YYYY-mm-dd)
geo: geographic resolution, one of ["county", "state", "msa", "hrr"]
geo: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
parallel: boolean to run the sensor update in parallel
weekday: boolean to adjust for weekday effects
se: boolean to write out standard errors, if true, use an obfuscated name
Expand Down Expand Up @@ -132,19 +132,8 @@ def update_sensor(

# get right geography
geo_map = GeoMaps()
if geo.lower() == "county":
data_groups, _ = geo_map.county_to_megacounty(
data, Config.MIN_RECENT_VISITS, Config.RECENT_LENGTH
)
elif geo.lower() == "state":
data_groups, _ = geo_map.county_to_state(data)
elif geo.lower() == "msa":
data_groups, _ = geo_map.county_to_msa(data)
elif geo.lower() == "hrr":
data_groups, _ = geo_map.county_to_hrr(data)
else:
logging.error(f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'")
return {}
mapping_func = geo_map.geo_func[geo.lower()]
data_groups, _ = mapping_func(data)
unique_geo_ids = list(data_groups.groups.keys())

# run sensor fitting code (maybe in parallel)
Expand Down
2 changes: 1 addition & 1 deletion doctor_visits/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"indicator": {
"input_file": "./input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz",
"drop_date": "",
"n_backfill_days": 60,
"n_backfill_days": 70,
"n_waiting_days": 3,
"weekday": [true, false],
"se": false,
Expand Down
1 change: 0 additions & 1 deletion facebook/delphiFacebook/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ export(filter_complete_responses)
export(filter_data_for_aggregation)
export(filter_responses)
export(floor_epiweek)
export(get_date_range_from_filenames)
export(get_filenames_in_range)
export(get_range_prev_full_month)
export(get_range_prev_full_period)
Expand Down
78 changes: 46 additions & 32 deletions facebook/delphiFacebook/R/contingency_aggregate.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#' @return none
#'
#' @import data.table
#' @importFrom dplyr full_join %>%
#' @importFrom dplyr full_join %>% select all_of
#' @importFrom purrr reduce
#'
#' @export
Expand All @@ -44,9 +44,18 @@ produce_aggregates <- function(df, aggregations, cw_list, params) {
df <- output[[1]]
aggregations <- output[[2]]

## Keep only columns used in indicators, plus supporting columns.
group_vars <- unique( unlist(aggregations$group_by) )
df <- select(df,
all_of(unique(aggregations$metric)),
all_of(unique(aggregations$var_weight)),
all_of( group_vars[group_vars != "geo_id"] ),
zip5,
start_dt)

agg_groups <- unique(aggregations[c("group_by", "geo_level")])

# For each unique combination of groupby_vars and geo level, run aggregation process once
# For each unique combination of group_vars and geo level, run aggregation process once
# and calculate all desired aggregations on the grouping. Rename columns. Save
# to individual files
for (group_ind in seq_along(agg_groups$group_by)) {
Expand Down Expand Up @@ -158,37 +167,43 @@ post_process_aggs <- function(df, aggregations, cw_list) {
# - multi-select items are converted to a series of binary columns, one for
# each unique level/response code; multi-select used for grouping are left as-is.
# - multiple choice items are left as-is

#### TODO: How do we want to handle multi-select items when used for grouping?
agg_groups <- unique(aggregations$group_by)
group_cols_to_convert <- unique(do.call(c, agg_groups))
group_cols_to_convert <- group_cols_to_convert[startsWith(group_cols_to_convert, "b_")]

metric_cols_to_convert <- unique(aggregations$metric)

for (col_var in c(group_cols_to_convert, metric_cols_to_convert)) {
if ( is.null(df[[col_var]]) ) {
aggregations <- aggregations[aggregations$metric != col_var &
!mapply(aggregations$group_by,
FUN=function(x) {col_var %in% x}), ]
msg_plain(
paste0(
col_var, " is not defined. Removing all aggregations that use it. ",
nrow(aggregations), " remaining")
)
group_vars <- unique( unlist(aggregations$group_by) )
group_vars <- group_vars[group_vars != "geo_id"]

metric_cols <- unique(aggregations$metric)

cols_check_available <- unique(c(group_vars, metric_cols))
available <- cols_check_available %in% names(df)
cols_not_available <- cols_check_available[ !available ]
for (col_var in cols_not_available) {
# Remove from aggregations
aggregations <- aggregations[aggregations$metric != col_var &
!mapply(aggregations$group_by,
FUN=function(x) {col_var %in% x}), ]
msg_plain(paste0(
col_var, " is not defined. Removing all aggregations that use it. ",
nrow(aggregations), " remaining")
)
}

cols_available <- cols_check_available[ available ]
for (col_var in cols_available) {
if ( col_var %in% group_vars & !(col_var %in% metric_cols) & !startsWith(col_var, "b_") ) {
next
}

if (startsWith(col_var, "b_")) { # Binary
output <- code_binary(df, aggregations, col_var)
} else if (startsWith(col_var, "ms_")) { # Multiselect
output <- code_multiselect(df, aggregations, col_var)
} else if (startsWith(col_var, "n_")) { # Numeric free response
output <- code_numeric_freeresponse(df, aggregations, col_var)
} else if (startsWith(col_var, "mc_")) { # Multiple choice
} else if (startsWith(col_var, "ms_")) { # Multi-select
output <- code_multiselect(df, aggregations, col_var)
} else {
# Multiple choice and variables that are formatted differently
output <- list(df, aggregations)
}

df <- output[[1]]
aggregations <- output[[2]]
}
Expand Down Expand Up @@ -233,28 +248,27 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params)
## inefficient; profiling shows the cost to be negligible, so shut it up
df <- suppressWarnings(inner_join(df, crosswalk_data, by = "zip5"))

groupby_vars <- aggregations$group_by[[1]]
group_vars <- aggregations$group_by[[1]]

if (all(groupby_vars %in% names(df))) {
unique_group_combos <- unique(df[, groupby_vars, with=FALSE])
if (all(group_vars %in% names(df))) {
unique_group_combos <- unique(df[, group_vars, with=FALSE])
unique_group_combos <- unique_group_combos[complete.cases(unique_group_combos)]
} else {
msg_plain(
sprintf(
"not all of groupby columns %s available in data; skipping aggregation",
paste(groupby_vars, collapse=", ")
paste(group_vars, collapse=", ")
))
}

if ( !exists("unique_group_combos") || nrow(unique_group_combos) == 0 ) {
return(list())
}


## Set an index on the groupby var columns so that the groupby step can be
## faster; data.table stores the sort order of the column and
## uses a binary search to find matching values, rather than a linear scan.
setindexv(df, groupby_vars)
setindexv(df, group_vars)

calculate_group <- function(ii) {
target_group <- unique_group_combos[ii]
Expand Down Expand Up @@ -287,15 +301,15 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params)
## Do post-processing.
for (row in seq_len(nrow(aggregations))) {
aggregation <- aggregations$id[row]
groupby_vars <- aggregations$group_by[[row]]
group_vars <- aggregations$group_by[[row]]
post_fn <- aggregations$post_fn[[row]]

dfs_out[[aggregation]] <- dfs_out[[aggregation]][
rowSums(is.na(dfs_out[[aggregation]][, c("val", "sample_size", groupby_vars)])) == 0,
rowSums(is.na(dfs_out[[aggregation]][, c("val", "sample_size", group_vars)])) == 0,
]

if (geo_level == "county") {
df_megacounties <- megacounty(dfs_out[[aggregation]], params$num_filter, groupby_vars)
df_megacounties <- megacounty(dfs_out[[aggregation]], params$num_filter, group_vars)
dfs_out[[aggregation]] <- bind_rows(dfs_out[[aggregation]], df_megacounties)
}

Expand Down
Loading