Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions python/mcp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Python MCP Function (HTTP)

Welcome to your new Python Function! This Function is an MCP server that
handles any HTTP requests with MCP requests at `/mcp`. The MCP is implemented
using [Python-SDK](https://github.com/modelcontextprotocol/python-sdk) library.

## Function

The main code lives in `function/func.py`.
The Function itself is ASGI compatible, implementing `handle(scope,receive,send)`
which is the entry for your function and all requests are handled here.

You can also use `start` and `stop` methods which are implemented and you can
see them in the bottom of the `function/func.py` file.

## Project Structure

```
├── function/
│ ├── __init__.py
│ └── func.py # Main function code
├── client/
│ └── client.py # Example MCP client
├── tests/
│ └── test_func.py # simple HTTP test
└── pyproject.toml # Project configuration
```

## MCP Server

The MCP server is implemented via a `MCPServer` class.
- Uses `FastMCP()` function from the Python SDK lib mentioned above
- Uses `streamable_http_app` transport which is a lower level function in the
Python-SDK library that is plugged in directly.
- Integrates with the Functions middleware without running its own server
- Routes all incoming MCP requests to the MCP handler

## MCP Client

Since we are using the Python-SDK library, in order to communicate easily
with the server we can use the Python-SDK Clients. You can refer to their
[client docs](https://github.com/modelcontextprotocol/python-sdk?tab=readme-ov-file#writing-mcp-clients)
for more information.

You can find an example implementation of a client in your function's `client/` directory.
Refer to [Testing section](#testing) for how to run clients.

## Deployment

Before running your Function, it is recommended to create a virtual environment
to isolate the project for easier dependency management.

Subsequently install necessary dependencies and you're all set up.

```bash
# Create the virtual env
python3 -m venv venv

# Optional: Activate the venv
source venv/bin/activate

# Install dependencies
pip install -e .
```

## Testing

Tests can be found in `tests/` directory with a simple test for HTTP requests
in `test_func.py`. To run tests:

```bash
# Install dependencies (if not already installed)
pip install -e .

# Run tests
pytest

# Run tests with verbose output
pytest -v
```

For testing the MCP functionality, you can use the included client at `client/client.py`:

```bash
# run your mcp server locally
func run --builder=host --container=false

# in different terminal: run mcp client
python client/client.py
```

## Contact and Docs

Please share your functions or ask us questions in our CNCF Slack
[Functions channel](https://cloud-native.slack.com/archives/C04LKEZUXEE)!

For more info about the Python Functions implementation itself, please visit
[python template docs](https://github.com/knative/func/blob/main/docs/function-templates/python.md)
and
[python default http template](https://github.com/knative/func/tree/main/templates/python/http)

For even more, see [the complete documentation](https://github.com/knative/func/tree/main/docs)
31 changes: 31 additions & 0 deletions python/mcp/client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def main():
# check your running Function MCP Server, it will output where its available
# at during initialization.
async with streamablehttp_client("http://localhost:8080/mcp") as streams:
read_stream,write_stream = streams[0],streams[1]

async with ClientSession(read_stream,write_stream) as s:
print("Initializing connection...",end="")
await s.initialize()
print("done!\n")

# List all available tools
#tools = await s.list_tools()
#print("--- List of tools ---")
#print(tools.tools)

# Call hello tool which will greet Thomas
hello_tool = await s.call_tool(
name="hello_tool",
arguments={"name": "Thomas"}
)

# Print the actual content of the result
print(hello_tool.content[0].text)

if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions python/mcp/function/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .func import new
144 changes: 144 additions & 0 deletions python/mcp/function/func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# function/func.py

# Function as an MCP Server implementation
import logging

from mcp.server.fastmcp import FastMCP
import asyncio

def new():
""" New is the only method that must be implemented by a Function.
The instance returned can be of any name.
"""
return Function()

class MCPServer:
"""MCP server that exposes tools, resources, and prompts via the MCP protocol."""

def __init__(self):
# Create FastMCP instance with stateless HTTP for Kubernetes deployment
self.mcp = FastMCP("Function MCP Server", stateless_http=True)

self._register_tools()
#self._register_resources()
#self._register_prompts()

# Get the ASGI app from FastMCP
self._app = self.mcp.streamable_http_app()

def _register_tools(self):
"""Register MCP tools."""
@self.mcp.tool()
def hello_tool(name: str) -> str:
"""Say hello to someone."""
return f"Hey there {name}!"

@self.mcp.tool()
def add_numbers(a: int, b: int) -> int:
"""Add two numbers together."""
return a + b

## Other MCP objects include resources and prompts.
## Add them here to be registered in the same fashion as tools above.
# def _register_resources(self):
# """Register MCP resources."""
# @self.mcp.resource("echo://{message}")
# def echo_resource(message: str) -> str:
# """Echo the message as a resource."""
# return f"Echo: {message}"
#
# def _register_prompts(self):
# """Register MCP prompts."""
# @self.mcp.prompt()
# def greeting_prompt(name: str = "Big Dave"):
# """Generate a greeting prompt."""
# return [
# {
# "role": "user",
# "content": f"Please write a friendly greeting for {name}"
# }
# ]

async def handle(self, scope, receive, send):
"""Handle ASGI requests - both lifespan and HTTP."""
await self._app(scope, receive, send)

class Function:
def __init__(self):
""" The init method is an optional method where initialization can be
performed. See the start method for a startup hook which includes
configuration.
"""
self.mcp_server = MCPServer()
self._mcp_initialized = False

async def handle(self, scope, receive, send):
"""
Main entry to your Function.
This handles all the incoming requests.
"""

# Initialize MCP server on first request
if not self._mcp_initialized:
await self._initialize_mcp()

# Route MCP requests
if scope['path'].startswith('/mcp'):
await self.mcp_server.handle(scope, receive, send)
return

# Default response for non-MCP requests
await self._send_default_response(send)

async def _initialize_mcp(self):
"""Initialize the MCP server by sending lifespan startup event."""
lifespan_scope = {'type': 'lifespan', 'asgi': {'version': '3.0'}}
startup_sent = False

async def lifespan_receive():
nonlocal startup_sent
if not startup_sent:
startup_sent = True
return {'type': 'lifespan.startup'}
await asyncio.Event().wait() # Wait forever for shutdown

async def lifespan_send(message):
if message['type'] == 'lifespan.startup.complete':
self._mcp_initialized = True
elif message['type'] == 'lifespan.startup.failed':
logging.error(f"MCP startup failed: {message}")

# Start lifespan in background
asyncio.create_task(self.mcp_server.handle(
lifespan_scope, lifespan_receive, lifespan_send
))

# Brief wait for startup completion
await asyncio.sleep(0.1)

async def _send_default_response(self, send):
"""
Send default OK response.
This is for your non MCP requests if desired.
"""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'OK',
})

def start(self, cfg):
logging.info("Function starting")

def stop(self):
logging.info("Function stopping")

def alive(self):
return True, "Alive"

def ready(self):
return True, "Ready"
25 changes: 25 additions & 0 deletions python/mcp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[project]
name = "function"
description = ""
version = "0.1.0"
requires-python = ">=3.9"
readme = "README.md"
license = "MIT"
dependencies = [
"httpx",
"pytest",
"pytest-asyncio",
"mcp"
]
authors = [
{ name="Your Name", email="you@example.com"},
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.pytest.ini_options]
asyncio_mode = "strict"
asyncio_default_fixture_loop_scope = "function"

38 changes: 38 additions & 0 deletions python/mcp/tests/test_func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""
An example set of unit tests which confirm that the main handler (the
callable function) returns 200 OK for a simple HTTP GET.
"""
import pytest
from function import new


@pytest.mark.asyncio
async def test_function_handle():
f = new() # Instantiate Function to Test

sent_ok = False
sent_headers = False
sent_body = False

# Mock Send
async def send(message):
nonlocal sent_ok
nonlocal sent_headers
nonlocal sent_body

if message.get('status') == 200:
sent_ok = True

if message.get('type') == 'http.response.start':
sent_headers = True

if message.get('type') == 'http.response.body':
sent_body = True

# Invoke the Function
await f.handle({}, {}, send)

# Assert send was called
assert sent_ok, "Function did not send a 200 OK"
assert sent_headers, "Function did not send headers"
assert sent_body, "Function did not send a body"
Loading