Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,5 @@ results/

# benchmark artificats
profiles/
docs/*.html
docs/index_files/
199 changes: 155 additions & 44 deletions analysis/clustering/euclidean_clustering.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
#!/usr/bin/env python3
"""
Phase 2: K-Means Clustering for Load Profile Analysis.

Clusters daily electricity usage profiles using standard Euclidean distance
to identify distinct consumption patterns.

Note: DTW (Dynamic Time Warping) is unnecessary here because all profiles
are aligned to the same 48 half-hourly time grid. Euclidean distance is
appropriate and much faster.

Pipeline:
1. Load daily profiles from Phase 1
2. Normalize profiles (optional but recommended)
Expand All @@ -17,7 +14,7 @@

Usage:
# Standard run (evaluates k=3-6 using silhouette on up to 10k samples)
python euclidean_clustering.py \\
python euclidean_clustering_fixed.py \\
--input data/clustering/sampled_profiles.parquet \\
--output-dir data/clustering/results \\
--k-range 3 6 \\
Expand All @@ -26,7 +23,7 @@
--silhouette-sample-size 10000

# Fixed k (no evaluation)
python euclidean_clustering.py \\
python euclidean_clustering_fixed.py \\
--input data/clustering/sampled_profiles.parquet \\
--output-dir data/clustering/results \\
--k 4 --normalize
Expand All @@ -51,6 +48,9 @@
)
logger = logging.getLogger(__name__)

DEFAULT_NORMALIZATION: str = "minmax"
DEFAULT_NORMALIZE: bool = True


def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]:
"""
Expand Down Expand Up @@ -81,47 +81,67 @@ def load_profiles(path: Path) -> tuple[np.ndarray, pl.DataFrame]:


def normalize_profiles(
X: np.ndarray,
method: str = "zscore",
) -> np.ndarray:
df: pl.DataFrame,
method: str = "minmax",
profile_col: str = "profile",
out_col: str | None = None,
) -> pl.DataFrame:
"""
Normalize profiles for clustering.

Args:
X: Profile array of shape (n_samples, n_timepoints)
method: Normalization method ('zscore', 'minmax', 'none')

Returns:
Normalized array
Normalize per-household-day profiles for clustering.

Parameters
----------
df : pl.DataFrame
Must contain a list column with the profile values (e.g. 48-dim vector).
method : {"none", "zscore", "minmax"}
- "none": return df unchanged
- "zscore": per-profile z-score: (x - mean) / std
- "minmax": per-profile min-max: (x - min) / (max - min)
profile_col : str
Name of the list column holding the raw profile.
out_col : str | None
If provided, write normalized profile to this column; otherwise overwrite
`profile_col` in-place.

Notes
-----
- Normalization is done per profile (per row), not globally.
- For degenerate profiles where max == min, we fall back to all zeros.
"""

if method == "none":
return X
return df

if profile_col not in df.columns:
raise ValueError(f"normalize_profiles: column '{profile_col}' not found in DataFrame")

logger.info("Normalizing profiles using %s method...", method)
target_col = out_col or profile_col

expr = pl.col(profile_col)

if method == "zscore":
# Per-profile z-score normalization
means = X.mean(axis=1, keepdims=True)
stds = X.std(axis=1, keepdims=True)
stds[stds == 0] = 1 # Avoid division by zero
X_norm = (X - means) / stds
mean_expr = expr.list.mean()
std_expr = expr.list.std(ddof=0)

normalized = (expr - mean_expr) / std_expr

# If std == 0 (flat profile), fall back to zeros
normalized = pl.when(std_expr != 0).then(normalized).otherwise(expr * 0.0)

elif method == "minmax":
# Per-profile min-max normalization
mins = X.min(axis=1, keepdims=True)
maxs = X.max(axis=1, keepdims=True)
ranges = maxs - mins
ranges[ranges == 0] = 1
X_norm = (X - mins) / ranges
else:
raise ValueError(f"Unknown normalization method: {method}")
min_expr = expr.list.min()
max_expr = expr.list.max()
range_expr = max_expr - min_expr

logger.info(
" Normalized data range: [%.2f, %.2f]",
X_norm.min(),
X_norm.max(),
)
normalized = (expr - min_expr) / range_expr

# If range == 0 (flat profile), fall back to zeros
normalized = pl.when(range_expr != 0).then(normalized).otherwise(expr * 0.0)

else:
raise ValueError(f"Unknown normalization method: {method!r}")

return X_norm
return df.with_columns(normalized.alias(target_col))


def evaluate_clustering(
Expand Down Expand Up @@ -457,6 +477,83 @@ def plot_elbow_curve(
logger.info(" Saved elbow curve: %s", output_path)


def analyze_weekday_weekend_distribution(
df: pl.DataFrame,
labels: np.ndarray,
) -> dict:
"""
Analyze weekday vs weekend distribution across clusters.

This diagnostic checks if certain clusters are dominated by weekdays
or weekends, which would suggest usage patterns are day-type dependent.

Args:
df: Original profile DataFrame with 'is_weekend' column
labels: Cluster assignments

Returns:
Dictionary with distribution statistics
"""
if "is_weekend" not in df.columns:
logger.warning(" No 'is_weekend' column found - skipping weekday/weekend analysis")
return {}

# Add cluster labels to dataframe
df_with_clusters = df.with_columns(pl.Series("cluster", labels))

# Calculate distribution
dist = (
df_with_clusters.group_by(["cluster", "is_weekend"])
.agg(pl.len().alias("count"))
.sort(["cluster", "is_weekend"])
)

# Calculate percentages per cluster
dist = dist.with_columns([(pl.col("count") / pl.col("count").sum().over("cluster") * 100).alias("pct")])

# Summary: % weekend by cluster
summary = (
df_with_clusters.group_by("cluster")
.agg([pl.len().alias("total"), (pl.col("is_weekend").sum() / pl.len() * 100).alias("pct_weekend")])
.sort("cluster")
)

logger.info("")
logger.info("=" * 70)
logger.info("WEEKDAY/WEEKEND DISTRIBUTION BY CLUSTER")
logger.info("=" * 70)

for row in summary.iter_rows(named=True):
cluster = row["cluster"]
total = row["total"]
pct_weekend = row["pct_weekend"]
pct_weekday = 100 - pct_weekend

logger.info(
" Cluster %d: %.1f%% weekday, %.1f%% weekend (n=%s)", cluster, pct_weekday, pct_weekend, f"{total:,}"
)

# Flag significant imbalances (>70% one type)
if pct_weekend > 70:
logger.warning(" ⚠️ Weekend-dominated cluster")
elif pct_weekday > 70:
logger.warning(" ⚠️ Weekday-dominated cluster")

# Overall distribution
overall_weekend_pct = float(df_with_clusters["is_weekend"].mean() * 100)
logger.info("")
logger.info(" Overall dataset: %.1f%% weekend, %.1f%% weekday", overall_weekend_pct, 100 - overall_weekend_pct)

# Chi-square test would go here if needed for formal significance testing
logger.info("=" * 70)

return {
"cluster_distribution": summary.to_dicts(),
"detailed_distribution": dist.to_dicts(),
"overall_weekend_pct": overall_weekend_pct,
}


def save_results(
df: pl.DataFrame,
labels: np.ndarray,
Expand Down Expand Up @@ -527,14 +624,14 @@ def main() -> None:
epilog="""
Examples:
# Standard run with k evaluation (silhouette on sample)
python euclidean_clustering.py \\
python euclidean_clustering_fixed.py \\
--input data/clustering/sampled_profiles.parquet \\
--output-dir data/clustering/results \\
--k-range 3 6 --find-optimal-k --normalize \\
--silhouette-sample-size 10000

# Fixed k (no evaluation)
python euclidean_clustering.py \\
python euclidean_clustering_fixed.py \\
--input data/clustering/sampled_profiles.parquet \\
--output-dir data/clustering/results \\
--k 4 --normalize
Expand Down Expand Up @@ -597,13 +694,14 @@ def main() -> None:
parser.add_argument(
"--normalize",
action="store_true",
help="Apply normalization to profiles",
default=DEFAULT_NORMALIZE,
help="Apply normalization to profiles (default: True)",
)
parser.add_argument(
"--normalize-method",
choices=["zscore", "minmax", "none"],
default="zscore",
help="Normalization method (default: zscore)",
default=DEFAULT_NORMALIZATION,
help="Normalization method (default: minmax)",
)

parser.add_argument(
Expand All @@ -622,9 +720,22 @@ def main() -> None:
# Load profiles
X, df = load_profiles(args.input)

logger.info("Loaded %s sampled profiles", f"{len(df):,}")

# Normalize if requested
if args.normalize:
X = normalize_profiles(X, method=args.normalize_method)
logger.info("Normalizing profiles per household-day (method=%s)", args.normalize_method)
df = normalize_profiles(
df,
method=args.normalize_method, # ✅ FIXED: was args.normalization_method
profile_col="profile",
out_col=None,
)
# ✅ CRITICAL FIX: Re-extract normalized profiles as numpy array
X = np.array(df["profile"].to_list(), dtype=np.float64)
logger.info(" Normalized data range: [%.2f, %.2f]", X.min(), X.max())
else:
logger.info("Profile normalization disabled (using raw kWh values).")

# Determine k
eval_results = None
Expand Down
Loading
Loading