Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/mcp_agent/workflows/deep_orchestrator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,29 @@ class ExecutionConfig(BaseModel):
enable_filesystem: bool = True
"""Enable filesystem workspace for artifacts"""

# Efficiency and robustness controls
max_plan_verification_attempts: int = 4
"""Maximum attempts to repair/verify a plan before proceeding"""

# Knowledge extraction strategy
knowledge_extraction_mode: str = "batch"
"""Either 'per_task' or 'batch' (default) to extract knowledge after a step"""

knowledge_batch_max_concurrent: int = 3
"""Max concurrent knowledge extraction tasks when in batch mode"""

# Token/cost optimization
lean_agent_design: bool = False
"""If true, skip designer LLM call and create minimal agents for tasks"""

# Adaptive effort scaling based on objective complexity
dynamic_effort_scaling: bool = False
"""If true, adjust execution/context budgets based on objective complexity"""

# Artifact persistence
save_task_outputs_to_artifacts: bool = True
"""If true, persist each successful task's output into the workspace artifacts"""

Comment on lines +34 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Type/validate new fields (Literal and Field constraints) to prevent invalid configs

  • Constrain knowledge_extraction_mode to "per_task" | "batch".
  • Add ge=1 constraints for counts.
-    max_plan_verification_attempts: int = 4
+    max_plan_verification_attempts: int = Field(default=4, ge=1)
@@
-    knowledge_extraction_mode: str = "batch"
+    knowledge_extraction_mode: Literal["per_task", "batch"] = "batch"
@@
-    knowledge_batch_max_concurrent: int = 3
+    knowledge_batch_max_concurrent: int = Field(default=3, ge=1)

Also add these imports at the top:

from typing import List, Optional, Literal
from pydantic import BaseModel, ConfigDict, Field
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/deep_orchestrator/config.py around lines 34 to 56,
the new configuration fields lack type constraints and validation. Update
knowledge_extraction_mode to use Literal["per_task", "batch"] to restrict its
values. Add ge=1 constraints using Field for integer count fields like
max_plan_verification_attempts and knowledge_batch_max_concurrent to ensure they
are positive. Also, add the imports from typing (List, Optional, Literal) and
from pydantic (BaseModel, ConfigDict, Field) at the top of the file as
specified.


class ContextConfig(BaseModel):
"""Configuration for context management."""
Expand Down
11 changes: 11 additions & 0 deletions src/mcp_agent/workflows/deep_orchestrator/knowledge.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,24 @@ async def extract_knowledge(
else:
confidence = 0.8

# Attach provenance/citation if available from the calling LLM
citation = None
try:
if hasattr(llm, "get_and_clear_tool_provenance"):
prov = llm.get_and_clear_tool_provenance()
if prov:
citation = {"tools": prov[-3:]} # last few tool calls
except Exception:
citation = None

Comment on lines +106 to +115
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Fetch provenance once (outside the loop) and prefer TaskResult.citation; current code clears it per-item

  • Calling get_and_clear_tool_provenance inside the loop clears provenance after the first item; subsequent items get None.
  • Provenance should usually come from the task execution (TaskResult.citation), not the extraction LLM instance.

Within the loop, use a pre-fetched citation:

-                # Attach provenance/citation if available from the calling LLM
-                citation = None
-                try:
-                    if hasattr(llm, "get_and_clear_tool_provenance"):
-                        prov = llm.get_and_clear_tool_provenance()
-                        if prov:
-                            citation = {"tools": prov[-3:]}  # last few tool calls
-                except Exception:
-                    citation = None
+                # Use pre-fetched citation (if any)
+                citation = citation_info

Add this block once right after initializing knowledge_items (outside this hunk):

# Pre-fetch citation once: prefer TaskResult.citation, else LLM provenance
citation_info = None
try:
    if getattr(task_result, "citation", None):
        citation_info = task_result.citation
    elif hasattr(llm, "get_and_clear_tool_provenance"):
        prov = llm.get_and_clear_tool_provenance()
        if prov:
            citation_info = {"tools": prov[-3:]}  # last few tool calls
except Exception:
    citation_info = None

Optionally sanitize provenance before storing (mask secrets, cap sizes) similar to augmented_llm._sanitize_provenance_args.

🤖 Prompt for AI Agents
In src/mcp_agent/workflows/deep_orchestrator/knowledge.py around lines 106 to
115, the code calls llm.get_and_clear_tool_provenance inside a loop, which
clears provenance after the first iteration causing subsequent items to get
None. To fix this, move the provenance fetching outside the loop by pre-fetching
citation_info once after initializing knowledge_items, preferring
task_result.citation if available, otherwise falling back to
llm.get_and_clear_tool_provenance. Then, inside the loop, use this pre-fetched
citation_info instead of calling the method repeatedly. Optionally, sanitize the
provenance data before storing it to mask secrets or limit size.

knowledge_items.append(
KnowledgeItem(
key=item.get("key", "Unknown"),
value=item.get("value", ""),
source=task_result.task_name,
confidence=confidence,
category=item.get("category", "general"),
citation=citation,
)
Comment on lines +123 to 124
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use the pre-fetched citation variable when constructing KnowledgeItem

Tie item to the same provenance snapshot.

-                        citation=citation,
+                        citation=citation_info,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
citation=citation,
)
citation=citation_info,
)
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/deep_orchestrator/knowledge.py at lines 123 to 124,
the KnowledgeItem construction should use the pre-fetched citation variable
instead of creating or fetching a new one. Update the code to pass the existing
citation variable to ensure the item is tied to the same provenance snapshot.

)

