diff --git a/src/mcp_agent/workflows/deep_orchestrator/config.py b/src/mcp_agent/workflows/deep_orchestrator/config.py
index 2382ccc2a..ce29f9716 100644
--- a/src/mcp_agent/workflows/deep_orchestrator/config.py
+++ b/src/mcp_agent/workflows/deep_orchestrator/config.py
@@ -31,6 +31,29 @@ class ExecutionConfig(BaseModel):
enable_filesystem: bool = True
"""Enable filesystem workspace for artifacts"""
+ # Efficiency and robustness controls
+ max_plan_verification_attempts: int = 4
+ """Maximum attempts to repair/verify a plan before proceeding"""
+
+ # Knowledge extraction strategy
+ knowledge_extraction_mode: str = "batch"
+ """Either 'per_task' or 'batch' (default) to extract knowledge after a step"""
+
+ knowledge_batch_max_concurrent: int = 3
+ """Max concurrent knowledge extraction tasks when in batch mode"""
+
+ # Token/cost optimization
+ lean_agent_design: bool = False
+ """If true, skip designer LLM call and create minimal agents for tasks"""
+
+ # Adaptive effort scaling based on objective complexity
+ dynamic_effort_scaling: bool = False
+ """If true, adjust execution/context budgets based on objective complexity"""
+
+ # Artifact persistence
+ save_task_outputs_to_artifacts: bool = True
+ """If true, persist each successful task's output into the workspace artifacts"""
+
class ContextConfig(BaseModel):
"""Configuration for context management."""
diff --git a/src/mcp_agent/workflows/deep_orchestrator/knowledge.py b/src/mcp_agent/workflows/deep_orchestrator/knowledge.py
index 74ea9a519..572755239 100644
--- a/src/mcp_agent/workflows/deep_orchestrator/knowledge.py
+++ b/src/mcp_agent/workflows/deep_orchestrator/knowledge.py
@@ -103,6 +103,16 @@ async def extract_knowledge(
else:
confidence = 0.8
+ # Attach provenance/citation if available from the calling LLM
+ citation = None
+ try:
+ if hasattr(llm, "get_and_clear_tool_provenance"):
+ prov = llm.get_and_clear_tool_provenance()
+ if prov:
+ citation = {"tools": prov[-3:]} # last few tool calls
+ except Exception:
+ citation = None
+
knowledge_items.append(
KnowledgeItem(
key=item.get("key", "Unknown"),
@@ -110,6 +120,7 @@ async def extract_knowledge(
source=task_result.task_name,
confidence=confidence,
category=item.get("category", "general"),
+ citation=citation,
)
)
diff --git a/src/mcp_agent/workflows/deep_orchestrator/models.py b/src/mcp_agent/workflows/deep_orchestrator/models.py
index 6face3ab6..87077eefa 100644
--- a/src/mcp_agent/workflows/deep_orchestrator/models.py
+++ b/src/mcp_agent/workflows/deep_orchestrator/models.py
@@ -47,6 +47,8 @@ class KnowledgeItem:
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
confidence: float = 1.0
category: str = "general"
+ # Added citation/provenance
+ citation: Dict[str, Any] | None = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation."""
@@ -72,6 +74,8 @@ class TaskResult:
knowledge_extracted: List[KnowledgeItem] = field(default_factory=list)
duration_seconds: float = 0.0
retry_count: int = 0
+ # Optional provenance/citation information for outputs
+ citation: Dict[str, Any] | None = None
@property
def success(self) -> bool:
diff --git a/src/mcp_agent/workflows/deep_orchestrator/orchestrator.py b/src/mcp_agent/workflows/deep_orchestrator/orchestrator.py
index 456cb606c..980962c91 100644
--- a/src/mcp_agent/workflows/deep_orchestrator/orchestrator.py
+++ b/src/mcp_agent/workflows/deep_orchestrator/orchestrator.py
@@ -201,6 +201,8 @@ def _initialize_execution_components(self, objective: str):
context=self.context,
max_task_retries=self.config.execution.max_task_retries,
enable_parallel=self.config.execution.enable_parallel,
+ knowledge_extraction_mode=self.config.execution.knowledge_extraction_mode,
+ lean_agent_design=self.config.execution.lean_agent_design,
)
# Set budget update callback
@@ -290,6 +292,47 @@ async def _execute_workflow(
self.iteration = 0
self.replan_count = 0
+ # Optional dynamic effort scaling inspired by Anthropic research heuristics
+ if getattr(self.config.execution, "dynamic_effort_scaling", False):
+ # Cheap LLM pass to assess complexity and suggest scaling
+ assessor = Agent(
+ name="EffortAssessor",
+ instruction=(
+ "Assess objective complexity and recommend iteration/replan/context budgets."
+ ),
+ context=self.context,
+ )
+ llm = self.llm_factory(assessor)
+ try:
+ rec = await llm.generate_structured( # type: ignore[arg-type]
+ message=(
+ f"Objective: {self.objective}\n"
+ "Return JSON with keys: max_iterations, max_replans, task_context_budget."
+ ),
+ response_model=dict, # Loose schema to keep it cheap
+ request_params=RequestParams(max_iterations=1, temperature=0.1),
+ )
+ mi = int(
+ rec.get("max_iterations", self.config.execution.max_iterations)
+ )
+ mr = int(rec.get("max_replans", self.config.execution.max_replans))
+ tcb = int(
+ rec.get(
+ "task_context_budget", self.config.context.task_context_budget
+ )
+ )
+ self.config.execution.max_iterations = max(
+ self.config.execution.max_iterations, mi
+ )
+ self.config.execution.max_replans = max(
+ self.config.execution.max_replans, mr
+ )
+ self.config.context.task_context_budget = max(
+ self.config.context.task_context_budget, tcb
+ )
+ except Exception:
+ pass
+
# Phase 1: Initial Planning
span.add_event("phase_1_initial_planning")
logger.info("Phase 1: Creating initial plan")
@@ -390,6 +433,29 @@ async def _execute_workflow(
next_step, request_params, self.executor
)
+ # If configured, extract knowledge in batch post-step to reduce token churn
+ if (
+ self.config.execution.knowledge_extraction_mode == "batch"
+ and self.memory.task_results
+ ):
+ # Gather results from this step only
+ step_task_names = {t.name for t in next_step.tasks}
+ step_results = [
+ r
+ for r in self.memory.task_results
+ if r.task_name in step_task_names
+ ]
+ try:
+ extracted = await self.knowledge_extractor.extract_batch(
+ step_results,
+ self.objective,
+ max_concurrent=self.config.execution.knowledge_batch_max_concurrent,
+ )
+ for item in extracted:
+ self.memory.add_knowledge(item)
+ except Exception as batch_err:
+ logger.warning(f"Batch knowledge extraction failed: {batch_err}")
+
# Complete the step
self.queue.complete_step(next_step)
@@ -442,9 +508,19 @@ async def _create_full_plan(self) -> Plan:
)
llm = self.llm_factory(planner)
+ # Prefer intelligent model for planning
+ try:
+ rp = RequestParams(max_iterations=2)
+ rp.modelPreferences = getattr(
+ self.context, "planner_model_preferences", None
+ )
+ except Exception:
+ rp = RequestParams(max_iterations=2)
# Try to create a valid plan with retries
- max_verification_attempts = 10
+ max_verification_attempts = max(
+ 1, getattr(self.config.execution, "max_plan_verification_attempts", 4)
+ )
previous_plan: Plan = None
previous_errors = None
@@ -487,7 +563,9 @@ async def _create_full_plan(self) -> Plan:
# Get structured plan
prompt = get_full_plan_prompt(context)
plan: Plan = await retry_with_backoff(
- lambda: llm.generate_structured(message=prompt, response_model=Plan),
+ lambda: llm.generate_structured(
+ message=prompt, response_model=Plan, request_params=rp
+ ),
max_attempts=2,
)
@@ -553,6 +631,14 @@ async def _verify_completion(self) -> tuple[bool, float]:
)
llm = self.llm_factory(verifier)
+ # Prefer capable but cost-aware model
+ rp = RequestParams(max_iterations=1)
+ try:
+ rp.modelPreferences = getattr(
+ self.context, "verifier_model_preferences", None
+ )
+ except Exception:
+ pass
# Build verification context
context = get_verification_context(
@@ -647,9 +733,14 @@ async def _create_final_synthesis(self) -> List[MessageT]:
async with synthesizer:
llm = await synthesizer.attach_llm(self.llm_factory)
- result = await llm.generate(
- message=prompt, request_params=RequestParams(max_iterations=5)
- )
+ rp = RequestParams(max_iterations=5)
+ try:
+ rp.modelPreferences = getattr(
+ self.context, "synthesizer_model_preferences", None
+ )
+ except Exception:
+ pass
+ result = await llm.generate(message=prompt, request_params=rp)
logger.info("Final synthesis completed")
return result
diff --git a/src/mcp_agent/workflows/deep_orchestrator/prompts.py b/src/mcp_agent/workflows/deep_orchestrator/prompts.py
index 765e9dbb8..96d7fd7de 100644
--- a/src/mcp_agent/workflows/deep_orchestrator/prompts.py
+++ b/src/mcp_agent/workflows/deep_orchestrator/prompts.py
@@ -67,6 +67,7 @@
requires_context_from can ONLY reference tasks from PREVIOUS steps, not the current step
If a task needs output from another task in the same step, move it to a subsequent step
Only set context_window_budget if task needs more than default (10000 tokens)
+ Scale effort to query complexity: simple fact-finding = 1 step, few tasks; comparisons = 2-4 parallel tasks; broad surveys = multiple steps with tight division of labor
@@ -75,6 +76,8 @@
Consider resource constraints and prefer efficient approaches
Think step by step about the best way to achieve the objective
Tasks within a step run in parallel, steps run sequentially
+ Prefer asynchronous, loosely-coupled sub-tasks that can progress independently and be synthesized later
+ Favor authoritative sources/tools over SEO spam; encode source-quality selection when planning tasks
diff --git a/src/mcp_agent/workflows/deep_orchestrator/task_executor.py b/src/mcp_agent/workflows/deep_orchestrator/task_executor.py
index 06bf94a98..b7aa52750 100644
--- a/src/mcp_agent/workflows/deep_orchestrator/task_executor.py
+++ b/src/mcp_agent/workflows/deep_orchestrator/task_executor.py
@@ -50,6 +50,8 @@ def __init__(
context: Optional["Context"] = None,
max_task_retries: int = 3,
enable_parallel: bool = True,
+ knowledge_extraction_mode: str = "per_task",
+ lean_agent_design: bool = False,
):
"""
Initialize the task executor.
@@ -76,6 +78,8 @@ def __init__(
self.context = context
self.max_task_retries = max_task_retries
self.enable_parallel = enable_parallel
+ self.knowledge_extraction_mode = knowledge_extraction_mode
+ self.lean_agent_design = lean_agent_design
# Budget update callback (will be set by orchestrator)
self.update_budget_tokens = lambda tokens: None
@@ -228,17 +232,31 @@ async def _execute_task_once(
# Execute with agent
if isinstance(agent, AugmentedLLM):
+ rp = request_params or RequestParams(max_iterations=10)
+ try:
+ # Favor cheaper/faster models for executors unless overridden
+ if not getattr(rp, "modelPreferences", None):
+ rp.modelPreferences = getattr(
+ self.context, "executor_model_preferences", None
+ )
+ except Exception:
+ pass
output = await agent.generate_str(
- message=task_context,
- request_params=request_params or RequestParams(max_iterations=10),
+ message=task_context, request_params=rp
)
else:
async with agent:
llm = await agent.attach_llm(self.llm_factory)
+ rp = request_params or RequestParams(max_iterations=10)
+ try:
+ if not getattr(rp, "modelPreferences", None):
+ rp.modelPreferences = getattr(
+ self.context, "executor_model_preferences", None
+ )
+ except Exception:
+ pass
output = await llm.generate_str(
- message=task_context,
- request_params=request_params
- or RequestParams(max_iterations=10),
+ message=task_context, request_params=rp
)
# Success
@@ -246,6 +264,23 @@ async def _execute_task_once(
result.output = output
result.duration_seconds = time.time() - start_time
+ # Persist artifact if enabled
+ try:
+ if getattr(
+ getattr(self.context, "orchestrator_config", object()),
+ "execution",
+ object(),
+ ):
+ cfg = self.context.orchestrator_config # type: ignore[attr-defined]
+ if getattr(cfg.execution, "save_task_outputs_to_artifacts", True):
+ artifact_name = f"task_{task.name}.txt"
+ self.memory.save_artifact(
+ artifact_name, output, to_filesystem=True
+ )
+ result.artifacts[artifact_name] = output
+ except Exception:
+ pass
+
# Extract artifacts if mentioned
if any(
phrase in output.lower()
@@ -253,11 +288,12 @@ async def _execute_task_once(
):
result.artifacts[f"task_{task.name}_output"] = output
- # Extract knowledge
- knowledge_items = await self.knowledge_extractor.extract_knowledge(
- result, self.objective
- )
- result.knowledge_extracted = knowledge_items
+ # Extract knowledge (skip here if batch mode is enabled; orchestrator will run batch)
+ if self.knowledge_extraction_mode == "per_task":
+ knowledge_items = await self.knowledge_extractor.extract_knowledge(
+ result, self.objective
+ )
+ result.knowledge_extracted = knowledge_items
# Update task status
task.status = TaskStatus.COMPLETED
@@ -274,7 +310,7 @@ async def _execute_task_once(
task.status = TaskStatus.FAILED
logger.error(f"Task {task.name} failed: {e}")
- # Record result
+ # Record result
self.memory.add_task_result(result)
return result
@@ -329,7 +365,36 @@ async def _create_dynamic_agent(self, task: Task) -> Agent:
"""
logger.debug(f"Creating dynamic agent for task: {task.description[:50]}...")
- # Agent designer
+ # If lean mode, construct a minimal specialized agent without an extra LLM round-trip
+ if self.lean_agent_design:
+ minimal_instruction = build_agent_instruction(
+ {
+ "instruction": (
+ "You are a focused specialist. Complete the task precisely using the specified tools."
+ ),
+ "role": "Specialist executor",
+ "key_behaviors": [
+ "Use tools actively and verify results",
+ "Be concise and avoid unnecessary token usage",
+ "Stop when the deliverable is satisfied",
+ ],
+ "tool_usage_tips": [
+ "Prefer authoritative sources where applicable",
+ "Batch operations to reduce repeated tool calls",
+ ],
+ }
+ )
+
+ agent = Agent(
+ name=f"Lean_{task.name}",
+ instruction=minimal_instruction,
+ server_names=task.servers,
+ context=self.context,
+ )
+ logger.debug(f"Created lean dynamic agent for task: {task.name}")
+ return agent
+
+ # Full agent design via LLM
designer = Agent(
name="AgentDesigner",
instruction=AGENT_DESIGNER_INSTRUCTION,
@@ -338,7 +403,6 @@ async def _create_dynamic_agent(self, task: Task) -> Agent:
llm = self.llm_factory(designer)
- # Design agent
design_prompt = get_agent_design_prompt(
task.description, task.servers, self.objective
)
@@ -347,7 +411,6 @@ async def _create_dynamic_agent(self, task: Task) -> Agent:
message=design_prompt, response_model=AgentDesign
)
- # Build comprehensive instruction
instruction = build_agent_instruction(design.model_dump())
agent = Agent(
diff --git a/src/mcp_agent/workflows/llm/augmented_llm.py b/src/mcp_agent/workflows/llm/augmented_llm.py
index cabcecf89..670917a97 100644
--- a/src/mcp_agent/workflows/llm/augmented_llm.py
+++ b/src/mcp_agent/workflows/llm/augmented_llm.py
@@ -252,6 +252,8 @@ def __init__(
self.executor = self.context.executor
self.name = self._gen_name(name or (agent.name if agent else None), prefix=None)
self.instruction = instruction or (agent.instruction if agent else None)
+ # Track last tool call metadata for provenance
+ self._last_tool_calls: list[dict[str, Any]] = []
if not self.name:
raise ValueError(
@@ -434,6 +436,18 @@ async def post_tool_call(
self, tool_call_id: str | None, request: CallToolRequest, result: CallToolResult
) -> CallToolResult:
"""Called after a tool execution. Can modify the result before it's returned."""
+ # Record minimal provenance for citations
+ try:
+ self._last_tool_calls.append(
+ {
+ "tool": request.params.name,
+ "arguments": request.params.arguments,
+ "tool_call_id": tool_call_id,
+ "isError": result.isError,
+ }
+ )
+ except Exception:
+ pass
return result
async def call_tool(
@@ -520,6 +534,12 @@ def message_param_str(self, message: MessageParamT) -> str:
"""Convert an input message to a string representation."""
return str(message)
+ def get_and_clear_tool_provenance(self) -> list[dict[str, Any]]:
+ """Return and clear recorded tool call provenance."""
+ prov = self._last_tool_calls
+ self._last_tool_calls = []
+ return prov
+
def message_str(self, message: MessageT, content_only: bool = False) -> str:
"""Convert an output message to a string representation."""
return str(message)