Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions src/mcp/client/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from collections.abc import AsyncGenerator, Awaitable, Callable
from dataclasses import dataclass, field
from typing import Protocol
from urllib.parse import urlencode, urljoin, urlparse
from urllib.parse import parse_qs, urlencode, urljoin, urlparse

import anyio
import httpx
Expand Down Expand Up @@ -427,14 +427,43 @@ async def _exchange_token(self, auth_code: str, code_verifier: str) -> httpx.Req
"POST", token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}
)

def _parse_content_type(self, content_type: str) -> tuple[str, dict[str, str]]:
"""Parse Content-Type header into media type and parameters."""
parts = content_type.split(";")
media_type = parts[0].strip()

params: dict[str, str] = {}
for part in parts[1:]:
if "=" in part:
key, value = part.split("=", 1)
params[key.strip()] = value.strip()

return media_type, params

async def _handle_token_response(self, response: httpx.Response) -> None:
"""Handle token exchange response."""
if response.status_code != 200:
raise OAuthTokenError(f"Token exchange failed: {response.status_code}")

content_type = response.headers.get("Content-Type")
if content_type is None:
raise OAuthTokenError("Token exchange failed: Missing 'Content-Type' response header")

media_type, params = self._parse_content_type(content_type)
if media_type not in ("application/json", "application/x-www-form-urlencoded"):
raise OAuthTokenError(f"Token exchange failed: Unexpected token response content type {media_type}")

try:
content = await response.aread()
token_response = OAuthToken.model_validate_json(content)
if media_type == "application/json":
token_response = OAuthToken.model_validate_json(content)
else:
charset = params.get("charset", "utf-8")
parsed = parse_qs(content.decode(charset))
token_data = {key: value[0] if value else None for key, value in parsed.items()}
if scope := token_data.get("scope"):
token_data["scope"] = scope.replace(",", " ")
token_response = OAuthToken.model_validate(token_data)

# Validate scopes
if token_response.scope and self.context.client_metadata.scope:
Expand Down
5 changes: 3 additions & 2 deletions tests/client/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ async def test_oauth_discovery_fallback_conditions(self, oauth_provider: OAuthCl
# Send a successful token response
token_response = httpx.Response(
200,
headers={"Content-Type": "application/json"},
content=(
b'{"access_token": "new_access_token", "token_type": "Bearer", "expires_in": 3600, '
b'"refresh_token": "new_refresh_token"}'
Expand Down Expand Up @@ -790,9 +791,9 @@ async def test_auth_flow_with_no_tokens(self, oauth_provider: OAuthClientProvide
# Send a successful token response
token_response = httpx.Response(
200,
headers={"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"},
content=(
b'{"access_token": "new_access_token", "token_type": "Bearer", "expires_in": 3600, '
b'"refresh_token": "new_refresh_token"}'
b"access_token=new_access_token&token_type=bearer&expires_in=3600&refresh_token=new_refresh_token"
),
request=token_request,
)
Expand Down