Expand Down
4 changes: 4 additions & 0 deletions src/mcp_agent/workflows/deep_orchestrator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class KnowledgeItem:
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
confidence: float = 1.0
category: str = "general"
# Added citation/provenance
citation: Dict[str, Any] | None = None

Comment on lines +50 to 52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Include citation in KnowledgeItem.to_dict to avoid dropping provenance

Without adding citation to to_dict, provenance is lost when serializing.

Apply this diff to to_dict:

     def to_dict(self) -> Dict[str, Any]:
         """Convert to dictionary representation."""
         return {
             "key": self.key,
             "value": self.value,
             "source": self.source,
             "timestamp": self.timestamp.isoformat(),
             "confidence": self.confidence,
             "category": self.category,
+            "citation": self.citation,
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Added citation/provenance
citation: Dict[str, Any] | None = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation."""
return {
"key": self.key,
"value": self.value,
"source": self.source,
"timestamp": self.timestamp.isoformat(),
"confidence": self.confidence,
"category": self.category,
+ "citation": self.citation,
}
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/deep_orchestrator/models.py around lines 50 to 52,
the citation attribute is defined but not included in the KnowledgeItem.to_dict
method, causing loss of provenance information during serialization. Update the
to_dict method to include the citation field in the returned dictionary so that
citation data is preserved when converting the object to a dictionary.

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation."""
Expand All @@ -72,6 +74,8 @@ class TaskResult:
knowledge_extracted: List[KnowledgeItem] = field(default_factory=list)
duration_seconds: float = 0.0
retry_count: int = 0
# Optional provenance/citation information for outputs
citation: Dict[str, Any] | None = None

@property
def success(self) -> bool:
Expand Down
101 changes: 96 additions & 5 deletions src/mcp_agent/workflows/deep_orchestrator/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ def _initialize_execution_components(self, objective: str):
context=self.context,
max_task_retries=self.config.execution.max_task_retries,
enable_parallel=self.config.execution.enable_parallel,
knowledge_extraction_mode=self.config.execution.knowledge_extraction_mode,
lean_agent_design=self.config.execution.lean_agent_design,
)

