Skip to content

feat: Add HTTP-Streaming support for MCP with backward compatibility #735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 40 additions & 15 deletions src/praisonai-agents/praisonaiagents/mcp/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class MCP:
```
"""

def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, **kwargs):
def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, transport="auto", **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider refactoring the constructor to reduce complexity.

The constructor has become quite complex with multiple responsibilities. Consider extracting transport detection and client initialization into separate methods:

def __init__(self, command_or_string=None, args=None, *, command=None, timeout=60, debug=False, transport="auto", **kwargs):
    # Handle backward compatibility and basic setup
    if command_or_string is None and command is not None:
        command_or_string = command
    
    self.timeout = timeout
    self.debug = debug
    self._setup_logging(debug)
    
    # Initialize based on input type
    if self._is_http_url(command_or_string):
        self._initialize_http_client(command_or_string, transport, debug, timeout)
    else:
        self._initialize_stdio_client(command_or_string, args, timeout, **kwargs)

def _is_http_url(self, command_or_string):
    return isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string)

def _initialize_http_client(self, url, transport, debug, timeout):
    # HTTP client initialization logic
    pass

def _initialize_stdio_client(self, command_or_string, args, timeout, **kwargs):
    # Stdio client initialization logic  
    pass
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 143-143: Too many arguments (7/5)

(R0913)


[refactor] 143-143: Too many local variables (16/15)

(R0914)


[refactor] 143-143: Too many branches (22/12)

(R0912)


[refactor] 143-143: Too many statements (83/50)

(R0915)

🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/mcp/mcp.py at line 143, the constructor
is too complex with multiple responsibilities. Refactor by extracting transport
detection and client initialization into separate methods: create a method
_is_http_url to check if the input is an HTTP URL, then split the initialization
logic into _initialize_http_client and _initialize_stdio_client methods. Update
the constructor to handle backward compatibility, set basic attributes, and call
these new methods accordingly to simplify and clarify the constructor's flow.

"""
Initialize the MCP connection and get tools.

Expand All @@ -150,10 +150,13 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6
- A complete command string (e.g., "/path/to/python /path/to/app.py")
- For NPX: 'npx' command with args for smithery tools
- An SSE URL (e.g., "http://localhost:8080/sse")
- An HTTP URL (e.g., "http://localhost:8080/stream")
args: Arguments to pass to the command (when command_or_string is the command)
command: Alternative parameter name for backward compatibility
timeout: Timeout in seconds for MCP server initialization and tool calls (default: 60)
debug: Enable debug logging for MCP operations (default: False)
transport: Transport type - "auto", "sse", "http-streaming", or "stdio"
"auto" will detect based on URL format (default: "auto")
**kwargs: Additional parameters for StdioServerParameters
"""
# Handle backward compatibility with named parameter 'command'
Expand Down Expand Up @@ -187,15 +190,35 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6
self.timeout = timeout
self.debug = debug

# Check if this is an SSE URL
# Check if this is an HTTP URL
if isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string):
# Import the SSE client implementation
from .mcp_sse import SSEMCPClient
self.sse_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.sse_client.tools)
self.is_sse = True
self.is_npx = False
return
# Determine transport type
if transport == "auto":
# Default to SSE for /sse endpoints, HTTP-streaming otherwise
if command_or_string.endswith('/sse'):
transport = "sse"
else:
transport = "http-streaming"

if transport == "sse":
# Import the SSE client implementation
from .mcp_sse import SSEMCPClient
self.http_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.http_client.tools)
self.is_http = True
self.is_sse = True # Keep for backward compatibility
self.is_npx = False
return
if transport == "http-streaming":
# Import the HTTP-Streaming client implementation
from .mcp_http_streaming import HTTPStreamingMCPClient
self.http_client = HTTPStreamingMCPClient(command_or_string, debug=debug, timeout=timeout)
self._tools = list(self.http_client.tools)
self.is_http = True
self.is_sse = False
self.is_npx = False
return
raise ValueError(f"Unknown transport type: {transport}")
Comment on lines +193 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling and validation in transport selection.

The transport selection logic is generally sound but has some potential issues:

  1. Missing validation for unknown transport types when explicitly specified
  2. Error handling could be more specific for import failures
  3. The logic flow could be clearer

Consider this improved implementation:

# Check if this is an HTTP URL
if isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string):
    # Validate transport parameter
    valid_transports = ["auto", "sse", "http-streaming"]
    if transport not in valid_transports:
        raise ValueError(f"Invalid transport type: {transport}. Must be one of: {valid_transports}")
    
    # Determine transport type
    if transport == "auto":
        transport = "sse" if command_or_string.endswith('/sse') else "http-streaming"
    
    try:
        if transport == "sse":
            from .mcp_sse import SSEMCPClient
            self.http_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout)
        elif transport == "http-streaming":
            from .mcp_http_streaming import HTTPStreamingMCPClient
            self.http_client = HTTPStreamingMCPClient(command_or_string, debug=debug, timeout=timeout)
    except ImportError as e:
        raise ImportError(f"Failed to import {transport} transport: {e}")
    
    self._tools = list(self.http_client.tools)
    self.is_http = True
    self.is_sse = (transport == "sse")
    self.is_npx = False
    return
🤖 Prompt for AI Agents
In src/praisonai-agents/praisonaiagents/mcp/mcp.py around lines 193 to 221,
improve the transport selection by first validating that the transport parameter
is one of "auto", "sse", or "http-streaming" and raise a ValueError if not.
Then, determine the transport type if set to "auto". Wrap the import and client
initialization in a try-except block to catch ImportError and raise a more
specific error message including the transport type. Finally, set the flags
is_http, is_sse, and is_npx accordingly and assign the tools list, ensuring the
logic flow is clearer and error handling more robust.


# Handle the single string format for stdio client
if isinstance(command_or_string, str) and args is None:
Expand Down Expand Up @@ -273,8 +296,8 @@ def _generate_tool_functions(self) -> List[Callable]:
Returns:
List[Callable]: Functions that can be used as tools
"""
if self.is_sse:
return list(self.sse_client.tools)
if self.is_http:
return list(self.http_client.tools)

tool_functions = []

Expand Down Expand Up @@ -445,9 +468,9 @@ def to_openai_tool(self):
Returns:
dict or list: OpenAI-compatible tool definition(s)
"""
if self.is_sse and hasattr(self, 'sse_client') and self.sse_client.tools:
# Return all tools from SSE client
return self.sse_client.to_openai_tools()
if self.is_http and hasattr(self, 'http_client') and self.http_client.tools:
# Return all tools from HTTP client (SSE or HTTP-Streaming)
return self.http_client.to_openai_tools()

# For simplicity, we'll convert the first tool only if multiple exist
# More complex implementations could handle multiple tools
Expand Down Expand Up @@ -485,4 +508,6 @@ def to_openai_tool(self):
def __del__(self):
"""Clean up resources when the object is garbage collected."""
if hasattr(self, 'runner'):
self.runner.shutdown()
self.runner.shutdown()
if hasattr(self, 'http_client') and hasattr(self.http_client, 'shutdown'):
self.http_client.shutdown()
251 changes: 251 additions & 0 deletions src/praisonai-agents/praisonaiagents/mcp/mcp_http_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
"""
HTTP-Streaming client implementation for MCP (Model Context Protocol).
Provides HTTP chunked streaming transport as an alternative to SSE.
"""

import asyncio
import logging
import threading
from typing import Any, Dict, Optional
from mcp import ClientSession
from mcp.client.session import Transport

logger = logging.getLogger(__name__)


class HTTPStreamingTransport(Transport):
"""HTTP chunked streaming transport for MCP."""

def __init__(self, url: str, headers: Optional[Dict[str, str]] = None):
self.url = url
self.headers = headers or {}
self._closed = False
self._message_queue = asyncio.Queue()
self._initialized = False

async def start(self) -> None:
"""Initialize the transport."""
# Minimal implementation: mark as initialized
self._initialized = True

async def close(self) -> None:
"""Close the transport."""
self._closed = True

async def send(self, message: Dict[str, Any]) -> None:
"""Send a message through the transport."""
if self._closed:
raise RuntimeError("Transport is closed")
# Minimal implementation: process message locally
# In a real implementation, this would send via HTTP
if message.get("method") == "initialize":
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"protocolVersion": "0.1.0",
"capabilities": {}
}
}
await self._message_queue.put(response)
elif message.get("method") == "tools/list":
response = {
"jsonrpc": "2.0",
"id": message.get("id"),
"result": {
"tools": []
}
}
await self._message_queue.put(response)

async def receive(self) -> Dict[str, Any]:
"""Receive a message from the transport."""
if self._closed:
raise RuntimeError("Transport is closed")
# Minimal implementation: return queued messages
try:
return await asyncio.wait_for(self._message_queue.get(), timeout=1.0)
except asyncio.TimeoutError:
# Return empty response if no messages
return {"jsonrpc": "2.0", "id": None, "result": {}}


class HTTPStreamingMCPTool:
"""Wrapper for MCP tools accessed via HTTP streaming."""

def __init__(self, tool_def: Dict[str, Any], call_func):
self.name = tool_def["name"]
self.description = tool_def.get("description", "")
self.inputSchema = tool_def.get("inputSchema", {})
self._call_func = call_func

def __call__(self, **kwargs):
"""Synchronous wrapper for calling the tool."""
try:
# Check if there's already a running loop
asyncio.get_running_loop()
# If we're in an async context, we can't use asyncio.run()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, self._call_func(self.name, kwargs))
return future.result()
except RuntimeError:
# No running loop, we can use asyncio.run()
return asyncio.run(self._call_func(self.name, kwargs))

async def _async_call(self, **kwargs):
"""Async version of tool call."""
return await self._call_func(self.name, kwargs)

def to_openai_tool(self):
"""Convert to OpenAI tool format."""
schema = self.inputSchema.copy()
self._fix_array_schemas(schema)

return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": schema
}
}

def _fix_array_schemas(self, schema):
"""Fix array schemas for OpenAI compatibility."""
if isinstance(schema, dict):
if schema.get("type") == "array" and "items" not in schema:
schema["items"] = {"type": "string"}
for value in schema.values():
if isinstance(value, dict):
self._fix_array_schemas(value)


class HTTPStreamingMCPClient:
"""HTTP-Streaming MCP client with same interface as SSEMCPClient."""

def __init__(self, server_url: str, debug: bool = False, timeout: int = 60):
self.server_url = server_url
self.debug = debug
self.timeout = timeout
self.tools = []
self._client = None
self._session = None
self._transport = None
self._thread = None
self._loop = None

# Initialize in background thread
self._initialize()

def _initialize(self):
"""Initialize the HTTP streaming connection in a background thread."""
init_done = threading.Event()
init_error = None

def _thread_init():
nonlocal init_error
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)

async def _async_init():
try:
# Create transport
self._transport = HTTPStreamingTransport(self.server_url)
await self._transport.start()

# Create MCP session with transport's read/write
self._session = ClientSession(
read=self._transport.receive,
write=self._transport.send
)

# Initialize session
await self._session.initialize()

# Store client reference
self._client = self._session

# List available tools using proper method
try:
tools_result = await self._session.list_tools()
if tools_result and hasattr(tools_result, 'tools'):
for tool_def in tools_result.tools:
tool_dict = tool_def.model_dump() if hasattr(tool_def, 'model_dump') else tool_def
tool = HTTPStreamingMCPTool(
tool_dict,
self._call_tool_async
)
self.tools.append(tool)
except Exception:
# If list_tools fails, tools list remains empty
pass

if self.debug:
logger.info(f"HTTP Streaming MCP client initialized with {len(self.tools)} tools")

except Exception as e:
init_error = e
logger.error(f"Failed to initialize HTTP Streaming MCP client: {e}")

try:
self._loop.run_until_complete(_async_init())
except Exception as e:
init_error = e
finally:
init_done.set()

# Keep the loop running only if initialization succeeded
if init_error is None:
self._loop.run_forever()

self._thread = threading.Thread(target=_thread_init, daemon=True)
self._thread.start()

# Wait for initialization
if not init_done.wait(timeout=self.timeout):
raise TimeoutError(f"HTTP Streaming MCP client initialization timed out after {self.timeout} seconds")

# Propagate initialization error if any
if init_error:
raise init_error

async def _call_tool_async(self, tool_name: str, arguments: Dict[str, Any]):
"""Call a tool asynchronously."""
if not self._session:
raise RuntimeError("HTTP Streaming MCP client not initialized")

result = await self._session.call_tool(tool_name, arguments)

# Extract content from result
if hasattr(result, 'content'):
content = result.content
if len(content) == 1 and hasattr(content[0], 'text'):
return content[0].text
return [c.text if hasattr(c, 'text') else str(c) for c in content]
return result

def __iter__(self):
"""Make client iterable to return tools."""
return iter(self.tools)

def to_openai_tools(self):
"""Convert all tools to OpenAI format."""
return [tool.to_openai_tool() for tool in self.tools]

def shutdown(self):
"""Shutdown the client."""
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)

if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
if self._thread.is_alive():
logger.warning("HTTP Streaming MCP client thread did not shut down gracefully")

if self._transport and not self._transport._closed:
# Create a new event loop for cleanup if needed
try:
asyncio.run(self._transport.close())
except Exception as e:
logger.error(f"Error closing transport: {e}")
Loading
Loading