Skip to content

feat: Add HTTP-Streaming support for MCP with backward compatibility #746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions docs/MCP_HTTP_STREAMING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# MCP HTTP-Streaming Support

This document describes the HTTP-Streaming transport implementation for MCP (Model Context Protocol) in PraisonAI.

## Overview

HTTP-Streaming provides bidirectional streaming communication over HTTP using chunked transfer encoding. This transport method offers advantages over SSE (Server-Sent Events) including:

- **Bidirectional streaming** - Both client and server can stream data
- **Binary support** - Can transmit binary data, not just text
- **Lower overhead** - More efficient than SSE's text-based protocol
- **Better performance** - Ideal for high-throughput scenarios

## Usage

### Auto-Detection (Default)

The MCP client automatically detects the appropriate transport based on URL patterns:

```python
# SSE transport (URLs ending with /sse)
agent = Agent(
tools=MCP("http://localhost:8080/sse") # Uses SSE
)

# HTTP-Streaming transport (other HTTP URLs)
agent = Agent(
tools=MCP("http://localhost:8080/api") # Uses HTTP-Streaming
)
```

### Explicit Transport Selection

You can explicitly specify the transport type:

```python
# Force SSE transport
agent = Agent(
tools=MCP("http://localhost:8080/api", transport="sse")
)

# Force HTTP-Streaming transport
agent = Agent(
tools=MCP("http://localhost:8080/sse", transport="http-streaming")
)
```

### TypeScript Usage

The TypeScript implementation follows the same pattern:

```typescript
import { MCP } from '@praisonai/agents/tools';

// Auto-detection
const mcpAuto = new MCP("http://localhost:8080/api");
await mcpAuto.initialize();

// Explicit transport
const mcpExplicit = new MCP("http://localhost:8080/api", {
transport: "http-streaming",
debug: true,
headers: {
"Authorization": "Bearer token"
}
});
await mcpExplicit.initialize();
```

## Transport Detection Rules

The following URL patterns automatically use SSE transport:
- `/sse` (exact ending)
- `/sse/` (with trailing slash)
- `/events` (exact ending)
- `/stream` (exact ending)
- `/server-sent-events`
- URLs containing `transport=sse` query parameter

All other HTTP/HTTPS URLs default to HTTP-Streaming transport.

## Implementation Details

### Message Format

HTTP-Streaming uses NDJSON (Newline Delimited JSON) format:
- Each message is a complete JSON object
- Messages are separated by newline characters (`\n`)
- Supports efficient streaming parsing

### Python Architecture

```
MCP (main class)
├── _detect_transport() - Auto-detection logic
├── HTTPStreamingMCPClient - HTTP-Streaming implementation
│ ├── HTTPStreamingTransport - Low-level transport
│ ├── HTTPReadStream - Read adapter
│ └── HTTPWriteStream - Write adapter
└── SSEMCPClient - SSE implementation (existing)
```

### TypeScript Architecture

```
MCP (unified class)
├── detectTransport() - Auto-detection logic
├── MCPHttpStreaming - HTTP-Streaming implementation
│ └── HTTPStreamingTransport - Transport layer
│ ├── HTTPStreamingTransport - Modern browsers
│ └── HTTPStreamingTransportFallback - Legacy browsers
└── MCPSse - SSE implementation (existing)
```

## Server Implementation

### Python Server Example

```python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json
import asyncio

app = FastAPI()

@app.post("/mcp/v1/stream")
async def mcp_stream(request: Request):
async def generate():
async for chunk in request.stream():
# Process incoming messages
message = json.loads(chunk)

# Generate response
response = process_mcp_message(message)
yield json.dumps(response).encode() + b'\n'

return StreamingResponse(
generate(),
media_type="application/x-ndjson"
)
```

### Node.js Server Example

```javascript
const express = require('express');
const app = express();

app.post('/mcp/v1/stream', (req, res) => {
res.writeHead(200, {
'Content-Type': 'application/x-ndjson',
'Transfer-Encoding': 'chunked'
});

req.on('data', (chunk) => {
const message = JSON.parse(chunk);
const response = processMCPMessage(message);
res.write(JSON.stringify(response) + '\n');
});

req.on('end', () => {
res.end();
});
});
```

## Configuration Options

### Python Options

```python
MCP(
url,
transport="http-streaming", # Explicit transport
timeout=60, # Request timeout in seconds
debug=True, # Enable debug logging
headers={ # Custom headers
"Authorization": "Bearer token"
}
)
```

### TypeScript Options

```typescript
new MCP(url, {
transport: "http-streaming", // Explicit transport
timeout: 60000, // Timeout in milliseconds
debug: true, // Enable debug logging
headers: { // Custom headers
"Authorization": "Bearer token"
},
fallbackMode: false // Force fallback for testing
})
```

## Backward Compatibility

The implementation maintains 100% backward compatibility:

1. **Existing SSE URLs** continue to use SSE transport
2. **Stdio commands** work unchanged
3. **NPX commands** work unchanged
4. **All existing code** continues to function without modification

## Migration Guide

No migration is required! Existing code continues to work. To use HTTP-Streaming:

1. **Option 1**: Use URLs that don't match SSE patterns (recommended)
2. **Option 2**: Add `transport="http-streaming"` parameter

## Troubleshooting

### Debug Mode

Enable debug logging to see transport selection:

```python
MCP(url, debug=True)
```

### Common Issues

1. **Connection Refused**: Ensure the server supports HTTP-Streaming at the endpoint
2. **Transport Errors**: Check if the server implements the correct protocol
3. **Browser Compatibility**: TypeScript fallback mode handles older browsers

## Performance Considerations

HTTP-Streaming is recommended for:
- High-frequency message exchange
- Large message payloads
- Binary data transmission
- Bidirectional communication needs

SSE remains suitable for:
- Simple server-to-client streaming
- Text-only data
- Browser compatibility requirements
- Existing SSE infrastructure

## Future Enhancements

Potential future improvements:
- WebSocket transport option
- gRPC streaming support
- Connection pooling
- Automatic reconnection for HTTP-Streaming
- Compression support
109 changes: 109 additions & 0 deletions src/praisonai-agents/examples/mcp_http_streaming_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""
Example demonstrating MCP with HTTP-Streaming transport.

This example shows:
1. Auto-detection of transport based on URL
2. Explicit transport selection
3. Backward compatibility with existing code
"""

from praisonaiagents import Agent
from praisonaiagents.mcp import MCP

# Example 1: Auto-detection - SSE endpoint (backward compatible)
print("Example 1: Auto-detection with SSE endpoint")
try:
agent_sse_auto = Agent(
instructions="You are a helpful assistant that can use MCP tools.",
llm="gpt-4o-mini",
tools=MCP("http://localhost:8080/sse") # Auto-detects SSE transport
)
print("✓ SSE transport detected automatically")
except Exception as e:
print(f"Note: {e}")

# Example 2: Auto-detection - HTTP endpoint
print("\nExample 2: Auto-detection with HTTP endpoint")
try:
agent_http_auto = Agent(
instructions="You are a helpful assistant that can use MCP tools.",
llm="gpt-4o-mini",
tools=MCP("http://localhost:8080/api") # Auto-detects HTTP-streaming transport
)
print("✓ HTTP-streaming transport detected automatically")
except Exception as e:
print(f"Note: {e}")

# Example 3: Explicit SSE transport
print("\nExample 3: Explicit SSE transport selection")
try:
agent_sse_explicit = Agent(
instructions="You are a helpful assistant that can use MCP tools.",
llm="gpt-4o-mini",
tools=MCP("http://localhost:8080/api", transport="sse") # Force SSE
)
print("✓ SSE transport explicitly selected")
except Exception as e:
print(f"Note: {e}")

# Example 4: Explicit HTTP-streaming transport
print("\nExample 4: Explicit HTTP-streaming transport selection")
try:
agent_http_explicit = Agent(
instructions="You are a helpful assistant that can use MCP tools.",
llm="gpt-4o-mini",
tools=MCP("http://localhost:8080/sse", transport="http-streaming") # Force HTTP-streaming
)
print("✓ HTTP-streaming transport explicitly selected")
except Exception as e:
print(f"Note: {e}")

# Example 5: HTTP-streaming with custom headers
print("\nExample 5: HTTP-streaming with custom headers")
try:
agent_http_headers = Agent(
instructions="You are a helpful assistant that can use MCP tools.",
llm="gpt-4o-mini",
tools=MCP(
"http://localhost:8080/api",
transport="http-streaming",
headers={"Authorization": "Bearer your-token-here"}
)
)
print("✓ HTTP-streaming with custom headers configured")
except Exception as e:
print(f"Note: {e}")

# Example 6: Existing stdio usage - completely unchanged
print("\nExample 6: Existing stdio usage (backward compatible)")
try:
agent_stdio = Agent(
instructions="You are a helpful assistant that can use MCP tools.",
llm="gpt-4o-mini",
tools=MCP(
command="/path/to/python",
args=["/path/to/mcp_server.py"]
)
)
print("✓ Stdio transport works as before")
except Exception as e:
print(f"Note: {e}")

# Example 7: NPX usage - completely unchanged
print("\nExample 7: NPX usage (backward compatible)")
try:
agent_npx = Agent(
instructions="You are a helpful assistant that can use MCP tools.",
llm="gpt-4o-mini",
tools=MCP("npx @modelcontextprotocol/server-brave-search")
)
print("✓ NPX transport works as before")
except Exception as e:
print(f"Note: {e}")

print("\n" + "="*50)
print("Summary: HTTP-Streaming support added with full backward compatibility!")
print("- Auto-detection: URLs ending with /sse use SSE, others use HTTP-streaming")
print("- Explicit control: Use transport='sse' or transport='http-streaming'")
print("- All existing code continues to work without modification")
print("="*50)
Loading
Loading