1414
1515from mcp .client .auth import TokenStorage
1616from mcp .shared .auth import OAuthClientInformationFull , OAuthToken
17+ from pydantic import BaseModel , Field
1718
1819# Configure logger
1920logger = logging .getLogger (__name__ )
2021
22+
23+ class APITokenInfo (BaseModel ):
24+ """Pydantic model representing the /api_tokens/self endpoint response."""
25+
26+ id : str = Field (description = "Unique identifier for the API token" )
27+ name : str = Field (description = "Name of the API token" )
28+ workspace_id : int = Field (description = "ID of the workspace this token belongs to" )
29+ type : str = Field (description = "Type of token (e.g., 'personal_access_token')" )
30+ status : str = Field (description = "Status of the token (e.g., 'active', 'revoked')" )
31+ created_at : datetime .datetime = Field (description = "Timestamp when the token was created" )
32+ last_used_at : Optional [datetime .datetime ] = Field (
33+ default = None , description = "Timestamp of last usage, or None if never used"
34+ )
35+ expire_at : Optional [datetime .datetime ] = Field (
36+ default = None , description = "Expiration timestamp, or None if token never expires"
37+ )
38+ revoked_at : Optional [datetime .datetime ] = Field (
39+ default = None , description = "Timestamp when the token was revoked, or None if active"
40+ )
41+ member_id : int = Field (description = "ID of the member associated with this token" )
42+ creator_id : int = Field (description = "ID of the user who created this token" )
43+ scopes : list [str ] = Field (default_factory = list , description = "List of scopes granted to this token" )
44+
45+
2146# Port range for callback server
2247CALLBACK_PORT_RANGE = (8000 , 8999 )
2348
@@ -469,14 +494,10 @@ def _load_saved_token(self):
469494 # Set the access token and related info
470495 self .access_token = token_data .get ("access_token" )
471496 if self .access_token :
472- # Store other token information
473- self .token_info = {
474- "expires_at" : token_data .get ("expires_at" ),
475- "scopes" : token_data .get ("scopes" ),
476- "token_name" : token_data .get ("token_name" ),
477- }
497+ # Update token name from saved data if available
478498 self .token_name = token_data .get ("token_name" , self .token_name )
479499 logger .info (f"Loaded saved token '{ self .token_name } ' for { self .dashboard_url } " )
500+ # Note: self.token_info will be populated when _fetch_token_info() is called
480501 else :
481502 logger .warning (f"Token data found but no access_token field" )
482503 except Exception as e :
@@ -496,13 +517,20 @@ async def oauth_process(self, login_path: str | None = None) -> str:
496517 Exception: If authentication fails
497518 """
498519 logger .debug (f"oauth_process() called for token '{ self .token_name } '" )
499-
520+
500521 # Check if we already have a valid token loaded
501- if self .access_token and self .token_info :
502- logger .info (f"Using existing token '{ self .token_name } ' - skipping OAuth flow" )
503- return self .access_token
504-
505- logger .info (f"No valid token found for '{ self .token_name } ', starting OAuth authentication flow" )
522+ if self .access_token :
523+ # Try to fetch token info to verify the token is still valid
524+ token_info = await self ._fetch_token_info ()
525+ if token_info :
526+ self .token_info = token_info
527+ logger .info (f"Using existing token '{ self .token_name } ' - skipping OAuth flow" )
528+ return self .access_token
529+ else :
530+ logger .info (f"Saved token for '{ self .token_name } ' is no longer valid, starting OAuth authentication flow" )
531+ self .access_token = None
532+ else :
533+ logger .info (f"No valid token found for '{ self .token_name } ', starting OAuth authentication flow" )
506534
507535 # Handle the base URL correctly
508536 base_url = self .dashboard_url
@@ -677,7 +705,7 @@ async def redirect_handler(authorization_url: str) -> None:
677705 # Save the token for future reuse
678706 if self .access_token and self .token_info :
679707 # Get expiry date from token info or set based on configured lifetime
680- expires_at = self .token_info .get ( "expires_at" )
708+ expires_at = self .token_info .expire_at . isoformat () if self . token_info . expire_at else None
681709
682710 # If no expiry date was returned from the API but we have a token lifetime
683711 if not expires_at and self .token_lifetime is not None :
@@ -703,7 +731,7 @@ async def redirect_handler(authorization_url: str) -> None:
703731 "access_token" : self .access_token ,
704732 "expires_at" : expires_at ,
705733 "token_name" : self .token_name ,
706- "scopes" : self .token_info .get ( " scopes" , self .scopes ) ,
734+ "scopes" : self .token_info .scopes or self .scopes ,
707735 }
708736
709737 # Save to file storage
@@ -717,10 +745,14 @@ async def redirect_handler(authorization_url: str) -> None:
717745 logger .error (f"OAuth authentication failed: { e } " )
718746 raise
719747
720- async def _fetch_token_info (self ) -> None :
721- """Fetch token information from the GitGuardian API."""
748+ async def _fetch_token_info (self ) -> APITokenInfo | None :
749+ """Fetch token information from the GitGuardian API.
750+
751+ Returns:
752+ APITokenInfo: Pydantic model containing the API token information, or None if failed
753+ """
722754 if not self .access_token :
723- return
755+ return None
724756
725757 try :
726758 import httpx # Import here to avoid circular imports
@@ -733,16 +765,24 @@ async def _fetch_token_info(self) -> None:
733765 )
734766
735767 if response .status_code == 200 :
736- self .token_info = response .json ()
737- logger .info (f"Retrieved token info with scopes: { self .token_info .get ('scopes' , [])} " )
768+ token_data = response .json ()
769+ self .token_info = APITokenInfo (** token_data )
770+ logger .info (f"Retrieved token info with scopes: { self .token_info .scopes } " )
771+ return self .token_info
738772 else :
739773 # Log the error but don't raise an exception
740774 logger .warning (f"Failed to retrieve token info: HTTP { response .status_code } " )
741775 if response .content :
742776 logger .debug (f"Response content: { response .text } " )
777+ return None
743778 except Exception as e :
744779 logger .warning (f"Failed to retrieve token info: { e } " )
780+ return None
781+
782+ def get_token_info (self ) -> APITokenInfo | None :
783+ """Return the token information.
745784
746- def get_token_info (self ) -> dict | None :
747- """Return the token information."""
785+ Returns:
786+ APITokenInfo: The API token information, or None if not available
787+ """
748788 return self .token_info
0 commit comments