# Set budget update callback
Expand Down Expand Up @@ -290,6 +292,47 @@ async def _execute_workflow(
self.iteration = 0
self.replan_count = 0

# Optional dynamic effort scaling inspired by Anthropic research heuristics
if getattr(self.config.execution, "dynamic_effort_scaling", False):
# Cheap LLM pass to assess complexity and suggest scaling
assessor = Agent(
name="EffortAssessor",
instruction=(
"Assess objective complexity and recommend iteration/replan/context budgets."
),
context=self.context,
)
llm = self.llm_factory(assessor)
try:
rec = await llm.generate_structured( # type: ignore[arg-type]
message=(
f"<assess>Objective: {self.objective}\n"
"Return JSON with keys: max_iterations, max_replans, task_context_budget.</assess>"
),
response_model=dict, # Loose schema to keep it cheap
request_params=RequestParams(max_iterations=1, temperature=0.1),
)
mi = int(
rec.get("max_iterations", self.config.execution.max_iterations)
)
mr = int(rec.get("max_replans", self.config.execution.max_replans))
tcb = int(
rec.get(
"task_context_budget", self.config.context.task_context_budget
)
)
self.config.execution.max_iterations = max(
self.config.execution.max_iterations, mi
)
self.config.execution.max_replans = max(
self.config.execution.max_replans, mr
)
self.config.context.task_context_budget = max(
self.config.context.task_context_budget, tcb
)
except Exception:
pass

Comment on lines +295 to +335
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Dynamic effort scaling: avoid dict as response model and add bounds.

  • Passing response_model=dict to generate_structured may fail if the implementation expects a Pydantic model. Prefer using generate_str + json.loads, or define a minimal Pydantic model.
  • Consider clamping recommended values to sane upper bounds to avoid runaway budgets from a bad assessor output.
  • Optional: allow assessor_model_preferences from context to force cheap model.

Example fix using JSON:

@@
-            try:
-                rec = await llm.generate_structured(  # type: ignore[arg-type]
-                    message=(
-                        f"<assess>Objective: {self.objective}\n"
-                        "Return JSON with keys: max_iterations, max_replans, task_context_budget.</assess>"
-                    ),
-                    response_model=dict,  # Loose schema to keep it cheap
-                    request_params=RequestParams(max_iterations=1, temperature=0.1),
-                )
+            try:
+                rp = RequestParams(max_iterations=1, temperature=0.1)
+                # Prefer cheap/fast model if available
+                try:
+                    setattr(rp, "modelPreferences", getattr(self.context, "assessor_model_preferences", None))
+                except Exception:
+                    pass
+                raw = await llm.generate_str(
+                    message=(
+                        f"<assess>Objective: {self.objective}\n"
+                        "Return JSON with keys: max_iterations, max_replans, task_context_budget.</assess>"
+                    ),
+                    request_params=rp,
+                )
+                import json
+                rec = json.loads(raw) if raw else {}
@@
-                self.config.execution.max_iterations = max(
-                    self.config.execution.max_iterations, mi
-                )
-                self.config.execution.max_replans = max(
-                    self.config.execution.max_replans, mr
-                )
-                self.config.context.task_context_budget = max(
-                    self.config.context.task_context_budget, tcb
-                )
+                # Clamp updates to prevent extreme values (tunable caps)
+                cap_iter = getattr(self.config.execution, "max_iterations_cap", 50)
+                cap_replans = getattr(self.config.execution, "max_replans_cap", 10)
+                cap_ctx = getattr(self.config.context, "task_context_budget_cap", 20000)
+                self.config.execution.max_iterations = min(
+                    max(self.config.execution.max_iterations, mi), cap_iter
+                )
+                self.config.execution.max_replans = min(
+                    max(self.config.execution.max_replans, mr), cap_replans
+                )
+                self.config.context.task_context_budget = min(
+                    max(self.config.context.task_context_budget, tcb), cap_ctx
+                )

If you prefer structured parsing, define a tiny Pydantic model with optional ints and defaults.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Optional dynamic effort scaling inspired by Anthropic research heuristics
if getattr(self.config.execution, "dynamic_effort_scaling", False):
# Cheap LLM pass to assess complexity and suggest scaling
assessor = Agent(
name="EffortAssessor",
instruction=(
"Assess objective complexity and recommend iteration/replan/context budgets."
),
context=self.context,
)
llm = self.llm_factory(assessor)
try:
rec = await llm.generate_structured( # type: ignore[arg-type]
message=(
f"<assess>Objective: {self.objective}\n"
"Return JSON with keys: max_iterations, max_replans, task_context_budget.</assess>"
),
response_model=dict, # Loose schema to keep it cheap
request_params=RequestParams(max_iterations=1, temperature=0.1),
)
mi = int(
rec.get("max_iterations", self.config.execution.max_iterations)
)
mr = int(rec.get("max_replans", self.config.execution.max_replans))
tcb = int(
rec.get(
"task_context_budget", self.config.context.task_context_budget
)
)
self.config.execution.max_iterations = max(
self.config.execution.max_iterations, mi
)
self.config.execution.max_replans = max(
self.config.execution.max_replans, mr
)
self.config.context.task_context_budget = max(
self.config.context.task_context_budget, tcb
)
except Exception:
pass
# Optional dynamic effort scaling inspired by Anthropic research heuristics
if getattr(self.config.execution, "dynamic_effort_scaling", False):
# Cheap LLM pass to assess complexity and suggest scaling
assessor = Agent(
name="EffortAssessor",
instruction=(
"Assess objective complexity and recommend iteration/replan/context budgets."
),
context=self.context,
)
llm = self.llm_factory(assessor)
try:
rp = RequestParams(max_iterations=1, temperature=0.1)
# Prefer cheap/fast model if available
try:
setattr(
rp,
"modelPreferences",
getattr(self.context, "assessor_model_preferences", None),
)
except Exception:
pass
raw = await llm.generate_str(
message=(
f"<assess>Objective: {self.objective}\n"
"Return JSON with keys: max_iterations, max_replans, task_context_budget.</assess>"
),
request_params=rp,
)
import json
rec = json.loads(raw) if raw else {}
mi = int(
rec.get("max_iterations", self.config.execution.max_iterations)
)
mr = int(rec.get("max_replans", self.config.execution.max_replans))
tcb = int(
rec.get(
"task_context_budget",
self.config.context.task_context_budget,
)
)
# Clamp updates to prevent extreme values (tunable caps)
cap_iter = getattr(self.config.execution, "max_iterations_cap", 50)
cap_replans = getattr(self.config.execution, "max_replans_cap", 10)
cap_ctx = getattr(self.config.context, "task_context_budget_cap", 20000)
self.config.execution.max_iterations = min(
max(self.config.execution.max_iterations, mi), cap_iter
)
self.config.execution.max_replans = min(
max(self.config.execution.max_replans, mr), cap_replans
)
self.config.context.task_context_budget = min(
max(self.config.context.task_context_budget, tcb), cap_ctx
)
except Exception:
pass
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/deep_orchestrator/orchestrator.py around lines 295 to
335, the current use of response_model=dict in llm.generate_structured may cause
errors if the method expects a Pydantic model. Replace this by either using
generate_str combined with json.loads to parse the response or define a minimal
Pydantic model with optional integer fields and defaults for max_iterations,
max_replans, and task_context_budget. Additionally, after parsing the
recommended values, clamp them to predefined sane upper bounds before updating
the config to prevent excessively large budgets from faulty assessor outputs.
Optionally, incorporate assessor_model_preferences from the context to select a
cheaper model for the assessor.

# Phase 1: Initial Planning
span.add_event("phase_1_initial_planning")
logger.info("Phase 1: Creating initial plan")
Expand Down Expand Up @@ -390,6 +433,29 @@ async def _execute_workflow(
next_step, request_params, self.executor
)

# If configured, extract knowledge in batch post-step to reduce token churn
if (
self.config.execution.knowledge_extraction_mode == "batch"
and self.memory.task_results
):
# Gather results from this step only
step_task_names = {t.name for t in next_step.tasks}
step_results = [
r
for r in self.memory.task_results
if r.task_name in step_task_names
]
try:
extracted = await self.knowledge_extractor.extract_batch(
step_results,
self.objective,
max_concurrent=self.config.execution.knowledge_batch_max_concurrent,
)
for item in extracted:
self.memory.add_knowledge(item)
except Exception as batch_err:
logger.warning(f"Batch knowledge extraction failed: {batch_err}")

# Complete the step
self.queue.complete_step(next_step)

Expand Down Expand Up @@ -442,9 +508,19 @@ async def _create_full_plan(self) -> Plan:
)

