Skip to content

Conversation

saqadri
Copy link
Collaborator

@saqadri saqadri commented Sep 22, 2025

Support tools/listChanged, prompts/listChanged, resources/listChanged.

Plus a bunch of formatter changes.

Summary by CodeRabbit

  • New Features

    • Agent now automatically syncs its tools, prompts, and resources with the central aggregator before each operation for up-to-date results.
    • Aggregator responds to server “list changed” notifications and refreshes affected tools, prompts, and resources in near real time.
    • More robust persistent connections and cleanup improve reliability during server updates and shutdowns.
  • Tests

    • Added tests covering notification-driven refreshes for tools, prompts, and resources.
    • Expanded concurrency and agent workflow tests to ensure correct state synchronization under load.

Copy link

coderabbitai bot commented Sep 22, 2025

Walkthrough

Adds agent-to-aggregator state sync before operations, introduces a GetAggregatorStateTask/request, and updates Agent methods to refresh maps. Enhances MCPAggregator with per-server notification handling, refresh scheduling, capability tracking, and a MCPCompoundServer. Updates and adds tests to cover notification-triggered refresh and new async behaviors.

Changes

Cohort / File(s) Summary of changes
Agent state synchronization
src/mcp_agent/agents/agent.py
Adds _sync_with_aggregator_state, GetAggregatorStateRequest, wires pre-call sync into tool/prompt/resource methods, and implements get_aggregator_state_task to return InitAggregatorResponse with current cached maps.
Aggregator notifications and refresh
src/mcp_agent/mcp/mcp_aggregator.py
Introduces per-server notification handler, refresh scheduling (_schedule_server_refresh, _refresh_server), persistent connection helper, capability support tracking, updated capability typing/annotations, lifecycle cleanup, and new MCPCompoundServer.
Agent tests adjustments
tests/agents/test_agent.py, tests/agents/test_agent_tasks_concurrency.py
Updates mocks to async side effects handling multiple tasks; adds asyncio locks in FakeAggregator for map guarding.
Aggregator tests for list_changed
tests/mcp/test_mcp_aggregator.py
Adds async tests verifying tools/prompts/resources list_changed notifications trigger server reload; expands imports; aligns dummy connection interface.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client as Caller
  participant Agent as Agent
  participant Tasks as AgentTasks
  participant Agg as MCPAggregator

  rect rgba(230,245,255,0.6)
    note over Agent: Pre-operation sync
    Client->>Agent: list_tools / get_prompt / call_tool
    Agent->>Tasks: GetAggregatorStateRequest(agent_name)
    Tasks->>Agg: Read cached state (under locks)
    Agg-->>Tasks: InitAggregatorResponse (maps, server mappings)
    Tasks-->>Agent: Aggregator state
    Agent->>Agent: Update in-memory maps
  end

  alt Operation continues
    Agent-->>Client: Result (tools/prompts/resources or tool call)
  end
Loading
sequenceDiagram
  autonumber
  participant Server as MCP Server
  participant Agg as MCPAggregator
  participant Conn as Persistent Session

  rect rgba(240,255,240,0.6)
    note over Agg: Notification setup
    Agg->>Agg: _get_persistent_server_connection(server)
    Agg->>Conn: Register notification handler
  end

  Server-->>Agg: ServerNotification(list_changed: tools/prompts/resources)
  Agg->>Agg: _handle_incoming_server_message(...)
  Agg->>Agg: _schedule_server_refresh(server)
  par Refresh task
    Agg->>Agg: _refresh_server(server)
    Agg->>Agg: load_server(server) and update caches
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • rholinshead
  • jtcorbett

Poem

A hop, a sync, a gentle thrum,
Aggregators drum—here changes come. 🥕
Tools and prompts line up in rows,
When servers whisper “lists have changed,” it knows.
I twitch my ears, refresh in flight—
Then bound along: all maps set right. 🐇

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 41.67% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title "Support MCP tool, resource and prompt discovery" accurately and concisely summarizes the primary change set — adding support for tools/prompts/resources discovery (list_changed notifications) and related synchronization; it is specific, relevant to the PR objectives and file diffs, and readable for teammates scanning history.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/mcp_discovery

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
src/mcp_agent/mcp/sampling_handler.py (1)

82-91: Harden response formatter against None/non‑text content

Same NoneType risk as above; result.content.text can be None, and other content types may carry data.

Apply this diff:

@@
-    content = (
-        result.content.text if hasattr(result.content, "text") else str(result.content)
-    )
+    content = (
+        (result.content.text or "")
+        if isinstance(result.content, TextContent)
+        else (
+            str(getattr(result.content, "data", ""))
+            if hasattr(result.content, "data")
+            else (str(result.content) if result.content is not None else "")
+        )
+    )

If you add a shared normalizer as suggested in the previous comment, use it here to avoid duplication.

src/mcp_agent/mcp/client_proxy.py (1)

179-181: Sanitize signal names for Temporal compatibility.

method may contain characters like /, :, or spaces. Temporal signal names are best kept to alphanumerics/underscores and reasonable length. Generate a slug to avoid runtime failures in certain backends/SDKs.

Apply:

