diff --git a/src/mcp_agent/workflows/deep_orchestrator/config.py b/src/mcp_agent/workflows/deep_orchestrator/config.py index 2382ccc2a..ce29f9716 100644 --- a/src/mcp_agent/workflows/deep_orchestrator/config.py +++ b/src/mcp_agent/workflows/deep_orchestrator/config.py @@ -31,6 +31,29 @@ class ExecutionConfig(BaseModel): enable_filesystem: bool = True """Enable filesystem workspace for artifacts""" + # Efficiency and robustness controls + max_plan_verification_attempts: int = 4 + """Maximum attempts to repair/verify a plan before proceeding""" + + # Knowledge extraction strategy + knowledge_extraction_mode: str = "batch" + """Either 'per_task' or 'batch' (default) to extract knowledge after a step""" + + knowledge_batch_max_concurrent: int = 3 + """Max concurrent knowledge extraction tasks when in batch mode""" + + # Token/cost optimization + lean_agent_design: bool = False + """If true, skip designer LLM call and create minimal agents for tasks""" + + # Adaptive effort scaling based on objective complexity + dynamic_effort_scaling: bool = False + """If true, adjust execution/context budgets based on objective complexity""" + + # Artifact persistence + save_task_outputs_to_artifacts: bool = True + """If true, persist each successful task's output into the workspace artifacts""" + class ContextConfig(BaseModel): """Configuration for context management.""" diff --git a/src/mcp_agent/workflows/deep_orchestrator/knowledge.py b/src/mcp_agent/workflows/deep_orchestrator/knowledge.py index 74ea9a519..572755239 100644 --- a/src/mcp_agent/workflows/deep_orchestrator/knowledge.py +++ b/src/mcp_agent/workflows/deep_orchestrator/knowledge.py @@ -103,6 +103,16 @@ async def extract_knowledge( else: confidence = 0.8 + # Attach provenance/citation if available from the calling LLM + citation = None + try: + if hasattr(llm, "get_and_clear_tool_provenance"): + prov = llm.get_and_clear_tool_provenance() + if prov: + citation = {"tools": prov[-3:]} # last few tool calls + except Exception: + citation = None + knowledge_items.append( KnowledgeItem( key=item.get("key", "Unknown"), @@ -110,6 +120,7 @@ async def extract_knowledge( source=task_result.task_name, confidence=confidence, category=item.get("category", "general"), + citation=citation, ) ) diff --git a/src/mcp_agent/workflows/deep_orchestrator/models.py b/src/mcp_agent/workflows/deep_orchestrator/models.py index 6face3ab6..87077eefa 100644 --- a/src/mcp_agent/workflows/deep_orchestrator/models.py +++ b/src/mcp_agent/workflows/deep_orchestrator/models.py @@ -47,6 +47,8 @@ class KnowledgeItem: timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) confidence: float = 1.0 category: str = "general" + # Added citation/provenance + citation: Dict[str, Any] | None = None def to_dict(self) -> Dict[str, Any]: """Convert to dictionary representation.""" @@ -72,6 +74,8 @@ class TaskResult: knowledge_extracted: List[KnowledgeItem] = field(default_factory=list) duration_seconds: float = 0.0 retry_count: int = 0 + # Optional provenance/citation information for outputs + citation: Dict[str, Any] | None = None @property def success(self) -> bool: diff --git a/src/mcp_agent/workflows/deep_orchestrator/orchestrator.py b/src/mcp_agent/workflows/deep_orchestrator/orchestrator.py index 456cb606c..980962c91 100644 --- a/src/mcp_agent/workflows/deep_orchestrator/orchestrator.py +++ b/src/mcp_agent/workflows/deep_orchestrator/orchestrator.py @@ -201,6 +201,8 @@ def _initialize_execution_components(self, objective: str): context=self.context, max_task_retries=self.config.execution.max_task_retries, enable_parallel=self.config.execution.enable_parallel, + knowledge_extraction_mode=self.config.execution.knowledge_extraction_mode, + lean_agent_design=self.config.execution.lean_agent_design, ) # Set budget update callback @@ -290,6 +292,47 @@ async def _execute_workflow( self.iteration = 0 self.replan_count = 0 + # Optional dynamic effort scaling inspired by Anthropic research heuristics + if getattr(self.config.execution, "dynamic_effort_scaling", False): + # Cheap LLM pass to assess complexity and suggest scaling + assessor = Agent( + name="EffortAssessor", + instruction=( + "Assess objective complexity and recommend iteration/replan/context budgets." + ), + context=self.context, + ) + llm = self.llm_factory(assessor) + try: + rec = await llm.generate_structured( # type: ignore[arg-type] + message=( + f"Objective: {self.objective}\n" + "Return JSON with keys: max_iterations, max_replans, task_context_budget." + ), + response_model=dict, # Loose schema to keep it cheap + request_params=RequestParams(max_iterations=1, temperature=0.1), + ) + mi = int( + rec.get("max_iterations", self.config.execution.max_iterations) + ) + mr = int(rec.get("max_replans", self.config.execution.max_replans)) + tcb = int( + rec.get( + "task_context_budget", self.config.context.task_context_budget + ) + ) + self.config.execution.max_iterations = max( + self.config.execution.max_iterations, mi + ) + self.config.execution.max_replans = max( + self.config.execution.max_replans, mr + ) + self.config.context.task_context_budget = max( + self.config.context.task_context_budget, tcb + ) + except Exception: + pass + # Phase 1: Initial Planning span.add_event("phase_1_initial_planning") logger.info("Phase 1: Creating initial plan") @@ -390,6 +433,29 @@ async def _execute_workflow( next_step, request_params, self.executor ) + # If configured, extract knowledge in batch post-step to reduce token churn + if ( + self.config.execution.knowledge_extraction_mode == "batch" + and self.memory.task_results + ): + # Gather results from this step only + step_task_names = {t.name for t in next_step.tasks} + step_results = [ + r + for r in self.memory.task_results + if r.task_name in step_task_names + ] + try: + extracted = await self.knowledge_extractor.extract_batch( + step_results, + self.objective, + max_concurrent=self.config.execution.knowledge_batch_max_concurrent, + ) + for item in extracted: + self.memory.add_knowledge(item) + except Exception as batch_err: + logger.warning(f"Batch knowledge extraction failed: {batch_err}") + # Complete the step self.queue.complete_step(next_step) @@ -442,9 +508,19 @@ async def _create_full_plan(self) -> Plan: ) llm = self.llm_factory(planner) + # Prefer intelligent model for planning + try: + rp = RequestParams(max_iterations=2) + rp.modelPreferences = getattr( + self.context, "planner_model_preferences", None + ) + except Exception: + rp = RequestParams(max_iterations=2) # Try to create a valid plan with retries - max_verification_attempts = 10 + max_verification_attempts = max( + 1, getattr(self.config.execution, "max_plan_verification_attempts", 4) + ) previous_plan: Plan = None previous_errors = None @@ -487,7 +563,9 @@ async def _create_full_plan(self) -> Plan: # Get structured plan prompt = get_full_plan_prompt(context) plan: Plan = await retry_with_backoff( - lambda: llm.generate_structured(message=prompt, response_model=Plan), + lambda: llm.generate_structured( + message=prompt, response_model=Plan, request_params=rp + ), max_attempts=2, ) @@ -553,6 +631,14 @@ async def _verify_completion(self) -> tuple[bool, float]: ) llm = self.llm_factory(verifier) + # Prefer capable but cost-aware model + rp = RequestParams(max_iterations=1) + try: + rp.modelPreferences = getattr( + self.context, "verifier_model_preferences", None + ) + except Exception: + pass # Build verification context context = get_verification_context( @@ -647,9 +733,14 @@ async def _create_final_synthesis(self) -> List[MessageT]: async with synthesizer: llm = await synthesizer.attach_llm(self.llm_factory) - result = await llm.generate( - message=prompt, request_params=RequestParams(max_iterations=5) - ) + rp = RequestParams(max_iterations=5) + try: + rp.modelPreferences = getattr( + self.context, "synthesizer_model_preferences", None + ) + except Exception: + pass + result = await llm.generate(message=prompt, request_params=rp) logger.info("Final synthesis completed") return result diff --git a/src/mcp_agent/workflows/deep_orchestrator/prompts.py b/src/mcp_agent/workflows/deep_orchestrator/prompts.py index 765e9dbb8..96d7fd7de 100644 --- a/src/mcp_agent/workflows/deep_orchestrator/prompts.py +++ b/src/mcp_agent/workflows/deep_orchestrator/prompts.py @@ -67,6 +67,7 @@ requires_context_from can ONLY reference tasks from PREVIOUS steps, not the current step If a task needs output from another task in the same step, move it to a subsequent step Only set context_window_budget if task needs more than default (10000 tokens) + Scale effort to query complexity: simple fact-finding = 1 step, few tasks; comparisons = 2-4 parallel tasks; broad surveys = multiple steps with tight division of labor @@ -75,6 +76,8 @@ Consider resource constraints and prefer efficient approaches Think step by step about the best way to achieve the objective Tasks within a step run in parallel, steps run sequentially + Prefer asynchronous, loosely-coupled sub-tasks that can progress independently and be synthesized later + Favor authoritative sources/tools over SEO spam; encode source-quality selection when planning tasks diff --git a/src/mcp_agent/workflows/deep_orchestrator/task_executor.py b/src/mcp_agent/workflows/deep_orchestrator/task_executor.py index 06bf94a98..b7aa52750 100644 --- a/src/mcp_agent/workflows/deep_orchestrator/task_executor.py +++ b/src/mcp_agent/workflows/deep_orchestrator/task_executor.py @@ -50,6 +50,8 @@ def __init__( context: Optional["Context"] = None, max_task_retries: int = 3, enable_parallel: bool = True, + knowledge_extraction_mode: str = "per_task", + lean_agent_design: bool = False, ): """ Initialize the task executor. @@ -76,6 +78,8 @@ def __init__( self.context = context self.max_task_retries = max_task_retries self.enable_parallel = enable_parallel + self.knowledge_extraction_mode = knowledge_extraction_mode + self.lean_agent_design = lean_agent_design # Budget update callback (will be set by orchestrator) self.update_budget_tokens = lambda tokens: None @@ -228,17 +232,31 @@ async def _execute_task_once( # Execute with agent if isinstance(agent, AugmentedLLM): + rp = request_params or RequestParams(max_iterations=10) + try: + # Favor cheaper/faster models for executors unless overridden + if not getattr(rp, "modelPreferences", None): + rp.modelPreferences = getattr( + self.context, "executor_model_preferences", None + ) + except Exception: + pass output = await agent.generate_str( - message=task_context, - request_params=request_params or RequestParams(max_iterations=10), + message=task_context, request_params=rp ) else: async with agent: llm = await agent.attach_llm(self.llm_factory) + rp = request_params or RequestParams(max_iterations=10) + try: + if not getattr(rp, "modelPreferences", None): + rp.modelPreferences = getattr( + self.context, "executor_model_preferences", None + ) + except Exception: + pass output = await llm.generate_str( - message=task_context, - request_params=request_params - or RequestParams(max_iterations=10), + message=task_context, request_params=rp ) # Success @@ -246,6 +264,23 @@ async def _execute_task_once( result.output = output result.duration_seconds = time.time() - start_time + # Persist artifact if enabled + try: + if getattr( + getattr(self.context, "orchestrator_config", object()), + "execution", + object(), + ): + cfg = self.context.orchestrator_config # type: ignore[attr-defined] + if getattr(cfg.execution, "save_task_outputs_to_artifacts", True): + artifact_name = f"task_{task.name}.txt" + self.memory.save_artifact( + artifact_name, output, to_filesystem=True + ) + result.artifacts[artifact_name] = output + except Exception: + pass + # Extract artifacts if mentioned if any( phrase in output.lower() @@ -253,11 +288,12 @@ async def _execute_task_once( ): result.artifacts[f"task_{task.name}_output"] = output - # Extract knowledge - knowledge_items = await self.knowledge_extractor.extract_knowledge( - result, self.objective - ) - result.knowledge_extracted = knowledge_items + # Extract knowledge (skip here if batch mode is enabled; orchestrator will run batch) + if self.knowledge_extraction_mode == "per_task": + knowledge_items = await self.knowledge_extractor.extract_knowledge( + result, self.objective + ) + result.knowledge_extracted = knowledge_items # Update task status task.status = TaskStatus.COMPLETED @@ -274,7 +310,7 @@ async def _execute_task_once( task.status = TaskStatus.FAILED logger.error(f"Task {task.name} failed: {e}") - # Record result + # Record result self.memory.add_task_result(result) return result @@ -329,7 +365,36 @@ async def _create_dynamic_agent(self, task: Task) -> Agent: """ logger.debug(f"Creating dynamic agent for task: {task.description[:50]}...") - # Agent designer + # If lean mode, construct a minimal specialized agent without an extra LLM round-trip + if self.lean_agent_design: + minimal_instruction = build_agent_instruction( + { + "instruction": ( + "You are a focused specialist. Complete the task precisely using the specified tools." + ), + "role": "Specialist executor", + "key_behaviors": [ + "Use tools actively and verify results", + "Be concise and avoid unnecessary token usage", + "Stop when the deliverable is satisfied", + ], + "tool_usage_tips": [ + "Prefer authoritative sources where applicable", + "Batch operations to reduce repeated tool calls", + ], + } + ) + + agent = Agent( + name=f"Lean_{task.name}", + instruction=minimal_instruction, + server_names=task.servers, + context=self.context, + ) + logger.debug(f"Created lean dynamic agent for task: {task.name}") + return agent + + # Full agent design via LLM designer = Agent( name="AgentDesigner", instruction=AGENT_DESIGNER_INSTRUCTION, @@ -338,7 +403,6 @@ async def _create_dynamic_agent(self, task: Task) -> Agent: llm = self.llm_factory(designer) - # Design agent design_prompt = get_agent_design_prompt( task.description, task.servers, self.objective ) @@ -347,7 +411,6 @@ async def _create_dynamic_agent(self, task: Task) -> Agent: message=design_prompt, response_model=AgentDesign ) - # Build comprehensive instruction instruction = build_agent_instruction(design.model_dump()) agent = Agent( diff --git a/src/mcp_agent/workflows/llm/augmented_llm.py b/src/mcp_agent/workflows/llm/augmented_llm.py index cabcecf89..670917a97 100644 --- a/src/mcp_agent/workflows/llm/augmented_llm.py +++ b/src/mcp_agent/workflows/llm/augmented_llm.py @@ -252,6 +252,8 @@ def __init__( self.executor = self.context.executor self.name = self._gen_name(name or (agent.name if agent else None), prefix=None) self.instruction = instruction or (agent.instruction if agent else None) + # Track last tool call metadata for provenance + self._last_tool_calls: list[dict[str, Any]] = [] if not self.name: raise ValueError( @@ -434,6 +436,18 @@ async def post_tool_call( self, tool_call_id: str | None, request: CallToolRequest, result: CallToolResult ) -> CallToolResult: """Called after a tool execution. Can modify the result before it's returned.""" + # Record minimal provenance for citations + try: + self._last_tool_calls.append( + { + "tool": request.params.name, + "arguments": request.params.arguments, + "tool_call_id": tool_call_id, + "isError": result.isError, + } + ) + except Exception: + pass return result async def call_tool( @@ -520,6 +534,12 @@ def message_param_str(self, message: MessageParamT) -> str: """Convert an input message to a string representation.""" return str(message) + def get_and_clear_tool_provenance(self) -> list[dict[str, Any]]: + """Return and clear recorded tool call provenance.""" + prov = self._last_tool_calls + self._last_tool_calls = [] + return prov + def message_str(self, message: MessageT, content_only: bool = False) -> str: """Convert an output message to a string representation.""" return str(message)