From bb0fd90182ce65888763b16685aab46c137f633d Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Sun, 10 Aug 2025 00:47:34 -0700 Subject: [PATCH 01/12] Add GEPA Draft --- dspy/teleprompt/__init__.py | 2 + dspy/teleprompt/gepa.py | 424 ++++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 3 files changed, 427 insertions(+) create mode 100644 dspy/teleprompt/gepa.py diff --git a/dspy/teleprompt/__init__.py b/dspy/teleprompt/__init__.py index a5bc4da7fc..a11c64fa2d 100644 --- a/dspy/teleprompt/__init__.py +++ b/dspy/teleprompt/__init__.py @@ -4,6 +4,7 @@ from dspy.teleprompt.bootstrap_finetune import BootstrapFinetune from dspy.teleprompt.copro_optimizer import COPRO from dspy.teleprompt.ensemble import Ensemble +from dspy.teleprompt.gepa import GEPA from dspy.teleprompt.infer_rules import InferRules from dspy.teleprompt.knn_fewshot import KNNFewShot from dspy.teleprompt.mipro_optimizer_v2 import MIPROv2 @@ -20,6 +21,7 @@ "BootstrapFinetune", "COPRO", "Ensemble", + "GEPA", "KNNFewShot", "MIPROv2", "BootstrapFewShotWithRandomSearch", diff --git a/dspy/teleprompt/gepa.py b/dspy/teleprompt/gepa.py new file mode 100644 index 0000000000..45427c2b27 --- /dev/null +++ b/dspy/teleprompt/gepa.py @@ -0,0 +1,424 @@ +import logging +import os +import random +from dataclasses import dataclass +from typing import Any, Callable, Dict, List, Optional, Set + +from dspy.dsp.utils.settings import settings +from dspy.adapters.chat_adapter import ChatAdapter +from dspy.adapters.types import History +from dspy.clients.lm import LM +from dspy.evaluate import Evaluate +from dspy.primitives import Example, Module + +from gepa.gepa import EvaluationBatch, GEPAAdapter, GEPAResult, optimize +from .teleprompt import Teleprompter + +logger = logging.getLogger(__name__) + +class LoggerAdapter: + def __init__(self, logger: logging.Logger): + self.logger = logger + + def log(self, x: str): + self.logger.info(x) + +@dataclass(frozen=True) +class DspyGEPAResult: + """ + Additional data related to the GEPA run. + + Fields: + - candidates: list of proposed candidates (component_name -> component_text) + - parents: lineage info; for each candidate i, parents[i] is a list of parent indices or None + - val_aggregate_scores: per-candidate aggregate score on the validation set (higher is better) + - val_subscores: per-candidate per-instance scores on the validation set (len == num_val_instances) + - per_val_instance_best_candidates: for each val instance t, a set of candidate indices achieving the current best score on t + - discovery_eval_counts: number of metric calls accumulated up to the discovery of each candidate + + - total_metric_calls: total number of metric calls made across the run + - num_full_val_evals: number of full validation evaluations performed + - run_dir: where artifacts were written (if any) + - seed: RNG seed for reproducibility (if known) + + - best_idx: candidate index with the highest val_aggregate_scores + - best_candidate: the program text mapping for best_idx + """ + # Core data + candidates: List[Module] + parents: List[List[Optional[int]]] + val_aggregate_scores: List[float] + val_subscores: List[List[float]] + per_val_instance_best_candidates: List[Set[int]] + discovery_eval_counts: List[int] + + # Run metadata (optional) + total_metric_calls: Optional[int] = None + num_full_val_evals: Optional[int] = None + run_dir: Optional[str] = None + seed: Optional[int] = None + + @property + def best_idx(self) -> int: + scores = self.val_aggregate_scores + return max(range(len(scores)), key=lambda i: scores[i]) + + @property + def best_candidate(self) -> Dict[str, str]: + return self.candidates[self.best_idx] + + def to_dict(self) -> Dict[str, Any]: + cands = [ + {k: v for k, v in cand.items()} + for cand in self.candidates + ] + + return dict( + candidates=cands, + parents=self.parents, + val_aggregate_scores=self.val_aggregate_scores, + val_subscores=self.val_subscores, + per_val_instance_best_candidates=[list(s) for s in self.per_val_instance_best_candidates], + discovery_eval_counts=self.discovery_eval_counts, + total_metric_calls=self.total_metric_calls, + num_full_val_evals=self.num_full_val_evals, + run_dir=self.run_dir, + seed=self.seed, + best_idx=self.best_idx, + ) + + def from_gepa_result(gepa_result: GEPAResult, adapter: "DspyAdapter") -> "DspyGEPAResult": + return DspyGEPAResult( + candidates=[adapter.build_program(c) for c in gepa_result.candidates], + parents=gepa_result.parents, + val_aggregate_scores=gepa_result.val_aggregate_scores, + val_subscores=gepa_result.val_subscores, + per_val_instance_best_candidates=gepa_result.per_val_instance_best_candidates, + discovery_eval_counts=gepa_result.discovery_eval_counts, + total_metric_calls=gepa_result.total_metric_calls, + num_full_val_evals=gepa_result.num_full_val_evals, + run_dir=gepa_result.run_dir, + seed=gepa_result.seed, + ) + +class DspyAdapter(GEPAAdapter): + def __init__( + self, + student_module, + metric_fn: Callable, + feedback_map: Dict[str, Callable], + failure_score=0.0, + num_threads: Optional[int] = None, + add_format_failure_as_feedback: bool = False, + rng: Optional[random.Random] = None, + ): + import dspy + self.student = student_module + self.metric_fn = metric_fn + self.feedback_map = feedback_map + self.failure_score = failure_score + self.num_threads = num_threads or os.cpu_count() + self.add_format_failure_as_feedback = add_format_failure_as_feedback + self.rng = rng or random.Random(0) + + # Cache predictor names/signatures + self.named_predictors = list(self.student.named_predictors()) + + def build_program(self, candidate: Dict[str, str]): + new_prog = self.student.deepcopy() + for name, pred in new_prog.named_predictors(): + if name in candidate: + pred.signature = pred.signature.with_instructions(candidate[name]) + return new_prog + + def evaluate(self, batch, candidate, capture_traces=False): + import dspy + program = self.build_program(candidate) + + if capture_traces: + # bootstrap_trace_data-like flow with trace capture + from .bootstrap_finetune import bootstrap_trace_data + trajs = bootstrap_trace_data( + program=program, + dataset=batch, + metric=self.metric_fn, + num_threads=self.num_threads, + raise_on_error=False, + capture_failed_parses=True, + failure_score=self.failure_score, + format_failure_score=self.failure_score, + ) + scores = [] + outputs = [] + for t in trajs: + outputs.append(t['prediction']) + if hasattr(t['prediction'], '__class__') and t.get('score') is None: + scores.append(self.failure_score) + else: + scores.append(t['score']) + return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajs) + else: + evaluator = Evaluate( + devset=batch, + metric=self.metric_fn, + num_threads=self.num_threads, + return_all_scores=True, + return_outputs=True, + failure_score=self.failure_score, + provide_traceback=True, + max_errors=len(batch) * 100 + ) + res = evaluator(program) + outputs = [r[1] for r in res.results] + scores = [r[2] for r in res.results] + return EvaluationBatch(outputs=outputs, scores=scores, trajectories=None) + + def make_reflective_dataset(self, candidate, eval_batch, components_to_update): + import dspy + from .bootstrap_finetune import FailedPrediction + program = self.build_program(candidate) + + ret_d: Dict[str, List[Dict[str, Any]]] = {} + for pred_name in components_to_update: + feedback_fn = self.feedback_map[pred_name] + module = None + for name, m in program.named_predictors(): + if name == pred_name: + module = m + break + assert module is not None + + items: List[Dict[str, Any]] = [] + for data in eval_batch.trajectories or []: + trace = data["trace"] + example = data["example"] + prediction = data["prediction"] + + trace_instances = [t for t in trace if t[0].signature.equals(module.signature)] + if not self.add_format_failure_as_feedback: + trace_instances = [t for t in trace_instances if not isinstance(t[2], FailedPrediction)] + if len(trace_instances) == 0: + continue + + selected = None + for t in trace_instances: + if isinstance(t[2], FailedPrediction): + selected = t + break + + if selected is None: + if isinstance(prediction, FailedPrediction): + continue + selected = self.rng.choice(trace_instances) + + inputs = selected[1] + outputs = selected[2] + + new_inputs = {} + new_outputs = {} + + contains_history = False + history_key_name = None + for input_key, input_val in inputs.items(): + if isinstance(input_val, History): + contains_history = True + assert history_key_name is None + history_key_name = input_key + + if contains_history: + s = "```json\n" + for i, message in enumerate(inputs[history_key_name].messages): + s += f" {i}: {message}\n" + s += "```" + new_inputs["Context"] = s + + for input_key, input_val in inputs.items(): + if contains_history and input_key == history_key_name: + continue + new_inputs[input_key] = str(input_val) + + if isinstance(outputs, FailedPrediction): + s = "Couldn't parse the output as per the expected output format. The model's raw response was:\n" + s += "```\n" + s += outputs.completion_text + "\n" + s += "```\n\n" + new_outputs = s + else: + for output_key, output_val in outputs.items(): + new_outputs[output_key] = str(output_val) + + d = {"Inputs": new_inputs, "Generated Outputs": new_outputs} + if isinstance(outputs, FailedPrediction): + adapter = ChatAdapter() + structure_instruction = "" + for dd in adapter.format(module.signature, [], {}): + structure_instruction += dd["role"] + ": " + dd["content"] + "\n" + d['Feedback'] = "Your output failed to parse. Follow this structure:\n" + structure_instruction + # d['score'] = self.failure_score + else: + fb = feedback_fn( + predictor_output=outputs, + predictor_inputs=inputs, + module_inputs=example, + module_outputs=prediction, + captured_trace=trace, + ) + d['Feedback'] = fb["feedback_text"] + # d['score'] = fb["feedback_score"] + items.append(d) + + if len(items) == 0: + # raise Exception(f"No valid predictions found for module {module.signature}.") + continue + ret_d[pred_name] = items + + if len(ret_d) == 0: + raise Exception(f"No valid predictions found for any module.") + + return ret_d + +class GEPA(Teleprompter): + def __init__( + self, + named_predictor_to_feedback_fn_map: Dict[str, Callable], + metric: Callable, + run_dir: str, + run_linearized_gepa: bool = True, # kept for API compatibility + num_threads: Optional[int] = None, + num_iters: Optional[int] = None, + failure_score: float = 0.0, + perfect_score: float = 1.0, + teacher_lm: Optional[LM] = None, + use_wandb: bool = False, + wandb_api_key: Optional[str] = None, + max_evals_per_trainval_instance: Optional[int] = None, + seed: int = 0, + skip_perfect_score: bool = True, + use_merge: bool = False, + max_merge_invocations: int = 5, + num_dspy_examples_per_gepa_step: int = 3, + max_metric_calls: Optional[int] = None, + add_format_failure_as_feedback: bool = False, + track_stats: bool = False, + ): + # Exactly one of the three budget controls must be provided + assert ( + (max_metric_calls is not None) + + (max_evals_per_trainval_instance is not None) + + (num_iters is not None) + == 1 + ), ( + "Exactly one of max_metric_calls, max_evals_per_trainval_instance or num_iters must be set. " + f"You set max_metric_calls={max_metric_calls}, " + f"max_evals_per_trainval_instance={max_evals_per_trainval_instance}, " + f"num_iters={num_iters}" + ) + + self.named_predictor_to_feedback_fn_map = named_predictor_to_feedback_fn_map + self.metric_fn = metric + self.run_dir = run_dir + self.run_linearized_gepa = run_linearized_gepa + + self.num_threads = num_threads or os.cpu_count() + self.num_iters = num_iters + self.max_evals_per_trainval_instance = max_evals_per_trainval_instance + self.max_metric_calls = max_metric_calls + + self.failure_score = failure_score + self.perfect_score = perfect_score + self.teacher_lm = teacher_lm + self.use_wandb = use_wandb + self.wandb_api_key = wandb_api_key + + self.seed = seed + self.skip_perfect_score = skip_perfect_score + self.use_merge = use_merge + self.max_merge_invocations = max_merge_invocations + + self.num_dspy_examples_per_gepa_step = num_dspy_examples_per_gepa_step + self.add_format_failure_as_feedback = add_format_failure_as_feedback + self.track_stats = track_stats + + self._rng = random.Random(seed) + + def _resolve_budget(self, train_n: int, val_n: int) -> Dict[str, Optional[int]]: + """ + Normalize the 3 user-facing budget options to the engine's (num_iters | max_metric_calls). + If max_evals_per_trainval_instance is set, approximate the global budget as + (train_n + val_n) * max_evals_per_trainval_instance. + """ + if self.max_metric_calls is not None: + return dict(num_iters=None, max_metric_calls=self.max_metric_calls) + + if self.max_evals_per_trainval_instance is not None: + # Simple, conservative mapping to the engine's total eval counter + # Includes both minibatch evals and full-valset evals + total_instances = train_n + val_n + return dict( + num_iters=None, + max_metric_calls=self.max_evals_per_trainval_instance * max(1, total_instances), + ) + + # Fallback to num_iters if provided + return dict(num_iters=self.num_iters, max_metric_calls=None) + + def compile( + self, + student: Module, + *, + trainset: List[Example], + teacher: Optional[Module] = None, + valset: Optional[List[Example]] = None, + **kwargs, + ) -> Module: + assert trainset is not None and len(trainset) > 0, "Trainset must be provided and non-empty" + assert teacher is None, "Teacher is not supported in DspyGEPA yet." + + valset = valset or trainset + + # Build the DSPy adapter that encapsulates evaluation, trace capture, feedback extraction, and instruction proposal + adapter = DspyAdapter( + student_module=student, + metric_fn=self.metric_fn, + feedback_map=self.named_predictor_to_feedback_fn_map, + failure_score=self.failure_score, + num_threads=self.num_threads, + add_format_failure_as_feedback=self.add_format_failure_as_feedback, + rng=self._rng, + ) + + # Prepare engine budgets + budgets = self._resolve_budget(train_n=len(trainset), val_n=len(valset)) + + teacher_lm = lambda x: (self.teacher_lm or settings.lm or student.get_lm())(x)[0] + + # Instantiate GEPA with the simpler adapter-based API + base_program = {name: pred.signature.instructions for name, pred in student.named_predictors()} + gepa_result: GEPAResult = optimize( + base_program=base_program, + trainset=trainset, + adapter=adapter, + valset=valset, + logger=LoggerAdapter(logger), + run_dir=self.run_dir, + teacher_lm=teacher_lm, + candidate_selection_strategy="pareto", + num_iters=budgets["num_iters"], + perfect_score=self.perfect_score, + use_wandb=self.use_wandb, + wandb_api_key=self.wandb_api_key, + seed=self.seed, + skip_perfect_score=self.skip_perfect_score, + use_merge=self.use_merge, + max_merge_invocations=self.max_merge_invocations, + num_examples_per_gepa_step=self.num_dspy_examples_per_gepa_step, + max_metric_calls=budgets["max_metric_calls"], + ) + + new_prog = adapter.build_program(gepa_result.best_candidate) + + if self.track_stats: + dspy_gepa_result = DspyGEPAResult.from_gepa_result(gepa_result, adapter) + setattr(new_prog, "dspy_gepa_result", dspy_gepa_result) + + return new_prog diff --git a/pyproject.toml b/pyproject.toml index 6b456c67fa..100b561a70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,7 @@ dependencies = [ "cloudpickle>=3.0.0", "rich>=13.7.1", "numpy>=1.26.0", + "gepa==0.0.1" ] [project.optional-dependencies] From 8dffdfdcf17d68790ff632c4cc3998075da37c4e Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Sun, 10 Aug 2025 00:50:11 -0700 Subject: [PATCH 02/12] Add GEPA paper to README --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 8bd03a3cf9..b97cdec547 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ If you're looking to understand the framework, please go to the [DSPy Docs at ds If you're looking to understand the underlying research, this is a set of our papers: +**[Jul'25] [GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning](https://arxiv.org/abs/2507.19457)** **[Jun'24] [Optimizing Instructions and Demonstrations for Multi-Stage Language Model Programs](https://arxiv.org/abs/2406.11695)** **[Oct'23] [DSPy: Compiling Declarative Language Model Calls into Self-Improving Pipelines](https://arxiv.org/abs/2310.03714)** [Jul'24] [Fine-Tuning and Prompt Optimization: Two Great Steps that Work Better Together](https://arxiv.org/abs/2407.10930) From 908130749b54f1bbceb62605517245e846bb950e Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Sun, 10 Aug 2025 21:02:30 -0700 Subject: [PATCH 03/12] Improve GEPA Interface --- dspy/teleprompt/__init__.py | 2 +- dspy/teleprompt/bootstrap_finetune.py | 12 +- dspy/teleprompt/gepa.py | 424 -------------------------- dspy/teleprompt/gepa/gepa.py | 416 +++++++++++++++++++++++++ dspy/teleprompt/gepa/gepa_utils.py | 229 ++++++++++++++ 5 files changed, 656 insertions(+), 427 deletions(-) delete mode 100644 dspy/teleprompt/gepa.py create mode 100644 dspy/teleprompt/gepa/gepa.py create mode 100644 dspy/teleprompt/gepa/gepa_utils.py diff --git a/dspy/teleprompt/__init__.py b/dspy/teleprompt/__init__.py index a11c64fa2d..707d505f19 100644 --- a/dspy/teleprompt/__init__.py +++ b/dspy/teleprompt/__init__.py @@ -4,7 +4,7 @@ from dspy.teleprompt.bootstrap_finetune import BootstrapFinetune from dspy.teleprompt.copro_optimizer import COPRO from dspy.teleprompt.ensemble import Ensemble -from dspy.teleprompt.gepa import GEPA +from dspy.teleprompt.gepa.gepa import GEPA from dspy.teleprompt.infer_rules import InferRules from dspy.teleprompt.knn_fewshot import KNNFewShot from dspy.teleprompt.mipro_optimizer_v2 import MIPROv2 diff --git a/dspy/teleprompt/bootstrap_finetune.py b/dspy/teleprompt/bootstrap_finetune.py index b8d531f798..730b29b159 100644 --- a/dspy/teleprompt/bootstrap_finetune.py +++ b/dspy/teleprompt/bootstrap_finetune.py @@ -1,7 +1,7 @@ import logging from collections import defaultdict from dataclasses import dataclass -from typing import Any, Callable +from typing import Any, Callable, Dict, List, Tuple, TypedDict import dspy from dspy.adapters.base import Adapter @@ -9,6 +9,7 @@ from dspy.clients.lm import LM from dspy.clients.utils_finetune import infer_data_format from dspy.dsp.utils.settings import settings +from dspy.primitives.prediction import Prediction from dspy.evaluate.evaluate import Evaluate from dspy.predict.predict import Predict from dspy.primitives.example import Example @@ -213,6 +214,13 @@ class FailedPrediction: completion_text: str format_reward: float | None = None +@dataclass +class TraceData(TypedDict): + example_ind: int + example: Example + prediction: Prediction + trace: List[Tuple[Any, Dict[str, Any], Prediction]] + score: float | None def bootstrap_trace_data( program: Module, @@ -224,7 +232,7 @@ def bootstrap_trace_data( failure_score: float = 0, format_failure_score: float = -1, log_format_failures: bool = False, -) -> list[dict[str, Any]]: +) -> list[TraceData]: # Return a list of dicts with the following keys: example_ind, example, prediction, trace, and score # (if metric != None) evaluator = Evaluate( diff --git a/dspy/teleprompt/gepa.py b/dspy/teleprompt/gepa.py deleted file mode 100644 index 45427c2b27..0000000000 --- a/dspy/teleprompt/gepa.py +++ /dev/null @@ -1,424 +0,0 @@ -import logging -import os -import random -from dataclasses import dataclass -from typing import Any, Callable, Dict, List, Optional, Set - -from dspy.dsp.utils.settings import settings -from dspy.adapters.chat_adapter import ChatAdapter -from dspy.adapters.types import History -from dspy.clients.lm import LM -from dspy.evaluate import Evaluate -from dspy.primitives import Example, Module - -from gepa.gepa import EvaluationBatch, GEPAAdapter, GEPAResult, optimize -from .teleprompt import Teleprompter - -logger = logging.getLogger(__name__) - -class LoggerAdapter: - def __init__(self, logger: logging.Logger): - self.logger = logger - - def log(self, x: str): - self.logger.info(x) - -@dataclass(frozen=True) -class DspyGEPAResult: - """ - Additional data related to the GEPA run. - - Fields: - - candidates: list of proposed candidates (component_name -> component_text) - - parents: lineage info; for each candidate i, parents[i] is a list of parent indices or None - - val_aggregate_scores: per-candidate aggregate score on the validation set (higher is better) - - val_subscores: per-candidate per-instance scores on the validation set (len == num_val_instances) - - per_val_instance_best_candidates: for each val instance t, a set of candidate indices achieving the current best score on t - - discovery_eval_counts: number of metric calls accumulated up to the discovery of each candidate - - - total_metric_calls: total number of metric calls made across the run - - num_full_val_evals: number of full validation evaluations performed - - run_dir: where artifacts were written (if any) - - seed: RNG seed for reproducibility (if known) - - - best_idx: candidate index with the highest val_aggregate_scores - - best_candidate: the program text mapping for best_idx - """ - # Core data - candidates: List[Module] - parents: List[List[Optional[int]]] - val_aggregate_scores: List[float] - val_subscores: List[List[float]] - per_val_instance_best_candidates: List[Set[int]] - discovery_eval_counts: List[int] - - # Run metadata (optional) - total_metric_calls: Optional[int] = None - num_full_val_evals: Optional[int] = None - run_dir: Optional[str] = None - seed: Optional[int] = None - - @property - def best_idx(self) -> int: - scores = self.val_aggregate_scores - return max(range(len(scores)), key=lambda i: scores[i]) - - @property - def best_candidate(self) -> Dict[str, str]: - return self.candidates[self.best_idx] - - def to_dict(self) -> Dict[str, Any]: - cands = [ - {k: v for k, v in cand.items()} - for cand in self.candidates - ] - - return dict( - candidates=cands, - parents=self.parents, - val_aggregate_scores=self.val_aggregate_scores, - val_subscores=self.val_subscores, - per_val_instance_best_candidates=[list(s) for s in self.per_val_instance_best_candidates], - discovery_eval_counts=self.discovery_eval_counts, - total_metric_calls=self.total_metric_calls, - num_full_val_evals=self.num_full_val_evals, - run_dir=self.run_dir, - seed=self.seed, - best_idx=self.best_idx, - ) - - def from_gepa_result(gepa_result: GEPAResult, adapter: "DspyAdapter") -> "DspyGEPAResult": - return DspyGEPAResult( - candidates=[adapter.build_program(c) for c in gepa_result.candidates], - parents=gepa_result.parents, - val_aggregate_scores=gepa_result.val_aggregate_scores, - val_subscores=gepa_result.val_subscores, - per_val_instance_best_candidates=gepa_result.per_val_instance_best_candidates, - discovery_eval_counts=gepa_result.discovery_eval_counts, - total_metric_calls=gepa_result.total_metric_calls, - num_full_val_evals=gepa_result.num_full_val_evals, - run_dir=gepa_result.run_dir, - seed=gepa_result.seed, - ) - -class DspyAdapter(GEPAAdapter): - def __init__( - self, - student_module, - metric_fn: Callable, - feedback_map: Dict[str, Callable], - failure_score=0.0, - num_threads: Optional[int] = None, - add_format_failure_as_feedback: bool = False, - rng: Optional[random.Random] = None, - ): - import dspy - self.student = student_module - self.metric_fn = metric_fn - self.feedback_map = feedback_map - self.failure_score = failure_score - self.num_threads = num_threads or os.cpu_count() - self.add_format_failure_as_feedback = add_format_failure_as_feedback - self.rng = rng or random.Random(0) - - # Cache predictor names/signatures - self.named_predictors = list(self.student.named_predictors()) - - def build_program(self, candidate: Dict[str, str]): - new_prog = self.student.deepcopy() - for name, pred in new_prog.named_predictors(): - if name in candidate: - pred.signature = pred.signature.with_instructions(candidate[name]) - return new_prog - - def evaluate(self, batch, candidate, capture_traces=False): - import dspy - program = self.build_program(candidate) - - if capture_traces: - # bootstrap_trace_data-like flow with trace capture - from .bootstrap_finetune import bootstrap_trace_data - trajs = bootstrap_trace_data( - program=program, - dataset=batch, - metric=self.metric_fn, - num_threads=self.num_threads, - raise_on_error=False, - capture_failed_parses=True, - failure_score=self.failure_score, - format_failure_score=self.failure_score, - ) - scores = [] - outputs = [] - for t in trajs: - outputs.append(t['prediction']) - if hasattr(t['prediction'], '__class__') and t.get('score') is None: - scores.append(self.failure_score) - else: - scores.append(t['score']) - return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajs) - else: - evaluator = Evaluate( - devset=batch, - metric=self.metric_fn, - num_threads=self.num_threads, - return_all_scores=True, - return_outputs=True, - failure_score=self.failure_score, - provide_traceback=True, - max_errors=len(batch) * 100 - ) - res = evaluator(program) - outputs = [r[1] for r in res.results] - scores = [r[2] for r in res.results] - return EvaluationBatch(outputs=outputs, scores=scores, trajectories=None) - - def make_reflective_dataset(self, candidate, eval_batch, components_to_update): - import dspy - from .bootstrap_finetune import FailedPrediction - program = self.build_program(candidate) - - ret_d: Dict[str, List[Dict[str, Any]]] = {} - for pred_name in components_to_update: - feedback_fn = self.feedback_map[pred_name] - module = None - for name, m in program.named_predictors(): - if name == pred_name: - module = m - break - assert module is not None - - items: List[Dict[str, Any]] = [] - for data in eval_batch.trajectories or []: - trace = data["trace"] - example = data["example"] - prediction = data["prediction"] - - trace_instances = [t for t in trace if t[0].signature.equals(module.signature)] - if not self.add_format_failure_as_feedback: - trace_instances = [t for t in trace_instances if not isinstance(t[2], FailedPrediction)] - if len(trace_instances) == 0: - continue - - selected = None - for t in trace_instances: - if isinstance(t[2], FailedPrediction): - selected = t - break - - if selected is None: - if isinstance(prediction, FailedPrediction): - continue - selected = self.rng.choice(trace_instances) - - inputs = selected[1] - outputs = selected[2] - - new_inputs = {} - new_outputs = {} - - contains_history = False - history_key_name = None - for input_key, input_val in inputs.items(): - if isinstance(input_val, History): - contains_history = True - assert history_key_name is None - history_key_name = input_key - - if contains_history: - s = "```json\n" - for i, message in enumerate(inputs[history_key_name].messages): - s += f" {i}: {message}\n" - s += "```" - new_inputs["Context"] = s - - for input_key, input_val in inputs.items(): - if contains_history and input_key == history_key_name: - continue - new_inputs[input_key] = str(input_val) - - if isinstance(outputs, FailedPrediction): - s = "Couldn't parse the output as per the expected output format. The model's raw response was:\n" - s += "```\n" - s += outputs.completion_text + "\n" - s += "```\n\n" - new_outputs = s - else: - for output_key, output_val in outputs.items(): - new_outputs[output_key] = str(output_val) - - d = {"Inputs": new_inputs, "Generated Outputs": new_outputs} - if isinstance(outputs, FailedPrediction): - adapter = ChatAdapter() - structure_instruction = "" - for dd in adapter.format(module.signature, [], {}): - structure_instruction += dd["role"] + ": " + dd["content"] + "\n" - d['Feedback'] = "Your output failed to parse. Follow this structure:\n" + structure_instruction - # d['score'] = self.failure_score - else: - fb = feedback_fn( - predictor_output=outputs, - predictor_inputs=inputs, - module_inputs=example, - module_outputs=prediction, - captured_trace=trace, - ) - d['Feedback'] = fb["feedback_text"] - # d['score'] = fb["feedback_score"] - items.append(d) - - if len(items) == 0: - # raise Exception(f"No valid predictions found for module {module.signature}.") - continue - ret_d[pred_name] = items - - if len(ret_d) == 0: - raise Exception(f"No valid predictions found for any module.") - - return ret_d - -class GEPA(Teleprompter): - def __init__( - self, - named_predictor_to_feedback_fn_map: Dict[str, Callable], - metric: Callable, - run_dir: str, - run_linearized_gepa: bool = True, # kept for API compatibility - num_threads: Optional[int] = None, - num_iters: Optional[int] = None, - failure_score: float = 0.0, - perfect_score: float = 1.0, - teacher_lm: Optional[LM] = None, - use_wandb: bool = False, - wandb_api_key: Optional[str] = None, - max_evals_per_trainval_instance: Optional[int] = None, - seed: int = 0, - skip_perfect_score: bool = True, - use_merge: bool = False, - max_merge_invocations: int = 5, - num_dspy_examples_per_gepa_step: int = 3, - max_metric_calls: Optional[int] = None, - add_format_failure_as_feedback: bool = False, - track_stats: bool = False, - ): - # Exactly one of the three budget controls must be provided - assert ( - (max_metric_calls is not None) - + (max_evals_per_trainval_instance is not None) - + (num_iters is not None) - == 1 - ), ( - "Exactly one of max_metric_calls, max_evals_per_trainval_instance or num_iters must be set. " - f"You set max_metric_calls={max_metric_calls}, " - f"max_evals_per_trainval_instance={max_evals_per_trainval_instance}, " - f"num_iters={num_iters}" - ) - - self.named_predictor_to_feedback_fn_map = named_predictor_to_feedback_fn_map - self.metric_fn = metric - self.run_dir = run_dir - self.run_linearized_gepa = run_linearized_gepa - - self.num_threads = num_threads or os.cpu_count() - self.num_iters = num_iters - self.max_evals_per_trainval_instance = max_evals_per_trainval_instance - self.max_metric_calls = max_metric_calls - - self.failure_score = failure_score - self.perfect_score = perfect_score - self.teacher_lm = teacher_lm - self.use_wandb = use_wandb - self.wandb_api_key = wandb_api_key - - self.seed = seed - self.skip_perfect_score = skip_perfect_score - self.use_merge = use_merge - self.max_merge_invocations = max_merge_invocations - - self.num_dspy_examples_per_gepa_step = num_dspy_examples_per_gepa_step - self.add_format_failure_as_feedback = add_format_failure_as_feedback - self.track_stats = track_stats - - self._rng = random.Random(seed) - - def _resolve_budget(self, train_n: int, val_n: int) -> Dict[str, Optional[int]]: - """ - Normalize the 3 user-facing budget options to the engine's (num_iters | max_metric_calls). - If max_evals_per_trainval_instance is set, approximate the global budget as - (train_n + val_n) * max_evals_per_trainval_instance. - """ - if self.max_metric_calls is not None: - return dict(num_iters=None, max_metric_calls=self.max_metric_calls) - - if self.max_evals_per_trainval_instance is not None: - # Simple, conservative mapping to the engine's total eval counter - # Includes both minibatch evals and full-valset evals - total_instances = train_n + val_n - return dict( - num_iters=None, - max_metric_calls=self.max_evals_per_trainval_instance * max(1, total_instances), - ) - - # Fallback to num_iters if provided - return dict(num_iters=self.num_iters, max_metric_calls=None) - - def compile( - self, - student: Module, - *, - trainset: List[Example], - teacher: Optional[Module] = None, - valset: Optional[List[Example]] = None, - **kwargs, - ) -> Module: - assert trainset is not None and len(trainset) > 0, "Trainset must be provided and non-empty" - assert teacher is None, "Teacher is not supported in DspyGEPA yet." - - valset = valset or trainset - - # Build the DSPy adapter that encapsulates evaluation, trace capture, feedback extraction, and instruction proposal - adapter = DspyAdapter( - student_module=student, - metric_fn=self.metric_fn, - feedback_map=self.named_predictor_to_feedback_fn_map, - failure_score=self.failure_score, - num_threads=self.num_threads, - add_format_failure_as_feedback=self.add_format_failure_as_feedback, - rng=self._rng, - ) - - # Prepare engine budgets - budgets = self._resolve_budget(train_n=len(trainset), val_n=len(valset)) - - teacher_lm = lambda x: (self.teacher_lm or settings.lm or student.get_lm())(x)[0] - - # Instantiate GEPA with the simpler adapter-based API - base_program = {name: pred.signature.instructions for name, pred in student.named_predictors()} - gepa_result: GEPAResult = optimize( - base_program=base_program, - trainset=trainset, - adapter=adapter, - valset=valset, - logger=LoggerAdapter(logger), - run_dir=self.run_dir, - teacher_lm=teacher_lm, - candidate_selection_strategy="pareto", - num_iters=budgets["num_iters"], - perfect_score=self.perfect_score, - use_wandb=self.use_wandb, - wandb_api_key=self.wandb_api_key, - seed=self.seed, - skip_perfect_score=self.skip_perfect_score, - use_merge=self.use_merge, - max_merge_invocations=self.max_merge_invocations, - num_examples_per_gepa_step=self.num_dspy_examples_per_gepa_step, - max_metric_calls=budgets["max_metric_calls"], - ) - - new_prog = adapter.build_program(gepa_result.best_candidate) - - if self.track_stats: - dspy_gepa_result = DspyGEPAResult.from_gepa_result(gepa_result, adapter) - setattr(new_prog, "dspy_gepa_result", dspy_gepa_result) - - return new_prog diff --git a/dspy/teleprompt/gepa/gepa.py b/dspy/teleprompt/gepa/gepa.py new file mode 100644 index 0000000000..dae0992617 --- /dev/null +++ b/dspy/teleprompt/gepa/gepa.py @@ -0,0 +1,416 @@ +import logging +import random +from dataclasses import dataclass +from typing import Any, Dict, List, Literal, Optional, Set, Protocol + +from dspy.dsp.utils.settings import settings +from dspy.clients.lm import LM +from dspy.primitives import Example, Module, Prediction +from dspy.teleprompt.teleprompt import Teleprompter + +from gepa.gepa import GEPAResult, optimize +from .gepa_utils import LoggerAdapter, DSPyTrace, ScoreWithFeedback, PredictorFeedbackFn, DspyAdapter + +logger = logging.getLogger(__name__) + +AUTO_RUN_SETTINGS = { + "light": {"n": 6}, + "medium": {"n": 12}, + "heavy": {"n": 18}, +} + +class GEPAFeedbackMetric(Protocol): + def __call__( + gold: Example, + pred: Prediction, + trace: Optional[DSPyTrace], + pred_name: Optional[str], + pred_trace: Optional[DSPyTrace], + ) -> float | ScoreWithFeedback: + """ + This function is called with the following arguments: + - gold: The gold example. + - pred: The predicted output. + - trace: Optional. The trace of the program's execution. + - pred_name: Optional. The name of the target predictor currently being optimized by GEPA, for which + the feedback is being requested. + - pred_trace: Optional. The trace of the target predictor's execution GEPA is seeking feedback for. + + Note the `pred_name` and `pred_trace` arguments. During optimization, GEPA will call the metric to obtain + feedback for individual predictors being optimized. GEPA provides the name of the predictor in `pred_name` + and the sub-trace (of the trace) corresponding to the predictor in `pred_trace`. + If available at the predictor level, the metric should return {'score': float, 'feedback': str} corresponding + to the predictor. + If not available at the predictor level, the metric can also return a text feedback at the program level + (using just the gold, pred and trace). + If no feedback is returned, GEPA will use a simple text feedback consisting of just the score: + f"This trajectory got a score of {score}." + """ + ... + +@dataclass(frozen=True) +class DspyGEPAResult: + """ + Additional data related to the GEPA run. + + Fields: + - candidates: list of proposed candidates (component_name -> component_text) + - parents: lineage info; for each candidate i, parents[i] is a list of parent indices or None + - val_aggregate_scores: per-candidate aggregate score on the validation set (higher is better) + - val_subscores: per-candidate per-instance scores on the validation set (len == num_val_instances) + - per_val_instance_best_candidates: for each val instance t, a set of candidate indices achieving the best score on t + - discovery_eval_counts: Budget (number of metric calls / rollouts) consumed up to the discovery of each candidate + + - total_metric_calls: total number of metric calls made across the run + - num_full_val_evals: number of full validation evaluations performed + - log_dir: where artifacts were written (if any) + - seed: RNG seed for reproducibility (if known) + + - best_idx: candidate index with the highest val_aggregate_scores + - best_candidate: the program text mapping for best_idx + """ + # Data about the proposed candidates + candidates: List[Module] + parents: List[List[Optional[int]]] + val_aggregate_scores: List[float] + val_subscores: List[List[float]] + per_val_instance_best_candidates: List[Set[int]] + discovery_eval_counts: List[int] + + # Optimization metadata + total_metric_calls: Optional[int] = None + num_full_val_evals: Optional[int] = None + log_dir: Optional[str] = None + seed: Optional[int] = None + + @property + def best_idx(self) -> int: + scores = self.val_aggregate_scores + return max(range(len(scores)), key=lambda i: scores[i]) + + @property + def best_candidate(self) -> Dict[str, str]: + return self.candidates[self.best_idx] + + def to_dict(self) -> Dict[str, Any]: + cands = [ + {k: v for k, v in cand.items()} + for cand in self.candidates + ] + + return dict( + candidates=cands, + parents=self.parents, + val_aggregate_scores=self.val_aggregate_scores, + val_subscores=self.val_subscores, + per_val_instance_best_candidates=[list(s) for s in self.per_val_instance_best_candidates], + discovery_eval_counts=self.discovery_eval_counts, + total_metric_calls=self.total_metric_calls, + num_full_val_evals=self.num_full_val_evals, + log_dir=self.log_dir, + seed=self.seed, + best_idx=self.best_idx, + ) + + def from_gepa_result(gepa_result: GEPAResult, adapter: DspyAdapter) -> "DspyGEPAResult": + return DspyGEPAResult( + candidates=[adapter.build_program(c) for c in gepa_result.candidates], + parents=gepa_result.parents, + val_aggregate_scores=gepa_result.val_aggregate_scores, + val_subscores=gepa_result.val_subscores, + per_val_instance_best_candidates=gepa_result.per_val_instance_best_candidates, + discovery_eval_counts=gepa_result.discovery_eval_counts, + total_metric_calls=gepa_result.total_metric_calls, + num_full_val_evals=gepa_result.num_full_val_evals, + log_dir=gepa_result.run_dir, + seed=gepa_result.seed, + ) + +class GEPA(Teleprompter): + """ + GEPA is an evolutionary optimizer, which uses reflection to evolve text components + of complex systems. + + GEPA captures full traces of the DSPy module's execution, identifies the parts of the trace + corresponding to a specific predictor, and reflects on the behaviour of the predictor to + propose a new instruction for the predictor. GEPA allows users to provide textual feedback + to the optimizer, which is used to guide the evolution of the predictor. The textual feedback + can be provided at the granularity of individual predictors, or at the level of the entire system's + execution. + + To provide feedback to the GEPA optimizer, implement a metric as follows: + ``` + def metric( + gold: Example, + pred: Prediction, + trace: Optional[DSPyTrace] = None, + pred_name: Optional[str] = None, + pred_trace: Optional[DSPyTrace] = None, + ) -> float | ScoreWithFeedback: + \""" + This function is called with the following arguments: + - gold: The gold example. + - pred: The predicted output. + - trace: Optional. The trace of the program's execution. + - pred_name: Optional. The name of the target predictor currently being optimized by GEPA, for which + the feedback is being requested. + - pred_trace: Optional. The trace of the target predictor's execution GEPA is seeking feedback for. + + Note the `pred_name` and `pred_trace` arguments. During optimization, GEPA will call the metric to obtain + feedback for individual predictors being optimized. GEPA provides the name of the predictor in `pred_name` + and the sub-trace (of the trace) corresponding to the predictor in `pred_trace`. + If available at the predictor level, the metric should return {'score': float, 'feedback': str} corresponding + to the predictor. + If not available at the predictor level, the metric can also return a text feedback at the program level + (using just the gold, pred and trace). + If no feedback is returned, GEPA will use a simple text feedback consisting of just the score: + f"This trajectory got a score of {score}." + \""" + ... + ``` + + Parameters: + Mandatory: + - metric: The metric function to use for feedback and evaluation. + + Budget configuration (exactly one of the following must be provided): + - auto: The auto budget to use for the run. + - max_full_evals: The maximum number of full evaluations to perform. + - max_metric_calls: The maximum number of metric calls to perform. + + Reflection based configuration: + - reflection_minibatch_size: The number of examples to use for reflection in a single GEPA step. + - candidate_selection_strategy: The strategy to use for candidate selection. Default is "pareto", which stochastically selects candidates from the Pareto frontier of all validation scores. + - reflection_lm: The language model to use for reflection. If not provided, student's LM is used, or dspy.settings.lm is used. + + Merge-based configuration: + - use_merge: Whether to use merge-based optimization. Default is True. + - max_merge_invocations: The maximum number of merge invocations to perform. Default is 5. + + Evaluation configuration: + - num_threads: The number of threads to use for evaluation with `Evaluate` + - failure_score: The score to assign to failed examples. Default is 0.0. + - perfect_score: The maximum score achievable by the metric. Default is 1.0. Used by GEPA to determine if all examples in a minibatch are perfect. + + Logging configuration: + - log_dir: The directory to save the logs. GEPA saves elaborate logs, along with all the candidate programs, in this directory. + Running GEPA with the same `log_dir` will resume the run from the last checkpoint. + - track_stats: Whether to return detailed results and all proposed programs in the `detailed_results` attribute of the optimized program. Default is False. + - use_wandb: Whether to use wandb for logging. Default is False. + - wandb_api_key: The API key to use for wandb. If not provided, wandb will use the API key from the environment variable `WANDB_API_KEY`. + - wandb_init_kwargs: Additional keyword arguments to pass to `wandb.init`. + + Reproducibility: + - seed: The random seed to use for reproducibility. Default is 0. + """ + def __init__( + self, + metric: GEPAFeedbackMetric, + # Budget configuration + auto: Optional[Literal["light", "medium", "heavy"]] = "light", + max_full_evals: Optional[int] = None, + max_metric_calls: Optional[int] = None, + # Reflection based configuration + reflection_minibatch_size: int = 3, + candidate_selection_strategy: Literal["pareto", "current_best"] = "pareto", + reflection_lm: Optional[LM] = None, + skip_perfect_score: bool = True, + add_format_failure_as_feedback: bool = False, + # Merge-based configuration + use_merge: bool = True, + max_merge_invocations: Optional[int] = 5, + # Evaluation configuration + num_threads: Optional[int] = None, + failure_score: float = 0.0, + perfect_score: float = 1.0, + # Logging + log_dir: str = None, + track_stats: bool = False, + use_wandb: bool = False, + wandb_api_key: Optional[str] = None, + wandb_init_kwargs: Optional[Dict[str, Any]] = None, + # Reproducibility + seed: Optional[int] = 0, + ): + self.metric_fn = metric + + # Budget configuration + assert ( + (max_metric_calls is not None) + + (max_full_evals is not None) + + (auto is not None) + == 1 + ), ( + "Exactly one of max_metric_calls, max_full_evals, auto must be set. " + f"You set max_metric_calls={max_metric_calls}, " + f"max_full_evals={max_full_evals}, " + f"auto={auto}." + ) + self.auto = auto + self.max_full_evals = max_full_evals + self.max_metric_calls = max_metric_calls + + # Reflection based configuration + self.reflection_minibatch_size = reflection_minibatch_size + self.candidate_selection_strategy = candidate_selection_strategy + self.reflection_lm = reflection_lm + self.skip_perfect_score = skip_perfect_score + self.add_format_failure_as_feedback = add_format_failure_as_feedback + + # Merge-based configuration + self.use_merge = use_merge + self.max_merge_invocations = max_merge_invocations + + # Evaluation Configuration + self.num_threads = num_threads + self.failure_score = failure_score + self.perfect_score = perfect_score + + # Logging configuration + self.log_dir = log_dir + self.track_stats = track_stats + self.use_wandb = use_wandb + self.wandb_api_key = wandb_api_key + self.wandb_init_kwargs = wandb_init_kwargs + + # Reproducibility + self.seed = seed + + def auto_budget(num_preds, num_candidates, valset_size: int, minibatch_size: int = 35, full_eval_steps: int = 5) -> int: + import numpy as np + num_trials = int(max(2 * (num_preds * 2) * np.log2(num_candidates), 1.5 * num_candidates)) + if num_trials < 0 or valset_size < 0 or minibatch_size < 0: + raise ValueError("num_trials, valset_size, and minibatch_size must be >= 0.") + if full_eval_steps < 1: + raise ValueError("full_eval_steps must be >= 1.") + + V = valset_size + N = num_trials + M = minibatch_size + m = full_eval_steps + + # Initial full evaluation on the default program + total = V + + # Assume upto 5 trials for bootstrapping each candidate + total += num_candidates * 5 + + # N minibatch evaluations + total += N * M + if N == 0: + return total # no periodic/full evals inside the loop + # Periodic full evals occur when trial_num % (m+1) == 0, where trial_num runs 2..N+1 + periodic_fulls = (N + 1) // (m) + 1 + # If 1 <= N < m, the code triggers one final full eval at the end + extra_final = 1 if N < m else 0 + print(periodic_fulls+extra_final) + total += (periodic_fulls + extra_final) * V + return total + + def compile( + self, + student: Module, + *, + trainset: List[Example], + teacher: Optional[Module] = None, + valset: Optional[List[Example]] = None, + ) -> Module: + """ + GEPA uses the trainset to perform reflective updates to the prompt, but uses the valset for tracking Pareto scores. + If no valset is provided, GEPA will use the trainset for both. + + Parameters: + - student: The student module to optimize. + - trainset: The training set to use for reflective updates. + - valset: The validation set to use for tracking Pareto scores. If not provided, GEPA will use the trainset for both. + """ + assert trainset is not None and len(trainset) > 0, "Trainset must be provided and non-empty" + assert teacher is None, "Teacher is not supported in DspyGEPA yet." + + if self.auto is not None: + self.max_metric_calls = self.auto_budget( + num_preds=len(student.predictors()), + num_candidates=AUTO_RUN_SETTINGS[self.auto]["n"], + valset_size=len(valset) if valset is not None else len(trainset), + ) + elif self.max_full_evals is not None: + self.max_metric_calls = self.max_full_evals * (len(trainset) + (len(valset) if valset is not None else 0)) + else: + assert self.max_metric_calls is not None, "Either auto, max_full_evals, or max_metric_calls must be set." + + valset = valset or trainset + + rng = random.Random(self.seed) + + def feedback_fn_creator(pred_name: str, predictor) -> PredictorFeedbackFn: + def feedback_fn( + predictor_output: Dict[str, Any], + predictor_inputs: Dict[str, Any], + module_inputs: Example, + module_outputs: Prediction, + captured_trace: DSPyTrace, + ) -> ScoreWithFeedback: + trace_for_pred = [(predictor, predictor_inputs, predictor_output)] + o = self.metric_fn( + module_inputs, + module_outputs, + captured_trace, + pred_name, + trace_for_pred, + ) + if hasattr(o, 'feedback'): + if o['feedback'] is None: + o['feedback'] = f"This trajectory got a score of {o['score']}." + return o + else: + return dict(score=o, feedback=f"This trajectory got a score of {o}.") + return feedback_fn + + feedback_map = { + k: feedback_fn_creator(k, v) + for k, v in student.named_predictors() + } + + # Build the DSPy adapter that encapsulates evaluation, trace capture, feedback extraction, and instruction proposal + adapter = DspyAdapter( + student_module=student, + metric_fn=self.metric_fn, + feedback_map=feedback_map, + failure_score=self.failure_score, + num_threads=self.num_threads, + add_format_failure_as_feedback=self.add_format_failure_as_feedback, + rng=rng, + ) + + reflection_lm = lambda x: (self.reflection_lm or settings.lm or student.get_lm())(x)[0] + + # Instantiate GEPA with the simpler adapter-based API + base_program = {name: pred.signature.instructions for name, pred in student.named_predictors()} + gepa_result: GEPAResult = optimize( + seed_candidate=base_program, + trainset=trainset, + valset=valset, + adapter=adapter, + logger=LoggerAdapter(logger), + run_dir=self.log_dir, + reflection_lm=reflection_lm, + candidate_selection_strategy=self.candidate_selection_strategy, + perfect_score=self.perfect_score, + seed=self.seed, + skip_perfect_score=self.skip_perfect_score, + use_merge=self.use_merge, + max_merge_invocations=self.max_merge_invocations, + num_examples_per_gepa_step=self.reflection_minibatch_size, + max_metric_calls=self.max_metric_calls, + use_wandb=self.use_wandb, + wandb_api_key=self.wandb_api_key, + wandb_init_kwargs=self.wandb_init_kwargs, + ) + + new_prog = adapter.build_program(gepa_result.best_candidate) + + if self.track_stats: + dspy_gepa_result = DspyGEPAResult.from_gepa_result(gepa_result, adapter) + setattr(new_prog, "detailed_results", dspy_gepa_result) + + return new_prog diff --git a/dspy/teleprompt/gepa/gepa_utils.py b/dspy/teleprompt/gepa/gepa_utils.py new file mode 100644 index 0000000000..c6ede42db8 --- /dev/null +++ b/dspy/teleprompt/gepa/gepa_utils.py @@ -0,0 +1,229 @@ +import logging +import random +from typing import Any, Callable, Dict, List, Optional, Protocol, Tuple, TypedDict + +from dspy.adapters.chat_adapter import ChatAdapter +from dspy.adapters.types import History +from ..bootstrap_finetune import TraceData +from dspy.evaluate import Evaluate +from dspy.primitives import Example, Prediction + +from gepa.gepa import EvaluationBatch, GEPAAdapter + +class LoggerAdapter: + def __init__(self, logger: logging.Logger): + self.logger = logger + + def log(self, x: str): + self.logger.info(x) + +DSPyTrace = List[Tuple[Any, Dict[str, Any], Prediction]] + +class ScoreWithFeedback(Prediction): + score: float + feedback: str + +class PredictorFeedbackFn(Protocol): + def __call__( + predictor_output: Dict[str, Any], + predictor_inputs: Dict[str, Any], + module_inputs: Example, + module_outputs: Prediction, + captured_trace: DSPyTrace, + ) -> ScoreWithFeedback: + """ + This function is used to provide feedback to a specific predictor. + The function is called with the following arguments: + - predictor_output: The output of the predictor. + - predictor_inputs: The inputs to the predictor. + - module_inputs: The inputs to the whole program --- `Example`. + - module_outputs: The outputs of the whole program --- `Prediction`. + - captured_trace: The trace of the module's execution. + # Shape of trace is: [predictor_invocation_idx -> Tuple[Predictor, PredictorInputs, Prediction]] + # Each trace is a tuple of (Predictor, PredictorInputs, Prediction) + + The function should return a `ScoreWithFeedback` object. + The feedback is a string that is used to guide the evolution of the predictor. + """ + ... + +class DspyAdapter(GEPAAdapter[Example, TraceData, Prediction]): + def __init__( + self, + student_module, + metric_fn: Callable, + feedback_map: Dict[str, Callable], + failure_score=0.0, + num_threads: Optional[int] = None, + add_format_failure_as_feedback: bool = False, + rng: Optional[random.Random] = None, + ): + self.student = student_module + self.metric_fn = metric_fn + self.feedback_map = feedback_map + self.failure_score = failure_score + self.num_threads = num_threads + self.add_format_failure_as_feedback = add_format_failure_as_feedback + self.rng = rng or random.Random(0) + + # Cache predictor names/signatures + self.named_predictors = list(self.student.named_predictors()) + + def build_program(self, candidate: Dict[str, str]): + new_prog = self.student.deepcopy() + for name, pred in new_prog.named_predictors(): + if name in candidate: + pred.signature = pred.signature.with_instructions(candidate[name]) + return new_prog + + def evaluate(self, batch, candidate, capture_traces=False): + program = self.build_program(candidate) + + if capture_traces: + # bootstrap_trace_data-like flow with trace capture + from ..bootstrap_finetune import bootstrap_trace_data + trajs = bootstrap_trace_data( + program=program, + dataset=batch, + metric=self.metric_fn, + num_threads=self.num_threads, + raise_on_error=False, + capture_failed_parses=True, + failure_score=self.failure_score, + format_failure_score=self.failure_score, + ) + scores = [] + outputs = [] + for t in trajs: + outputs.append(t['prediction']) + if hasattr(t['prediction'], '__class__') and t.get('score') is None: + scores.append(self.failure_score) + else: + score = t['score'] + if hasattr(score, 'score'): + score = score['score'] + scores.append(score) + return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajs) + else: + evaluator = Evaluate( + devset=batch, + metric=self.metric_fn, + num_threads=self.num_threads, + return_all_scores=True, + return_outputs=True, + failure_score=self.failure_score, + provide_traceback=True, + max_errors=len(batch) * 100 + ) + res = evaluator(program) + outputs = [r[1] for r in res.results] + scores = [r[2] for r in res.results] + scores = [s['score'] if hasattr(s, 'score') else s for s in scores] + return EvaluationBatch(outputs=outputs, scores=scores, trajectories=None) + + def make_reflective_dataset(self, candidate, eval_batch, components_to_update): + from ..bootstrap_finetune import FailedPrediction + program = self.build_program(candidate) + + ret_d: Dict[str, List[Dict[str, Any]]] = {} + for pred_name in components_to_update: + module = None + for name, m in program.named_predictors(): + if name == pred_name: + module = m + break + assert module is not None + + items: List[Dict[str, Any]] = [] + for data in eval_batch.trajectories or []: + trace = data["trace"] + example = data["example"] + prediction = data["prediction"] + module_score = data["score"] + if hasattr(module_score, 'score'): + module_score = module_score['score'] + + trace_instances = [t for t in trace if t[0].signature.equals(module.signature)] + if not self.add_format_failure_as_feedback: + trace_instances = [t for t in trace_instances if not isinstance(t[2], FailedPrediction)] + if len(trace_instances) == 0: + continue + + selected = None + for t in trace_instances: + if isinstance(t[2], FailedPrediction): + selected = t + break + + if selected is None: + if isinstance(prediction, FailedPrediction): + continue + selected = self.rng.choice(trace_instances) + + inputs = selected[1] + outputs = selected[2] + + new_inputs = {} + new_outputs = {} + + contains_history = False + history_key_name = None + for input_key, input_val in inputs.items(): + if isinstance(input_val, History): + contains_history = True + assert history_key_name is None + history_key_name = input_key + + if contains_history: + s = "```json\n" + for i, message in enumerate(inputs[history_key_name].messages): + s += f" {i}: {message}\n" + s += "```" + new_inputs["Context"] = s + + for input_key, input_val in inputs.items(): + if contains_history and input_key == history_key_name: + continue + new_inputs[input_key] = str(input_val) + + if isinstance(outputs, FailedPrediction): + s = "Couldn't parse the output as per the expected output format. The model's raw response was:\n" + s += "```\n" + s += outputs.completion_text + "\n" + s += "```\n\n" + new_outputs = s + else: + for output_key, output_val in outputs.items(): + new_outputs[output_key] = str(output_val) + + d = {"Inputs": new_inputs, "Generated Outputs": new_outputs} + if isinstance(outputs, FailedPrediction): + adapter = ChatAdapter() + structure_instruction = "" + for dd in adapter.format(module.signature, [], {}): + structure_instruction += dd["role"] + ": " + dd["content"] + "\n" + d['Feedback'] = "Your output failed to parse. Follow this structure:\n" + structure_instruction + # d['score'] = self.failure_score + else: + feedback_fn = self.feedback_map[pred_name] + fb = feedback_fn( + predictor_output=outputs, + predictor_inputs=inputs, + module_inputs=example, + module_outputs=prediction, + captured_trace=trace, + ) + d['Feedback'] = fb["feedback"] + assert fb["score"] == module_score, f"Currently, GEPA only supports feedback functions that return the same score as the module's score. However, the module-level score is {module_score} and the feedback score is {fb.score}." + # d['score'] = fb.score + items.append(d) + + if len(items) == 0: + # raise Exception(f"No valid predictions found for module {module.signature}.") + continue + ret_d[pred_name] = items + + if len(ret_d) == 0: + raise Exception(f"No valid predictions found for any module.") + + return ret_d \ No newline at end of file From 2b1e011baddd4f0e80915bff65f667cf2c7a5901 Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Sun, 10 Aug 2025 21:48:56 -0700 Subject: [PATCH 04/12] Add Instruction Proposal function (commented) --- dspy/teleprompt/gepa/gepa.py | 21 ++++++++++++++++----- dspy/teleprompt/gepa/gepa_utils.py | 28 +++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/dspy/teleprompt/gepa/gepa.py b/dspy/teleprompt/gepa/gepa.py index dae0992617..f36ee076c5 100644 --- a/dspy/teleprompt/gepa/gepa.py +++ b/dspy/teleprompt/gepa/gepa.py @@ -391,20 +391,31 @@ def feedback_fn( trainset=trainset, valset=valset, adapter=adapter, - logger=LoggerAdapter(logger), - run_dir=self.log_dir, + + # Reflection-based configuration reflection_lm=reflection_lm, candidate_selection_strategy=self.candidate_selection_strategy, - perfect_score=self.perfect_score, - seed=self.seed, skip_perfect_score=self.skip_perfect_score, + num_examples_per_gepa_step=self.reflection_minibatch_size, + + perfect_score=self.perfect_score, + + # Merge-based configuration use_merge=self.use_merge, max_merge_invocations=self.max_merge_invocations, - num_examples_per_gepa_step=self.reflection_minibatch_size, + + # Budget max_metric_calls=self.max_metric_calls, + + # Logging + logger=LoggerAdapter(logger), + run_dir=self.log_dir, use_wandb=self.use_wandb, wandb_api_key=self.wandb_api_key, wandb_init_kwargs=self.wandb_init_kwargs, + + # Reproducibility + seed=self.seed, ) new_prog = adapter.build_program(gepa_result.best_candidate) diff --git a/dspy/teleprompt/gepa/gepa_utils.py b/dspy/teleprompt/gepa/gepa_utils.py index c6ede42db8..2db180845e 100644 --- a/dspy/teleprompt/gepa/gepa_utils.py +++ b/dspy/teleprompt/gepa/gepa_utils.py @@ -226,4 +226,30 @@ def make_reflective_dataset(self, candidate, eval_batch, components_to_update): if len(ret_d) == 0: raise Exception(f"No valid predictions found for any module.") - return ret_d \ No newline at end of file + return ret_d + + # TODO: The current DSPyAdapter implementation uses the GEPA default propose_new_texts. + # We can potentially override this, to use the instruction proposal similar to MIPROv2. + + # def propose_new_texts( + # self, + # candidate: Dict[str, str], + # reflective_dataset: Dict[str, List[Dict[str, Any]]], + # components_to_update: List[str] + # ) -> Dict[str, str]: + # if self.adapter.propose_new_texts is not None: + # return self.adapter.propose_new_texts(candidate, reflective_dataset, components_to_update) + + # from .instruction_proposal import InstructionProposalSignature + # new_texts: Dict[str, str] = {} + # for name in components_to_update: + # base_instruction = candidate[name] + # dataset_with_feedback = reflective_dataset[name] + # new_texts[name] = InstructionProposalSignature.run( + # lm=self.reflection_lm, + # input_dict={ + # "current_instruction_doc": base_instruction, + # "dataset_with_feedback": dataset_with_feedback + # } + # )['new_instruction'] + # return new_texts \ No newline at end of file From 92f6b3085df1c03926bb6e8e94fbe796569d00c0 Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Mon, 11 Aug 2025 02:17:52 -0700 Subject: [PATCH 05/12] Update imports --- dspy/teleprompt/gepa/gepa.py | 2 +- dspy/teleprompt/gepa/gepa_utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dspy/teleprompt/gepa/gepa.py b/dspy/teleprompt/gepa/gepa.py index f36ee076c5..07881b9b3c 100644 --- a/dspy/teleprompt/gepa/gepa.py +++ b/dspy/teleprompt/gepa/gepa.py @@ -8,7 +8,7 @@ from dspy.primitives import Example, Module, Prediction from dspy.teleprompt.teleprompt import Teleprompter -from gepa.gepa import GEPAResult, optimize +from gepa import GEPAResult, optimize from .gepa_utils import LoggerAdapter, DSPyTrace, ScoreWithFeedback, PredictorFeedbackFn, DspyAdapter logger = logging.getLogger(__name__) diff --git a/dspy/teleprompt/gepa/gepa_utils.py b/dspy/teleprompt/gepa/gepa_utils.py index 2db180845e..15df9c89aa 100644 --- a/dspy/teleprompt/gepa/gepa_utils.py +++ b/dspy/teleprompt/gepa/gepa_utils.py @@ -8,7 +8,7 @@ from dspy.evaluate import Evaluate from dspy.primitives import Example, Prediction -from gepa.gepa import EvaluationBatch, GEPAAdapter +from gepa import EvaluationBatch, GEPAAdapter class LoggerAdapter: def __init__(self, logger: logging.Logger): From 67cf0f8513a0da9a355c9209738189f6c7e91da1 Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Mon, 11 Aug 2025 11:17:03 -0700 Subject: [PATCH 06/12] Fix README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b97cdec547..fde5d3e683 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ If you're looking to understand the framework, please go to the [DSPy Docs at ds If you're looking to understand the underlying research, this is a set of our papers: -**[Jul'25] [GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning](https://arxiv.org/abs/2507.19457)** +**[Jul'25] [GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning](https://arxiv.org/abs/2507.19457)** **[Jun'24] [Optimizing Instructions and Demonstrations for Multi-Stage Language Model Programs](https://arxiv.org/abs/2406.11695)** **[Oct'23] [DSPy: Compiling Declarative Language Model Calls into Self-Improving Pipelines](https://arxiv.org/abs/2310.03714)** [Jul'24] [Fine-Tuning and Prompt Optimization: Two Great Steps that Work Better Together](https://arxiv.org/abs/2407.10930) From b73c8b09eb4224ecc5575f14f1dc149bc12d338b Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Mon, 11 Aug 2025 14:09:35 -0700 Subject: [PATCH 07/12] Add track_best_outputs --- dspy/teleprompt/gepa/gepa.py | 37 +++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/dspy/teleprompt/gepa/gepa.py b/dspy/teleprompt/gepa/gepa.py index 07881b9b3c..e9bfce0c21 100644 --- a/dspy/teleprompt/gepa/gepa.py +++ b/dspy/teleprompt/gepa/gepa.py @@ -1,7 +1,7 @@ import logging import random from dataclasses import dataclass -from typing import Any, Dict, List, Literal, Optional, Set, Protocol +from typing import Any, Dict, List, Literal, Optional, Set, Protocol, Tuple from dspy.dsp.utils.settings import settings from dspy.clients.lm import LM @@ -77,6 +77,9 @@ class DspyGEPAResult: per_val_instance_best_candidates: List[Set[int]] discovery_eval_counts: List[int] + # Optional data + best_outputs_valset: Optional[List[List[Tuple[int, List[Prediction]]]]] = None + # Optimization metadata total_metric_calls: Optional[int] = None num_full_val_evals: Optional[int] = None @@ -102,6 +105,7 @@ def to_dict(self) -> Dict[str, Any]: candidates=cands, parents=self.parents, val_aggregate_scores=self.val_aggregate_scores, + best_outputs_valset=self.best_outputs_valset, val_subscores=self.val_subscores, per_val_instance_best_candidates=[list(s) for s in self.per_val_instance_best_candidates], discovery_eval_counts=self.discovery_eval_counts, @@ -117,6 +121,7 @@ def from_gepa_result(gepa_result: GEPAResult, adapter: DspyAdapter) -> "DspyGEPA candidates=[adapter.build_program(c) for c in gepa_result.candidates], parents=gepa_result.parents, val_aggregate_scores=gepa_result.val_aggregate_scores, + best_outputs_valset=gepa_result.best_outputs_valset, val_subscores=gepa_result.val_subscores, per_val_instance_best_candidates=gepa_result.per_val_instance_best_candidates, discovery_eval_counts=gepa_result.discovery_eval_counts, @@ -129,7 +134,7 @@ def from_gepa_result(gepa_result: GEPAResult, adapter: DspyAdapter) -> "DspyGEPA class GEPA(Teleprompter): """ GEPA is an evolutionary optimizer, which uses reflection to evolve text components - of complex systems. + of complex systems. GEPA is proposed in the paper [GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning](https://arxiv.org/abs/2507.19457). GEPA captures full traces of the DSPy module's execution, identifies the parts of the trace corresponding to a specific predictor, and reflects on the behaviour of the predictor to @@ -169,8 +174,19 @@ def metric( ... ``` + GEPA can also be used as a batch inference-time search strategy, by passing `valset=trainset, track_stats=True`, and using the + `detailed_results` attribute of the optimized program (returned by `compile`) to get the Pareto frontier of the batch. + + Example: + ``` + gepa = GEPA(metric=metric, track_stats=True) + batch_of_tasks = [dspy.Example(...) for task in tasks] + new_prog = gepa.compile(student, trainset=trainset, valset=batch_of_tasks) + pareto_frontier = new_prog.detailed_results.val_aggregate_scores + # pareto_frontier is a list of scores, one for each task in the batch. + ``` + Parameters: - Mandatory: - metric: The metric function to use for feedback and evaluation. Budget configuration (exactly one of the following must be provided): @@ -193,12 +209,12 @@ def metric( - perfect_score: The maximum score achievable by the metric. Default is 1.0. Used by GEPA to determine if all examples in a minibatch are perfect. Logging configuration: - - log_dir: The directory to save the logs. GEPA saves elaborate logs, along with all the candidate programs, in this directory. - Running GEPA with the same `log_dir` will resume the run from the last checkpoint. + - log_dir: The directory to save the logs. GEPA saves elaborate logs, along with all the candidate programs, in this directory. Running GEPA with the same `log_dir` will resume the run from the last checkpoint. - track_stats: Whether to return detailed results and all proposed programs in the `detailed_results` attribute of the optimized program. Default is False. - use_wandb: Whether to use wandb for logging. Default is False. - wandb_api_key: The API key to use for wandb. If not provided, wandb will use the API key from the environment variable `WANDB_API_KEY`. - wandb_init_kwargs: Additional keyword arguments to pass to `wandb.init`. + - track_best_outputs: Whether to track the best outputs on the validation set. track_stats must be True if track_best_outputs is True. `optimized_program.detailed_results.best_outputs_valset` will contain the best outputs for each task in the validation set. Reproducibility: - seed: The random seed to use for reproducibility. Default is 0. @@ -206,8 +222,9 @@ def metric( def __init__( self, metric: GEPAFeedbackMetric, + *, # Budget configuration - auto: Optional[Literal["light", "medium", "heavy"]] = "light", + auto: Optional[Literal["light", "medium", "heavy"]] = None, max_full_evals: Optional[int] = None, max_metric_calls: Optional[int] = None, # Reflection based configuration @@ -229,6 +246,7 @@ def __init__( use_wandb: bool = False, wandb_api_key: Optional[str] = None, wandb_init_kwargs: Optional[Dict[str, Any]] = None, + track_best_outputs: bool = False, # Reproducibility seed: Optional[int] = 0, ): @@ -273,6 +291,10 @@ def __init__( self.wandb_api_key = wandb_api_key self.wandb_init_kwargs = wandb_init_kwargs + if track_best_outputs: + assert track_stats, "track_stats must be True if track_best_outputs is True." + self.track_best_outputs = track_best_outputs + # Reproducibility self.seed = seed @@ -396,7 +418,7 @@ def feedback_fn( reflection_lm=reflection_lm, candidate_selection_strategy=self.candidate_selection_strategy, skip_perfect_score=self.skip_perfect_score, - num_examples_per_gepa_step=self.reflection_minibatch_size, + reflection_minibatch_size=self.reflection_minibatch_size, perfect_score=self.perfect_score, @@ -413,6 +435,7 @@ def feedback_fn( use_wandb=self.use_wandb, wandb_api_key=self.wandb_api_key, wandb_init_kwargs=self.wandb_init_kwargs, + track_best_outputs=self.track_best_outputs, # Reproducibility seed=self.seed, From 662a603bc568eabe037d23963bd2a98a82b95114 Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Mon, 11 Aug 2025 14:57:42 -0700 Subject: [PATCH 08/12] Fix auto_budget bug --- dspy/teleprompt/gepa/gepa.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dspy/teleprompt/gepa/gepa.py b/dspy/teleprompt/gepa/gepa.py index e9bfce0c21..ea55c9c0a5 100644 --- a/dspy/teleprompt/gepa/gepa.py +++ b/dspy/teleprompt/gepa/gepa.py @@ -298,7 +298,7 @@ def __init__( # Reproducibility self.seed = seed - def auto_budget(num_preds, num_candidates, valset_size: int, minibatch_size: int = 35, full_eval_steps: int = 5) -> int: + def auto_budget(self, num_preds, num_candidates, valset_size: int, minibatch_size: int = 35, full_eval_steps: int = 5) -> int: import numpy as np num_trials = int(max(2 * (num_preds * 2) * np.log2(num_candidates), 1.5 * num_candidates)) if num_trials < 0 or valset_size < 0 or minibatch_size < 0: @@ -359,8 +359,11 @@ def compile( self.max_metric_calls = self.max_full_evals * (len(trainset) + (len(valset) if valset is not None else 0)) else: assert self.max_metric_calls is not None, "Either auto, max_full_evals, or max_metric_calls must be set." + + logger.info(f"Running GEPA for approx {self.max_metric_calls} metric calls of the program. This amounts to {self.max_metric_calls / len(trainset) if valset is None else self.max_metric_calls / (len(trainset) + len(valset)):.2f} full evals on the {'train' if valset is None else 'train+val'} set.") valset = valset or trainset + logger.info(f"Using {len(valset)} examples for tracking Pareto scores. You can consider using a sample of the valset to allow GEPA to explore more diverse solutions within the same budget.") rng = random.Random(self.seed) From 7d86db6c67a72d50fbae75f7a364f246aad6c929 Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Mon, 11 Aug 2025 16:09:14 -0700 Subject: [PATCH 09/12] Fix bug in TraceData type --- dspy/teleprompt/bootstrap_finetune.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dspy/teleprompt/bootstrap_finetune.py b/dspy/teleprompt/bootstrap_finetune.py index eaab963c62..11b01efa8e 100644 --- a/dspy/teleprompt/bootstrap_finetune.py +++ b/dspy/teleprompt/bootstrap_finetune.py @@ -221,7 +221,6 @@ class FailedPrediction: completion_text: str format_reward: float | None = None -@dataclass class TraceData(TypedDict): example_ind: int example: Example From 8d5b79676a4af2ec9d960fc08179ebc6a432abc6 Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Mon, 11 Aug 2025 17:04:54 -0700 Subject: [PATCH 10/12] Fix ruff checks --- dspy/teleprompt/bootstrap_finetune.py | 6 +- dspy/teleprompt/gepa/gepa.py | 91 ++++++++++++++------------- dspy/teleprompt/gepa/gepa_utils.py | 64 ++++++++++--------- 3 files changed, 82 insertions(+), 79 deletions(-) diff --git a/dspy/teleprompt/bootstrap_finetune.py b/dspy/teleprompt/bootstrap_finetune.py index 11b01efa8e..5f07786381 100644 --- a/dspy/teleprompt/bootstrap_finetune.py +++ b/dspy/teleprompt/bootstrap_finetune.py @@ -1,7 +1,7 @@ import logging from collections import defaultdict from dataclasses import dataclass -from typing import Any, Callable, Dict, List, Tuple, TypedDict +from typing import Any, Callable, TypedDict import dspy from dspy.adapters.base import Adapter @@ -9,11 +9,11 @@ from dspy.clients.lm import LM from dspy.clients.utils_finetune import infer_data_format from dspy.dsp.utils.settings import settings -from dspy.primitives.prediction import Prediction from dspy.evaluate.evaluate import Evaluate from dspy.predict.predict import Predict from dspy.primitives.example import Example from dspy.primitives.module import Module +from dspy.primitives.prediction import Prediction from dspy.teleprompt.teleprompt import Teleprompter from dspy.utils.exceptions import AdapterParseError @@ -225,7 +225,7 @@ class TraceData(TypedDict): example_ind: int example: Example prediction: Prediction - trace: List[Tuple[Any, Dict[str, Any], Prediction]] + trace: list[tuple[Any, dict[str, Any], Prediction]] score: float | None def bootstrap_trace_data( diff --git a/dspy/teleprompt/gepa/gepa.py b/dspy/teleprompt/gepa/gepa.py index ea55c9c0a5..5287649e5a 100644 --- a/dspy/teleprompt/gepa/gepa.py +++ b/dspy/teleprompt/gepa/gepa.py @@ -1,15 +1,16 @@ import logging import random from dataclasses import dataclass -from typing import Any, Dict, List, Literal, Optional, Set, Protocol, Tuple +from typing import Any, Literal, Protocol + +from gepa import GEPAResult, optimize -from dspy.dsp.utils.settings import settings from dspy.clients.lm import LM +from dspy.dsp.utils.settings import settings from dspy.primitives import Example, Module, Prediction from dspy.teleprompt.teleprompt import Teleprompter -from gepa import GEPAResult, optimize -from .gepa_utils import LoggerAdapter, DSPyTrace, ScoreWithFeedback, PredictorFeedbackFn, DspyAdapter +from .gepa_utils import DspyAdapter, DSPyTrace, LoggerAdapter, PredictorFeedbackFn, ScoreWithFeedback logger = logging.getLogger(__name__) @@ -23,9 +24,9 @@ class GEPAFeedbackMetric(Protocol): def __call__( gold: Example, pred: Prediction, - trace: Optional[DSPyTrace], - pred_name: Optional[str], - pred_trace: Optional[DSPyTrace], + trace: DSPyTrace | None, + pred_name: str | None, + pred_trace: DSPyTrace | None, ) -> float | ScoreWithFeedback: """ This function is called with the following arguments: @@ -70,21 +71,21 @@ class DspyGEPAResult: - best_candidate: the program text mapping for best_idx """ # Data about the proposed candidates - candidates: List[Module] - parents: List[List[Optional[int]]] - val_aggregate_scores: List[float] - val_subscores: List[List[float]] - per_val_instance_best_candidates: List[Set[int]] - discovery_eval_counts: List[int] + candidates: list[Module] + parents: list[list[int | None]] + val_aggregate_scores: list[float] + val_subscores: list[list[float]] + per_val_instance_best_candidates: list[set[int]] + discovery_eval_counts: list[int] # Optional data - best_outputs_valset: Optional[List[List[Tuple[int, List[Prediction]]]]] = None + best_outputs_valset: list[list[tuple[int, list[Prediction]]]] | None = None # Optimization metadata - total_metric_calls: Optional[int] = None - num_full_val_evals: Optional[int] = None - log_dir: Optional[str] = None - seed: Optional[int] = None + total_metric_calls: int | None = None + num_full_val_evals: int | None = None + log_dir: str | None = None + seed: int | None = None @property def best_idx(self) -> int: @@ -92,10 +93,10 @@ def best_idx(self) -> int: return max(range(len(scores)), key=lambda i: scores[i]) @property - def best_candidate(self) -> Dict[str, str]: + def best_candidate(self) -> dict[str, str]: return self.candidates[self.best_idx] - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: cands = [ {k: v for k, v in cand.items()} for cand in self.candidates @@ -224,31 +225,31 @@ def __init__( metric: GEPAFeedbackMetric, *, # Budget configuration - auto: Optional[Literal["light", "medium", "heavy"]] = None, - max_full_evals: Optional[int] = None, - max_metric_calls: Optional[int] = None, + auto: Literal["light", "medium", "heavy"] | None = None, + max_full_evals: int | None = None, + max_metric_calls: int | None = None, # Reflection based configuration reflection_minibatch_size: int = 3, candidate_selection_strategy: Literal["pareto", "current_best"] = "pareto", - reflection_lm: Optional[LM] = None, + reflection_lm: LM | None = None, skip_perfect_score: bool = True, add_format_failure_as_feedback: bool = False, # Merge-based configuration use_merge: bool = True, - max_merge_invocations: Optional[int] = 5, + max_merge_invocations: int | None = 5, # Evaluation configuration - num_threads: Optional[int] = None, + num_threads: int | None = None, failure_score: float = 0.0, perfect_score: float = 1.0, # Logging log_dir: str = None, track_stats: bool = False, use_wandb: bool = False, - wandb_api_key: Optional[str] = None, - wandb_init_kwargs: Optional[Dict[str, Any]] = None, + wandb_api_key: str | None = None, + wandb_init_kwargs: dict[str, Any] | None = None, track_best_outputs: bool = False, # Reproducibility - seed: Optional[int] = 0, + seed: int | None = 0, ): self.metric_fn = metric @@ -278,7 +279,7 @@ def __init__( # Merge-based configuration self.use_merge = use_merge self.max_merge_invocations = max_merge_invocations - + # Evaluation Configuration self.num_threads = num_threads self.failure_score = failure_score @@ -333,9 +334,9 @@ def compile( self, student: Module, *, - trainset: List[Example], - teacher: Optional[Module] = None, - valset: Optional[List[Example]] = None, + trainset: list[Example], + teacher: Module | None = None, + valset: list[Example] | None = None, ) -> Module: """ GEPA uses the trainset to perform reflective updates to the prompt, but uses the valset for tracking Pareto scores. @@ -359,7 +360,7 @@ def compile( self.max_metric_calls = self.max_full_evals * (len(trainset) + (len(valset) if valset is not None else 0)) else: assert self.max_metric_calls is not None, "Either auto, max_full_evals, or max_metric_calls must be set." - + logger.info(f"Running GEPA for approx {self.max_metric_calls} metric calls of the program. This amounts to {self.max_metric_calls / len(trainset) if valset is None else self.max_metric_calls / (len(trainset) + len(valset)):.2f} full evals on the {'train' if valset is None else 'train+val'} set.") valset = valset or trainset @@ -369,10 +370,10 @@ def compile( def feedback_fn_creator(pred_name: str, predictor) -> PredictorFeedbackFn: def feedback_fn( - predictor_output: Dict[str, Any], - predictor_inputs: Dict[str, Any], + predictor_output: dict[str, Any], + predictor_inputs: dict[str, Any], module_inputs: Example, - module_outputs: Prediction, + module_outputs: Prediction, captured_trace: DSPyTrace, ) -> ScoreWithFeedback: trace_for_pred = [(predictor, predictor_inputs, predictor_output)] @@ -383,16 +384,16 @@ def feedback_fn( pred_name, trace_for_pred, ) - if hasattr(o, 'feedback'): - if o['feedback'] is None: - o['feedback'] = f"This trajectory got a score of {o['score']}." + if hasattr(o, "feedback"): + if o["feedback"] is None: + o["feedback"] = f"This trajectory got a score of {o['score']}." return o else: return dict(score=o, feedback=f"This trajectory got a score of {o}.") return feedback_fn feedback_map = { - k: feedback_fn_creator(k, v) + k: feedback_fn_creator(k, v) for k, v in student.named_predictors() } @@ -416,7 +417,7 @@ def feedback_fn( trainset=trainset, valset=valset, adapter=adapter, - + # Reflection-based configuration reflection_lm=reflection_lm, candidate_selection_strategy=self.candidate_selection_strategy, @@ -424,11 +425,11 @@ def feedback_fn( reflection_minibatch_size=self.reflection_minibatch_size, perfect_score=self.perfect_score, - + # Merge-based configuration use_merge=self.use_merge, max_merge_invocations=self.max_merge_invocations, - + # Budget max_metric_calls=self.max_metric_calls, @@ -448,6 +449,6 @@ def feedback_fn( if self.track_stats: dspy_gepa_result = DspyGEPAResult.from_gepa_result(gepa_result, adapter) - setattr(new_prog, "detailed_results", dspy_gepa_result) + new_prog.detailed_results = dspy_gepa_result return new_prog diff --git a/dspy/teleprompt/gepa/gepa_utils.py b/dspy/teleprompt/gepa/gepa_utils.py index 15df9c89aa..6bac1b9598 100644 --- a/dspy/teleprompt/gepa/gepa_utils.py +++ b/dspy/teleprompt/gepa/gepa_utils.py @@ -1,14 +1,16 @@ import logging import random -from typing import Any, Callable, Dict, List, Optional, Protocol, Tuple, TypedDict +from typing import Any, Callable, Protocol + +from gepa import EvaluationBatch, GEPAAdapter from dspy.adapters.chat_adapter import ChatAdapter from dspy.adapters.types import History -from ..bootstrap_finetune import TraceData from dspy.evaluate import Evaluate from dspy.primitives import Example, Prediction -from gepa import EvaluationBatch, GEPAAdapter +from ..bootstrap_finetune import TraceData + class LoggerAdapter: def __init__(self, logger: logging.Logger): @@ -17,7 +19,7 @@ def __init__(self, logger: logging.Logger): def log(self, x: str): self.logger.info(x) -DSPyTrace = List[Tuple[Any, Dict[str, Any], Prediction]] +DSPyTrace = list[tuple[Any, dict[str, Any], Prediction]] class ScoreWithFeedback(Prediction): score: float @@ -25,10 +27,10 @@ class ScoreWithFeedback(Prediction): class PredictorFeedbackFn(Protocol): def __call__( - predictor_output: Dict[str, Any], - predictor_inputs: Dict[str, Any], + predictor_output: dict[str, Any], + predictor_inputs: dict[str, Any], module_inputs: Example, - module_outputs: Prediction, + module_outputs: Prediction, captured_trace: DSPyTrace, ) -> ScoreWithFeedback: """ @@ -52,11 +54,11 @@ def __init__( self, student_module, metric_fn: Callable, - feedback_map: Dict[str, Callable], + feedback_map: dict[str, Callable], failure_score=0.0, - num_threads: Optional[int] = None, + num_threads: int | None = None, add_format_failure_as_feedback: bool = False, - rng: Optional[random.Random] = None, + rng: random.Random | None = None, ): self.student = student_module self.metric_fn = metric_fn @@ -69,7 +71,7 @@ def __init__( # Cache predictor names/signatures self.named_predictors = list(self.student.named_predictors()) - def build_program(self, candidate: Dict[str, str]): + def build_program(self, candidate: dict[str, str]): new_prog = self.student.deepcopy() for name, pred in new_prog.named_predictors(): if name in candidate: @@ -95,13 +97,13 @@ def evaluate(self, batch, candidate, capture_traces=False): scores = [] outputs = [] for t in trajs: - outputs.append(t['prediction']) - if hasattr(t['prediction'], '__class__') and t.get('score') is None: + outputs.append(t["prediction"]) + if hasattr(t["prediction"], "__class__") and t.get("score") is None: scores.append(self.failure_score) else: - score = t['score'] - if hasattr(score, 'score'): - score = score['score'] + score = t["score"] + if hasattr(score, "score"): + score = score["score"] scores.append(score) return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajs) else: @@ -118,14 +120,14 @@ def evaluate(self, batch, candidate, capture_traces=False): res = evaluator(program) outputs = [r[1] for r in res.results] scores = [r[2] for r in res.results] - scores = [s['score'] if hasattr(s, 'score') else s for s in scores] + scores = [s["score"] if hasattr(s, "score") else s for s in scores] return EvaluationBatch(outputs=outputs, scores=scores, trajectories=None) def make_reflective_dataset(self, candidate, eval_batch, components_to_update): from ..bootstrap_finetune import FailedPrediction program = self.build_program(candidate) - ret_d: Dict[str, List[Dict[str, Any]]] = {} + ret_d: dict[str, list[dict[str, Any]]] = {} for pred_name in components_to_update: module = None for name, m in program.named_predictors(): @@ -134,14 +136,14 @@ def make_reflective_dataset(self, candidate, eval_batch, components_to_update): break assert module is not None - items: List[Dict[str, Any]] = [] + items: list[dict[str, Any]] = [] for data in eval_batch.trajectories or []: trace = data["trace"] example = data["example"] prediction = data["prediction"] module_score = data["score"] - if hasattr(module_score, 'score'): - module_score = module_score['score'] + if hasattr(module_score, "score"): + module_score = module_score["score"] trace_instances = [t for t in trace if t[0].signature.equals(module.signature)] if not self.add_format_failure_as_feedback: @@ -173,14 +175,14 @@ def make_reflective_dataset(self, candidate, eval_batch, components_to_update): contains_history = True assert history_key_name is None history_key_name = input_key - + if contains_history: s = "```json\n" for i, message in enumerate(inputs[history_key_name].messages): s += f" {i}: {message}\n" s += "```" new_inputs["Context"] = s - + for input_key, input_val in inputs.items(): if contains_history and input_key == history_key_name: continue @@ -202,7 +204,7 @@ def make_reflective_dataset(self, candidate, eval_batch, components_to_update): structure_instruction = "" for dd in adapter.format(module.signature, [], {}): structure_instruction += dd["role"] + ": " + dd["content"] + "\n" - d['Feedback'] = "Your output failed to parse. Follow this structure:\n" + structure_instruction + d["Feedback"] = "Your output failed to parse. Follow this structure:\n" + structure_instruction # d['score'] = self.failure_score else: feedback_fn = self.feedback_map[pred_name] @@ -213,7 +215,7 @@ def make_reflective_dataset(self, candidate, eval_batch, components_to_update): module_outputs=prediction, captured_trace=trace, ) - d['Feedback'] = fb["feedback"] + d["Feedback"] = fb["feedback"] assert fb["score"] == module_score, f"Currently, GEPA only supports feedback functions that return the same score as the module's score. However, the module-level score is {module_score} and the feedback score is {fb.score}." # d['score'] = fb.score items.append(d) @@ -222,19 +224,19 @@ def make_reflective_dataset(self, candidate, eval_batch, components_to_update): # raise Exception(f"No valid predictions found for module {module.signature}.") continue ret_d[pred_name] = items - + if len(ret_d) == 0: - raise Exception(f"No valid predictions found for any module.") + raise Exception("No valid predictions found for any module.") return ret_d - + # TODO: The current DSPyAdapter implementation uses the GEPA default propose_new_texts. # We can potentially override this, to use the instruction proposal similar to MIPROv2. # def propose_new_texts( # self, - # candidate: Dict[str, str], - # reflective_dataset: Dict[str, List[Dict[str, Any]]], + # candidate: Dict[str, str], + # reflective_dataset: Dict[str, List[Dict[str, Any]]], # components_to_update: List[str] # ) -> Dict[str, str]: # if self.adapter.propose_new_texts is not None: @@ -252,4 +254,4 @@ def make_reflective_dataset(self, candidate, eval_batch, components_to_update): # "dataset_with_feedback": dataset_with_feedback # } # )['new_instruction'] - # return new_texts \ No newline at end of file + # return new_texts From e44ca8f031460117f7fbe2de6906b4b4771bc038 Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Mon, 11 Aug 2025 19:27:43 -0700 Subject: [PATCH 11/12] Update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 56e7d5e5aa..2350bee1db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ dependencies = [ "rich>=13.7.1", "numpy>=1.26.0", "xxhash>=3.5.0", - "gepa==0.0.1" + "gepa==0.0.2" ] [project.optional-dependencies] From 08c704e62b747424a9a8dee5d721949aa08c8dcb Mon Sep 17 00:00:00 2001 From: Lakshya A Agrawal Date: Mon, 11 Aug 2025 19:39:38 -0700 Subject: [PATCH 12/12] Add note about ITCS --- dspy/teleprompt/gepa/gepa.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dspy/teleprompt/gepa/gepa.py b/dspy/teleprompt/gepa/gepa.py index 5287649e5a..d7082d1ad5 100644 --- a/dspy/teleprompt/gepa/gepa.py +++ b/dspy/teleprompt/gepa/gepa.py @@ -136,6 +136,7 @@ class GEPA(Teleprompter): """ GEPA is an evolutionary optimizer, which uses reflection to evolve text components of complex systems. GEPA is proposed in the paper [GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning](https://arxiv.org/abs/2507.19457). + The GEPA optimization engine is provided by the `gepa` package, available from [https://github.com/gepa-ai/gepa](https://github.com/gepa-ai/gepa). GEPA captures full traces of the DSPy module's execution, identifies the parts of the trace corresponding to a specific predictor, and reflects on the behaviour of the predictor to @@ -175,8 +176,8 @@ def metric( ... ``` - GEPA can also be used as a batch inference-time search strategy, by passing `valset=trainset, track_stats=True`, and using the - `detailed_results` attribute of the optimized program (returned by `compile`) to get the Pareto frontier of the batch. + GEPA can also be used as a batch inference-time search strategy, by passing `valset=trainset, track_stats=True, track_best_outputs=True`, and using the + `detailed_results` attribute of the optimized program (returned by `compile`) to get the Pareto frontier of the batch. `optimized_program.detailed_results.best_outputs_valset` will contain the best outputs for each task in the batch. Example: ```