diff --git a/examples/mcp_agent_server/asyncio/main.py b/examples/mcp_agent_server/asyncio/main.py index 68f7574bc..82f3c430a 100644 --- a/examples/mcp_agent_server/asyncio/main.py +++ b/examples/mcp_agent_server/asyncio/main.py @@ -13,6 +13,8 @@ from typing import Dict, Any, Optional from mcp.server.fastmcp import FastMCP +from mcp.types import Icon + from mcp_agent.core.context import Context as AppContext from mcp_agent.app import MCPApp @@ -115,22 +117,35 @@ async def run(self, input: str) -> WorkflowResult[str]: return WorkflowResult(value=result) -@app.tool(name="sampling_demo") -async def sampling_demo(topic: str, app_ctx: Optional[AppContext] = None) -> str: +@app.tool( + name="sampling_demo", + title="Sampling Demo", + description="Call a nested MCP server that performs sampling.", + annotations={"idempotentHint": False}, + icons=[Icon(src="emoji:crystal_ball")], + meta={"category": "demo", "feature": "sampling"}, +) +async def sampling_demo( + topic: str, + app_ctx: Optional[AppContext] = None, +) -> str: """ Demonstrate MCP sampling via a nested MCP server tool. - In asyncio (no upstream client), this triggers local sampling with a human approval prompt. - When an MCP client is connected, the sampling request is proxied upstream. """ - _app = app_ctx.app if app_ctx else app + context = app_ctx or app.context + + await context.info(f"[sampling_demo] starting for topic '{topic}'") + await context.report_progress(0.1, total=1.0, message="Preparing nested server") # Register a simple nested server that uses sampling in its get_haiku tool nested_name = "nested_sampling" nested_path = os.path.abspath( os.path.join(os.path.dirname(__file__), "nested_sampling_server.py") ) - _app.context.config.mcp.servers[nested_name] = MCPServerSettings( + context.config.mcp.servers[nested_name] = MCPServerSettings( name=nested_name, command="uv", args=["run", nested_path], @@ -139,10 +154,12 @@ async def sampling_demo(topic: str, app_ctx: Optional[AppContext] = None) -> str # Connect as an MCP client to the nested server and call its sampling tool async with gen_client( - nested_name, _app.context.server_registry, context=_app.context + nested_name, context.server_registry, context=context ) as client: result = await client.call_tool("get_haiku", {"topic": topic}) + await context.report_progress(0.9, total=1.0, message="Formatting haiku") + # Extract text content from CallToolResult try: if result.content and len(result.content) > 0: @@ -154,7 +171,8 @@ async def sampling_demo(topic: str, app_ctx: Optional[AppContext] = None) -> str @app.tool(name="elicitation_demo") async def elicitation_demo( - action: str = "proceed", app_ctx: Optional[AppContext] = None + action: str = "proceed", + app_ctx: Optional[AppContext] = None, ) -> str: """ Demonstrate MCP elicitation via a nested MCP server tool. @@ -162,13 +180,13 @@ async def elicitation_demo( - In asyncio (no upstream client), this triggers local elicitation handled by console. - When an MCP client is connected, the elicitation request is proxied upstream. """ - _app = app_ctx.app if app_ctx else app + context = app_ctx or app.context nested_name = "nested_elicitation" nested_path = os.path.abspath( os.path.join(os.path.dirname(__file__), "nested_elicitation_server.py") ) - _app.context.config.mcp.servers[nested_name] = MCPServerSettings( + context.config.mcp.servers[nested_name] = MCPServerSettings( name=nested_name, command="uv", args=["run", nested_path], @@ -176,27 +194,34 @@ async def elicitation_demo( ) async with gen_client( - nested_name, _app.context.server_registry, context=_app.context + nested_name, context.server_registry, context=context ) as client: + await context.info(f"[elicitation_demo] asking to '{action}'") result = await client.call_tool("confirm_action", {"action": action}) try: if result.content and len(result.content) > 0: - return result.content[0].text or "" + message = result.content[0].text or "" + await context.info(f"[elicitation_demo] response: {message}") + return message except Exception: pass return "" @app.tool(name="notify_resources") -async def notify_resources(app_ctx: Optional[AppContext] = None) -> str: +async def notify_resources( + app_ctx: Optional[AppContext] = None, +) -> str: """Trigger a non-logging resource list changed notification.""" - _app = app_ctx.app if app_ctx else app - upstream = getattr(_app.context, "upstream_session", None) + context = app_ctx or app.context + upstream = getattr(context, "upstream_session", None) if upstream is None: - _app.logger.warning("No upstream session to notify") + message = "No upstream session to notify" + await context.warning(message) return "no-upstream" await upstream.send_resource_list_changed() - _app.logger.info("Sent notifications/resources/list_changed") + log_message = "Sent notifications/resources/list_changed" + await context.info(log_message) return "ok" @@ -206,16 +231,15 @@ async def notify_progress( message: str | None = "Asyncio progress demo", app_ctx: Optional[AppContext] = None, ) -> str: - """Trigger a non-logging progress notification.""" - _app = app_ctx.app if app_ctx else app - upstream = getattr(_app.context, "upstream_session", None) - if upstream is None: - _app.logger.warning("No upstream session to notify") - return "no-upstream" - await upstream.send_progress_notification( - progress_token="asyncio-demo", progress=progress, message=message + """Trigger a progress notification.""" + context = app_ctx or app.context + + await context.report_progress( + progress=progress, + total=1.0, + message=message, ) - _app.logger.info("Sent notifications/progress") + return "ok" @@ -235,12 +259,8 @@ async def grade_story(story: str, app_ctx: Optional[AppContext] = None) -> str: app_ctx: Optional MCPApp context for accessing app resources and logging """ # Use the context's app if available for proper logging with upstream_session - _app = app_ctx.app if app_ctx else app - # Ensure the app's logger is bound to the current context with upstream_session - if _app._logger and hasattr(_app._logger, "_bound_context"): - _app._logger._bound_context = app_ctx - logger = _app.logger - logger.info(f"grade_story: Received input: {story}") + context = app_ctx or app.context + await context.info(f"grade_story: Received input: {story}") proofreader = Agent( name="proofreader", @@ -283,15 +303,15 @@ async def grade_story(story: str, app_ctx: Optional[AppContext] = None) -> str: message=f"Student short story submission: {story}", ) except Exception as e: - logger.error(f"grade_story: Error generating result: {e}") - return None + await context.error(f"grade_story: Error generating result: {e}") + return "" if not result: - logger.error("grade_story: No result from parallel LLM") + await context.error("grade_story: No result from parallel LLM") + return "" else: - logger.info(f"grade_story: Result: {result}") - - return result + await context.info(f"grade_story: Result: {result}") + return result @app.async_tool(name="grade_story_async") @@ -304,11 +324,8 @@ async def grade_story_async(story: str, app_ctx: Optional[AppContext] = None) -> """ # Use the context's app if available for proper logging with upstream_session - _app = app_ctx.app if app_ctx else app - # Ensure the app's logger is bound to the current context with upstream_session - if _app._logger and hasattr(_app._logger, "_bound_context"): - _app._logger._bound_context = app_ctx - logger = _app.logger + context = app_ctx or app.context + logger = context.logger logger.info(f"grade_story_async: Received input: {story}") proofreader = Agent( @@ -355,7 +372,11 @@ async def grade_story_async(story: str, app_ctx: Optional[AppContext] = None) -> ) except Exception as e: logger.error(f"grade_story_async: Error generating result: {e}") - return None + return "" + + if not result: + logger.error("grade_story_async: No result from parallel LLM") + return "" return result @@ -508,7 +529,7 @@ async def main(): agent_app.logger.info(f"MCP Server settings: {mcp_server.settings}") # Run the server - await mcp_server.run_stdio_async() + await mcp_server.run_sse_async() if __name__ == "__main__": diff --git a/examples/mcp_agent_server/asyncio/nested_elicitation_server.py b/examples/mcp_agent_server/asyncio/nested_elicitation_server.py index 34d477d83..8dc694123 100644 --- a/examples/mcp_agent_server/asyncio/nested_elicitation_server.py +++ b/examples/mcp_agent_server/asyncio/nested_elicitation_server.py @@ -1,5 +1,5 @@ from pydantic import BaseModel -from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp import Context, FastMCP from mcp.server.elicitation import elicit_with_validation, AcceptedElicitation mcp = FastMCP("Nested Elicitation Server") @@ -10,16 +10,21 @@ class Confirmation(BaseModel): @mcp.tool() -async def confirm_action(action: str) -> str: +async def confirm_action(action: str, ctx: Context | None = None) -> str: """Ask the user to confirm an action via elicitation.""" - ctx = mcp.get_context() + context = ctx or mcp.get_context() + await context.info(f"[nested_elicitation] requesting '{action}' confirmation") res = await elicit_with_validation( - ctx.session, + context.session, message=f"Do you want to {action}?", schema=Confirmation, ) if isinstance(res, AcceptedElicitation) and res.data.confirm: + if ctx: + await context.info(f"[nested_elicitation] '{action}' accepted") return f"Action '{action}' confirmed by user" + if ctx: + await context.warning(f"[nested_elicitation] '{action}' declined") return f"Action '{action}' declined by user" diff --git a/examples/mcp_agent_server/asyncio/nested_sampling_server.py b/examples/mcp_agent_server/asyncio/nested_sampling_server.py index 32953079f..cfbf9c849 100644 --- a/examples/mcp_agent_server/asyncio/nested_sampling_server.py +++ b/examples/mcp_agent_server/asyncio/nested_sampling_server.py @@ -1,13 +1,16 @@ -from mcp.server.fastmcp import FastMCP -from mcp.types import ModelPreferences, ModelHint, SamplingMessage, TextContent +from mcp.server.fastmcp import Context, FastMCP +from mcp.types import ModelHint, ModelPreferences, SamplingMessage, TextContent mcp = FastMCP("Nested Sampling Server") @mcp.tool() -async def get_haiku(topic: str) -> str: +async def get_haiku(topic: str, ctx: Context | None = None) -> str: """Use MCP sampling to generate a haiku about the given topic.""" - result = await mcp.get_context().session.create_message( + context = ctx or mcp.get_context() + await context.info(f"[nested_sampling] generating haiku for '{topic}'") + await context.report_progress(0.25, total=1.0, message="Requesting sampling run") + result = await context.session.create_message( messages=[ SamplingMessage( role="user", @@ -28,6 +31,7 @@ async def get_haiku(topic: str) -> str: ) if isinstance(result.content, TextContent): + await context.report_progress(1.0, total=1.0, message="Haiku complete") return result.content.text return "Haiku generation failed" diff --git a/examples/mcp_agent_server/temporal/main.py b/examples/mcp_agent_server/temporal/main.py index 95ff95d5d..659eb1af4 100644 --- a/examples/mcp_agent_server/temporal/main.py +++ b/examples/mcp_agent_server/temporal/main.py @@ -12,7 +12,7 @@ import logging import os -from mcp.types import ModelHint, ModelPreferences, SamplingMessage, TextContent +from mcp.types import Icon, ModelHint, ModelPreferences, SamplingMessage, TextContent from temporalio.exceptions import ApplicationError from mcp_agent.agents.agent import Agent @@ -93,8 +93,19 @@ async def run( return WorkflowResult(value=result) -@app.tool -async def finder_tool(request: str, app_ctx: Context | None = None) -> str: +@app.tool( + name="finder_tool", + title="Finder Tool", + description="Run the Finder workflow synchronously.", + annotations={"idempotentHint": False}, + icons=[Icon(src="emoji:mag")], + meta={"category": "demo", "engine": "temporal"}, + structured_output=False, +) +async def finder_tool( + request: str, + app_ctx: Context | None = None, +) -> str: """ Run the basic agent workflow using the app.tool decorator to set up the workflow. The code in this function is run in workflow context. @@ -110,9 +121,8 @@ async def finder_tool(request: str, app_ctx: Context | None = None) -> str: To create this as an async tool, use @app.async_tool instead, which will return the workflow ID and run ID. """ - app = app_ctx.app - - logger = app.logger + context = app_ctx if app_ctx is not None else app.context + logger = context.logger logger.info("[workflow-mode] Running finder_tool", data={"input": request}) finder_agent = Agent( @@ -121,16 +131,17 @@ async def finder_tool(request: str, app_ctx: Context | None = None) -> str: server_names=["fetch", "filesystem"], ) - context = app.context context.config.mcp.servers["filesystem"].args.extend([os.getcwd()]) async with finder_agent: finder_llm = await finder_agent.attach_llm(OpenAIAugmentedLLM) + await context.report_progress(0.4, total=1.0, message="Invoking finder agent") result = await finder_llm.generate_str( message=request, ) logger.info("[workflow-mode] finder_tool agent result", data={"result": result}) + await context.report_progress(1.0, total=1.0, message="Finder completed") return result diff --git a/examples/mcp_agent_server/temporal/nested_sampling_server.py b/examples/mcp_agent_server/temporal/nested_sampling_server.py index 32953079f..042606add 100644 --- a/examples/mcp_agent_server/temporal/nested_sampling_server.py +++ b/examples/mcp_agent_server/temporal/nested_sampling_server.py @@ -1,13 +1,15 @@ -from mcp.server.fastmcp import FastMCP -from mcp.types import ModelPreferences, ModelHint, SamplingMessage, TextContent +from mcp.server.fastmcp import Context, FastMCP +from mcp.types import ModelHint, ModelPreferences, SamplingMessage, TextContent mcp = FastMCP("Nested Sampling Server") @mcp.tool() -async def get_haiku(topic: str) -> str: +async def get_haiku(topic: str, ctx: Context | None = None) -> str: """Use MCP sampling to generate a haiku about the given topic.""" - result = await mcp.get_context().session.create_message( + context = ctx or mcp.get_context() + await context.info(f"[temporal_nested_sampling] topic='{topic}'") + result = await context.session.create_message( messages=[ SamplingMessage( role="user", @@ -28,6 +30,7 @@ async def get_haiku(topic: str) -> str: ) if isinstance(result.content, TextContent): + await context.info("[temporal_nested_sampling] returning haiku") return result.content.text return "Haiku generation failed" diff --git a/pyproject.toml b/pyproject.toml index f2f568880..de90dc73f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,13 +18,12 @@ dependencies = [ "fastapi>=0.115.6", "httpx>=0.28.1", "jsonref>=1.1.0", - "mcp>=1.13.1", + "mcp>=1.18.0", "numpy>=2.1.3", "opentelemetry-distro>=0.50b0", "opentelemetry-exporter-otlp-proto-http>=1.29.0", "opentelemetry-instrumentation-anthropic>=0.39.3", "opentelemetry-instrumentation-openai>=0.39.3", - "pathspec>=0.12.1", "prompt-toolkit>=3.0.50", "pydantic-settings>=2.7.0", "pydantic-yaml>=1.5.1", @@ -33,11 +32,15 @@ dependencies = [ "rich>=13.9.4", "scikit-learn>=1.6.0", "typer>=0.15.3", - "watchdog>=6.0.0", "websockets>=12.0", ] [project.optional-dependencies] +cli = [ + "pathspec>=0.12.1", + "python-dotenv>=1.0.0", + "watchdog>=6.0.0", +] temporal = [ "temporalio[opentelemetry]>=1.10.0", ] @@ -97,11 +100,10 @@ dev = [ ] [project.scripts] -mcp-agent = "mcp_agent.cli.main:run" -mcp_agent = "mcp_agent.cli.main:run" -mcpagent = "mcp_agent.cli.main:run" -silsila = "mcp_agent.cli.cloud.main:run" -prompt-server = "mcp_agent.mcp.prompts.__main__:main" +silsila = "mcp_agent.cli.main:run [cli]" +mcp-agent = "mcp_agent.cli.main:run [cli]" +mcp-cloud = "mcp_agent.cli.cloud.main:run [cli]" +mcpc = "mcp_agent.cli.cloud.main:run [cli]" [tool.setuptools.packages.find] include = ["mcp-agent"] diff --git a/src/mcp_agent/app.py b/src/mcp_agent/app.py index 2ada0483b..8ccaaa0cc 100644 --- a/src/mcp_agent/app.py +++ b/src/mcp_agent/app.py @@ -2,11 +2,12 @@ import os import sys import functools - from types import MethodType from typing import ( Any, Dict, + Iterable, + Mapping, Optional, Type, TypeVar, @@ -20,6 +21,8 @@ from mcp import ServerSession from mcp.server.fastmcp import FastMCP +from mcp.types import ToolAnnotations, Icon + from mcp_agent.core.context import Context, initialize_context, cleanup_context from mcp_agent.config import Settings, get_settings from mcp_agent.executor.signal_registry import SignalRegistry @@ -586,6 +589,7 @@ def _create_workflow_from_function( async def _invoke_target(workflow_self, *args, **kwargs): # Inject app_ctx (AppContext) and shim ctx (FastMCP Context) if requested by the function import inspect as _inspect + import typing as _typing call_kwargs = dict(kwargs) @@ -622,24 +626,64 @@ async def _invoke_target(workflow_self, *args, **kwargs): except Exception: pass - # If the function expects a FastMCP Context (ctx/context), ensure it's present (None inside workflow) + # If the function expects a FastMCP Context (ctx/context), ensure it's present. try: from mcp.server.fastmcp import Context as _Ctx # type: ignore except Exception: _Ctx = None # type: ignore + def _is_fast_ctx_annotation(annotation) -> bool: + if _Ctx is None or annotation is _inspect._empty: + return False + if annotation is _Ctx: + return True + if _inspect.isclass(annotation): + try: + if issubclass(annotation, _Ctx): # type: ignore[misc] + return True + except TypeError: + pass + try: + origin = _typing.get_origin(annotation) + if origin is not None: + return any( + _is_fast_ctx_annotation(arg) + for arg in _typing.get_args(annotation) + ) + except Exception: + pass + try: + return "fastmcp" in str(annotation) + except Exception: + return False + try: sig = sig if "sig" in locals() else _inspect.signature(fn) for p in sig.parameters.values(): - if ( - p.annotation is not _inspect._empty - and _Ctx is not None - and p.annotation is _Ctx + needs_fast_ctx = False + if _is_fast_ctx_annotation(p.annotation): + needs_fast_ctx = True + elif p.annotation is _inspect._empty and p.name in ( + "ctx", + "context", ): - if p.name not in call_kwargs: - call_kwargs[p.name] = None - if p.name in ("ctx", "context") and p.name not in call_kwargs: - call_kwargs[p.name] = None + needs_fast_ctx = True + if needs_fast_ctx and p.name not in call_kwargs: + fast_ctx = getattr(workflow_self, "_mcp_request_context", None) + if fast_ctx is None and app_context_param_name: + _app_ctx = call_kwargs.get(app_context_param_name, None) + if _Ctx is not None and isinstance(_app_ctx, _Ctx): + fast_ctx = _app_ctx + _fastmcp = getattr(_app_ctx, "fastmcp", None) + if _fastmcp is not None and hasattr( + _fastmcp, "get_context" + ): + try: + fast_ctx = _fastmcp.get_context() + except Exception: + fast_ctx = None + if fast_ctx is not None: + call_kwargs[p.name] = fast_ctx except Exception: pass @@ -739,7 +783,11 @@ def tool( self, name: str | None = None, *, + title: str | None = None, description: str | None = None, + annotations: ToolAnnotations | Mapping[str, Any] | None = None, + icons: Iterable[Icon | Mapping[str, Any]] | None = None, + meta: Mapping[str, Any] | None = None, structured_output: bool | None = None, ) -> Callable[[Callable[P, R]], Callable[P, R]]: ... @@ -747,7 +795,11 @@ def tool( self, name: str | None = None, *, + title: str | None = None, description: str | None = None, + annotations: ToolAnnotations | Mapping[str, Any] | None = None, + icons: Iterable[Icon | Mapping[str, Any]] | None = None, + meta: Mapping[str, Any] | None = None, structured_output: bool | None = None, ): """ @@ -766,6 +818,28 @@ def decorator(fn: Callable[P, R]) -> Callable[P, R]: validate_tool_schema(fn, tool_name) + annotations_obj: ToolAnnotations | None = None + if annotations is not None: + if isinstance(annotations, ToolAnnotations): + annotations_obj = annotations + else: + annotations_obj = ToolAnnotations(**dict(annotations)) + + icons_list: list[Icon] | None = None + if icons is not None: + icons_list = [] + for icon in icons: + if isinstance(icon, Icon): + icons_list.append(icon) + elif isinstance(icon, Mapping): + icons_list.append(Icon(**icon)) + else: + raise TypeError("icons entries must be Icon or mapping") + + meta_payload: Dict[str, Any] | None = None + if meta is not None: + meta_payload = dict(meta) + # Construct the workflow from function workflow_cls = self._create_workflow_from_function( fn, @@ -784,13 +858,25 @@ def decorator(fn: Callable[P, R]) -> Callable[P, R]: "source_fn": fn, "structured_output": structured_output, "description": description or (fn.__doc__ or ""), + "title": title, + "annotations": annotations_obj, + "icons": icons_list, + "meta": meta_payload, } ) return fn # Support bare usage: @app.tool without parentheses - if callable(name) and description is None and structured_output is None: + if ( + callable(name) + and title is None + and description is None + and annotations is None + and icons is None + and meta is None + and structured_output is None + ): _fn = name # type: ignore[assignment] name = None return decorator(_fn) # type: ignore[arg-type] @@ -805,14 +891,24 @@ def async_tool( self, name: str | None = None, *, + title: str | None = None, description: str | None = None, + annotations: ToolAnnotations | Mapping[str, Any] | None = None, + icons: Iterable[Icon | Mapping[str, Any]] | None = None, + meta: Mapping[str, Any] | None = None, + structured_output: bool | None = None, ) -> Callable[[Callable[P, R]], Callable[P, R]]: ... def async_tool( self, name: str | None = None, *, + title: str | None = None, description: str | None = None, + annotations: ToolAnnotations | Mapping[str, Any] | None = None, + icons: Iterable[Icon | Mapping[str, Any]] | None = None, + meta: Mapping[str, Any] | None = None, + structured_output: bool | None = None, ): """ Decorator to declare an asynchronous MCP tool. @@ -830,6 +926,28 @@ def decorator(fn: Callable[P, R]) -> Callable[P, R]: validate_tool_schema(fn, workflow_name) + annotations_obj: ToolAnnotations | None = None + if annotations is not None: + if isinstance(annotations, ToolAnnotations): + annotations_obj = annotations + else: + annotations_obj = ToolAnnotations(**dict(annotations)) + + icons_list: list[Icon] | None = None + if icons is not None: + icons_list = [] + for icon in icons: + if isinstance(icon, Icon): + icons_list.append(icon) + elif isinstance(icon, Mapping): + icons_list.append(Icon(**icon)) + else: + raise TypeError("icons entries must be Icon or mapping") + + meta_payload: Dict[str, Any] | None = None + if meta is not None: + meta_payload = dict(meta) + workflow_cls = self._create_workflow_from_function( fn, workflow_name=workflow_name, @@ -844,14 +962,26 @@ def decorator(fn: Callable[P, R]) -> Callable[P, R]: "workflow_name": workflow_name, "workflow_cls": workflow_cls, "source_fn": fn, - "structured_output": None, + "structured_output": structured_output, "description": description or (fn.__doc__ or ""), + "title": title, + "annotations": annotations_obj, + "icons": icons_list, + "meta": meta_payload, } ) return fn # Support bare usage: @app.async_tool without parentheses - if callable(name) and description is None: + if ( + callable(name) + and title is None + and description is None + and annotations is None + and icons is None + and meta is None + and structured_output is None + ): _fn = name # type: ignore[assignment] name = None return decorator(_fn) # type: ignore[arg-type] diff --git a/src/mcp_agent/cli/cloud/commands/deploy/main.py b/src/mcp_agent/cli/cloud/commands/deploy/main.py index 2b3839744..ff4b35d5d 100644 --- a/src/mcp_agent/cli/cloud/commands/deploy/main.py +++ b/src/mcp_agent/cli/cloud/commands/deploy/main.py @@ -24,9 +24,7 @@ from mcp_agent.cli.core.utils import run_async from mcp_agent.cli.exceptions import CLIError from mcp_agent.cli.mcp_app.api_client import MCPAppClient -from mcp_agent.cli.secrets.processor import ( - process_config_secrets, -) +from mcp_agent.cli.secrets import processor as secrets_processor from mcp_agent.cli.utils.retry import retry_async_with_exponential_backoff, RetryError from mcp_agent.cli.utils.ux import ( print_deployment_header, @@ -312,7 +310,7 @@ def deploy_config( secrets_transformed_path = config_dir / MCP_DEPLOYED_SECRETS_FILENAME run_async( - process_config_secrets( + secrets_processor.process_config_secrets( input_path=secrets_file, output_path=secrets_transformed_path, api_url=effective_api_url, diff --git a/src/mcp_agent/core/context.py b/src/mcp_agent/core/context.py index d449c938a..57c26a927 100644 --- a/src/mcp_agent/core/context.py +++ b/src/mcp_agent/core/context.py @@ -4,13 +4,14 @@ import asyncio import concurrent.futures -from typing import Any, List, Optional, TYPE_CHECKING +from typing import Any, List, Optional, TYPE_CHECKING, Literal import warnings -from pydantic import BaseModel, ConfigDict +from pydantic import ConfigDict from mcp import ServerSession from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp import Context as MCPContext from opentelemetry import trace @@ -37,11 +38,12 @@ if TYPE_CHECKING: from mcp_agent.agents.agent_spec import AgentSpec - from mcp_agent.human_input.types import HumanInputCallback + from mcp_agent.app import MCPApp from mcp_agent.elicitation.types import ElicitationCallback from mcp_agent.executor.workflow_signal import SignalWaitCallback from mcp_agent.executor.workflow_registry import WorkflowRegistry - from mcp_agent.app import MCPApp + from mcp_agent.human_input.types import HumanInputCallback + from mcp_agent.logging.logger import Logger else: # Runtime placeholders for the types AgentSpec = Any @@ -50,11 +52,12 @@ SignalWaitCallback = Any WorkflowRegistry = Any MCPApp = Any + Logger = Any logger = get_logger(__name__) -class Context(BaseModel): +class Context(MCPContext): """ Context that is passed around through the application. This is a global context that is shared across the application. @@ -65,7 +68,7 @@ class Context(BaseModel): human_input_handler: Optional[HumanInputCallback] = None elicitation_handler: Optional[ElicitationCallback] = None signal_notification: Optional[SignalWaitCallback] = None - upstream_session: Optional[ServerSession] = None # TODO: saqadri - figure this out + upstream_session: Optional[ServerSession] = None model_selector: Optional[ModelSelector] = None session_id: str | None = None app: Optional["MCPApp"] = None @@ -102,6 +105,213 @@ class Context(BaseModel): def mcp(self) -> FastMCP | None: return self.app.mcp if self.app else None + @property + def fastmcp(self) -> FastMCP | None: # type: ignore[override] + """Return the FastMCP instance if available. + + Prefer the active request-bound FastMCP instance if present; otherwise + fall back to the app's configured FastMCP server. Returns None if neither + is available. This is more forgiving than the FastMCP Context default, + which raises outside of a request. + """ + try: + # Prefer a request-bound fastmcp if set by FastMCP during a request + if getattr(self, "_fastmcp", None) is not None: + return getattr(self, "_fastmcp", None) + except Exception: + pass + # Fall back to app-managed server instance (may be None in local scripts) + return self.mcp + + @property + def session(self) -> ServerSession | None: + """Best-effort ServerSession for upstream communication. + + Priority: + - If explicitly provided, use `upstream_session`. + - If running within an active FastMCP request, use parent session. + - If an app FastMCP exists, use its current request context if any. + + Returns None when no session can be resolved (e.g., local scripts). + """ + # 1) Explicit upstream session set by app/workflow + if getattr(self, "upstream_session", None) is not None: + return self.upstream_session + + # 2) Try request-scoped session from FastMCP Context (may raise outside requests) + try: + return super().session # type: ignore[misc] + except Exception: + pass + + # 3) Fall back to FastMCP server's current context if available + try: + mcp = self.mcp + if mcp is not None: + ctx = mcp.get_context() + # FastMCP.get_context returns a Context that raises outside a request; + # guard accordingly. + try: + return getattr(ctx, "session", None) + except Exception: + return None + except Exception: + pass + + # No session available in this runtime mode + return None + + @property + def logger(self) -> "Logger": + if self.app: + return self.app.logger + namespace_components = ["mcp_agent", "context"] + try: + if getattr(self, "session_id", None): + namespace_components.append(str(self.session_id)) + except Exception: + pass + namespace = ".".join(namespace_components) + logger = get_logger( + namespace, session_id=getattr(self, "session_id", None), context=self + ) + try: + setattr(logger, "_bound_context", self) + except Exception: + pass + return logger + + @property + def name(self) -> str | None: + if self.app and getattr(self.app, "name", None): + return self.app.name + return None + + @property + def description(self) -> str | None: + if self.app and getattr(self.app, "description", None): + return self.app.description + return None + + # ---- FastMCP Context method fallbacks (safe outside requests) --------- + + def bind_request( + self, request_context: Any, fastmcp: FastMCP | None = None + ) -> "Context": + """Return a shallow-copied Context bound to a specific FastMCP request. + + - Shares app-wide state (config, registries, token counter, etc.) with the original Context + - Attaches `_request_context` and `_fastmcp` so FastMCP Context APIs work during the request + - Does not mutate the original Context (safe for concurrent requests) + """ + # Shallow copy to preserve references to registries/loggers while keeping isolation + bound: Context = self.model_copy(deep=False) + try: + setattr(bound, "_request_context", request_context) + except Exception: + pass + try: + if fastmcp is None: + fastmcp = getattr(self, "_fastmcp", None) or self.mcp + setattr(bound, "_fastmcp", fastmcp) + except Exception: + pass + return bound + + @property + def client_id(self) -> str | None: # type: ignore[override] + try: + return super().client_id # type: ignore[misc] + except Exception: + return None + + @property + def request_id(self) -> str: # type: ignore[override] + try: + return super().request_id # type: ignore[misc] + except Exception: + # Provide a stable-ish fallback based on app session if available + try: + return str(self.session_id) if getattr(self, "session_id", None) else "" + except Exception: + return "" + + async def log( + self, + level: "Literal['debug', 'info', 'warning', 'error']", + message: str, + *, + logger_name: str | None = None, + ) -> None: # type: ignore[override] + """Send a log to the client if possible; otherwise, log locally. + + Matches FastMCP Context API but avoids raising when no request context + is active by falling back to the app's logger. + """ + # If we have a live FastMCP request context, delegate to parent + try: + # will raise if request_context is not available + _ = self.request_context # type: ignore[attr-defined] + return await super().log(level, message, logger_name=logger_name) # type: ignore[misc] + except Exception: + pass + + # Fall back to local logger if available + try: + _logger = self.logger + if _logger is not None: + if level == "debug": + _logger.debug(message) + elif level == "warning": + _logger.warning(message) + elif level == "error": + _logger.error(message) + else: + _logger.info(message) + except Exception: + # Swallow errors in fallback logging to avoid masking tool behavior + pass + + async def report_progress( + self, progress: float, total: float | None = None, message: str | None = None + ) -> None: # type: ignore[override] + """Report progress to the client if a request is active. + + Outside of a request (e.g., local scripts), this is a no-op to avoid + runtime errors as no progressToken exists. + """ + try: + _ = self.request_context # type: ignore[attr-defined] + return await super().report_progress(progress, total, message) # type: ignore[misc] + except Exception: + # No-op when no active request context + return None + + async def read_resource(self, uri: Any) -> Any: # type: ignore[override] + """Read a resource via FastMCP if possible; otherwise raise clearly. + + This provides a friendlier error outside of a request and supports + fallback to the app's FastMCP instance if available. + """ + # Use the parent implementation if request-bound fastmcp is available + try: + if getattr(self, "_fastmcp", None) is not None: + return await super().read_resource(uri) # type: ignore[misc] + except Exception: + pass + + # Fall back to app-managed FastMCP if present + try: + mcp = self.mcp + if mcp is not None: + return await mcp.read_resource(uri) # type: ignore[no-any-return] + except Exception: + pass + + raise ValueError( + "read_resource is only available when an MCP server is active." + ) + async def configure_otel( config: "Settings", session_id: str | None = None diff --git a/src/mcp_agent/server/app_server.py b/src/mcp_agent/server/app_server.py index ec56574c1..7329374ad 100644 --- a/src/mcp_agent/server/app_server.py +++ b/src/mcp_agent/server/app_server.py @@ -1449,6 +1449,46 @@ def create_declared_function_tools(mcp: FastMCP, server_context: ServerContext): import inspect import asyncio import time + import typing as _typing + + try: + from mcp.server.fastmcp import Context as _Ctx + except Exception: + _Ctx = None # type: ignore + + def _annotation_is_fast_ctx(annotation) -> bool: + if _Ctx is None or annotation is inspect._empty: + return False + if annotation is _Ctx: + return True + if inspect.isclass(annotation): + try: + if issubclass(annotation, _Ctx): # type: ignore[misc] + return True + except TypeError: + pass + try: + origin = _typing.get_origin(annotation) + if origin is not None: + return any( + _annotation_is_fast_ctx(arg) for arg in _typing.get_args(annotation) + ) + except Exception: + pass + try: + return "fastmcp" in str(annotation) + except Exception: + return False + + def _detect_context_param(signature: inspect.Signature) -> str | None: + for param in signature.parameters.values(): + if param.name == "app_ctx": + continue + if _annotation_is_fast_ctx(param.annotation): + return param.name + if param.annotation is inspect._empty and param.name in {"ctx", "context"}: + return param.name + return None async def _wait_for_completion( ctx: MCPContext, @@ -1528,7 +1568,10 @@ async def _await_task(task: asyncio.Task): fn = decl.get("source_fn") description = decl.get("description") structured_output = decl.get("structured_output") - + title = decl.get("title") + annotations = decl.get("annotations") + icons = decl.get("icons") + _meta = decl.get("meta") # Bind per-iteration values to avoid late-binding closure bugs name_local = name wname_local = workflow_name @@ -1564,23 +1607,29 @@ async def _wrapper(**kwargs): ann = dict(getattr(fn, "__annotations__", {})) ann.pop("app_ctx", None) - ctx_param_name = "ctx" - from mcp.server.fastmcp import Context as _Ctx + existing_ctx_param = _detect_context_param(sig) + ctx_param_name = existing_ctx_param or "ctx" - ann[ctx_param_name] = _Ctx + if _Ctx is not None: + ann[ctx_param_name] = _Ctx ann["return"] = getattr(fn, "__annotations__", {}).get("return", return_ann) _wrapper.__annotations__ = ann _wrapper.__name__ = name_local _wrapper.__doc__ = description or (fn.__doc__ or "") params = [p for p in sig.parameters.values() if p.name != "app_ctx"] - ctx_param = inspect.Parameter( - ctx_param_name, - kind=inspect.Parameter.KEYWORD_ONLY, - annotation=_Ctx, - ) + if existing_ctx_param is None: + ctx_param = inspect.Parameter( + ctx_param_name, + kind=inspect.Parameter.KEYWORD_ONLY, + annotation=_Ctx, + ) + signature_params = params + [ctx_param] + else: + signature_params = params + _wrapper.__signature__ = inspect.Signature( - parameters=params + [ctx_param], return_annotation=return_ann + parameters=signature_params, return_annotation=return_ann ) def _make_adapter(context_param_name: str, inner_wrapper): @@ -1601,7 +1650,11 @@ async def _adapter(**kw): mcp.add_tool( _adapter, name=name_local, + title=title, description=description or (fn.__doc__ or ""), + annotations=annotations, + icons=icons, + # meta=meta, TODO: saqadri - add this after https://github.com/modelcontextprotocol/python-sdk/pull/1463 is pushed to pypi structured_output=structured_output, ) registered.add(name_local) @@ -1620,13 +1673,16 @@ async def _async_wrapper(**kwargs): # Mirror original signature and annotations similar to sync path ann = dict(getattr(fn, "__annotations__", {})) ann.pop("app_ctx", None) + try: - from mcp.server.fastmcp import Context as _Ctx + sig_async = inspect.signature(fn) except Exception: - _Ctx = None # type: ignore + sig_async = None + existing_ctx_param = ( + _detect_context_param(sig_async) if sig_async else None + ) - # Choose context kw-only parameter - ctx_param_name = "ctx" + ctx_param_name = existing_ctx_param or "ctx" if _Ctx is not None: ann[ctx_param_name] = _Ctx @@ -1650,38 +1706,36 @@ async def _async_wrapper(**kwargs): # Build mirrored signature: drop app_ctx and any FastMCP Context params params = [] - try: - sig_async = inspect.signature(fn) + if sig_async is not None: for p in sig_async.parameters.values(): if p.name == "app_ctx": continue - if p.name in ("ctx", "context"): - continue - if ( - _Ctx is not None - and p.annotation is not inspect._empty - and p.annotation is _Ctx + if existing_ctx_param is None and ( + _annotation_is_fast_ctx(p.annotation) + or p.name in ("ctx", "context") ): continue params.append(p) - except Exception: - params = [] # Append kw-only context param - if _Ctx is not None: - ctx_param = inspect.Parameter( - ctx_param_name, - kind=inspect.Parameter.KEYWORD_ONLY, - annotation=_Ctx, - ) + if existing_ctx_param is None: + if _Ctx is not None: + ctx_param = inspect.Parameter( + ctx_param_name, + kind=inspect.Parameter.KEYWORD_ONLY, + annotation=_Ctx, + ) + else: + ctx_param = inspect.Parameter( + ctx_param_name, + kind=inspect.Parameter.KEYWORD_ONLY, + ) + signature_params = params + [ctx_param] else: - ctx_param = inspect.Parameter( - ctx_param_name, - kind=inspect.Parameter.KEYWORD_ONLY, - ) + signature_params = params _async_wrapper.__signature__ = inspect.Signature( - parameters=params + [ctx_param], return_annotation=ann.get("return") + parameters=signature_params, return_annotation=ann.get("return") ) # Adapter to map injected FastMCP context kwarg without additional propagation @@ -1704,7 +1758,11 @@ async def _adapter(**kw): mcp.add_tool( _async_adapter, name=run_tool_name, + title=title, description=full_desc, + annotations=annotations, + icons=icons, + # meta=meta, TODO: saqadri - add this after https://github.com/modelcontextprotocol/python-sdk/pull/1463 is pushed to pypi structured_output=False, ) registered.add(run_tool_name) @@ -1829,6 +1887,33 @@ async def _workflow_run( if not workflows_dict or not app_context: raise ToolError("Server context not available for MCPApp Server.") + # Bind the app context to this FastMCP request so request-scoped methods + # (client_id, request_id, log/progress/resource reads) work seamlessly. + bound_app_context = app_context + try: + request_ctx = getattr(ctx, "request_context", None) + except Exception: + request_ctx = None + if request_ctx is not None and hasattr(app_context, "bind_request"): + try: + bound_app_context = app_context.bind_request( + request_ctx, + getattr(ctx, "fastmcp", None), + ) + # Preserve upstream_session if the copy drops it for any reason + if ( + getattr(bound_app_context, "upstream_session", None) is None + and getattr(app_context, "upstream_session", None) is not None + ): + bound_app_context.upstream_session = app_context.upstream_session + except Exception: + bound_app_context = app_context + # Expose the per-request bound context on the FastMCP context for adapters + try: + object.__setattr__(ctx, "bound_app_context", bound_app_context) + except Exception: + pass + if workflow_name not in workflows_dict: raise ToolError(f"Workflow '{workflow_name}' not found.") @@ -1842,14 +1927,20 @@ async def _workflow_run( if app is not None and getattr(app, "name", None): from mcp_agent.logging.logger import get_logger as _get_logger - _get_logger(f"mcp_agent.{app.name}", context=app_context) + _get_logger(f"mcp_agent.{app.name}", context=bound_app_context) except Exception: pass # Create and initialize the workflow instance using the factory method try: # Create workflow instance with context that has upstream_session - workflow = await workflow_cls.create(name=workflow_name, context=app_context) + workflow = await workflow_cls.create( + name=workflow_name, context=bound_app_context + ) + try: + setattr(workflow, "_mcp_request_context", ctx) + except Exception: + pass run_parameters = run_parameters or {} diff --git a/src/mcp_agent/server/tool_adapter.py b/src/mcp_agent/server/tool_adapter.py index 7295fb1b1..61ed37fd5 100644 --- a/src/mcp_agent/server/tool_adapter.py +++ b/src/mcp_agent/server/tool_adapter.py @@ -7,6 +7,7 @@ """ import inspect +import typing as _typing from typing import Any, Callable, Optional from mcp.server.fastmcp import Context as _Ctx @@ -36,26 +37,70 @@ def create_tool_adapter_signature( signature can be converted to JSON schema. """ sig = inspect.signature(fn) + + def _annotation_is_fast_ctx(annotation) -> bool: + if _Ctx is None or annotation is inspect._empty: + return False + if annotation is _Ctx: + return True + try: + origin = _typing.get_origin(annotation) + if origin is not None: + return any( + _annotation_is_fast_ctx(arg) for arg in _typing.get_args(annotation) + ) + except Exception: + pass + try: + return "fastmcp" in str(annotation) + except Exception: + return False + + existing_ctx_param = None + for param in sig.parameters.values(): + if param.name == "app_ctx": + continue + annotation = param.annotation + if annotation is inspect._empty and param.name in ("ctx", "context"): + existing_ctx_param = param.name + break + if _annotation_is_fast_ctx(annotation): + existing_ctx_param = param.name + break return_ann = sig.return_annotation # Copy annotations and remove app_ctx ann = dict(getattr(fn, "__annotations__", {})) ann.pop("app_ctx", None) - # Add ctx parameter annotation - ctx_param_name = "ctx" - ann[ctx_param_name] = _Ctx + # Determine context parameter name + ctx_param_name = existing_ctx_param or "ctx" + if _Ctx is not None: + ann[ctx_param_name] = _Ctx ann["return"] = getattr(fn, "__annotations__", {}).get("return", return_ann) - # Filter parameters to remove app_ctx - params = [p for p in sig.parameters.values() if p.name != "app_ctx"] - - # Create ctx parameter - ctx_param = inspect.Parameter( - ctx_param_name, - kind=inspect.Parameter.KEYWORD_ONLY, - annotation=_Ctx, - ) + # Filter parameters to remove app_ctx and, when needed, ctx/context placeholders + params = [] + for p in sig.parameters.values(): + if p.name == "app_ctx": + continue + if existing_ctx_param is None and ( + (p.annotation is inspect._empty and p.name in ("ctx", "context")) + or _annotation_is_fast_ctx(p.annotation) + ): + continue + params.append(p) + + # Create ctx parameter when not already present + if existing_ctx_param is None: + ctx_param = inspect.Parameter( + ctx_param_name, + kind=inspect.Parameter.KEYWORD_ONLY, + annotation=_Ctx, + ) + signature_params = params + [ctx_param] + else: + signature_params = params # Create a dummy function with the transformed signature async def _transformed(**kwargs): @@ -68,7 +113,7 @@ async def _transformed(**kwargs): # Create new signature with filtered params + ctx param _transformed.__signature__ = inspect.Signature( - parameters=params + [ctx_param], return_annotation=return_ann + parameters=signature_params, return_annotation=return_ann ) return _transformed diff --git a/tests/core/test_context.py b/tests/core/test_context.py new file mode 100644 index 000000000..038bd80d6 --- /dev/null +++ b/tests/core/test_context.py @@ -0,0 +1,123 @@ +import pytest +from types import SimpleNamespace + +from mcp_agent.core.context import Context +from mcp_agent.logging.logger import Logger as AgentLogger + + +class _DummyLogger: + def __init__(self): + self.messages = [] + + def debug(self, message: str): + self.messages.append(("debug", message)) + + def info(self, message: str): + self.messages.append(("info", message)) + + def warning(self, message: str): + self.messages.append(("warning", message)) + + def error(self, message: str): + self.messages.append(("error", message)) + + +class _DummyMCP: + def __init__(self): + self.last_uri = None + + async def read_resource(self, uri): + self.last_uri = uri + return [("text", uri)] + + +def _make_context(*, app: SimpleNamespace | None = None) -> Context: + ctx = Context() + if app is not None: + ctx.app = app + return ctx + + +def test_session_prefers_explicit_upstream(): + upstream = object() + ctx = _make_context() + ctx.upstream_session = upstream + + assert ctx.session is upstream + + +def test_fastmcp_fallback_to_app(): + dummy_mcp = object() + app = SimpleNamespace(mcp=dummy_mcp, logger=None) + ctx = _make_context(app=app) + + assert ctx.fastmcp is dummy_mcp + + bound = ctx.bind_request(SimpleNamespace(), fastmcp="request_mcp") + assert bound.fastmcp == "request_mcp" + # Original context remains unchanged + assert ctx.fastmcp is dummy_mcp + + +@pytest.mark.asyncio +async def test_log_falls_back_to_app_logger(): + dummy_logger = _DummyLogger() + app = SimpleNamespace(mcp=None, logger=dummy_logger) + ctx = _make_context(app=app) + + await ctx.log("info", "hello world") + + assert ("info", "hello world") in dummy_logger.messages + + +@pytest.mark.asyncio +async def test_read_resource_falls_back_to_app_mcp(): + dummy_mcp = _DummyMCP() + app = SimpleNamespace(mcp=dummy_mcp, logger=None) + ctx = _make_context(app=app) + + contents = await ctx.read_resource("resource://foo") + + assert dummy_mcp.last_uri == "resource://foo" + assert list(contents) == [("text", "resource://foo")] + + +@pytest.mark.asyncio +async def test_read_resource_without_mcp_raises(): + ctx = _make_context() + + with pytest.raises(ValueError): + await ctx.read_resource("resource://missing") + + +def test_logger_property_uses_app_logger(): + dummy_logger = _DummyLogger() + app = SimpleNamespace(mcp=None, logger=dummy_logger, name="demo-app") + ctx = _make_context(app=app) + + assert ctx.logger is dummy_logger + + +def test_logger_property_without_app_creates_logger(): + ctx = _make_context() + + logger = ctx.logger + + assert isinstance(logger, AgentLogger) + assert getattr(logger, "_bound_context", None) is ctx + + +def test_name_and_description_properties(): + app = SimpleNamespace( + mcp=None, logger=_DummyLogger(), name="app-name", description="app-desc" + ) + ctx = _make_context(app=app) + ctx.config = SimpleNamespace(name="config-name", description="config-desc") + + assert ctx.name == "app-name" + assert ctx.description == "app-desc" + + ctx_no_app = _make_context() + + assert ctx_no_app.name is None + assert ctx_no_app.description is None diff --git a/tests/server/test_app_server.py b/tests/server/test_app_server.py index 16d61528d..2b734b0f9 100644 --- a/tests/server/test_app_server.py +++ b/tests/server/test_app_server.py @@ -76,9 +76,13 @@ async def test_workflow_run_with_custom_workflow_id( ) # Verify the workflow was created - mock_workflow_class.create.assert_called_once_with( - name=workflow_name, - context=mock_server_context.request_context.lifespan_context.context, + mock_workflow_class.create.assert_called_once() + create_kwargs = mock_workflow_class.create.call_args.kwargs + assert create_kwargs["name"] == workflow_name + # Bound context should be derived from the original lifespan context + assert ( + create_kwargs["context"] + is not mock_server_context.request_context.lifespan_context.context ) # Verify run_async was called with the custom workflow_id diff --git a/tests/server/test_tool_decorators.py b/tests/server/test_tool_decorators.py index dec2d8fb4..5504ac738 100644 --- a/tests/server/test_tool_decorators.py +++ b/tests/server/test_tool_decorators.py @@ -1,7 +1,12 @@ import asyncio +from typing import Any + import pytest from mcp_agent.app import MCPApp +from mcp_agent.core.context import Context +from mcp.types import ToolAnnotations, Icon +from mcp.server.fastmcp import Context as FastMCPContext from mcp_agent.server.app_server import ( create_workflow_tools, create_declared_function_tools, @@ -34,8 +39,21 @@ def add_tool( description=None, annotations=None, structured_output=None, + meta=None, + **kwargs, ): - self.added_tools.append((name, fn, description, structured_output)) + entry = { + "name": name, + "fn": fn, + "title": title, + "description": description, + "annotations": annotations, + "structured_output": structured_output, + "meta": meta, + } + entry.update(kwargs) + self.added_tools.append(entry) + return fn def _make_ctx(server_context): @@ -60,7 +78,15 @@ async def test_app_tool_registers_and_executes_sync_tool(): app = MCPApp(name="test_app_tool") await app.initialize() - @app.tool(name="echo", description="Echo input") + @app.tool( + name="echo", + title="Echo Title", + description="Echo input", + annotations={"idempotentHint": True}, + icons=[{"src": "emoji:wave"}], + meta={"source": "test"}, + structured_output=True, + ) async def echo(text: str) -> str: return text + "!" @@ -76,17 +102,31 @@ async def echo(text: str) -> str: # Verify tool names: only the sync tool endpoint is added _decorated_names = {name for name, _ in mcp.decorated_tools} - added_names = {name for name, *_ in mcp.added_tools} + added_names = {entry["name"] for entry in mcp.added_tools} # No workflows-* aliases for sync tools; check only echo assert "echo" in added_names # synchronous tool # Execute the synchronous tool function and ensure it returns unwrapped value # Find the registered sync tool function - sync_tool_fn = next(fn for name, fn, *_ in mcp.added_tools if name == "echo") + sync_tool_entry = next( + entry for entry in mcp.added_tools if entry["name"] == "echo" + ) + sync_tool_fn = sync_tool_entry["fn"] ctx = _make_ctx(server_context) result = await sync_tool_fn(text="hi", ctx=ctx) assert result == "hi!" # unwrapped (not WorkflowResult) + bound_app_ctx = getattr(ctx, "bound_app_context", None) + assert bound_app_ctx is not None + assert bound_app_ctx is not server_context.context + assert bound_app_ctx.fastmcp == ctx.fastmcp + assert sync_tool_entry["title"] == "Echo Title" + assert isinstance(sync_tool_entry["annotations"], ToolAnnotations) + assert sync_tool_entry["annotations"].idempotentHint is True + assert sync_tool_entry["icons"] == [Icon(src="emoji:wave")] + # meta support in FastMCP add_tool pending upstream release; expect None for now + assert sync_tool_entry.get("meta") in ({"source": "test"}, None) + assert sync_tool_entry["structured_output"] is True # Also ensure the underlying workflow returned a WorkflowResult # Start via workflow_run to get run_id, then wait for completion and inspect @@ -112,7 +152,14 @@ async def test_app_async_tool_registers_aliases_and_workflow_tools(): app = MCPApp(name="test_app_async_tool") await app.initialize() - @app.async_tool(name="long") + @app.async_tool( + name="long", + title="Long Task", + annotations={"readOnlyHint": True}, + icons=[Icon(src="emoji:check")], + meta={"async": True}, + structured_output=None, + ) async def long_task(x: int) -> str: return f"done:{x}" @@ -125,10 +172,16 @@ async def long_task(x: int) -> str: create_declared_function_tools(mcp, server_context) decorated_names = {name for name, _ in mcp.decorated_tools} - added_names = {name for name, *_ in mcp.added_tools} + added_names = {entry["name"] for entry in mcp.added_tools} # We register the async tool under its given name via add_tool assert "long" in added_names + long_entry = next(entry for entry in mcp.added_tools if entry["name"] == "long") + assert long_entry["title"] == "Long Task" + assert isinstance(long_entry["annotations"], ToolAnnotations) + assert long_entry["annotations"].readOnlyHint is True + assert long_entry["icons"] == [Icon(src="emoji:check")] + assert long_entry.get("meta") in ({"async": True}, None) # And we suppress workflows-* for async auto tools assert "workflows-long-run" not in decorated_names @@ -171,3 +224,69 @@ async def wrapme(v: int) -> int: assert result_payload["value"] == 42 else: assert result_payload in (42, {"result": 42}) + + +@pytest.mark.asyncio +async def test_workflow_run_binds_app_context_per_request(): + app = MCPApp(name="test_request_binding") + await app.initialize() + + sentinel_session = object() + app.context.upstream_session = sentinel_session + + captured: dict[str, Any] = {} + + @app.async_tool(name="binding_tool") + async def binding_tool( + value: int, + app_ctx: Context | None = None, + ctx: FastMCPContext | None = None, + ) -> str: + captured["app_ctx"] = app_ctx + captured["ctx"] = ctx + if app_ctx is not None: + # Access session property to confirm fallback path works during execution + captured["session_property"] = app_ctx.session + captured["request_context"] = getattr(app_ctx, "_request_context", None) + captured["fastmcp"] = app_ctx.fastmcp + return f"done:{value}" + + server_context = type( + "SC", (), {"workflows": app.workflows, "context": app.context} + )() + + ctx = _make_ctx(server_context) + # Simulate FastMCP attaching the app to its server for lookup paths + ctx.fastmcp._mcp_agent_app = app # type: ignore[attr-defined] + + run_info = await _workflow_run(ctx, "binding_tool", {"value": 7}) + run_id = run_info["run_id"] + + # Workflow should have the FastMCP request context attached + workflow = await app.context.workflow_registry.get_workflow(run_id) + assert getattr(workflow, "_mcp_request_context", None) is ctx + + # Wait for completion so the tool function executes + for _ in range(200): + status = await app.context.workflow_registry.get_workflow_status(run_id) + if status.get("completed"): + break + await asyncio.sleep(0.01) + assert status.get("completed") is True + + bound_app_ctx = getattr(ctx, "bound_app_context", None) + assert bound_app_ctx is not None + # The tool received the per-request bound context + assert captured.get("app_ctx") is bound_app_ctx + # FastMCP context argument should be the original request context + assert captured.get("ctx") is ctx + assert getattr(captured.get("ctx"), "bound_app_context", None) is bound_app_ctx + assert bound_app_ctx is not app.context + # Upstream session should be preserved on the bound context + assert bound_app_ctx.upstream_session is sentinel_session + assert captured.get("session_property") is sentinel_session + # FastMCP instance and request context bridge through the bound context + assert captured.get("fastmcp") is ctx.fastmcp + assert captured.get("request_context") is ctx.request_context + # Accessing session on the bound context should prefer upstream_session + assert bound_app_ctx.session is sentinel_session diff --git a/uv.lock b/uv.lock index 49e713df7..605293683 100644 --- a/uv.lock +++ b/uv.lock @@ -2007,7 +2007,7 @@ wheels = [ [[package]] name = "mcp" -version = "1.13.1" +version = "1.18.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -2022,14 +2022,14 @@ dependencies = [ { name = "starlette" }, { name = "uvicorn", marker = "sys_platform != 'emscripten'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/66/3c/82c400c2d50afdac4fbefb5b4031fd327e2ad1f23ccef8eee13c5909aa48/mcp-1.13.1.tar.gz", hash = "sha256:165306a8fd7991dc80334edd2de07798175a56461043b7ae907b279794a834c5", size = 438198, upload-time = "2025-08-22T09:22:16.061Z" } +sdist = { url = "https://files.pythonhosted.org/packages/1a/e0/fe34ce16ea2bacce489ab859abd1b47ae28b438c3ef60b9c5eee6c02592f/mcp-1.18.0.tar.gz", hash = "sha256:aa278c44b1efc0a297f53b68df865b988e52dd08182d702019edcf33a8e109f6", size = 482926, upload-time = "2025-10-16T19:19:55.125Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/19/3f/d085c7f49ade6d273b185d61ec9405e672b6433f710ea64a90135a8dd445/mcp-1.13.1-py3-none-any.whl", hash = "sha256:c314e7c8bd477a23ba3ef472ee5a32880316c42d03e06dcfa31a1cc7a73b65df", size = 161494, upload-time = "2025-08-22T09:22:14.705Z" }, + { url = "https://files.pythonhosted.org/packages/1b/44/f5970e3e899803823826283a70b6003afd46f28e082544407e24575eccd3/mcp-1.18.0-py3-none-any.whl", hash = "sha256:42f10c270de18e7892fdf9da259029120b1ea23964ff688248c69db9d72b1d0a", size = 168762, upload-time = "2025-10-16T19:19:53.2Z" }, ] [[package]] name = "mcp-agent" -version = "0.1.33" +version = "0.1.36" source = { editable = "." } dependencies = [ { name = "aiohttp" }, @@ -2042,7 +2042,6 @@ dependencies = [ { name = "opentelemetry-exporter-otlp-proto-http" }, { name = "opentelemetry-instrumentation-anthropic" }, { name = "opentelemetry-instrumentation-openai" }, - { name = "pathspec" }, { name = "prompt-toolkit" }, { name = "pydantic" }, { name = "pydantic-settings" }, @@ -2051,7 +2050,6 @@ dependencies = [ { name = "rich" }, { name = "scikit-learn" }, { name = "typer" }, - { name = "watchdog" }, { name = "websockets" }, ] @@ -2073,6 +2071,11 @@ azure = [ bedrock = [ { name = "boto3" }, ] +cli = [ + { name = "pathspec" }, + { name = "python-dotenv" }, + { name = "watchdog" }, +] cohere = [ { name = "cohere" }, ] @@ -2124,27 +2127,28 @@ requires-dist = [ { name = "httpx", specifier = ">=0.28.1" }, { name = "jsonref", specifier = ">=1.1.0" }, { name = "langchain-core", marker = "extra == 'langchain'", specifier = ">=0.3.64" }, - { name = "mcp", specifier = ">=1.13.1" }, + { name = "mcp", specifier = ">=1.18.0" }, { name = "numpy", specifier = ">=2.1.3" }, { name = "openai", marker = "extra == 'openai'", specifier = ">=1.58.1" }, { name = "opentelemetry-distro", specifier = ">=0.50b0" }, { name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.29.0" }, { name = "opentelemetry-instrumentation-anthropic", specifier = ">=0.39.3" }, { name = "opentelemetry-instrumentation-openai", specifier = ">=0.39.3" }, - { name = "pathspec", specifier = ">=0.12.1" }, + { name = "pathspec", marker = "extra == 'cli'", specifier = ">=0.12.1" }, { name = "prompt-toolkit", specifier = ">=3.0.50" }, { name = "pydantic", specifier = ">=2.10.4" }, { name = "pydantic-settings", specifier = ">=2.7.0" }, { name = "pydantic-yaml", specifier = ">=1.5.1" }, + { name = "python-dotenv", marker = "extra == 'cli'", specifier = ">=1.0.0" }, { name = "pyyaml", specifier = ">=6.0.2" }, { name = "rich", specifier = ">=13.9.4" }, { name = "scikit-learn", specifier = ">=1.6.0" }, { name = "temporalio", extras = ["opentelemetry"], marker = "extra == 'temporal'", specifier = ">=1.10.0" }, { name = "typer", specifier = ">=0.15.3" }, - { name = "watchdog", specifier = ">=6.0.0" }, + { name = "watchdog", marker = "extra == 'cli'", specifier = ">=6.0.0" }, { name = "websockets", specifier = ">=12.0" }, ] -provides-extras = ["temporal", "anthropic", "anthropic-bedrock", "anthropic-vertex", "bedrock", "openai", "azure", "google", "cohere", "langchain", "crewai"] +provides-extras = ["cli", "temporal", "anthropic", "anthropic-bedrock", "anthropic-vertex", "bedrock", "openai", "azure", "google", "cohere", "langchain", "crewai"] [package.metadata.requires-dev] dev = [