@@
-import uuid
+import uuid
+import re
@@
-        signal_name = f"mcp_rpc_{method}_{uuid.uuid4().hex}"
+        safe_method = re.sub(r"[^A-Za-z0-9_]+", "_", method)[:64]
+        signal_name = f"mcp_rpc_{safe_method}_{uuid.uuid4().hex}"
@@
-                    json={
+                    json={
                         "method": method,
                         "params": params or {},
                         "signal_name": signal_name,

Also applies to: 211-214

src/mcp_agent/agents/agent.py (1)

1059-1066: Avoid logging raw human prompts/metadata (PII risk).

logger.debug(..., data=request) may include sensitive user text. Log IDs only.

-logger.debug("Requesting human input:", data=request)
+logger.debug(
+    "Requesting human input",
+    data={"request_id": request_id, "workflow_id": request.workflow_id},
+)

And similarly later:

-logger.debug("Received human input:", data=user_input)
+logger.debug(
+    "Received human input",
+    data={"request_id": user_input.request_id},
+)
-logger.debug("Received human input signal", data=result)
+logger.debug(
+    "Received human input signal",
+    data={"request_id": request.request_id},
+)
src/mcp_agent/mcp/mcp_aggregator.py (1)

1579-1591: MCPCompoundServer._call_tool returns wrong type.

Signature says CallToolResult but it returns result.content. Return the full CallToolResult.

-async def _call_tool(
-    self, name: str, arguments: dict | None = None
-) -> CallToolResult:
+async def _call_tool(
+    self, name: str, arguments: dict | None = None
+) -> CallToolResult:
     """Call a specific tool from the aggregated servers."""
     try:
         result = await self.aggregator.call_tool(name=name, arguments=arguments)
-        return result.content
+        return result
     except Exception as e:
         return CallToolResult(
             isError=True,
             content=[
                 TextContent(type="text", text=f"Error calling tool: {str(e)}")
             ],
         )

Note: adjust the related test to expect a CallToolResult (not the raw content). I can update the test accordingly if desired.

🧹 Nitpick comments (19)
examples/human_input/temporal/main.py (5)

20-22: Comment mentions FastMCPApp but code instantiates MCPApp.

Align the comment with the actual class to avoid confusion.

-# Create a single FastMCPApp instance (which extends MCPApp)
-# We don't need to explicitly create a tool for human interaction; providing the human_input_callback will
-# automatically create a tool for the agent to use.
+# Create a single MCPApp instance
+# We don't need to explicitly create a tool for human interaction; providing human_input_callback will
+# automatically create a tool for the agent to use.

31-31: Optional parameter is annotated but immediately dereferenced.

app_ctx defaults to None, yet the body assumes it’s non‑None. Either require it or guard.

-async def greet(app_ctx: Context | None = None) -> str:
+async def greet(app_ctx: Context) -> str:

If the decorator requires Optional, add a quick guard instead:

 async def greet(app_ctx: Context | None = None) -> str:
+    if app_ctx is None:
+        raise ValueError("app_ctx is required")

39-44: Docstring args don’t reflect the actual signature.

Document app_ctx and remove the placeholder “input: none”.

-    Args:
-        input: none
+    Args:
+        app_ctx: Workflow context provided by the framework; use to access app/executor.

46-49: Avoid shadowing the module-level app variable.

Rename the local to reduce confusion.

-    app = app_ctx.app
-
-    logger = app.logger
+    workflow_app = app_ctx.app
+    logger = workflow_app.logger

57-64: Minor naming + logging hygiene.

  • Rename finder_llm to greeter_llm for clarity.
  • Consider not logging full LLM output; truncate and lower log level to avoid leaking user input.
-    async with greeting_agent:
-        finder_llm = await greeting_agent.attach_llm(OpenAIAugmentedLLM)
+    async with greeting_agent:
+        greeter_llm = await greeting_agent.attach_llm(OpenAIAugmentedLLM)
 
-        result = await finder_llm.generate_str(
+        result = await greeter_llm.generate_str(
             message="Ask the user for their name and greet them.",
         )
-        logger.info("[workflow-mode] greet_tool agent result", data={"result": result})
+        logger.debug("[workflow-mode] greet_tool agent result",
+                     data={"result_preview": (result[:200] if isinstance(result, str) else "<non-str>")})
examples/human_input/temporal/client.py (2)

138-141: Use a keyword for server_registry to guard against signature drift.

Minor clarity/safety improvement; avoids relying on positional order.

-            async with gen_client(
-                "basic_agent_server",
-                context.server_registry,
-                client_session_factory=make_session,
-            ) as server:
+            async with gen_client(
+                "basic_agent_server",
+                server_registry=context.server_registry,
+                client_session_factory=make_session,
+            ) as server:

159-161: Handle Python 3.11’s built-in ExceptionGroup as well.

Currently only the backport is used; on 3.11 the typed branch may be skipped. Prefer builtin first, then backport.

Apply near the imports (outside this hunk):

# Prefer builtin (3.11+) then fall back to backport
try:  # 3.11+
    _ExceptionGroup = ExceptionGroup  # type: ignore[name-defined]
except NameError:  # 3.10 and earlier
    try:
        from exceptiongroup import ExceptionGroup as _ExceptionGroup  # type: ignore[assignment]
    except Exception:  # pragma: no cover
        _ExceptionGroup = None  # type: ignore

No change needed in the handling block; your existing isinstance checks will then cover both cases.

Also applies to: 167-168

examples/mcp/mcp_elicitation/temporal/client.py (1)

175-178: Guard against empty tool result content before JSON decode.

If run_result.content is missing/empty, this raises an IndexError.

-                execution = WorkflowExecution(**json.loads(run_result.content[0].text))
+                if not run_result.content:
+                    raise RuntimeError(f"Empty content in run result: {run_result}")
+                execution = WorkflowExecution(**json.loads(run_result.content[0].text))
tests/agents/test_agent_tasks_concurrency.py (2)

36-38: Locks added but unused in test logic.

If the goal is fidelity to MCPAggregator internals, fine. Otherwise, consider removing to reduce noise.


121-125: Remove unused local call_list_tools.

Declared but never used.

-    async def call_list_tools():
-        return await tasks.list_tools_task(
-            ListToolsRequest(agent_name=agent_name, server_name=None)
-        )
tests/agents/test_agent.py (1)

534-546: Side-effect routing by name is brittle; add identity/attr fallbacks.

AsyncMock/wrapped callables may not preserve __name__. Guard by checking known task attrs or comparing against task.__wrapped__ when present, then fall back to __name__.

-async def execute_side_effect(task, *args, **kwargs):
-    task_name = getattr(task, "__name__", "")
+async def execute_side_effect(task, *args, **kwargs):
+    # Prefer robust identification over __name__ only
+    task_obj = getattr(task, "__wrapped__", task)
+    task_name = getattr(task_obj, "__name__", "")
     if task_name == "initialize_aggregator_task":
         return init_response
     if task_name == "get_aggregator_state_task":
         return init_response
     if task_name == "call_tool_task":
         return mock_result
     return init_response
src/mcp_agent/server/app_server.py (3)

1914-1933: Safer X-Forwarded host normalization (IPv6/port-aware).

netloc.replace(host, new_host) can mis-handle bracketed IPv6 and ports. Build netloc from parsed components.

-    def _normalize_gateway_url(url: str | None) -> str | None:
+    def _normalize_gateway_url(url: str | None) -> str | None:
         if not url:
             return url
         try:
             from urllib.parse import urlparse, urlunparse
-
-            parsed = urlparse(url)
-            host = parsed.hostname or ""
-            # Replace wildcard binds with a loopback address that's actually connectable
-            if host in ("0.0.0.0", "::", "[::]"):
-                new_host = "127.0.0.1" if host == "0.0.0.0" else "localhost"
-                netloc = parsed.netloc.replace(host, new_host)
-                parsed = parsed._replace(netloc=netloc)
-                return urlunparse(parsed)
+            parsed = urlparse(url)
+            host = parsed.hostname or ""
+            if host in ("0.0.0.0", "::"):
+                new_host = "127.0.0.1" if host == "0.0.0.0" else "localhost"
+                # Preserve port if present
+                port = f":{parsed.port}" if parsed.port else ""
+                # Preserve userinfo if present
+                auth = ""
+                if parsed.username:
+                    auth = parsed.username
+                    if parsed.password:
+                        auth += f":{parsed.password}"
+                    auth += "@"
+                netloc = f"{auth}{new_host}{port}"
+                parsed = parsed._replace(netloc=netloc)
+                return urlunparse(parsed)
         except Exception:
             pass
         return url

377-386: Idempotency store is unbounded; add TTL/size cap to prevent memory growth.

_IDEMPOTENCY_KEYS_SEEN accumulates per-execution keys with no eviction. Bound the per-execution set and/or periodically purge old entries.

Would you like a lightweight in-memory TTL cache (e.g., timestamp + periodic sweep) or a fixed-size LRU per execution_id?


958-1016: Pending prompts registry may leak; define lifecycle cleanup.

_PENDING_PROMPTS adds entries but this file doesn’t remove them. Ensure the submit/response path deletes by request_id, and add a max size/TTL to avoid unbounded growth on failures.

If cleanup lives elsewhere, point to it; otherwise I can draft a small helper to sweep stale entries.

tests/mcp/test_mcp_aggregator.py (1)

1287-1391: Good coverage for list_changed → server reload; consider coalescing test.

You assert one refresh fires. Add a test that emits two notifications quickly and verifies only one refresh runs then a second due to _server_refresh_pending coalescing.

I can add a test that sends two notifications back-to-back, checks that the first task is running, sets load_server to awaitable barrier, and then confirms exactly two awaits after completion.

src/mcp_agent/agents/agent.py (2)

493-528: State sync on every call can be costly; gate or debounce.

_sync_with_aggregator_state() runs before each list/get/call. Consider:

  • Short-circuit if initialized unchanged and no aggregator version bump.
  • Debounce by time window (e.g., 100–250 ms) to coalesce bursts.

I can wire a monotonic timestamp/version from the aggregator to avoid redundant RPCs.


1082-1087: Use named parameter for signal_name consistently.

The second signal(...) call passes request_id positionally; use signal_name= for clarity and to avoid signature drift.

- await self.context.executor.signal(
-     request_id,
+ await self.context.executor.signal(
+     signal_name=request_id,
      payload=f"Error getting human input: {str(e)}",
      workflow_id=request.workflow_id,
      run_id=request.run_id,
 )
src/mcp_agent/mcp/mcp_aggregator.py (2)

730-739: Typo in span name: get_capabilitites → get_capabilities.

Minor but affects tracing/search.

-with tracer.start_as_current_span(
-    f"{self.__class__.__name__}.get_capabilitites"
-) as span:
+with tracer.start_as_current_span(
+    f"{self.__class__.__name__}.get_capabilities"
+) as span:

1354-1356: Return type should allow None for not-found cases.

_parse_capability_name returns (None, None) but annotated as tuple[str, str].

-async def _parse_capability_name(
-    self, name: str, capability: Literal["tool", "prompt", "resource"]
-) -> tuple[str, str]:
+async def _parse_capability_name(
+    self, name: str, capability: Literal["tool", "prompt", "resource"]
+) -> tuple[str | None, str | None]:
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d954ea6 and f72b827.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (18)
  • examples/human_input/temporal/client.py (4 hunks)
  • examples/human_input/temporal/main.py (1 hunks)
  • examples/mcp/mcp_elicitation/temporal/client.py (4 hunks)
  • examples/mcp/mcp_elicitation/temporal/main.py (1 hunks)
  • src/mcp_agent/agents/agent.py (9 hunks)
  • src/mcp_agent/app.py (1 hunks)
  • src/mcp_agent/executor/temporal/session_proxy.py (3 hunks)
  • src/mcp_agent/executor/temporal/system_activities.py (1 hunks)
  • src/mcp_agent/human_input/elicitation_handler.py (7 hunks)
  • src/mcp_agent/mcp/client_proxy.py (7 hunks)
  • src/mcp_agent/mcp/mcp_aggregator.py (14 hunks)
  • src/mcp_agent/mcp/sampling_handler.py (2 hunks)
  • src/mcp_agent/server/app_server.py (25 hunks)
  • src/mcp_agent/workflows/factory.py (1 hunks)
  • tests/agents/test_agent.py (1 hunks)
  • tests/agents/test_agent_tasks_concurrency.py (2 hunks)
  • tests/human_input/test_elicitation_handler.py (7 hunks)
  • tests/mcp/test_mcp_aggregator.py (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.

Applied to files:

  • examples/human_input/temporal/main.py
  • examples/mcp/mcp_elicitation/temporal/client.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.

Applied to files:

  • examples/human_input/temporal/client.py
🧬 Code graph analysis (12)
src/mcp_agent/app.py (1)
src/mcp_agent/executor/decorator_registry.py (1)
  • get_workflow_run_decorator (73-82)
tests/human_input/test_elicitation_handler.py (3)
src/mcp_agent/human_input/elicitation_handler.py (1)
  • _handle_elicitation_response (19-51)
src/mcp_agent/human_input/types.py (1)
  • HumanInputRequest (7-29)
tests/agents/test_agent.py (1)
  • mock_context (19-28)
src/mcp_agent/mcp/client_proxy.py (1)
src/mcp_agent/app.py (1)
  • workflow (434-468)
src/mcp_agent/agents/agent.py (3)
src/mcp_agent/app.py (2)
  • executor (177-178)
  • context (161-166)
src/mcp_agent/executor/executor.py (2)
  • execute (86-92)
  • execute (293-316)
src/mcp_agent/executor/temporal/__init__.py (1)
  • execute (212-228)
tests/mcp/test_mcp_aggregator.py (1)
src/mcp_agent/mcp/mcp_aggregator.py (2)
  • load_server (347-476)
  • _handle_incoming_server_message (620-674)
examples/mcp/mcp_elicitation/temporal/client.py (5)
src/mcp_agent/app.py (2)
  • context (161-166)
  • server_registry (173-174)
examples/human_input/temporal/client.py (1)
  • make_session (122-134)
examples/mcp_agent_server/asyncio/client.py (1)
  • make_session (115-127)
src/mcp_agent/mcp/mcp_aggregator.py (1)
  • call_tool (1017-1142)
src/mcp_agent/executor/workflow.py (2)
  • WorkflowExecution (64-71)
  • run_id (133-138)
examples/human_input/temporal/client.py (4)
examples/mcp/mcp_elicitation/temporal/client.py (1)
  • make_session (125-137)
examples/mcp_agent_server/asyncio/client.py (1)
  • make_session (115-127)
src/mcp_agent/agents/agent.py (1)
  • call_tool (1119-1174)
src/mcp_agent/mcp/mcp_aggregator.py (1)
  • call_tool (1017-1142)
examples/mcp/mcp_elicitation/temporal/main.py (1)
src/mcp_agent/app.py (1)
  • MCPApp (54-1003)
src/mcp_agent/human_input/elicitation_handler.py (2)
src/mcp_agent/executor/temporal/session_proxy.py (2)
  • request (108-161)
  • request (360-363)
src/mcp_agent/human_input/types.py (2)
  • HumanInputRequest (7-29)
  • HumanInputResponse (32-42)
tests/agents/test_agent.py (1)
tests/workflows/llm/test_augmented_llm_ollama.py (1)
  • execute_side_effect (190-196)
src/mcp_agent/server/app_server.py (3)
src/mcp_agent/executor/temporal/session_proxy.py (3)
  • request (108-161)
  • request (360-363)
  • send_request (183-204)
src/mcp_agent/app.py (4)
  • logger (205-222)
  • context (161-166)
  • executor (177-178)
  • server_registry (173-174)
src/mcp_agent/executor/temporal/workflow_signal.py (1)
  • signal (146-195)
src/mcp_agent/mcp/mcp_aggregator.py (4)
tests/mcp/test_mcp_aggregator.py (4)
  • get_server (248-249)
  • get_server (594-595)
  • get_server (604-605)
  • server_capabilities (590-591)
src/mcp_agent/mcp/mcp_connection_manager.py (1)
  • get_server (519-563)
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
  • MCPAgentClientSession (73-428)
src/mcp_agent/agents/agent.py (1)
  • _annotate_span_for_capabilities (406-421)
🔇 Additional comments (20)
src/mcp_agent/app.py (1)

710-712: LGTM — formatting-only change

Parenthesis placement change is style-only; no functional impact.

examples/mcp/mcp_elicitation/temporal/main.py (1)

16-16: LGTM on the MCPApp init; no functional change.

One minor ask: ensure the name="elicitation_demo" won’t collide with other example apps if multiple are registered with the aggregator (some registries key on app name). Consider confirming uniqueness in your env or via config.

examples/human_input/temporal/main.py (1)

9-9: Formatting-only change looks good.

Blank line before imports improves readability and follows common style.

examples/human_input/temporal/client.py (3)

16-16: No-op whitespace change.

Nothing to review here.


122-127: Session factory signature/typing looks good.

Matches other examples and preserves behavior.


152-152: Simplified tool call is fine.

greet is registered in examples/human_input/temporal/main.py (decorated with @app.tool); the call site is valid.

examples/mcp/mcp_elicitation/temporal/client.py (2)

186-193: Good defensive handling for None workflow_status.

Early exit with error logging prevents tight-looping on bad payloads. LGTM.


242-251: Benign SSE shutdown handling looks correct.

Selective ignore of BrokenResourceError (including ExceptionGroup) avoids noisy test logs without hiding real errors.

src/mcp_agent/executor/temporal/system_activities.py (1)

93-98: Signature reflow only — no functional impact.

Interface remains stable; activities still delegate to proxy helpers.

src/mcp_agent/executor/temporal/session_proxy.py (3)

108-111: Minor: signature reflow — OK.

No behavior change.


324-329: Minor: signature reflow — OK.

No behavior change.


266-273: No change required — notification names are consistent.
session_proxy, mcp_aggregator, and tests all use "notifications/{resources,tools,prompts}/list_changed".

src/mcp_agent/human_input/elicitation_handler.py (2)

20-35: Response handling is clear and robust.

Trim + slash-command fast path is straightforward; fallbacks are sane.


120-125: Good structured error logging.

Including data={"error": ...} keeps logs parseable.

tests/human_input/test_elicitation_handler.py (1)

81-83: Test case reads well and covers accept path.

Mocks and assertions look correct.

src/mcp_agent/workflows/factory.py (1)

107-110: Formatting-only changes — OK.

AgentSpec construction unchanged semantically.

src/mcp_agent/mcp/client_proxy.py (1)

53-57: Keep both X-MCP-Gateway-Token and Authorization headers

Server accepts the gateway token from either header (X-MCP-Gateway-Token or Authorization: Bearer), so setting both in client_proxy is intentional and compatible — see src/mcp_agent/server/app_server.py:526–531 and 1886–1891.

tests/mcp/test_mcp_aggregator.py (1)

8-14: Imports for list_changed notifications look correct.

Covers tools/prompts/resources; aligns with aggregator notification handling.

src/mcp_agent/mcp/mcp_aggregator.py (2)

539-553: Good: centralized persistent-connection fetch + handler install.

Encapsulates connection retrieval and ensures notification handling is attached once.


676-704: Refresh coalescing logic LGTM.

Pending flag prevents task storms while guaranteeing one trailing refresh.

Consider a stress test with rapid-fire notifications to validate coalescing under load; I can add one if helpful.

Comment on lines +581 to +619
def _ensure_notification_handler(
self, server_name: str, session: ClientSession
) -> None:
if session is None:
return

existing = self._notification_handler_sessions.get(server_name)
if existing is session:
return

original_handler = getattr(session, "_message_handler", None)

async def downstream_handler(message):
if original_handler is not None:
try:
await original_handler(message)
except Exception as exc: # pragma: no cover - defensive
logger.warning(
f"Error in original message handler for '{server_name}': {exc}",
exc_info=True,
)
else:
await anyio.lowlevel.checkpoint()

async def message_handler(message):
try:
await self._handle_incoming_server_message(server_name, message)
except Exception as exc: # pragma: no cover - defensive
logger.warning(
f"Error handling notification from server '{server_name}': {exc}",
exc_info=True,
)

await downstream_handler(message)

# Replace the session's message handler so we can observe notifications
setattr(session, "_message_handler", message_handler)
self._notification_handler_sessions[server_name] = session

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Hooking private ClientSession._message_handler is fragile; preserve and restore.

Directly overwriting a private attr risks breakage on library updates. Store original handlers per session and restore in close(). Also guard if the session replaces its handler.

I can provide a patch that:

  • Tracks self._original_message_handlers[server_name] = original_handler
  • In close(), iterates sessions and restores originals before clearing maps.

Comment on lines 33 to 39
def _format_sampling_request_for_human(params: CreateMessageRequestParams) -> str:
"""Format sampling request for human review"""
messages_text = ""
for i, msg in enumerate(params.messages):
content = (
msg.content.text if hasattr(msg.content, "text") else str(msg.content)
)
content = msg.content.text if hasattr(msg.content, "text") else str(msg.content)
messages_text += f" Message {i + 1} ({msg.role}): {content[:200]}{'...' if len(content) > 200 else ''}\n"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent NoneType crash when formatting request messages

If msg.content.text exists but is None, len(content) and slicing will raise a TypeError. Also, some content types may expose data instead of text. Safely coalesce to an empty string and handle TextContent explicitly.

Apply this diff:

@@
-    for i, msg in enumerate(params.messages):
-        content = msg.content.text if hasattr(msg.content, "text") else str(msg.content)
-        messages_text += f"  Message {i + 1} ({msg.role}): {content[:200]}{'...' if len(content) > 200 else ''}\n"
+    for i, msg in enumerate(params.messages):
+        if isinstance(msg.content, TextContent):
+            raw = msg.content.text or ""
+        elif hasattr(msg.content, "data"):
+            raw = str(getattr(msg.content, "data", "")) or ""
+        else:
+            raw = str(msg.content) if msg.content is not None else ""
+        preview = raw[:200]
+        ellipsis = "..." if len(raw) > 200 else ""
+        messages_text += f"  Message {i + 1} ({msg.role}): {preview}{ellipsis}\n"

Optional follow‑up: extract a small helper to normalize content for both request/response formatters.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _format_sampling_request_for_human(params: CreateMessageRequestParams) -> str:
"""Format sampling request for human review"""
messages_text = ""
for i, msg in enumerate(params.messages):
content = (
msg.content.text if hasattr(msg.content, "text") else str(msg.content)
)
content = msg.content.text if hasattr(msg.content, "text") else str(msg.content)
messages_text += f" Message {i + 1} ({msg.role}): {content[:200]}{'...' if len(content) > 200 else ''}\n"
def _format_sampling_request_for_human(params: CreateMessageRequestParams) -> str:
"""Format sampling request for human review"""
messages_text = ""
for i, msg in enumerate(params.messages):
if isinstance(msg.content, TextContent):
raw = msg.content.text or ""
elif hasattr(msg.content, "data"):
raw = str(getattr(msg.content, "data", "")) or ""
else:
raw = str(msg.content) if msg.content is not None else ""
preview = raw[:200]
ellipsis = "..." if len(raw) > 200 else ""
messages_text += f" Message {i + 1} ({msg.role}): {preview}{ellipsis}\n"
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/sampling_handler.py around lines 33 to 39, the formatter
assumes msg.content.text is a non-None string and may crash when it's None or
when content exposes data instead of text; change the logic to normalize content
to a safe string first (if isinstance(msg.content, TextContent) use
msg.content.text or "" ; else try getattr(msg.content, "text", None) or
getattr(msg.content, "data", None) and coalesce None to ""), then cast to str
and slice safely for the preview; optionally extract this normalization into a
small helper reused by request/response formatters.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
src/mcp_agent/mcp/mcp_aggregator.py (5)

1039-1051: Bug: server_name parameter is shadowed and ignored

Reassigning server_name = None discards the caller’s provided server_name. This breaks explicit targeting.

Apply this diff to preserve the argument and use a resolved variable:

-            server_name: str = None
-            local_tool_name: str = None
-
-            if server_name:
-                span.set_attribute("server_name", server_name)
-                local_tool_name = name
-            else:
-                server_name, local_tool_name = await self._parse_capability_name(
-                    name, "tool"
-                )
-                span.set_attribute("parsed_server_name", server_name)
-                span.set_attribute("parsed_tool_name", local_tool_name)
+            resolved_server_name: str | None = server_name
+            local_tool_name: str | None = None
+
+            if resolved_server_name:
+                span.set_attribute("server_name", resolved_server_name)
+                local_tool_name = name
+            else:
+                resolved_server_name, local_tool_name = await self._parse_capability_name(
+                    name, "tool"
+                )
+                span.set_attribute("parsed_server_name", resolved_server_name)
+                span.set_attribute("parsed_tool_name", local_tool_name)

Follow-up replacements below update uses of server_name to resolved_server_name.


1113-1144: Use resolved_server_name consistently in logs and gen_client()

Continuation of the shadowing fix.

-                    f"Creating temporary connection to server: {server_name}",
+                    f"Creating temporary connection to server: {resolved_server_name}",
...
-                    {"server_name": server_name, GEN_AI_AGENT_NAME: self.agent_name},
+                    {"server_name": resolved_server_name, GEN_AI_AGENT_NAME: self.agent_name},
...
-                    server_name, server_registry=self.context.server_registry
+                    resolved_server_name, server_registry=self.context.server_registry
...
-                        f"Closing temporary connection to server: {server_name}",
+                        f"Closing temporary connection to server: {resolved_server_name}",
...
-                            "server_name": server_name,
+                            "server_name": resolved_server_name,
...
-                        {
-                            "server_name": server_name,
+                        {
+                            "server_name": resolved_server_name,
                             GEN_AI_AGENT_NAME: self.agent_name,
                         },

Also update the error message in try_call_tool to refer to resolved_server_name.


1443-1446: Bug: capabilities may be dict; attribute access will raise

get_capabilities can return dicts in tests; accessing capabilities.tools causes AttributeError. Handle both dict and object.

-        capabilities = await self.get_capabilities(server_name)
-        if not capabilities or not capabilities.tools:
+        capabilities = await self.get_capabilities(server_name)
+        tools_supported = False
+        if isinstance(capabilities, dict):
+            tools_supported = bool(capabilities.get("tools"))
+        else:
+            tools_supported = bool(getattr(capabilities, "tools", None))
+        if not capabilities or not tools_supported:
             logger.debug(f"Server '{server_name}' does not support tools")
             return []

1475-1477: Bug: capabilities may be dict; attribute access will raise

Mirror the fix from tools to prompts.

-        capabilities = await self.get_capabilities(server_name)
-        if not capabilities or not capabilities.prompts:
+        capabilities = await self.get_capabilities(server_name)
+        prompts_supported = False
+        if isinstance(capabilities, dict):
+            prompts_supported = bool(capabilities.get("prompts"))
+        else:
+            prompts_supported = bool(getattr(capabilities, "prompts", None))
+        if not capabilities or not prompts_supported:
             logger.debug(f"Server '{server_name}' does not support prompts")
             return []

1585-1587: Bug: returning content instead of CallToolResult

The handler is typed to return CallToolResult but returns result.content (list of Content). Return the full result.

-            result = await self.aggregator.call_tool(name=name, arguments=arguments)
-            return result.content
+            result = await self.aggregator.call_tool(name=name, arguments=arguments)
+            return result
🧹 Nitpick comments (6)
src/mcp_agent/mcp/mcp_aggregator.py (6)

676-686: Name refresh tasks for easier debugging

Helps observability in task dumps and error logs.

-        self._server_refresh_tasks[server_name] = asyncio.create_task(
-            self._refresh_server(server_name)
-        )
+        self._server_refresh_tasks[server_name] = asyncio.create_task(
+            self._refresh_server(server_name),
+            name=f"mcp-refresh:{server_name}",
+        )

1506-1510: Resource capability check: consider dict parity (optional)

This already uses getattr(..., None) and safely returns early. For parity with tools/prompts, you may also accept dicts: bool(capabilities.get("resources")).

-        if not capabilities or not getattr(capabilities, "resources", None):
+        if not capabilities or (
+            isinstance(capabilities, dict)
+            and not bool(capabilities.get("resources"))
+        ) or (not isinstance(capabilities, dict) and not getattr(capabilities, "resources", None)):
             logger.debug(f"Server '{server_name}' does not support resources")
             return []

730-731: Typo in span name

Minor: "get_capabilitites" -> "get_capabilities".

-            f"{self.__class__.__name__}.get_capabilitites"
+            f"{self.__class__.__name__}.get_capabilities"

841-852: Guard per-server reads with the same lock used for writes (optional)

These code paths read _server_to_tool_map without _tool_map_lock; concurrent refresh can cause inconsistent reads.

-            if server_name:
+            if server_name:
                 span.set_attribute("server_name", server_name)
-                result = ListToolsResult(
-                    tools=[
-                        namespaced_tool.tool.model_copy(
-                            update={"name": namespaced_tool.namespaced_tool_name}
-                        )
-                        for namespaced_tool in self._server_to_tool_map.get(
-                            server_name, []
-                        )
-                    ]
-                )
+                async with self._tool_map_lock:
+                    result = ListToolsResult(
+                        tools=[
+                            namespaced_tool.tool.model_copy(
+                                update={"name": namespaced_tool.namespaced_tool_name}
+                            )
+                            for namespaced_tool in self._server_to_tool_map.get(
+                                server_name, []
+                            )
+                        ]
+                    )

Consider the same adjustment for prompts and resources server-scoped branches.


1161-1170: Apply locking symmetry for prompts (optional)

Same rationale as tools: wrap in _prompt_map_lock.

-                res = ListPromptsResult(
-                    prompts=[
-                        namespaced_prompt.prompt.model_copy(
-                            update={"name": namespaced_prompt.namespaced_prompt_name}
-                        )
-                        for namespaced_prompt in self._server_to_prompt_map.get(
-                            server_name, []
-                        )
-                    ]
-                )
+                async with self._prompt_map_lock:
+                    res = ListPromptsResult(
+                        prompts=[
+                            namespaced_prompt.prompt.model_copy(
+                                update={"name": namespaced_prompt.namespaced_prompt_name}
+                            )
+                            for namespaced_prompt in self._server_to_prompt_map.get(
+                                server_name, []
+                            )
+                        ]
+                    )

886-899: Apply locking symmetry for resources (optional)

Wrap in _resource_map_lock for consistency.

-                result = ListResourcesResult(
-                    resources=[
-                        namespaced_resource.resource.model_copy(
-                            update={
-                                "name": namespaced_resource.namespaced_resource_name
-                            }
-                        )
-                        for namespaced_resource in self._server_to_resource_map.get(
-                            server_name, []
-                        )
-                    ]
-                )
+                async with self._resource_map_lock:
+                    result = ListResourcesResult(
+                        resources=[
+                            namespaced_resource.resource.model_copy(
+                                update={
+                                    "name": namespaced_resource.namespaced_resource_name
+                                }
+                            )
+                            for namespaced_resource in self._server_to_resource_map.get(
+                                server_name, []
+                            )
+                        ]
+                    )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f72b827 and dd5e01e.

📒 Files selected for processing (1)
  • src/mcp_agent/mcp/mcp_aggregator.py (14 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/mcp_agent/mcp/mcp_aggregator.py (3)
tests/mcp/test_mcp_aggregator.py (4)
  • get_server (248-249)
  • get_server (594-595)
  • get_server (604-605)
  • server_capabilities (590-591)
src/mcp_agent/mcp/mcp_connection_manager.py (1)
  • get_server (519-563)
src/mcp_agent/mcp/mcp_agent_client_session.py (1)
  • MCPAgentClientSession (73-428)
🔇 Additional comments (17)
src/mcp_agent/mcp/mcp_aggregator.py (17)

4-5: Import of anyio.lowlevel is appropriate

Used for cooperative yielding in the downstream handler path. No concerns.


24-27: Notification type imports look good

Covers tools, prompts, and resources list_changed notifications.


157-166: Per‑server notification/refresh tracking: LGTM

State maps for handlers, refresh tasks, pending flags, and advertised support are well‑scoped.


539-554: Centralized persistent connection retrieval + handler setup: LGTM

Good consolidation and handler installation point.


560-561: Refactor to use _get_persistent_server_connection: LGTM

Consistent with the new orchestration.


620-675: Notification method names: confirm list_changed vs listChanged

You match both typed notifications and string methods "notifications/.../list_changed". Some servers may use camelCase ("listChanged"). Ensure tests cover both; otherwise add an alternate string check.

Would you like a follow-up patch to recognize "notifications/*/listChanged" strings?


687-704: Refresh worker with coalescing: LGTM

Good coalescing via _server_refresh_pending.


705-725: Capability support tracking: LGTM

Dict-or-object handling and listChanged detection are fine.


737-754: Span annotation for capabilities supports dict: LGTM


760-766: Persistent capabilities retrieval path: LGTM


790-791: Capability support update on temp connection: LGTM


977-980: Persistent read_resource path: LGTM


1292-1294: Persistent get_prompt path: LGTM


1423-1424: Persistent start_server path reuse: LGTM


1540-1547: Centralized session reuse for fetch paths: LGTM

Good propagation of the persistent session through the fetch helpers.


581-619: Hooking private _message_handler: preserve originals and handle session swaps

This repeats a prior concern: store originals and restore on close; also restore the previous session’s handler if the session instance changes for a server.

Apply these diffs:

  1. Track originals in init:
         self._server_refresh_pending: Dict[str, bool] = {}
         self._capability_list_changed_supported: Dict[str, Dict[str, bool]] = {
             "tools": {},
             "prompts": {},
             "resources": {},
         }
+        # server_name -> original message handler callable
+        self._original_message_handlers: Dict[str, object] = {}
  1. Handle session replacement and store original before overwriting:
     def _ensure_notification_handler(
         self, server_name: str, session: ClientSession
     ) -> None:
         if session is None:
             return

-        existing = self._notification_handler_sessions.get(server_name)
-        if existing is session:
+        existing = self._notification_handler_sessions.get(server_name)
+        if existing is session:
             return
+        # If a different session was previously hooked, restore its original handler
+        if existing is not None:
+            original_prev = self._original_message_handlers.get(server_name)
+            if original_prev is not None:
+                try:
+                    setattr(existing, "_message_handler", original_prev)
+                except Exception:
+                    logger.debug(
+                        f"Failed to restore original handler for previous session of '{server_name}'",
+                        exc_info=True,
+                    )

-        original_handler = getattr(session, "_message_handler", None)
+        original_handler = getattr(session, "_message_handler", None)
+        # Store original for this server/session
+        self._original_message_handlers[server_name] = original_handler

This complements the cleanup fix in close().


292-301: Restore original session message handlers during cleanup

Handlers are replaced in _ensure_notification_handler, but never restored, which can leak our interceptor across aggregators and interfere with other users of the same session/manager.

Apply this diff to restore originals on close:

                 # Always mark as uninitialized regardless of errors
                 self.initialized = False

-                if self._server_refresh_tasks:
+                if self._server_refresh_tasks:
                     tasks = list(self._server_refresh_tasks.values())
                     for task in tasks:
                         task.cancel()
                     await asyncio.gather(*tasks, return_exceptions=True)
                     self._server_refresh_tasks.clear()

-                self._server_refresh_pending.clear()
-                self._notification_handler_sessions.clear()
+                self._server_refresh_pending.clear()
+                # Restore original message handlers before clearing
+                try:
+                    for server_name, session in list(self._notification_handler_sessions.items()):
+                        if not session:
+                            continue
+                        original = getattr(self, "_original_message_handlers", {}).get(server_name)
+                        if original is not None:
+                            try:
+                                setattr(session, "_message_handler", original)
+                            except Exception:
+                                logger.debug(f"Failed to restore original handler for '{server_name}'", exc_info=True)
+                finally:
+                    self._notification_handler_sessions.clear()
+                    if hasattr(self, "_original_message_handlers"):
+                        self._original_message_handlers.clear()

Additionally, add storage for originals (see next comment).

Comment on lines +1105 to 1107
server_connection = await self._get_persistent_server_connection(
server_name
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use resolved_server_name when fetching persistent connection

Continuation of the shadowing fix.

-                server_connection = await self._get_persistent_server_connection(
-                    server_name
-                )
+                server_connection = await self._get_persistent_server_connection(
+                    resolved_server_name
+                )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
server_connection = await self._get_persistent_server_connection(
server_name
)
server_connection = await self._get_persistent_server_connection(
resolved_server_name
)
🤖 Prompt for AI Agents
In src/mcp_agent/mcp/mcp_aggregator.py around lines 1105-1107, the call to
self._get_persistent_server_connection(server_name) is still using the shadowed
server_name; replace that argument with the resolved_server_name variable (i.e.,
call self._get_persistent_server_connection(resolved_server_name)), and ensure
resolved_server_name is in scope at that point so the persistent connection uses
the resolved name.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant