From 5b097dbe81921883b56dd363329b4aeeea6bc0ae Mon Sep 17 00:00:00 2001 From: eliasfauser <151051166+eliasfauser@users.noreply.github.com> Date: Tue, 4 Nov 2025 13:13:50 +0000 Subject: [PATCH 1/3] feat: list and read resources as tools --- langchain_mcp_adapters/resources.py | 179 +++++++++++++++++++++++++++- tests/test_resources.py | 170 ++++++++++++++++++++++++++ 2 files changed, 348 insertions(+), 1 deletion(-) diff --git a/langchain_mcp_adapters/resources.py b/langchain_mcp_adapters/resources.py index 7450a904..cdfa51c5 100644 --- a/langchain_mcp_adapters/resources.py +++ b/langchain_mcp_adapters/resources.py @@ -5,11 +5,15 @@ """ # noqa: E501 import base64 +from typing import Any from langchain_core.documents.base import Blob +from langchain_core.tools import BaseTool, StructuredTool from mcp import ClientSession from mcp.types import BlobResourceContents, ResourceContents, TextResourceContents - +from pydantic import BaseModel, Field +from langchain_mcp_adapters.sessions import Connection +from langchain_core.callbacks import Callbacks def convert_mcp_resource_to_langchain_blob( resource_uri: str, contents: ResourceContents @@ -101,3 +105,176 @@ async def load_mcp_resources( raise RuntimeError(msg) from e return blobs + + +# Pydantic schemas for tool arguments +class ListResourcesInput(BaseModel): + """Input schema for list_resources tool.""" + + cursor: str | None = Field( + default=None, + description="Pagination cursor returned from a previous list_resources call. " + "If provided, returns the next page of results.", + ) + + +class ReadResourceInput(BaseModel): + """Input schema for read_resource tool.""" + + uri: str = Field( + description="The URI of the resource to read. " + "Use list_resources to discover available resource URIs." + ) + + +async def load_mcp_resources_as_tools( + session: ClientSession | None, + *, + connection: Connection | None = None, + callbacks: Callbacks | None = None, + server_name: str | None = None, +) -> list[BaseTool]: + """Load MCP resources as LangChain tools for listing and reading resources. + + This function creates two tools that an agent can use to interact with MCP resources: + - `list_resources`: Lists available resources with pagination support + - `read_resource`: Reads a specific resource by URI and returns its contents + + Args: + session: The MCP client session. If `None`, connection must be provided. + connection: Connection config to create a new session if session is `None`. + callbacks: Optional `Callbacks` for handling notifications and events. + server_name: Name of the server these resources belong to. + + Returns: + A list of two LangChain tools: list_resources and read_resource. + + Raises: + ValueError: If neither session nor connection is provided. + + Example: + ```python + from langchain_mcp_adapters.resources import load_mcp_resources_as_tools + from langchain_mcp_adapters.sessions import create_session + + connection = { + "command": "uvx", + "args": ["mcp-server-fetch"], + "transport": "stdio", + } + + async with create_session(connection) as session: + await session.initialize() + tools = await load_mcp_resources_as_tools(session) + # tools can now be used by an agent + ``` + """ # noqa: E501 + if session is None and connection is None: + msg = "Either a session or a connection config must be provided" + raise ValueError(msg) + + mcp_callbacks = ( + callbacks.to_mcp_format(context=CallbackContext(server_name=server_name)) + if callbacks is not None + else [] + ) + + async def list_resources_fn(cursor: str | None = None) -> dict[str, Any]: + """List available MCP resources with pagination support. + + Args: + cursor: Optional pagination cursor from a previous call. + + Returns: + A dictionary containing: + - resources: List of resource dictionaries with uri, name, description, mimeType + - nextCursor: Pagination cursor for the next page (if available) + """ + if session is None: + if connection is None: + msg = "Either session or connection must be provided" + raise ValueError(msg) + async with create_session( + connection, mcp_callbacks=mcp_callbacks + ) as resource_session: + await resource_session.initialize() + result = await resource_session.list_resources(cursor=cursor) + else: + result = await session.list_resources(cursor=cursor) + + resources = [ + { + "uri": str(r.uri), + "name": r.name, + "description": r.description, + "mimeType": r.mimeType, + } + for r in (result.resources or []) + ] + + return { + "resources": resources, + "nextCursor": result.nextCursor, + } + + async def read_resource_fn(uri: str) -> dict[str, Any]: + """Read a specific MCP resource by URI. + + Args: + uri: The URI of the resource to read. + + Returns: + A dictionary containing: + - uri: The resource URI + - contents: List of content dictionaries with type, data, and mimeType + """ + blobs = await get_mcp_resource( + session, + uri + ) + + contents = [] + for blob in blobs: + content_dict = { + "mimeType": blob.mimetype, + } + # Return text as string, binary as base64 + if isinstance(blob.data, str): + content_dict["type"] = "text" + content_dict["data"] = blob.data + else: + content_dict["type"] = "blob" + content_dict["data"] = base64.b64encode(blob.data).decode() + + contents.append(content_dict) + + return { + "uri": uri, + "contents": contents, + } + + list_tool = StructuredTool( + name="list_resources", + description=( + "List available MCP resources. Resources are data sources that can be read. " + "Returns a list of resources with their URIs, names, descriptions, and MIME types. " + "Supports pagination via the cursor parameter. " + "Use this to discover what resources are available before reading them." + ), + args_schema=ListResourcesInput, + coroutine=list_resources_fn, + ) + + read_tool = StructuredTool( + name="read_resource", + description=( + "Read the contents of a specific MCP resource by its URI. " + "Returns the resource contents which may include text, binary data, or both. " + "Use list_resources first to discover available resource URIs." + ), + args_schema=ReadResourceInput, + coroutine=read_resource_fn, + ) + + return [list_tool, read_tool] + diff --git a/tests/test_resources.py b/tests/test_resources.py index 6c87d295..2aaf49ce 100644 --- a/tests/test_resources.py +++ b/tests/test_resources.py @@ -16,6 +16,7 @@ convert_mcp_resource_to_langchain_blob, get_mcp_resource, load_mcp_resources, + load_mcp_resources_as_tools, ) @@ -273,3 +274,172 @@ async def test_load_mcp_resources_with_blob_content(): assert isinstance(blobs[0], Blob) assert blobs[0].data == original_data assert blobs[0].mimetype == "application/octet-stream" + + +async def test_load_mcp_resources_as_tools(): + """Test that load_mcp_resources_as_tools returns two tools.""" + session = AsyncMock() + + tools = await load_mcp_resources_as_tools(session) + + assert len(tools) == 2 + assert tools[0].name == "list_resources" + assert tools[1].name == "read_resource" + + +async def test_list_resources_tool(): + """Test the list_resources tool functionality.""" + session = AsyncMock() + + session.list_resources = AsyncMock( + return_value=ListResourcesResult( + resources=[ + Resource( + uri="file:///test1.txt", + name="test1.txt", + description="First test file", + mimeType="text/plain", + ), + Resource( + uri="file:///test2.txt", + name="test2.txt", + description="Second test file", + mimeType="text/plain", + ), + ], + nextCursor="cursor123", + ), + ) + + tools = await load_mcp_resources_as_tools(session) + list_tool = tools[0] + + result = await list_tool.ainvoke({}) + + assert "resources" in result + assert "nextCursor" in result + assert len(result["resources"]) == 2 + assert result["resources"][0]["uri"] == "file:///test1.txt" + assert result["resources"][0]["name"] == "test1.txt" + assert result["resources"][0]["description"] == "First test file" + assert result["resources"][0]["mimeType"] == "text/plain" + assert result["nextCursor"] == "cursor123" + + +async def test_list_resources_tool_with_cursor(): + """Test the list_resources tool with pagination cursor.""" + session = AsyncMock() + + session.list_resources = AsyncMock( + return_value=ListResourcesResult( + resources=[ + Resource( + uri="file:///test3.txt", + name="test3.txt", + mimeType="text/plain", + ), + ], + nextCursor=None, + ), + ) + + tools = await load_mcp_resources_as_tools(session) + list_tool = tools[0] + + result = await list_tool.ainvoke({"cursor": "cursor123"}) + + assert len(result["resources"]) == 1 + assert result["resources"][0]["uri"] == "file:///test3.txt" + assert result["nextCursor"] is None + session.list_resources.assert_called_once_with(cursor="cursor123") + + +async def test_read_resource_tool(): + """Test the read_resource tool functionality.""" + session = AsyncMock() + uri = "file:///test.txt" + + session.read_resource = AsyncMock( + return_value=ReadResourceResult( + contents=[ + TextResourceContents(uri=uri, mimeType="text/plain", text="Test content"), + ], + ), + ) + + tools = await load_mcp_resources_as_tools(session) + read_tool = tools[1] + + result = await read_tool.ainvoke({"uri": uri}) + + assert "uri" in result + assert "contents" in result + assert result["uri"] == uri + assert len(result["contents"]) == 1 + assert result["contents"][0]["type"] == "text" + assert result["contents"][0]["data"] == "Test content" + assert result["contents"][0]["mimeType"] == "text/plain" + + +async def test_read_resource_tool_with_blob(): + """Test the read_resource tool with binary content.""" + session = AsyncMock() + uri = "file:///test.bin" + original_data = b"binary data" + base64_blob = base64.b64encode(original_data).decode() + + session.read_resource = AsyncMock( + return_value=ReadResourceResult( + contents=[ + BlobResourceContents( + uri=uri, + mimeType="application/octet-stream", + blob=base64_blob, + ), + ], + ), + ) + + tools = await load_mcp_resources_as_tools(session) + read_tool = tools[1] + + result = await read_tool.ainvoke({"uri": uri}) + + assert result["uri"] == uri + assert len(result["contents"]) == 1 + assert result["contents"][0]["type"] == "blob" + assert result["contents"][0]["data"] == base64_blob + assert result["contents"][0]["mimeType"] == "application/octet-stream" + + +async def test_read_resource_tool_with_mixed_content(): + """Test the read_resource tool with both text and binary content.""" + session = AsyncMock() + uri = "file:///mixed" + original_data = b"binary data" + base64_blob = base64.b64encode(original_data).decode() + + session.read_resource = AsyncMock( + return_value=ReadResourceResult( + contents=[ + TextResourceContents(uri=uri, mimeType="text/plain", text="Text content"), + BlobResourceContents( + uri=uri, + mimeType="application/octet-stream", + blob=base64_blob, + ), + ], + ), + ) + + tools = await load_mcp_resources_as_tools(session) + read_tool = tools[1] + + result = await read_tool.ainvoke({"uri": uri}) + + assert len(result["contents"]) == 2 + assert result["contents"][0]["type"] == "text" + assert result["contents"][0]["data"] == "Text content" + assert result["contents"][1]["type"] == "blob" + assert result["contents"][1]["data"] == base64_blob + From 347a4ad211be9a05ee619ea44068bb7343899d8d Mon Sep 17 00:00:00 2001 From: eliasfauser <151051166+eliasfauser@users.noreply.github.com> Date: Tue, 4 Nov 2025 13:41:22 +0000 Subject: [PATCH 2/3] fix: add missing import --- langchain_mcp_adapters/resources.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/langchain_mcp_adapters/resources.py b/langchain_mcp_adapters/resources.py index cdfa51c5..a7687157 100644 --- a/langchain_mcp_adapters/resources.py +++ b/langchain_mcp_adapters/resources.py @@ -12,7 +12,7 @@ from mcp import ClientSession from mcp.types import BlobResourceContents, ResourceContents, TextResourceContents from pydantic import BaseModel, Field -from langchain_mcp_adapters.sessions import Connection +from langchain_mcp_adapters.sessions import Connection, create_session from langchain_core.callbacks import Callbacks def convert_mcp_resource_to_langchain_blob( @@ -256,7 +256,7 @@ async def read_resource_fn(uri: str) -> dict[str, Any]: list_tool = StructuredTool( name="list_resources", description=( - "List available MCP resources. Resources are data sources that can be read. " + "List available resources. Resources are data sources that can be read. " "Returns a list of resources with their URIs, names, descriptions, and MIME types. " "Supports pagination via the cursor parameter. " "Use this to discover what resources are available before reading them." @@ -268,7 +268,7 @@ async def read_resource_fn(uri: str) -> dict[str, Any]: read_tool = StructuredTool( name="read_resource", description=( - "Read the contents of a specific MCP resource by its URI. " + "Read the contents of a specific resource by its URI. " "Returns the resource contents which may include text, binary data, or both. " "Use list_resources first to discover available resource URIs." ), From 22095d1ec28e74b755470518a11b4eb0698e434f Mon Sep 17 00:00:00 2001 From: eliasfauser <151051166+eliasfauser@users.noreply.github.com> Date: Tue, 4 Nov 2025 14:06:35 +0000 Subject: [PATCH 3/3] fix: allow lazy session loading --- langchain_mcp_adapters/resources.py | 62 +++++++++++++++++++---------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/langchain_mcp_adapters/resources.py b/langchain_mcp_adapters/resources.py index a7687157..dfdf8e93 100644 --- a/langchain_mcp_adapters/resources.py +++ b/langchain_mcp_adapters/resources.py @@ -5,15 +5,19 @@ """ # noqa: E501 import base64 -from typing import Any +from collections.abc import Awaitable, Callable +from functools import wraps +from typing import Any, ParamSpec, TypeVar from langchain_core.documents.base import Blob from langchain_core.tools import BaseTool, StructuredTool from mcp import ClientSession from mcp.types import BlobResourceContents, ResourceContents, TextResourceContents from pydantic import BaseModel, Field + +from langchain_mcp_adapters.callbacks import CallbackContext, Callbacks, _MCPCallbacks from langchain_mcp_adapters.sessions import Connection, create_session -from langchain_core.callbacks import Callbacks +from langchain_mcp_adapters.callbacks import Callbacks, _MCPCallbacks def convert_mcp_resource_to_langchain_blob( resource_uri: str, contents: ResourceContents @@ -176,13 +180,39 @@ async def load_mcp_resources_as_tools( mcp_callbacks = ( callbacks.to_mcp_format(context=CallbackContext(server_name=server_name)) if callbacks is not None - else [] + else _MCPCallbacks() ) - async def list_resources_fn(cursor: str | None = None) -> dict[str, Any]: + def with_session_context( + func: Callable[..., Awaitable[dict[str, Any]]] + ) -> Callable[..., Awaitable[dict[str, Any]]]: + """Decorator with access to closure variables.""" + @wraps(func) + async def wrapper(*args: Any, **kwargs: Any) -> dict[str, Any]: + if session is None: + if connection is None: + msg = "Either session or connection must be provided" + raise ValueError(msg) + + async with create_session( + connection, mcp_callbacks=mcp_callbacks + ) as temp_session: + await temp_session.initialize() + return await func(temp_session, *args, **kwargs) + else: + return await func(session, *args, **kwargs) + + return wrapper + + @with_session_context + async def list_resources_fn( + session: ClientSession, + cursor: str | None = None, + ) -> dict[str, Any]: """List available MCP resources with pagination support. Args: + session: MCP client session. cursor: Optional pagination cursor from a previous call. Returns: @@ -190,17 +220,7 @@ async def list_resources_fn(cursor: str | None = None) -> dict[str, Any]: - resources: List of resource dictionaries with uri, name, description, mimeType - nextCursor: Pagination cursor for the next page (if available) """ - if session is None: - if connection is None: - msg = "Either session or connection must be provided" - raise ValueError(msg) - async with create_session( - connection, mcp_callbacks=mcp_callbacks - ) as resource_session: - await resource_session.initialize() - result = await resource_session.list_resources(cursor=cursor) - else: - result = await session.list_resources(cursor=cursor) + result = await session.list_resources(cursor=cursor) resources = [ { @@ -217,10 +237,15 @@ async def list_resources_fn(cursor: str | None = None) -> dict[str, Any]: "nextCursor": result.nextCursor, } - async def read_resource_fn(uri: str) -> dict[str, Any]: + @with_session_context + async def read_resource_fn( + session: ClientSession, + uri: str, + ) -> dict[str, Any]: """Read a specific MCP resource by URI. Args: + session: MCP client session. uri: The URI of the resource to read. Returns: @@ -228,10 +253,7 @@ async def read_resource_fn(uri: str) -> dict[str, Any]: - uri: The resource URI - contents: List of content dictionaries with type, data, and mimeType """ - blobs = await get_mcp_resource( - session, - uri - ) + blobs = await get_mcp_resource(session, uri) contents = [] for blob in blobs: