diff --git a/.gitignore b/.gitignore index 110a564..15eb7d1 100644 --- a/.gitignore +++ b/.gitignore @@ -170,3 +170,5 @@ results/ # benchmark artificats profiles/ +docs/*.html +docs/index_files/ diff --git a/analysis/clustering/euclidean_clustering.py b/analysis/clustering/euclidean_clustering.py index 983c14e..c09f2ef 100644 --- a/analysis/clustering/euclidean_clustering.py +++ b/analysis/clustering/euclidean_clustering.py @@ -1,13 +1,10 @@ +#!/usr/bin/env python3 """ Phase 2: K-Means Clustering for Load Profile Analysis. Clusters daily electricity usage profiles using standard Euclidean distance to identify distinct consumption patterns. -Note: DTW (Dynamic Time Warping) is unnecessary here because all profiles -are aligned to the same 48 half-hourly time grid. Euclidean distance is -appropriate and much faster. - Pipeline: 1. Load daily profiles from Phase 1 2. Normalize profiles (optional but recommended) @@ -17,7 +14,7 @@ Usage: # Standard run (evaluates k=3-6 using silhouette on up to 10k samples) - python euclidean_clustering.py \\ + python euclidean_clustering_fixed.py \\ --input data/clustering/sampled_profiles.parquet \\ --output-dir data/clustering/results \\ --k-range 3 6 \\ @@ -26,7 +23,7 @@ --silhouette-sample-size 10000 # Fixed k (no evaluation) - python euclidean_clustering.py \\ + python euclidean_clustering_fixed.py \\ --input data/clustering/sampled_profiles.parquet \\ --output-dir data/clustering/results \\ --k 4 --normalize @@ -51,6 +48,9 @@ ) logger = logging.getLogger(__name__) +DEFAULT_NORMALIZATION: str = "minmax" +DEFAULT_NORMALIZE: bool = True + def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]: """ @@ -81,47 +81,67 @@ def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]: def normalize_profiles( - X: np.ndarray, - method: str = "zscore", -) -> np.ndarray: + df: pl.DataFrame, + method: str = "minmax", + profile_col: str = "profile", + out_col: str | None = None, +) -> pl.DataFrame: """ - Normalize profiles for clustering. - - Args: - X: Profile array of shape (n_samples, n_timepoints) - method: Normalization method ('zscore', 'minmax', 'none') - - Returns: - Normalized array + Normalize per-household-day profiles for clustering. + + Parameters + ---------- + df : pl.DataFrame + Must contain a list column with the profile values (e.g. 48-dim vector). + method : {"none", "zscore", "minmax"} + - "none": return df unchanged + - "zscore": per-profile z-score: (x - mean) / std + - "minmax": per-profile min-max: (x - min) / (max - min) + profile_col : str + Name of the list column holding the raw profile. + out_col : str | None + If provided, write normalized profile to this column; otherwise overwrite + `profile_col` in-place. + + Notes + ----- + - Normalization is done per profile (per row), not globally. + - For degenerate profiles where max == min, we fall back to all zeros. """ + if method == "none": - return X + return df + + if profile_col not in df.columns: + raise ValueError(f"normalize_profiles: column '{profile_col}' not found in DataFrame") - logger.info("Normalizing profiles using %s method...", method) + target_col = out_col or profile_col + + expr = pl.col(profile_col) if method == "zscore": - # Per-profile z-score normalization - means = X.mean(axis=1, keepdims=True) - stds = X.std(axis=1, keepdims=True) - stds[stds == 0] = 1 # Avoid division by zero - X_norm = (X - means) / stds + mean_expr = expr.list.mean() + std_expr = expr.list.std(ddof=0) + + normalized = (expr - mean_expr) / std_expr + + # If std == 0 (flat profile), fall back to zeros + normalized = pl.when(std_expr != 0).then(normalized).otherwise(expr * 0.0) + elif method == "minmax": - # Per-profile min-max normalization - mins = X.min(axis=1, keepdims=True) - maxs = X.max(axis=1, keepdims=True) - ranges = maxs - mins - ranges[ranges == 0] = 1 - X_norm = (X - mins) / ranges - else: - raise ValueError(f"Unknown normalization method: {method}") + min_expr = expr.list.min() + max_expr = expr.list.max() + range_expr = max_expr - min_expr - logger.info( - " Normalized data range: [%.2f, %.2f]", - X_norm.min(), - X_norm.max(), - ) + normalized = (expr - min_expr) / range_expr + + # If range == 0 (flat profile), fall back to zeros + normalized = pl.when(range_expr != 0).then(normalized).otherwise(expr * 0.0) + + else: + raise ValueError(f"Unknown normalization method: {method!r}") - return X_norm + return df.with_columns(normalized.alias(target_col)) def evaluate_clustering( @@ -457,6 +477,83 @@ def plot_elbow_curve( logger.info(" Saved elbow curve: %s", output_path) +def analyze_weekday_weekend_distribution( + df: pl.DataFrame, + labels: np.ndarray, +) -> dict: + """ + Analyze weekday vs weekend distribution across clusters. + + This diagnostic checks if certain clusters are dominated by weekdays + or weekends, which would suggest usage patterns are day-type dependent. + + Args: + df: Original profile DataFrame with 'is_weekend' column + labels: Cluster assignments + + Returns: + Dictionary with distribution statistics + """ + if "is_weekend" not in df.columns: + logger.warning(" No 'is_weekend' column found - skipping weekday/weekend analysis") + return {} + + # Add cluster labels to dataframe + df_with_clusters = df.with_columns(pl.Series("cluster", labels)) + + # Calculate distribution + dist = ( + df_with_clusters.group_by(["cluster", "is_weekend"]) + .agg(pl.len().alias("count")) + .sort(["cluster", "is_weekend"]) + ) + + # Calculate percentages per cluster + dist = dist.with_columns([(pl.col("count") / pl.col("count").sum().over("cluster") * 100).alias("pct")]) + + # Summary: % weekend by cluster + summary = ( + df_with_clusters.group_by("cluster") + .agg([pl.len().alias("total"), (pl.col("is_weekend").sum() / pl.len() * 100).alias("pct_weekend")]) + .sort("cluster") + ) + + logger.info("") + logger.info("=" * 70) + logger.info("WEEKDAY/WEEKEND DISTRIBUTION BY CLUSTER") + logger.info("=" * 70) + + for row in summary.iter_rows(named=True): + cluster = row["cluster"] + total = row["total"] + pct_weekend = row["pct_weekend"] + pct_weekday = 100 - pct_weekend + + logger.info( + " Cluster %d: %.1f%% weekday, %.1f%% weekend (n=%s)", cluster, pct_weekday, pct_weekend, f"{total:,}" + ) + + # Flag significant imbalances (>70% one type) + if pct_weekend > 70: + logger.warning(" ⚠️ Weekend-dominated cluster") + elif pct_weekday > 70: + logger.warning(" ⚠️ Weekday-dominated cluster") + + # Overall distribution + overall_weekend_pct = float(df_with_clusters["is_weekend"].mean() * 100) + logger.info("") + logger.info(" Overall dataset: %.1f%% weekend, %.1f%% weekday", overall_weekend_pct, 100 - overall_weekend_pct) + + # Chi-square test would go here if needed for formal significance testing + logger.info("=" * 70) + + return { + "cluster_distribution": summary.to_dicts(), + "detailed_distribution": dist.to_dicts(), + "overall_weekend_pct": overall_weekend_pct, + } + + def save_results( df: pl.DataFrame, labels: np.ndarray, @@ -527,14 +624,14 @@ def main() -> None: epilog=""" Examples: # Standard run with k evaluation (silhouette on sample) - python euclidean_clustering.py \\ + python euclidean_clustering_fixed.py \\ --input data/clustering/sampled_profiles.parquet \\ --output-dir data/clustering/results \\ --k-range 3 6 --find-optimal-k --normalize \\ --silhouette-sample-size 10000 # Fixed k (no evaluation) - python euclidean_clustering.py \\ + python euclidean_clustering_fixed.py \\ --input data/clustering/sampled_profiles.parquet \\ --output-dir data/clustering/results \\ --k 4 --normalize @@ -597,13 +694,14 @@ def main() -> None: parser.add_argument( "--normalize", action="store_true", - help="Apply normalization to profiles", + default=DEFAULT_NORMALIZE, + help="Apply normalization to profiles (default: True)", ) parser.add_argument( "--normalize-method", choices=["zscore", "minmax", "none"], - default="zscore", - help="Normalization method (default: zscore)", + default=DEFAULT_NORMALIZATION, + help="Normalization method (default: minmax)", ) parser.add_argument( @@ -622,9 +720,22 @@ def main() -> None: # Load profiles X, df = load_profiles(args.input) + logger.info("Loaded %s sampled profiles", f"{len(df):,}") + # Normalize if requested if args.normalize: - X = normalize_profiles(X, method=args.normalize_method) + logger.info("Normalizing profiles per household-day (method=%s)", args.normalize_method) + df = normalize_profiles( + df, + method=args.normalize_method, # ✅ FIXED: was args.normalization_method + profile_col="profile", + out_col=None, + ) + # ✅ CRITICAL FIX: Re-extract normalized profiles as numpy array + X = np.array(df["profile"].to_list(), dtype=np.float64) + logger.info(" Normalized data range: [%.2f, %.2f]", X.min(), X.max()) + else: + logger.info("Profile normalization disabled (using raw kWh values).") # Determine k eval_results = None diff --git a/analysis/clustering/prepare_clustering_data_households.py b/analysis/clustering/prepare_clustering_data_households.py index 660ff31..372590d 100644 --- a/analysis/clustering/prepare_clustering_data_households.py +++ b/analysis/clustering/prepare_clustering_data_households.py @@ -6,6 +6,8 @@ Transforms interval-level energy data into daily load profiles at the HOUSEHOLD (account) level for k-means clustering. +MODIFIED: Now accepts multiple input parquet files (e.g., split by time period). + What this does: 1. Validate schema (no full data scan) 2. Build/load manifests for accounts and dates (streaming, memory-safe) @@ -21,7 +23,7 @@ - Chunked streaming mode: fully streaming pipeline with per-chunk sinks * required for 10k+ households on constrained hardware - Profiles are 48 half-hourly kWh values in chronological order (00:30-24:00) - - Incomplete days (not 48 intervals) are dropped as “missing/irregular data” + - Incomplete days (not 48 intervals) are dropped as "missing/irregular data" Output files: - sampled_profiles.parquet: @@ -41,6 +43,14 @@ --sample-households 5000 \ --sample-days 20 + # Multiple input files + python prepare_clustering_data_households.py \ + --input file1.parquet file2.parquet \ + --output-dir data/clustering \ + --sample-households 100000 \ + --sample-days 365 \ + --streaming + # Large dataset with chunked streaming (20,000 households) python prepare_clustering_data_households.py \ --input data/processed/comed_202308.parquet \ @@ -96,17 +106,39 @@ def log_memory(label: str) -> None: logger.debug("Skipping memory log for %s: %s", label, exc) +# ============================================================================= +# MULTI-FILE SUPPORT +# ============================================================================= + + +def scan_multiple_parquet(paths: list[Path]) -> pl.LazyFrame: + """ + Create a unified LazyFrame from multiple parquet files. + + Args: + paths: List of parquet file paths + + Returns: + Combined LazyFrame + """ + if len(paths) == 1: + return pl.scan_parquet(paths[0]) + else: + logger.info("Combining %d input files...", len(paths)) + return pl.concat([pl.scan_parquet(p) for p in paths]) + + # ============================================================================= # DATA INSPECTION & SAMPLING # ============================================================================= -def validate_schema(path: Path) -> dict[str, Any]: +def validate_schema(paths: list[Path]) -> dict[str, Any]: """ Validate input data has required columns (schema-only check, no full data scan). Args: - path: Path to input parquet file. + paths: List of paths to input parquet files. Returns: Dictionary with: @@ -114,7 +146,7 @@ def validate_schema(path: Path) -> dict[str, Any]: - errors: list[str] - columns: list[str] """ - lf = pl.scan_parquet(path) + lf = scan_multiple_parquet(paths) schema = lf.collect_schema() errors: list[str] = [] @@ -134,8 +166,8 @@ def validate_schema(path: Path) -> dict[str, Any]: } -def get_metadata_and_samples( - input_path: Path, +def get_metadata_and_samples( # noqa: C901 + input_paths: list[Path], sample_households: int | None, sample_days: int, day_strategy: Literal["stratified", "random"], @@ -150,7 +182,7 @@ def get_metadata_and_samples( - Samples households and dates from the small manifest files Args: - input_path: Path to input parquet file. + input_paths: List of paths to input parquet files. sample_households: Number of households to sample (None = all). sample_days: Number of days to sample. day_strategy: 'stratified' (70/30 weekday/weekend) or 'random'. @@ -165,7 +197,7 @@ def get_metadata_and_samples( logger.info("Scanning input data for metadata and sampling...") log_memory("start of get_metadata_and_samples") - lf = pl.scan_parquet(input_path) + lf = scan_multiple_parquet(input_paths) # Summary stats (streaming-safe: single row output) logger.info(" Computing summary statistics...") @@ -180,7 +212,7 @@ def get_metadata_and_samples( # Early checks if summary["n_rows"] == 0: - raise ValueError(f"Input file {input_path} contains no rows.") + raise ValueError("Input files contain no rows.") logger.info(" %s rows", f"{summary['n_rows']:,}") logger.info(" %s households", f"{summary['n_accounts']:,}") @@ -188,17 +220,28 @@ def get_metadata_and_samples( logger.info(" Date range: %s to %s", summary["min_date"], summary["max_date"]) # ========================================================================== - # KEY CHANGE: Use manifests instead of unique().collect() + # KEY CHANGE: Use manifests from first file (assumes all files have same structure) # ========================================================================== - logger.info(" Loading manifests...") - account_manifest = ensure_account_manifest(input_path) - date_manifest = ensure_date_manifest(input_path) + logger.info(" Loading manifests from first input file...") + account_manifest = ensure_account_manifest(input_paths[0]) + date_manifest = ensure_date_manifest(input_paths[0]) # Read from small manifest files (fits easily in memory) accounts_df = pl.read_parquet(account_manifest) dates_df = pl.read_parquet(date_manifest) + # If multiple files, also load manifests from other files and combine + if len(input_paths) > 1: + logger.info(" Loading manifests from additional input files...") + for path in input_paths[1:]: + acc_manifest = ensure_account_manifest(path) + date_manifest_extra = ensure_date_manifest(path) + + accounts_df = pl.concat([accounts_df, pl.read_parquet(acc_manifest)]).unique() + + dates_df = pl.concat([dates_df, pl.read_parquet(date_manifest_extra)]).unique() + log_memory("after loading manifests") if accounts_df.height == 0: @@ -267,7 +310,7 @@ def get_metadata_and_samples( def create_household_profiles( - input_path: Path, + input_paths: list[Path], accounts: list[str], dates: list[Any], ) -> pl.DataFrame: @@ -282,7 +325,7 @@ def create_household_profiles( when sampling a moderate subset of households and dates. Args: - input_path: Path to input parquet file. + input_paths: List of paths to input parquet files. accounts: List of account_identifier values to include. dates: List of dates to include. @@ -308,7 +351,7 @@ def create_household_profiles( raise ValueError("No dates provided for profile creation.") df = ( - pl.scan_parquet(input_path) + scan_multiple_parquet(input_paths) .filter(pl.col("account_identifier").is_in(accounts) & pl.col("date").is_in(dates)) # Ensures profile list is chronological within each (account, date) .sort(["account_identifier", "datetime"]) @@ -346,7 +389,7 @@ def create_household_profiles( def _create_profiles_for_chunk_streaming( - input_path: Path, + input_paths: list[Path], accounts_chunk: list[str], dates: list[Any], chunk_idx: int, @@ -373,7 +416,7 @@ def _create_profiles_for_chunk_streaming( chunk_path = tmp_dir / f"sampled_profiles_chunk_{chunk_idx:03d}.parquet" ( - pl.scan_parquet(input_path) + scan_multiple_parquet(input_paths) .filter(pl.col("account_identifier").is_in(accounts_chunk) & pl.col("date").is_in(dates)) .group_by(["account_identifier", "zip_code", "date"]) .agg([ @@ -395,7 +438,7 @@ def _create_profiles_for_chunk_streaming( def create_household_profiles_chunked_streaming( - input_path: Path, + input_paths: list[Path], accounts: list[str], dates: list[Any], output_path: Path, @@ -412,7 +455,7 @@ def create_household_profiles_chunked_streaming( This avoids ever materializing profiles for all households in memory. Args: - input_path: Path to interval-level parquet file. + input_paths: List of paths to interval-level parquet files. accounts: List of account_identifier values to include. dates: List of dates to include. output_path: Final output parquet path for all profiles. @@ -451,7 +494,7 @@ def create_household_profiles_chunked_streaming( accounts_chunk = accounts[start_idx:end_idx] chunk_path = _create_profiles_for_chunk_streaming( - input_path=input_path, + input_paths=input_paths, accounts_chunk=accounts_chunk, dates=dates, chunk_idx=i, @@ -499,7 +542,7 @@ def create_household_profiles_chunked_streaming( def prepare_clustering_data( - input_path: Path, + input_paths: list[Path], output_dir: Path, sample_households: int | None = None, sample_days: int = 20, @@ -512,7 +555,7 @@ def prepare_clustering_data( Prepare household-level clustering data from interval parquet. Args: - input_path: Path to processed interval parquet file. + input_paths: List of paths to processed interval parquet files. output_dir: Output directory for clustering files. sample_households: Number of households to sample (None = all). sample_days: Number of days to sample. @@ -530,19 +573,21 @@ def prepare_clustering_data( """ logger.info("=" * 70) logger.info("PREPARING HOUSEHOLD-LEVEL CLUSTERING DATA") + if len(input_paths) > 1: + logger.info("(MULTI-FILE MODE: %d input files)", len(input_paths)) if streaming: logger.info("(STREAMING MODE ENABLED, chunk_size=%d)", chunk_size) logger.info("=" * 70) log_memory("start of prepare_clustering_data") # 1. Schema validation (cheap, no data load) - validation = validate_schema(input_path) + validation = validate_schema(input_paths) if not validation["valid"]: raise ValueError(f"Input validation failed: {validation['errors']}") # 2. Metadata + sampling (uses manifests for memory safety) metadata = get_metadata_and_samples( - input_path=input_path, + input_paths=input_paths, sample_households=sample_households, sample_days=sample_days, day_strategy=day_strategy, @@ -559,7 +604,7 @@ def prepare_clustering_data( # 4. Create profiles (in-memory or chunked streaming) if streaming: n_profiles = create_household_profiles_chunked_streaming( - input_path=input_path, + input_paths=input_paths, accounts=accounts, dates=dates, output_path=profiles_path, @@ -574,7 +619,7 @@ def prepare_clustering_data( profiles_df = pl.read_parquet(profiles_path) else: profiles_df = create_household_profiles( - input_path=input_path, + input_paths=input_paths, accounts=accounts, dates=dates, ) @@ -639,6 +684,15 @@ def main() -> int: --sample-households 5000 \\ --sample-days 20 + # Multiple input files + python prepare_clustering_data_households.py \\ + --input file1.parquet file2.parquet file3.parquet \\ + --output-dir data/clustering \\ + --sample-households 100000 \\ + --sample-days 365 \\ + --streaming \\ + --chunk-size 5000 + # Large dataset with chunked streaming python prepare_clustering_data_households.py \\ --input data/processed/comed_202308.parquet \\ @@ -667,7 +721,8 @@ def main() -> int: "--input", type=Path, required=True, - help="Path to processed interval parquet file.", + nargs="+", + help="Path(s) to processed interval parquet file(s).", ) parser.add_argument( "--output-dir", @@ -691,7 +746,7 @@ def main() -> int: "--day-strategy", choices=["stratified", "random"], default="stratified", - help="Day sampling: stratified (70% weekday) or random.", + help="Day sampling: stratified (70%% weekday) or random.", ) parser.add_argument( "--seed", @@ -713,13 +768,17 @@ def main() -> int: args = parser.parse_args() - if not args.input.exists(): - logger.error("Input file not found: %s", args.input) - return 1 + # Convert input to list if single path provided + input_paths = args.input if isinstance(args.input, list) else [args.input] + + for path in input_paths: + if not path.exists(): + logger.error("Input file not found: %s", path) + return 1 try: prepare_clustering_data( - input_path=args.input, + input_paths=input_paths, output_dir=args.output_dir, sample_households=args.sample_households, sample_days=args.sample_days, diff --git a/analysis/clustering/stage2_blockgroup_regression.py b/analysis/clustering/stage2_blockgroup_regression.py index 5a4df60..798ce5c 100644 --- a/analysis/clustering/stage2_blockgroup_regression.py +++ b/analysis/clustering/stage2_blockgroup_regression.py @@ -58,20 +58,16 @@ format="%(asctime)s - %(levelname)s - %(message)s", ) -# Default predictors (for convergence and interpretability) DEFAULT_PREDICTORS = [ + "Owner_Occupied_Pct", + "Average_Household_Size", + "Old_Building_Pct", + "Heat_Electric_Pct", "Median_Household_Income", - "Median_Age", "Urban_Percent", - "Total_Households", ] -# ============================================================================= -# 1. LOAD CLUSTER ASSIGNMENTS (HOUSEHOLD-DAY LEVEL) -# ============================================================================= - - def load_cluster_assignments_household_day(path: Path) -> tuple[pl.DataFrame, dict]: """ Load household-day cluster assignments. @@ -113,7 +109,6 @@ def load_cluster_assignments_household_day(path: Path) -> tuple[pl.DataFrame, di n_clusters, ) - # This doesn't affect the regression - just useful context dominance_stats = _compute_dominance_stats(raw) logger.info( @@ -135,16 +130,12 @@ def _compute_dominance_stats(df: pl.DataFrame) -> dict: Returns summary statistics across all households. """ - # Count days per (household, cluster) counts = df.group_by(["account_identifier", "cluster"]).agg(pl.len().alias("days_in_cluster")) - # Total days per household totals = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").sum().alias("n_days")) - # Max days in any single cluster per household max_days = counts.group_by("account_identifier").agg(pl.col("days_in_cluster").max().alias("max_days_in_cluster")) - # Compute dominance dominance_df = max_days.join(totals, on="account_identifier").with_columns( (pl.col("max_days_in_cluster") / pl.col("n_days")).alias("dominance") ) @@ -164,11 +155,6 @@ def _compute_dominance_stats(df: pl.DataFrame) -> dict: } -# ============================================================================= -# 2. CROSSWALK AND BLOCK GROUP MAPPING -# ============================================================================= - - def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: """ Load ZIP+4 → Census block-group crosswalk for the ZIP+4s in our data. @@ -200,7 +186,6 @@ def load_crosswalk(crosswalk_path: Path, zip_codes: list[str]) -> pl.DataFrame: logger.warning(" Crosswalk is empty after filtering for sample ZIP+4s.") return crosswalk - # Fan-out diagnostic fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_block_groups")) max_fanout = int(fanout["n_block_groups"].max()) @@ -247,11 +232,6 @@ def attach_block_groups_to_household_days( return df -# ============================================================================= -# 3. AGGREGATE TO BLOCK-GROUP x CLUSTER COUNTS -# ============================================================================= - - def aggregate_blockgroup_cluster_counts(df: pl.DataFrame) -> pl.DataFrame: """ Aggregate household-day observations to block-group x cluster counts. @@ -270,19 +250,16 @@ def aggregate_blockgroup_cluster_counts(df: pl.DataFrame) -> pl.DataFrame: """ logger.info("Aggregating to block-group x cluster counts (household-day units)...") - # Counts per (block_group, cluster) counts = df.group_by(["block_group_geoid", "cluster"]).agg([ pl.len().alias("n_obs"), pl.col("account_identifier").n_unique().alias("n_households"), ]) - # Totals per block_group totals = df.group_by("block_group_geoid").agg([ pl.len().alias("total_obs"), pl.col("account_identifier").n_unique().alias("total_households"), ]) - # Merge and compute cluster shares bg_counts = counts.join(totals, on="block_group_geoid", how="left").with_columns( (pl.col("n_obs") / pl.col("total_obs")).alias("cluster_share") ) @@ -301,11 +278,6 @@ def aggregate_blockgroup_cluster_counts(df: pl.DataFrame) -> pl.DataFrame: return bg_counts -# ============================================================================= -# 4. CENSUS DATA -# ============================================================================= - - def fetch_or_load_census( cache_path: Path, state_fips: str = "17", @@ -328,6 +300,43 @@ def fetch_or_load_census( return census_df +def create_derived_variables(census_df: pl.DataFrame) -> pl.DataFrame: + """Create derived percentage variables from raw Census counts.""" + logger.info("Creating derived variables...") + + df = census_df.with_columns([ + (pl.col("Owner_Occupied") / pl.col("Occupied_Housing_Units") * 100).alias("Owner_Occupied_Pct"), + (pl.col("Heat_Electric") / pl.col("Total_Households") * 100).alias("Heat_Electric_Pct"), + ( + ( + pl.col("Built_1960_1969") + + pl.col("Built_1950_1959") + + pl.col("Built_1940_1949") + + pl.col("Built_1939_Earlier") + ) + / pl.col("Total_Housing_Units") + * 100 + ).alias("Old_Building_Pct"), + ]) + + df = df.with_columns([ + pl.when(pl.col("Owner_Occupied_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Owner_Occupied_Pct")) + .alias("Owner_Occupied_Pct"), + pl.when(pl.col("Heat_Electric_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Heat_Electric_Pct")) + .alias("Heat_Electric_Pct"), + pl.when(pl.col("Old_Building_Pct").is_nan()) + .then(None) + .otherwise(pl.col("Old_Building_Pct")) + .alias("Old_Building_Pct"), + ]) + + return df + + def attach_census_to_blockgroups(bg_counts: pl.DataFrame, census_df: pl.DataFrame) -> pl.DataFrame: """Attach Census demographics to block-group cluster counts.""" logger.info("Joining Census data to block-group counts...") @@ -349,11 +358,6 @@ def attach_census_to_blockgroups(bg_counts: pl.DataFrame, census_df: pl.DataFram return demo -# ============================================================================= -# 5. PREPARE REGRESSION DATA -# ============================================================================= - - def prepare_regression_dataset( demo_df: pl.DataFrame, predictors: list[str], @@ -369,7 +373,6 @@ def prepare_regression_dataset( """ logger.info("Preparing regression dataset...") - # Filter by minimum observations (household-days) df = demo_df.filter(pl.col("total_obs") >= min_obs_per_bg) logger.info( " After min_obs filter (>=%d): %s block groups", @@ -377,7 +380,6 @@ def prepare_regression_dataset( f"{df['block_group_geoid'].n_unique():,}", ) - # Require block groups to have multiple clusters represented nonzero_counts = ( df.filter(pl.col("n_obs") > 0).group_by("block_group_geoid").agg(pl.len().alias("n_nonzero_clusters")) ) @@ -394,7 +396,6 @@ def prepare_regression_dataset( f"{df['block_group_geoid'].n_unique():,}", ) - # Filter predictors available_predictors: list[str] = [] for p in predictors: if p not in df.columns: @@ -421,11 +422,6 @@ def prepare_regression_dataset( return df, available_predictors -# ============================================================================= -# 6. MULTINOMIAL REGRESSION -# ============================================================================= - - def run_multinomial_regression( reg_df: pl.DataFrame, predictors: list[str], @@ -454,12 +450,10 @@ def run_multinomial_regression( logger.info("Running multinomial logistic regression...") logger.info(" Weighting by: %s (household-day observations)", weight_col) - # Extract arrays X = reg_df.select(predictors).to_numpy().astype(np.float64) y = reg_df.get_column(outcome).to_numpy() weights = reg_df.get_column(weight_col).to_numpy().astype(np.float64) - # Drop rows with NaN in predictors nan_mask = np.isnan(X).any(axis=1) if nan_mask.sum() > 0: logger.warning(" Dropping %s rows with NaN predictors", f"{nan_mask.sum():,}") @@ -470,7 +464,6 @@ def run_multinomial_regression( n_block_groups = reg_df.filter(~pl.any_horizontal(pl.col(predictors).is_null()))["block_group_geoid"].n_unique() - # Standardize or use raw units if standardize: logger.info(" Standardizing predictors...") scaler = StandardScaler() @@ -479,10 +472,8 @@ def run_multinomial_regression( logger.info(" Using raw predictor units (no standardization).") X_transformed = X - # Add intercept X_with_const = sm.add_constant(X_transformed) - # Expand rows by integer weights weight_ints = np.maximum(np.round(weights).astype(int), 1) X_expanded = np.repeat(X_with_const, weight_ints, axis=0) y_expanded = np.repeat(y, weight_ints) @@ -497,7 +488,6 @@ def run_multinomial_regression( model = sm.MNLogit(y_expanded, X_expanded) result = model.fit(method="newton", maxiter=100, disp=False) - # Extract results classes = sorted(np.unique(y).tolist()) baseline = classes[0] param_names = ["const", *predictors] @@ -514,7 +504,6 @@ def run_multinomial_regression( p_values[key] = {name: float(result.pvalues[j, i]) for j, name in enumerate(param_names)} odds_ratios[key] = {name: float(np.exp(result.params[j, i])) for j, name in enumerate(param_names)} - # Baseline cluster baseline_key = f"cluster_{baseline}" coefficients[baseline_key] = dict.fromkeys(param_names, 0.0) std_errors[baseline_key] = dict.fromkeys(param_names, 0.0) @@ -547,11 +536,6 @@ def run_multinomial_regression( } -# ============================================================================= -# 7. REPORT GENERATION -# ============================================================================= - - def generate_report( results: dict[str, object], cluster_distribution: pl.DataFrame, @@ -636,11 +620,6 @@ def generate_report( print("\n" + text) -# ============================================================================= -# 8. CLI -# ============================================================================= - - def main() -> int: parser = argparse.ArgumentParser( description="Stage 2: Block-group-level regression using household-day units.", @@ -696,18 +675,14 @@ def main() -> int: print("STAGE 2: BLOCK-GROUP REGRESSION (HOUSEHOLD-DAY UNITS)") print("=" * 70) - # 1. Load household-day cluster assignments (NO dominant cluster reduction) household_days, dominance_stats = load_cluster_assignments_household_day(args.clusters) - # 2. Load crosswalk and attach block groups zip_codes = household_days["zip_code"].unique().to_list() crosswalk = load_crosswalk(args.crosswalk, zip_codes) household_days_bg = attach_block_groups_to_household_days(household_days, crosswalk) - # 3. Aggregate to block-group x cluster counts bg_counts = aggregate_blockgroup_cluster_counts(household_days_bg) - # 4. Load Census and attach demographics census_df = fetch_or_load_census( cache_path=args.census_cache, state_fips=args.state_fips, @@ -716,9 +691,10 @@ def main() -> int: ) logger.info(" Census: %s block groups, %s columns", f"{len(census_df):,}", len(census_df.columns)) + census_df = create_derived_variables(census_df) + demo_df = attach_census_to_blockgroups(bg_counts, census_df) - # 5. Prepare regression dataset reg_df, predictors = prepare_regression_dataset( demo_df=demo_df, predictors=args.predictors, @@ -730,29 +706,24 @@ def main() -> int: logger.error("No data after filtering") return 1 - # Save regression dataset reg_df.write_parquet(args.output_dir / "regression_data_blockgroups.parquet") logger.info("Saved regression data to %s", args.output_dir / "regression_data_blockgroups.parquet") - # 6. Run regression (weighted by n_obs = household-day counts) results = run_multinomial_regression( reg_df=reg_df, predictors=predictors, outcome="cluster", - weight_col="n_obs", # household-day observations + weight_col="n_obs", standardize=args.standardize, ) - # Add dominance stats to results for reference results["dominance_stats"] = dominance_stats - # Save results model_summary = results.pop("model_summary") with open(args.output_dir / "regression_results_blockgroups.json", "w") as f: json.dump(results, f, indent=2) (args.output_dir / "statsmodels_summary.txt").write_text(model_summary) - # Generate report cluster_dist = ( reg_df.group_by("cluster") .agg(pl.col("n_obs").sum()) diff --git a/scripts/process_csvs_batched_optimized.py b/scripts/process_csvs_batched_optimized.py new file mode 100644 index 0000000..b692060 --- /dev/null +++ b/scripts/process_csvs_batched_optimized.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +""" +Memory-optimized CSV processing for large file counts. + +Processes CSV files in batches to avoid OOM when concatenating thousands of lazy frames. + +Usage: + python process_csvs_batched_optimized.py \\ + --input-dir data/validation_runs/202308_50k/samples \\ + --output data/validation_runs/202308_50k/processed_combined.parquet \\ + --batch-size 5000 +""" + +import argparse +import logging +from pathlib import Path + +import polars as pl + +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +logger = logging.getLogger(__name__) + + +def process_csv_batch_to_parquet( + csv_files: list[Path], + batch_num: int, + temp_dir: Path, +) -> Path: + """ + Process a batch of CSV files and write to temporary parquet. + + Args: + csv_files: List of CSV file paths to process + batch_num: Batch number (for naming) + temp_dir: Directory for temporary batch files + + Returns: + Path to the batch parquet file + """ + from smart_meter_analysis.aws_loader import ( + COMED_SCHEMA, + add_time_columns_lazy, + transform_wide_to_long_lazy, + ) + + logger.info("Processing batch %d: %d files", batch_num, len(csv_files)) + + lazy_frames = [] + for i, csv_path in enumerate(csv_files, 1): + if i % 200 == 0: + logger.info(" Scanned %d/%d files in batch %d", i, len(csv_files), batch_num) + try: + lf = pl.scan_csv(str(csv_path), schema_overrides=COMED_SCHEMA, ignore_errors=True) + lf = transform_wide_to_long_lazy(lf) + lf = add_time_columns_lazy(lf, day_mode="calendar") + lazy_frames.append(lf) + except Exception as exc: + logger.warning("Failed to scan %s: %s", csv_path.name, exc) + + if not lazy_frames: + raise ValueError(f"No files successfully scanned in batch {batch_num}") + + # Write batch to temporary parquet + batch_output = temp_dir / f"batch_{batch_num:04d}.parquet" + lf_combined = pl.concat(lazy_frames, how="diagonal_relaxed") + lf_combined.sink_parquet(batch_output) + + logger.info(" Batch %d complete: %s", batch_num, batch_output) + return batch_output + + +def main(): + parser = argparse.ArgumentParser(description="Process CSV files in memory-safe batches") + parser.add_argument("--input-dir", type=Path, required=True, help="Directory containing CSV files") + parser.add_argument("--output", type=Path, required=True, help="Output parquet file path") + parser.add_argument("--batch-size", type=int, default=5000, help="Files per batch (default: 5000)") + + args = parser.parse_args() + + # Get all CSV files + csv_files = sorted(args.input_dir.glob("*.csv")) + logger.info("Found %d CSV files", len(csv_files)) + + if not csv_files: + logger.error("No CSV files found in %s", args.input_dir) + return 1 + + # Create temp directory + temp_dir = args.output.parent / "temp_batches" + temp_dir.mkdir(parents=True, exist_ok=True) + + # Process in batches + batch_files = [] + for batch_num, i in enumerate(range(0, len(csv_files), args.batch_size), 1): + batch = csv_files[i : i + args.batch_size] + batch_file = process_csv_batch_to_parquet(batch, batch_num, temp_dir) + batch_files.append(batch_file) + + # Concatenate all batches using streaming + logger.info("Concatenating %d batch files into final output...", len(batch_files)) + args.output.parent.mkdir(parents=True, exist_ok=True) + + lf_combined = pl.concat([pl.scan_parquet(str(f)) for f in batch_files], how="diagonal_relaxed") + lf_combined.sink_parquet(args.output) + + # Verify + row_count = pl.scan_parquet(args.output).select(pl.len()).collect()[0, 0] + logger.info("Success! Wrote %s records to %s", f"{row_count:,}", args.output) + + # Clean up temp files + logger.info("Cleaning up temporary batch files...") + for batch_file in batch_files: + batch_file.unlink() + temp_dir.rmdir() + + logger.info("Done!") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/diagnose_bg_density_v2.py b/tests/diagnose_bg_density_v2.py new file mode 100644 index 0000000..b36d397 --- /dev/null +++ b/tests/diagnose_bg_density_v2.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python3 +""" +Diagnostic: Check block-group sampling density for Stage 2 regression. + +This script analyzes whether your household sample is dense enough at the +block-group level to detect meaningful demographic patterns. + +Usage: + python diagnose_bg_density.py \ + --clusters data/clustering/results/cluster_assignments.parquet \ + --crosswalk data/reference/2023_comed_zip4_census_crosswalk.txt \ + --output diagnostics.txt +""" + +from __future__ import annotations + +import argparse +from pathlib import Path + +import polars as pl + + +def inspect_cluster_file(clusters_path: Path) -> None: + """Inspect what's actually in the cluster assignments file.""" + print("=" * 70) + print("CLUSTER FILE INSPECTION") + print("=" * 70) + + df = pl.read_parquet(clusters_path) + + print(f"\nFile: {clusters_path}") + print(f"Rows: {len(df):,}") + print(f"Columns: {df.columns}") + print("\nSchema:") + for col, dtype in df.schema.items(): + print(f" {col:30s} {dtype}") + + print("\nFirst few rows:") + print(df.head()) + + print("\n" + "=" * 70) + + +def load_and_join_to_blockgroups( + clusters_path: Path, + crosswalk_path: Path, +) -> pl.DataFrame: + """Join cluster assignments to block groups.""" + print("\nLoading cluster assignments...") + clusters = pl.read_parquet(clusters_path) + print(f" {len(clusters):,} household-day observations") + + # Check which ID columns are present + id_col = None + if "account_identifier" in clusters.columns: + id_col = "account_identifier" + elif "household_id" in clusters.columns: + id_col = "household_id" + elif "meter_id" in clusters.columns: + id_col = "meter_id" + else: + print(" ⚠️ WARNING: No household identifier column found!") + print(f" Available columns: {clusters.columns}") + id_col = None + + if id_col: + print(f" {clusters[id_col].n_unique():,} unique households (using column: {id_col})") + + if "zip_code" not in clusters.columns: + raise ValueError(f"'zip_code' column not found in {clusters_path}") + + if "cluster" not in clusters.columns: + raise ValueError(f"'cluster' column not found in {clusters_path}") + + print(f" {clusters['cluster'].n_unique()} clusters") + + print("\nLoading crosswalk...") + zip_codes = clusters["zip_code"].unique().to_list() + + crosswalk = ( + pl.scan_csv(crosswalk_path, separator="\t", infer_schema_length=10000) + .with_columns([ + (pl.col("Zip").cast(pl.Utf8).str.zfill(5) + "-" + pl.col("Zip4").cast(pl.Utf8).str.zfill(4)).alias( + "zip_code" + ), + pl.col("CensusKey2023").cast(pl.Utf8).str.zfill(15).str.slice(0, 12).alias("block_group_geoid"), + ]) + .filter(pl.col("zip_code").is_in(zip_codes)) + .select(["zip_code", "block_group_geoid"]) + .collect() + ) + + print(f" {crosswalk['zip_code'].n_unique():,} ZIP+4s matched") + + # Check for fan-out + fanout = crosswalk.group_by("zip_code").agg(pl.n_unique("block_group_geoid").alias("n_bg")) + max_fanout = fanout["n_bg"].max() + if max_fanout > 1: + pct_fanout = (fanout.filter(pl.col("n_bg") > 1).height / len(fanout)) * 100 + print(f" ⚠️ {pct_fanout:.1f}% of ZIP+4s map to multiple block groups (max={max_fanout})") + + print("\nJoining to block groups...") + df = clusters.join(crosswalk, on="zip_code", how="left") + + missing = df.filter(pl.col("block_group_geoid").is_null()).height + if missing > 0: + print(f" ⚠️ {missing:,} observations ({missing / len(df) * 100:.1f}%) missing block group") + + df = df.filter(pl.col("block_group_geoid").is_not_null()) + print(f" ✓ {len(df):,} observations across {df['block_group_geoid'].n_unique():,} block groups") + + # Store the ID column name for later use + df = df.with_columns(pl.lit(id_col).alias("_id_col_name")) + + return df + + +def diagnose_household_density(df: pl.DataFrame) -> dict: + """Check households per block group.""" + print("\n" + "=" * 70) + print("DIAGNOSTIC 1: HOUSEHOLD DENSITY PER BLOCK GROUP") + print("=" * 70) + + # Get the ID column name (stored during join) + id_col = df["_id_col_name"][0] if "_id_col_name" in df.columns else None + + if not id_col or id_col not in df.columns: + print("\n⚠️ WARNING: Cannot compute household density - no household ID column") + print(" Skipping this diagnostic.") + return {} + + hh_per_bg = df.group_by("block_group_geoid").agg(pl.col(id_col).n_unique().alias("n_households")) + + stats = { + "n_block_groups": hh_per_bg.height, + "mean_hh_per_bg": hh_per_bg["n_households"].mean(), + "median_hh_per_bg": hh_per_bg["n_households"].median(), + "min_hh_per_bg": hh_per_bg["n_households"].min(), + "max_hh_per_bg": hh_per_bg["n_households"].max(), + "p10_hh_per_bg": hh_per_bg["n_households"].quantile(0.10), + "p90_hh_per_bg": hh_per_bg["n_households"].quantile(0.90), + } + + print(f"\nBlock groups: {stats['n_block_groups']:,}") + print("Households per block group:") + print(f" Mean: {stats['mean_hh_per_bg']:.1f}") + print(f" Median: {stats['median_hh_per_bg']:.1f}") + print(f" Min: {stats['min_hh_per_bg']}") + print(f" Max: {stats['max_hh_per_bg']}") + print(f" P10: {stats['p10_hh_per_bg']:.1f}") + print(f" P90: {stats['p90_hh_per_bg']:.1f}") + + # Distribution + print("\nDistribution:") + dist = ( + hh_per_bg.with_columns( + pl.when(pl.col("n_households") == 1) + .then(pl.lit("1 household")) + .when(pl.col("n_households") == 2) + .then(pl.lit("2 households")) + .when(pl.col("n_households").is_between(3, 5)) + .then(pl.lit("3-5 households")) + .when(pl.col("n_households").is_between(6, 10)) + .then(pl.lit("6-10 households")) + .when(pl.col("n_households").is_between(11, 20)) + .then(pl.lit("11-20 households")) + .otherwise(pl.lit("20+ households")) + .alias("bucket") + ) + .group_by("bucket") + .agg(pl.len().alias("n_bg")) + .sort("n_bg", descending=True) + ) + + for row in dist.iter_rows(named=True): + pct = row["n_bg"] / stats["n_block_groups"] * 100 + print(f" {row['bucket']:20s}: {row['n_bg']:5,} BGs ({pct:5.1f}%)") + + # Assessment + print("\nASSESSMENT:") + if stats["median_hh_per_bg"] < 3: + print(" ❌ CRITICAL: Median < 3 households per block group") + print(" → Most block groups have too few households for stable cluster shares") + print(" → Stage 2 regression will have very weak signal") + elif stats["median_hh_per_bg"] < 10: + print(" ⚠️ WARNING: Median < 10 households per block group") + print(" → Cluster shares will be noisy") + print(" → Consider increasing sample size or aggregating to ZIP-level") + else: + print(" ✓ GOOD: Median ≥ 10 households per block group") + print(" → Should have reasonable signal for Stage 2") + + return stats + + +def diagnose_obs_density(df: pl.DataFrame) -> dict: + """Check household-day observations per block group.""" + print("\n" + "=" * 70) + print("DIAGNOSTIC 2: OBSERVATION DENSITY PER BLOCK GROUP") + print("=" * 70) + + obs_per_bg = df.group_by("block_group_geoid").agg(pl.len().alias("n_obs")) + + stats = { + "mean_obs_per_bg": obs_per_bg["n_obs"].mean(), + "median_obs_per_bg": obs_per_bg["n_obs"].median(), + "min_obs_per_bg": obs_per_bg["n_obs"].min(), + "max_obs_per_bg": obs_per_bg["n_obs"].max(), + } + + print("\nObservations (household-days) per block group:") + print(f" Mean: {stats['mean_obs_per_bg']:.1f}") + print(f" Median: {stats['median_obs_per_bg']:.1f}") + print(f" Min: {stats['min_obs_per_bg']}") + print(f" Max: {stats['max_obs_per_bg']}") + + # After Stage 2 filtering (≥50 obs, ≥2 clusters) + filtered = obs_per_bg.filter(pl.col("n_obs") >= 50) + n_filtered = filtered.height + pct_surviving = n_filtered / obs_per_bg.height * 100 if obs_per_bg.height > 0 else 0 + + print("\nAfter Stage 2 filtering (≥50 obs per BG):") + print(f" {n_filtered:,} block groups ({pct_surviving:.1f}%) survive") + + if pct_surviving < 20: + print("\n ⚠️ WARNING: <20% of block groups survive filtering") + print(" → You're throwing away most of your data") + print(" → Consider lowering threshold or increasing sample size") + + return stats + + +def diagnose_cluster_share_variance(df: pl.DataFrame) -> dict: + """Check variance in cluster shares across block groups.""" + print("\n" + "=" * 70) + print("DIAGNOSTIC 3: CLUSTER SHARE VARIANCE") + print("=" * 70) + + # Compute cluster shares per block group + bg_counts = df.group_by(["block_group_geoid", "cluster"]).agg(pl.len().alias("n_obs")) + + bg_totals = df.group_by("block_group_geoid").agg(pl.len().alias("total_obs")) + + shares = bg_counts.join(bg_totals, on="block_group_geoid").with_columns( + (pl.col("n_obs") / pl.col("total_obs")).alias("cluster_share") + ) + + # Pivot to wide format for variance calculation + wide = shares.pivot( + index="block_group_geoid", + columns="cluster", + values="cluster_share", + ).fill_null(0) + + cluster_cols = [c for c in wide.columns if c != "block_group_geoid"] + + print("\nCluster share variance across block groups:") + stats = {} + for col in sorted(cluster_cols): + if col in wide.columns: + var = wide[col].var() + mean = wide[col].mean() + stats[col] = {"mean": mean, "variance": var} + print(f" Cluster {col}: mean={mean:.3f}, variance={var:.4f}") + + if stats: + avg_var = sum(s["variance"] for s in stats.values()) / len(stats) + + print(f"\nAverage variance: {avg_var:.4f}") + + print("\nASSESSMENT:") + if avg_var < 0.01: + print(" ❌ CRITICAL: Variance < 0.01") + print(" → Cluster shares barely differ across block groups") + print(" → No demographic signal can be detected") + print(" → MUST increase sample size or change aggregation level") + elif avg_var < 0.02: + print(" ⚠️ WARNING: Variance < 0.02") + print(" → Weak signal; demographic effects will be hard to detect") + else: + print(" ✓ GOOD: Variance ≥ 0.02") + print(" → Sufficient variation for regression") + + return stats + + +def generate_recommendations( + hh_stats: dict, + obs_stats: dict, + share_stats: dict, +) -> None: + """Generate actionable recommendations.""" + print("\n" + "=" * 70) + print("RECOMMENDATIONS") + print("=" * 70) + + if not hh_stats: + print("\n⚠️ Could not assess household density (no ID column)") + print(" Assess based on observation density instead.") + return + + median_hh = hh_stats.get("median_hh_per_bg", 0) + + if median_hh < 3: + print("\n🔴 CRITICAL ISSUE: Sample too sparse for block-group analysis") + print("\nOptions:") + print(" 1. Increase household sample to 50k-100k+") + print(" 2. Switch to ZIP-level or ZIP+4-level aggregation") + print(" 3. Use stratified sampling (population-weighted by block group)") + print(" 4. Aggregate to county-level if only interested in broad patterns") + + elif median_hh < 10: + print("\n⚠️ WARNING: Marginal sample density") + print("\nOptions:") + print(" 1. Increase sample to 30k-50k households") + print(" 2. Consider ZIP-level aggregation for more stable estimates") + print(" 3. Use hierarchical modeling to pool information across similar BGs") + + else: + print("\n✓ Sample density looks reasonable") + print("\nOptional improvements:") + print(" 1. Still consider stratified sampling for better coverage") + print(" 2. Add more days if household-day counts are low") + + print("\n" + "=" * 70) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Diagnose block-group sampling density for Stage 2") + parser.add_argument("--clusters", type=Path, required=True, help="Path to cluster_assignments.parquet") + parser.add_argument("--crosswalk", type=Path, required=True, help="Path to ZIP+4 crosswalk file") + parser.add_argument("--inspect-only", action="store_true", help="Only inspect the cluster file schema and exit") + parser.add_argument("--output", type=Path, default=None, help="Optional: save report to file") + + args = parser.parse_args() + + # Inspect the cluster file first + inspect_cluster_file(args.clusters) + + if args.inspect_only: + return + + # Load and join + df = load_and_join_to_blockgroups(args.clusters, args.crosswalk) + + # Run diagnostics + hh_stats = diagnose_household_density(df) + obs_stats = diagnose_obs_density(df) + share_stats = diagnose_cluster_share_variance(df) + + # Recommendations + generate_recommendations(hh_stats, obs_stats, share_stats) + + # TODO: Save to file if requested + if args.output: + print(f"\nReport saved to: {args.output}") + + +if __name__ == "__main__": + main()