From bea44560be7a665ddce357f9bf05440b74c34744 Mon Sep 17 00:00:00 2001 From: kohankhaki Date: Fri, 19 Sep 2025 11:49:39 -0400 Subject: [PATCH] added scripts for area level active learning. --- src/lbo_area.py | 203 +++++++++++++++++++++++++++++++++++++ src/run_lbo_area.py | 242 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 445 insertions(+) create mode 100644 src/lbo_area.py create mode 100644 src/run_lbo_area.py diff --git a/src/lbo_area.py b/src/lbo_area.py new file mode 100644 index 0000000..167b85d --- /dev/null +++ b/src/lbo_area.py @@ -0,0 +1,203 @@ +"""Run per-area active learning (LBO-based).""" + +from __future__ import annotations + +import logging +from collections import defaultdict +from pathlib import Path +from typing import Dict, List, Literal, Tuple + +import matplotlib.pyplot as plt +import torch + +from src.capability import Capability +from src.lbo import LBO, fit_lbo + + +Metric = Literal["mse", "ae"] +logger = logging.getLogger(__name__) + + +def group_by_area(caps: List[Capability]) -> Dict[str, List[Capability]]: + """Group capabilities by area.""" + buckets: Dict[str, List[Capability]] = defaultdict(list) + for c in caps: + if getattr(c, "area", None) is None: + raise ValueError(f"Capability {c.name} missing 'area'.") + buckets[c.area].append(c) + return buckets + + +def _cap_cost_num_tasks(cap: Capability) -> float: + return float(len(cap.get_tasks())) + + +def _sum_cost_num_tasks(caps: List[Capability]) -> float: + return sum(_cap_cost_num_tasks(c) for c in caps) + + +def _predict_cap_means( + lbo_model: LBO, caps: List[Capability], embedding_name: str +) -> torch.Tensor: + if not caps: + return torch.empty(0) + X = torch.stack([c.get_embedding(embedding_name) for c in caps]) + mean, _ = lbo_model.predict(X) + return mean + + +def _area_gt_mean(test_caps: List[Capability], subject_llm_name: str) -> float: + if not test_caps: + return float("nan") + vals = [float(c.scores[subject_llm_name]["mean"]) for c in test_caps] + return sum(vals) / max(1, len(vals)) + + +def _area_pred_mean( + lbo_model: LBO, test_caps: List[Capability], embedding_name: str +) -> float: + if not test_caps: + return float("nan") + means = _predict_cap_means(lbo_model, test_caps, embedding_name) + return float(torch.mean(means).item()) + + +def _err(gt: float, pred: float, metric: Metric) -> float: + if metric == "mse": + return (gt - pred) ** 2 + if metric == "ae": + return abs(gt - pred) + raise ValueError(f"Unsupported metric: {metric}") + + +def run_area_active_learning( + area_name: str, + train_caps_area: List[Capability], + initial_train_area: List[Capability], + pool_caps_area: List[Capability], + test_caps_area: List[Capability], + subject_llm_name: str, + embedding_name: str, + acquisition_function: str = "expected_variance_reduction", + num_lbo_iterations: int = 10, + metric: Metric = "mse", +) -> Tuple[List[Capability], Dict[str, List[float]]]: + """ + Run AL restricted to a single area; returns (selected_caps, curves). + + curves keys: error, avg_std, cum_cost, full_eval_cost_upper. + """ + if not initial_train_area: + if pool_caps_area: + initial_train_area = [pool_caps_area[0]] + pool_caps_area = pool_caps_area[1:] + else: + return [], { + "error": [], + "avg_std": [], + "cum_cost": [], + "full_eval_cost_upper": [0.0], + } + + lbo = fit_lbo( + capabilities=initial_train_area, + embedding_name=embedding_name, + subject_llm_name=subject_llm_name, + acquisition_function=acquisition_function, + ) + + gt = _area_gt_mean(test_caps_area, subject_llm_name) + if len(test_caps_area) > 0: + pred0 = _area_pred_mean(lbo, test_caps_area, embedding_name) + X_test = torch.stack([c.get_embedding(embedding_name) for c in test_caps_area]) + _, std0 = lbo.predict(X_test) + avg_std0 = float(torch.mean(std0).item()) + base_err = _err(gt, pred0, metric) + else: + X_test = None + avg_std0 = float("nan") + base_err = float("nan") + + full_eval_cost_upper = _sum_cost_num_tasks(train_caps_area) + + curves: Dict[str, List[float]] = { + "error": [base_err], + "avg_std": [avg_std0], + "cum_cost": [0.0], + "full_eval_cost_upper": [full_eval_cost_upper], + } + + selected_caps: List[Capability] = [] + cum_cost = 0.0 + + pool_x = ( + torch.stack([c.get_embedding(embedding_name) for c in pool_caps_area]) + if pool_caps_area + else None + ) + iters = min(num_lbo_iterations, len(pool_caps_area)) + for i in range(iters): + logger.info(f"[{area_name}] Iter {i} of {iters}") + if pool_x is None or pool_x.shape[0] == 0: + break + + idx, x_sel = lbo.select_next_point(pool_x) + chosen = pool_caps_area[idx] + y_sel = float(chosen.scores[subject_llm_name]["mean"]) + + pool_caps_area.pop(idx) + if pool_x.shape[0] > 1: + pool_x = torch.cat([pool_x[:idx], pool_x[idx + 1 :]], dim=0) + else: + pool_x = None + lbo.update(x_sel, torch.tensor([y_sel])) + + selected_caps.append(chosen) + cum_cost += _cap_cost_num_tasks(chosen) + + if len(test_caps_area) > 0 and X_test is not None: + pred_iter = _area_pred_mean(lbo, test_caps_area, embedding_name) + _, std_iter = lbo.predict(X_test) + avg_std_iter = float(torch.mean(std_iter).item()) + err_iter = _err(gt, pred_iter, metric) + else: + avg_std_iter = float("nan") + err_iter = float("nan") + + curves["error"].append(err_iter) + curves["avg_std"].append(avg_std_iter) + curves["cum_cost"].append(cum_cost) + + return selected_caps, curves + + +def plot_single_area_curves( + area: str, curves: Dict[str, List[float]], outdir: str | Path +) -> None: + """Plot the curves for a single area.""" + out = Path(outdir) + out.mkdir(parents=True, exist_ok=True) + + plt.figure() + plt.plot(curves["error"], marker="o") + plt.xlabel("AL iteration") + plt.ylabel("Error") + plt.title(f"{area} — Error vs Iteration") + plt.grid(True, alpha=0.4) + plt.tight_layout() + plt.savefig(out / f"{area}_error_curve.png", dpi=200) + plt.close() + + plt.figure() + plt.plot(curves["cum_cost"], marker="o", label="Cumulative cost (#tasks)") + if curves.get("full_eval_cost_upper"): + ub = curves["full_eval_cost_upper"][0] + plt.axhline(y=ub, linestyle="--", label="Full evaluation (upper bound)") + plt.xlabel("AL iteration") + plt.ylabel("Cost (#tasks)") + plt.title(f"{area} — Cumulative Cost vs Iteration") + plt.legend() + plt.grid(True, alpha=0.4) + plt.tight_layout() + plt.savefig(out / f"{area}_cost_curve.png", dpi=200) + plt.close() diff --git a/src/run_lbo_area.py b/src/run_lbo_area.py new file mode 100644 index 0000000..0508f7c --- /dev/null +++ b/src/run_lbo_area.py @@ -0,0 +1,242 @@ +"""Per-area AL orchestrator.""" + +from __future__ import annotations + +import json +import logging +import os +from typing import Any, Dict, List, Literal, Tuple + +import hydra +from omegaconf import DictConfig +from tqdm import tqdm + +from src.capability import Capability +from src.lbo_area import ( + group_by_area, + plot_single_area_curves, + run_area_active_learning, +) +from src.utils import constants +from src.utils.capability_discovery_utils import select_complete_capabilities +from src.utils.capability_management_utils import get_previous_capabilities +from src.utils.data_utils import check_cfg, get_run_id +from src.utils.embedding_utils import ( + apply_dimensionality_reduction, + apply_dimensionality_reduction_to_test_capabilities, + generate_and_set_capabilities_embeddings, +) +from src.utils.lbo_utils import get_lbo_train_set + + +Metric = Literal["mse", "ae"] + +logger = logging.getLogger(__name__) + + +def _prepare_capabilities(cfg: DictConfig, run_id: str) -> List[Capability]: + """Prepare capabilities for active learning.""" + base_capability_dir = os.path.join( + constants.BASE_ARTIFACTS_DIR, + f"capabilities_{run_id}", + cfg.capabilities_cfg.domain, + ) + + capabilities = get_previous_capabilities( + capability_dir=base_capability_dir, + score_dir_suffix=run_id, + ) + logger.info(f"Loaded {len(capabilities)} capabilities.") + + capabilities = select_complete_capabilities( + capabilities=capabilities, + strict=False, + num_tasks_lower_bound=int( + cfg.capabilities_cfg.num_gen_tasks_per_capability + * (1 - cfg.capabilities_cfg.num_gen_tasks_buffer) + ), + ) + logger.info(f"Selected {len(capabilities)} complete capabilities.") + + generate_and_set_capabilities_embeddings( + capabilities=capabilities, + embedding_model_name=cfg.embedding_cfg.embedding_model, + embed_dimensions=cfg.embedding_cfg.embedding_size, + ) + + for cap in tqdm(capabilities, desc="Loading capability scores"): + cap.load_scores(subject_llm_name=cfg.subject_llm.name) + + missing_area = [c.name for c in capabilities if getattr(c, "area", None) is None] + if missing_area: + raise ValueError(f"Capabilities missing 'area' (first 5): {missing_area[:5]}") + + return capabilities + + +def _global_split( + cfg: DictConfig, capabilities: List[Capability] +) -> Tuple[List[Capability], List[Capability]]: + """Global split of capabilities into train and test sets.""" + train_caps, test_caps = get_lbo_train_set( + input_data=capabilities, + train_frac=cfg.lbo_cfg.train_frac, + stratified=cfg.capabilities_cfg.method == "hierarchical", + input_categories=[c.area for c in capabilities], + seed=cfg.exp_cfg.seed, + ) + return train_caps, test_caps + + +def _apply_dr( + cfg: DictConfig, + train_caps: List[Capability], + extra_caps_for_fit: List[Capability], + test_caps: List[Capability], +) -> str: + """Apply dimensionality reduction.""" + method = cfg.dimensionality_reduction_cfg.no_discovery_reduced_dimensionality_method + size = cfg.dimensionality_reduction_cfg.no_discovery_reduced_dimensionality_size + + if method == "t-sne": + _ = apply_dimensionality_reduction( + capabilities=train_caps + extra_caps_for_fit + test_caps, + dim_reduction_method_name=method, + output_dimension_size=size, + embedding_model_name=cfg.embedding_cfg.embedding_model, + random_seed=cfg.exp_cfg.seed, + ) + else: + dim_model = apply_dimensionality_reduction( + capabilities=train_caps + extra_caps_for_fit, + dim_reduction_method_name=method, + output_dimension_size=size, + embedding_model_name=cfg.embedding_cfg.embedding_model, + random_seed=cfg.exp_cfg.seed, + ) + apply_dimensionality_reduction_to_test_capabilities( + capabilities=test_caps, + dim_reduction_method=dim_model, + embedding_model_name=cfg.embedding_cfg.embedding_model, + ) + + logger.info(f"Dimensionality reduction applied: method={method}, size={size}") + return str(method) + + +@hydra.main(version_base=None, config_path="cfg", config_name="run_cfg") +def main(cfg: DictConfig) -> None: + """Per-area AL orchestrator.""" + check_cfg(cfg, logger) + run_id = get_run_id(cfg) + + capabilities = _prepare_capabilities(cfg, run_id) + + global_train, global_test = _global_split(cfg, capabilities) + + num_seed_per_area = getattr(cfg.lbo_cfg, "num_initial_train_per_area", 1) + train_by_area = group_by_area(global_train) + test_by_area = group_by_area(global_test) + logger.info(f"Global train: {len(global_train)}, Global test: {len(global_test)}") + + per_area_sets: Dict[str, Dict[str, List[Capability]]] = {} + for area, train_list in train_by_area.items(): + test_list = test_by_area.get(area, []) + k = min(max(1, int(num_seed_per_area)), len(train_list)) + seeds = train_list[:k] + pool = train_list[k:] + per_area_sets[area] = { + "train_all": train_list, + "seed": seeds, + "pool": pool, + "test": test_list, + } + + all_pools = [c for v in per_area_sets.values() for c in v["pool"]] + embedding_name = _apply_dr(cfg, global_train, all_pools, global_test) + + results_dir = os.path.join(constants.BASE_ARTIFACTS_DIR, "per_area") + per_area_dir = os.path.join(results_dir, "per_area") + os.makedirs(per_area_dir, exist_ok=True) + + iters_per_area = getattr(cfg.lbo_cfg, "num_lbo_runs_per_area", None) + if iters_per_area is None: + n_areas = max(1, len(per_area_sets)) + iters_per_area = max(1, int(cfg.lbo_cfg.num_lbo_runs // n_areas)) + + metric: Metric = getattr(cfg.lbo_cfg, "area_error_metric", "mse") + + af_tag = "ALM" if cfg.lbo_cfg.acquisition_function == "variance" else "ALC" + summary: Dict[str, Dict[str, Any]] = {} + + for area, packs in per_area_sets.items(): + seeds = packs["seed"] + pool = packs["pool"] + train_all = packs["train_all"] + test = packs["test"] + + if len(train_all) == 0: + logger.warning(f"[{area}] No train capabilities; skipping.") + continue + logger.info( + f"[{area}] seeds={len(seeds)}, pool={len(pool)}, test={len(test)}, " + f"requested_iters={iters_per_area}" + ) + selected, curves = run_area_active_learning( + area_name=area, + train_caps_area=train_all, + initial_train_area=seeds, + pool_caps_area=pool, + test_caps_area=test, + subject_llm_name=cfg.subject_llm.name, + embedding_name=embedding_name, + acquisition_function=cfg.lbo_cfg.acquisition_function, + num_lbo_iterations=iters_per_area, + metric=metric, + ) + + out_json = os.path.join(per_area_dir, f"{area}_al_results.json") + with open(out_json, "w") as f: + json.dump( + { + "run_id": run_id, + "area": area, + "metric": metric, + "selected_capabilities": [c.name for c in selected], + "initial_seed_capabilities": [c.name for c in seeds], + "test_capabilities": [c.name for c in test], + "train_caps_in_area": [c.name for c in train_all], + "error": curves["error"], + "avg_std": curves["avg_std"], + "cum_cost": curves["cum_cost"], + "full_eval_cost_upper": curves["full_eval_cost_upper"][0], + }, + f, + indent=4, + ) + + plot_single_area_curves(area, curves, outdir=per_area_dir) + + summary[area] = { + "json": out_json, + "error_png": os.path.join(per_area_dir, f"{area}_error_curve.png"), + "cost_png": os.path.join(per_area_dir, f"{area}_cost_curve.png"), + "selected_count": len(selected), + "seed_count": len(seeds), + "pool_count": len(pool), + "test_count": len(test), + } + + index_json = os.path.join( + per_area_dir, f"per_area_index_{run_id}_{cfg.subject_llm.name}_{af_tag}.json" + ) + with open(index_json, "w") as f: + json.dump(summary, f, indent=4) + + logger.info(f"Per-area AL finished. Results in: {per_area_dir}") + logger.info(f"Index: {index_json}") + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + main()