llm = self.llm_factory(planner)
# Prefer intelligent model for planning
try:
rp = RequestParams(max_iterations=2)
rp.modelPreferences = getattr(
self.context, "planner_model_preferences", None
)
except Exception:
rp = RequestParams(max_iterations=2)

# Try to create a valid plan with retries
max_verification_attempts = 10
max_verification_attempts = max(
1, getattr(self.config.execution, "max_plan_verification_attempts", 4)
)
previous_plan: Plan = None
previous_errors = None

Expand Down Expand Up @@ -487,7 +563,9 @@ async def _create_full_plan(self) -> Plan:
# Get structured plan
prompt = get_full_plan_prompt(context)
plan: Plan = await retry_with_backoff(
lambda: llm.generate_structured(message=prompt, response_model=Plan),
lambda: llm.generate_structured(
message=prompt, response_model=Plan, request_params=rp
),
max_attempts=2,
)

Expand Down Expand Up @@ -553,6 +631,14 @@ async def _verify_completion(self) -> tuple[bool, float]:
)

llm = self.llm_factory(verifier)
# Prefer capable but cost-aware model
rp = RequestParams(max_iterations=1)
try:
rp.modelPreferences = getattr(
self.context, "verifier_model_preferences", None
)
except Exception:
pass

Comment on lines +635 to 642
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Verifier RequestParams prepared but not used.

