-
Notifications
You must be signed in to change notification settings - Fork 756
oauth for tools #537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
oauth for tools #537
Changes from all commits
2b15db8
c9a6553
59b1251
f5ccb74
efb63e1
74669aa
6cd9b40
11e9d6a
1ddb460
9500f8a
5240bad
fc4a27c
380b3bd
e56866d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
# MCP Agent OAuth Support | ||
|
||
## Goals | ||
- Protect MCP Agent Cloud servers using OAuth 2.1 so MCP clients obtain tokens via standard flows. | ||
- Enable MCP Agent runtimes to authenticate to downstream MCP servers that require OAuth access tokens. | ||
- Provide pluggable token storage for both local development (in-memory) and multi-instance deployments (Redis planned). | ||
- Maintain compatibility with MCP Authorization spec (RFC 8414, RFC 9728, OAuth 2.1 + PKCE, Resource Indicators) and the proposed delegated authorization SEP. | ||
|
||
## Architecture Overview | ||
|
||
### Components | ||
1. **Auth Server Integration** – Configure the FastMCP instance with `AuthSettings` and a custom `TokenVerifier` that calls MCP Agent Cloud auth services. | ||
2. **Protected Resource Metadata** – Serve `/.well-known/oauth-protected-resource` using FastMCP hooks so clients can discover the auth server. | ||
3. **Access Token Validation** – Enforce bearer tokens on every inbound MCP request via `RequireAuthMiddleware`, populating the request context with the authenticated user. | ||
4. **OAuth Token Service** – New `mcp_agent.oauth` package with: | ||
- `TokenStore`/`TokenRecord` abstractions | ||
- `InMemoryTokenStore` and Redis-backed implementation (second pass) | ||
- `TokenManager` orchestration (acquire, refresh, revoke) | ||
- `OAuthHttpxAuth` for attaching tokens to downstream HTTP transports | ||
- `AuthorizationFlowCoordinator` that interacts with the user via MCP `auth/request` | ||
5. **Delegated Authorization UI Flow** – Extend the gateway/session relay so servers can send `auth/request` messages to MCP clients, capturing authorization codes via either: | ||
- Client-returned callback URL (preferred, works with SEP-capable clients) | ||
- MCP Agent hosted callback endpoint (`/internal/oauth/callback/{flow_id}`) as a fallback / native-app style loopback. | ||
6. **Configuration Surface** – Extend `Settings` and per-server `MCPServerAuthSettings` to describe OAuth behaviour (scopes, preferred auth server, redirect URIs, etc.) and global token-store configuration. | ||
|
||
### Key Data Flow | ||
1. **Inbound Requests** | ||
- Client presents bearer token ⇒ `BearerAuthBackend` + `MCPAgentTokenVerifier` introspect token. | ||
- Verified token populates context with `OAuthUserIdentity` (provider + subject + email). | ||
- Context is propagated into workflows/sessions so downstream OAuth flows know the acting user. | ||
|
||
2. **Outbound HTTP (downstream MCP server)** | ||
- `ServerRegistry` detects `auth.oauth` configuration. | ||
- Wraps HTTP transport with `OAuthHttpxAuth` which requests an access token from `TokenManager`. | ||
- `TokenManager` checks store; if missing/expired ⇒ `AuthorizationFlowCoordinator` performs RFC 9728 discovery, PKCE, delegated browser flow through MCP client, exchanges code for tokens, caches result. | ||
- Requests automatically retry after token refresh when a response returns 401/invalid token. | ||
|
||
3. **Token Storage** | ||
- Tokens stored per `(user_identity, resource, authorization_server)` tuple with metadata (scopes, expiry, refresh token, provider claims). | ||
- Store implements optimistic locking to avoid concurrent refresh storms. | ||
- Pluggable backend (`InMemoryTokenStore` initial, Redis follow-up). | ||
|
||
## Module Plan | ||
|
||
``` | ||
src/mcp_agent/oauth/ | ||
__init__.py | ||
identity.py # OAuthUserIdentity, helpers to extract from auth context | ||
records.py # TokenRecord dataclass/pydantic model | ||
store/base.py # TokenStore protocol | ||
store/in_memory.py # Default store | ||
manager.py # TokenManager (get/refresh/invalidate) | ||
flow.py # AuthorizationFlowCoordinator | ||
http/auth.py # OAuthHttpxAuth (httpx.Auth implementation) | ||
metadata.py # RFC 8414 + RFC 9728 discovery helpers | ||
pkce.py # PKCE + state utilities | ||
errors.py # Custom exception hierarchy | ||
``` | ||
|
||
Integration touchpoints: | ||
- `mcp_agent/config.py` – add OAuth settings models. | ||
- `mcp_agent/core/context.py` – add `current_user`, `token_manager`, `token_store`, `oauth_config` fields. | ||
- `mcp_agent/app.py` – initialize token store/manager based on settings. | ||
- `mcp_agent/server/app_server.py` – configure FastMCP auth settings, register callback route, surface user identity, extend relay to handle `auth/request`. | ||
- `mcp_agent/mcp/mcp_server_registry.py` & `mcp_agent/mcp/mcp_connection_manager.py` – wire `OAuthHttpxAuth` into HTTP transports and expose helper for manual token teardown. | ||
- `mcp_agent/mcp/client_proxy.py` – add proxy helpers for `auth/request`. | ||
- `SessionProxy` – add direct request helper for `auth/request` and ensure Temporal flow support. | ||
- `examples/mcp_agent_server/*` – demonstrate configuration changes. | ||
- Tests – new suite exercising token store, metadata discovery, flow orchestration (with mocked HTTP + client responses). | ||
|
||
## OAuth Flow Details | ||
1. **Discovery** | ||
- If downstream server responds 401 with `WWW-Authenticate`, parse for `resource_metadata` ⇒ GET metadata ⇒ determine auth server URL(s). | ||
- Fetch authorization server metadata (RFC 8414). | ||
- Perform optional dynamic client registration when configured and supported. | ||
|
||
2. **Authorization Request** | ||
- Generate PKCE challenge/verifier, secure `state`, choose `redirect_uri`. | ||
- Build authorization URL including `resource` parameter (RFC 8707) + requested scopes. | ||
- Invoke `auth/request` via SessionProxy → MCP client opens browser. | ||
|
||
3. **Callback Handling** | ||
- Preferred: MCP client returns callback URL payload via request result. | ||
- Fallback: Authorization server redirects to `/internal/oauth/callback/{flow_id}`. | ||
- Coordinator validates `state`, extracts `code` (and errors). | ||
|
||
4. **Token Exchange / Storage** | ||
- POST token endpoint with code + PKCE verifier + resource. | ||
- Store access token, refresh token, expiry, scope, provider metadata. | ||
- Associate tokens with user identity for reuse. | ||
|
||
5. **Refresh / Revocation** | ||
- Manager refreshes when expiry within configurable grace window. | ||
- Invalidate token on refresh failure or when server responses indicate revocation. | ||
- Provide method to revoke tokens via authorization server when supported. | ||
|
||
## Open Questions / Follow-ups | ||
- Redis-backed `TokenStore` (requires deployment secrets) – planned second phase. | ||
- How LastMile auth server exposes token introspection + JWKS; need concrete endpoint specs to finalize `MCPAgentTokenVerifier`. | ||
- MCP client adoption of `auth/request` SEP – need capability detection; until widely supported we rely on hosted callback fallback & manual instructions. | ||
- Access control DSL (include/exclude by email/domain) – to be evaluated once token identity payload finalized. | ||
|
||
## Testing Strategy | ||
- Unit tests for token store concurrency + expiry handling. | ||
- Metadata discovery + PKCE generation (pure python tests). | ||
- Integration-style test for delegated flow using mocked HTTP server + fake MCP client (ensures `auth/request` plumbing works end-to-end). | ||
- Tests around server 401 enforcement + WWW-Authenticate header. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# OAUTH scenarios | ||
|
||
## preconfigured | ||
|
||
In this case, a token is hard-coded into the configuration. | ||
This is useful for testing or when the token is static. | ||
|
||
## workflow_pre_auth | ||
|
||
In this case, the client can call a `workflows_pre_auth` tool before calling a workflow to seed the tokens. | ||
This is useful when the client can do the auth step, but the workflow cannot (e.g. because it runs async). | ||
There is a slight hack employed here: since we don't have oauth for the mcp app, we do not have a user. | ||
Since we need a user to store the token against, we create a synthetic user and use that. | ||
|
||
## dynamic_auth | ||
|
||
In this case, no tokens are provided, and the calls comes back to the client to do the auth step. | ||
Currently implemented as an elicitation request (to align with the future elicit URL scheme). | ||
I have not achieved full end-to-end flow here. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
import asyncio | ||
import time | ||
|
||
from datetime import timedelta | ||
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream | ||
from mcp import ClientSession | ||
from mcp.types import CallToolResult, LoggingMessageNotificationParams | ||
from mcp_agent.app import MCPApp | ||
from mcp_agent.config import MCPServerSettings | ||
from mcp_agent.core.context import Context | ||
from mcp_agent.mcp.gen_client import gen_client | ||
from mcp_agent.mcp.mcp_agent_client_session import MCPAgentClientSession | ||
from mcp_agent.human_input.console_handler import console_input_callback | ||
from mcp_agent.elicitation.handler import console_elicitation_callback | ||
|
||
from rich import print | ||
|
||
try: | ||
from exceptiongroup import ExceptionGroup as _ExceptionGroup # Python 3.10 backport | ||
except Exception: # pragma: no cover | ||
_ExceptionGroup = None # type: ignore | ||
try: | ||
from anyio import BrokenResourceError as _BrokenResourceError | ||
except Exception: # pragma: no cover | ||
_BrokenResourceError = None # type: ignore | ||
|
||
|
||
async def main(): | ||
# Create MCPApp to get the server registry | ||
app = MCPApp( | ||
name="workflow_mcp_client", | ||
human_input_callback=console_input_callback, | ||
elicitation_callback=console_elicitation_callback, | ||
) | ||
async with app.run() as client_app: | ||
logger = client_app.logger | ||
context = client_app.context | ||
|
||
# Connect to the workflow server | ||
logger.info("Connecting to workflow server...") | ||
|
||
# Override the server configuration to point to our local script | ||
context.server_registry.registry["basic_agent_server"] = MCPServerSettings( | ||
name="basic_agent_server", | ||
description="Local workflow server running the basic agent example", | ||
transport="sse", | ||
url="http://127.0.0.1:8000/sse", | ||
) | ||
|
||
# Define a logging callback to receive server-side log notifications | ||
async def on_server_log(params: LoggingMessageNotificationParams) -> None: | ||
level = params.level.upper() | ||
name = params.logger or "server" | ||
print(f"[SERVER LOG] [{level}] [{name}] {params.data}") | ||
|
||
# Provide a client session factory that installs our logging callback | ||
# and prints non-logging notifications to the console | ||
class ConsolePrintingClientSession(MCPAgentClientSession): | ||
async def _received_notification(self, notification): # type: ignore[override] | ||
try: | ||
method = getattr(notification.root, "method", None) | ||
except Exception: | ||
method = None | ||
|
||
# Avoid duplicating server log prints (handled by logging_callback) | ||
if method and method != "notifications/message": | ||
try: | ||
data = notification.model_dump() | ||
except Exception: | ||
data = str(notification) | ||
print(f"[SERVER NOTIFY] {method}: {data}") | ||
|
||
return await super()._received_notification(notification) | ||
|
||
def make_session( | ||
read_stream: MemoryObjectReceiveStream, | ||
write_stream: MemoryObjectSendStream, | ||
read_timeout_seconds: timedelta | None, | ||
context: Context | None = None, | ||
) -> ClientSession: | ||
return ConsolePrintingClientSession( | ||
read_stream=read_stream, | ||
write_stream=write_stream, | ||
read_timeout_seconds=read_timeout_seconds, | ||
logging_callback=on_server_log, | ||
context=context, | ||
) | ||
|
||
try: | ||
async with gen_client( | ||
"basic_agent_server", | ||
context.server_registry, | ||
client_session_factory=make_session, | ||
) as server: | ||
try: | ||
await server.set_logging_level("info") | ||
except Exception: | ||
# Older servers may not support logging capability | ||
print("[client] Server does not support logging/setLevel") | ||
|
||
# List available tools | ||
tools_result = await server.list_tools() | ||
logger.info( | ||
"Available tools:", | ||
data={"tools": [tool.name for tool in tools_result.tools]}, | ||
) | ||
|
||
print( | ||
await server.call_tool("github_org_search", {"query": "lastmileai"}) | ||
) | ||
except Exception as e: | ||
# Tolerate benign shutdown races from stdio client (BrokenResourceError within ExceptionGroup) | ||
if _ExceptionGroup is not None and isinstance(e, _ExceptionGroup): | ||
subs = getattr(e, "exceptions", []) or [] | ||
if ( | ||
_BrokenResourceError is not None | ||
and subs | ||
and all(isinstance(se, _BrokenResourceError) for se in subs) | ||
): | ||
logger.debug("Ignored BrokenResourceError from stdio shutdown") | ||
else: | ||
raise | ||
elif _BrokenResourceError is not None and isinstance( | ||
e, _BrokenResourceError | ||
): | ||
logger.debug("Ignored BrokenResourceError from stdio shutdown") | ||
elif "BrokenResourceError" in str(e): | ||
logger.debug( | ||
"Ignored BrokenResourceError from stdio shutdown (string match)" | ||
) | ||
else: | ||
raise | ||
# Nudge cleanup of subprocess transports before the loop closes to avoid | ||
# 'Event loop is closed' from BaseSubprocessTransport.__del__ on GC. | ||
try: | ||
await asyncio.sleep(0) | ||
except Exception: | ||
pass | ||
try: | ||
import gc | ||
|
||
gc.collect() | ||
except Exception: | ||
pass | ||
|
||
|
||
def _tool_result_to_json(tool_result: CallToolResult): | ||
if tool_result.content and len(tool_result.content) > 0: | ||
text = tool_result.content[0].text | ||
try: | ||
# Try to parse the response as JSON if it's a string | ||
import json | ||
|
||
return json.loads(text) | ||
except (json.JSONDecodeError, TypeError): | ||
# If it's not valid JSON, just use the text | ||
return None | ||
|
||
|
||
if __name__ == "__main__": | ||
start = time.time() | ||
asyncio.run(main()) | ||
end = time.time() | ||
t = end - start | ||
|
||
print(f"Total run time: {t:.2f}s") | ||
Comment on lines
+1
to
+166
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major Extract shared client logic to eliminate duplication. This file duplicates nearly all of Extract the common client scaffolding into a shared module (e.g., Example refactor: Create from mcp_agent.mcp.mcp_agent_client_session import MCPAgentClientSession
class ConsolePrintingClientSession(MCPAgentClientSession):
async def _received_notification(self, notification):
# ... (current implementation)
pass
def make_session_factory(on_server_log):
def make_session(read_stream, write_stream, read_timeout_seconds, context=None):
return ConsolePrintingClientSession(
read_stream=read_stream,
write_stream=write_stream,
read_timeout_seconds=read_timeout_seconds,
logging_callback=on_server_log,
context=context,
)
return make_session Then simplify both example clients to import and use these utilities. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add required
expected_audiences
to the Authorization example (validation fails without it).MCPAuthorizationServerSettings
enforcesexpected_audiences
whenauthorization.enabled: true
. The example omits it and will fail validation.Add it to the YAML and call it out in the text.
authorization: enabled: true issuer_url: https://auth.example.com resource_server_url: https://agent.example.com/mcp + expected_audiences: ["mcp-agent"] # required for RFC 9068 audience validation required_scopes: ["mcp.read", "mcp.write"] introspection_endpoint: https://auth.example.com/oauth/introspect introspection_client_id: ${INTROSPECTION_CLIENT_ID} introspection_client_secret: ${INTROSPECTION_CLIENT_SECRET}
📝 Committable suggestion
🤖 Prompt for AI Agents