rp is created (with model preferences) but not passed to generate_structured. This prevents the intended model selection from taking effect.

Apply:

-        result = await llm.generate_structured(
-            message=prompt, response_model=VerificationResult
-        )
+        result = await llm.generate_structured(
+            message=prompt, response_model=VerificationResult, request_params=rp
+        )

Also applies to: 653-655

🤖 Prompt for AI Agents
In src/mcp_agent/workflows/deep_orchestrator/orchestrator.py around lines 635 to
642 and similarly at lines 653 to 655, the RequestParams object 'rp' is created
and configured but not passed to the generate_structured function, so the model
preferences are not applied. To fix this, modify the calls to
generate_structured to include 'rp' as an argument, ensuring the model
preferences in 'rp' are used during generation.

# Build verification context
context = get_verification_context(
Expand Down Expand Up @@ -647,9 +733,14 @@ async def _create_final_synthesis(self) -> List[MessageT]:
async with synthesizer:
llm = await synthesizer.attach_llm(self.llm_factory)

result = await llm.generate(
message=prompt, request_params=RequestParams(max_iterations=5)
)
rp = RequestParams(max_iterations=5)
try:
rp.modelPreferences = getattr(
self.context, "synthesizer_model_preferences", None
)
except Exception:
pass
result = await llm.generate(message=prompt, request_params=rp)

logger.info("Final synthesis completed")
return result
Expand Down
3 changes: 3 additions & 0 deletions src/mcp_agent/workflows/deep_orchestrator/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<rule>requires_context_from can ONLY reference tasks from PREVIOUS steps, not the current step</rule>
<rule>If a task needs output from another task in the same step, move it to a subsequent step</rule>
<rule>Only set context_window_budget if task needs more than default (10000 tokens)</rule>
<rule>Scale effort to query complexity: simple fact-finding = 1 step, few tasks; comparisons = 2-4 parallel tasks; broad surveys = multiple steps with tight division of labor</rule>
</task_design_rules>

<important_notes>
Expand All @@ -75,6 +76,8 @@
<note>Consider resource constraints and prefer efficient approaches</note>
<note>Think step by step about the best way to achieve the objective</note>
<note>Tasks within a step run in parallel, steps run sequentially</note>
<note>Prefer asynchronous, loosely-coupled sub-tasks that can progress independently and be synthesized later</note>
<note>Favor authoritative sources/tools over SEO spam; encode source-quality selection when planning tasks</note>
</important_notes>

<example_task_structure>
Expand Down
Loading
Loading