From 93990f555240cca88e5737e620492e0c99539e3a Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Fri, 14 Nov 2025 21:10:23 +0200 Subject: [PATCH 01/11] POC --- codeflash/cli_cmds/cmd_init.py | 161 ++++++++++++++++++++++++++++++++- 1 file changed, 158 insertions(+), 3 deletions(-) diff --git a/codeflash/cli_cmds/cmd_init.py b/codeflash/cli_cmds/cmd_init.py index a53d18ce6..192ee52e2 100644 --- a/codeflash/cli_cmds/cmd_init.py +++ b/codeflash/cli_cmds/cmd_init.py @@ -1149,6 +1149,21 @@ def convert(self, value: str, param: click.Parameter | None, ctx: click.Context # Returns True if the user entered a new API key, False if they used an existing one def prompt_api_key() -> bool: + import threading + import socket + import http.server + import urllib.parse + import random + import string + import base64 + import hashlib + import time + import json + import webbrowser + import requests + + BASE_URL = "https://app.codeflash.ai/" + try: existing_api_key = get_codeflash_api_key() except OSError: @@ -1168,10 +1183,150 @@ def prompt_api_key() -> bool: console.print(api_key_panel) console.print() return False + auth_choices = [ + "πŸ” Sign in", + "πŸ”‘ Enter Api key" + ] + name="auth_method" + questions = [ + inquirer.List( + name, + message="How would you like to sign in?", + choices=auth_choices, + default=auth_choices[0], + carousel=True, + ) + ] - enter_api_key_and_save_to_rc() - ph("cli-new-api-key-entered") - return True + answers = inquirer.prompt(questions, theme=CodeflashTheme()) + if not answers: + apologize_and_exit() + method = answers[name] + if method == "πŸ”‘ Enter Api key": + enter_api_key_and_save_to_rc() + ph("cli-new-api-key-entered") + return True + # OAuth PKCE Flow for "πŸ” Sign in" + # 1. Start a local server on available port + class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler): + server_version = "CFHTTP" + code = None + state = None + error = None + def do_GET(self): + parsed = urllib.parse.urlparse(self.path) + if parsed.path != "/callback": + self.send_response(404) + self.end_headers() + return + params = urllib.parse.parse_qs(parsed.query) + OAuthCallbackHandler.code = params.get("code", [None])[0] + OAuthCallbackHandler.state = params.get("state", [None])[0] + OAuthCallbackHandler.error = params.get("error", [None])[0] + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + if OAuthCallbackHandler.code: + self.wfile.write(b"

Sign-in successful!

You may close this window.") + elif OAuthCallbackHandler.error: + self.wfile.write(b"

Sign-in failed.

") + else: + self.wfile.write(b"

Missing code.

") + + def log_message(self, format, *args): + # Silence HTTP logs + pass + + # Find a free port + def get_free_port(): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + port = get_free_port() + redirect_uri = f"http://localhost:{port}/callback" + # PKCE code_verifier and code_challenge + def random_string(length=64): + return ''.join(random.choices(string.ascii_letters + string.digits + "-._~", k=length)) + code_verifier = random_string(64) + code_challenge = base64.urlsafe_b64encode( + hashlib.sha256(code_verifier.encode()).digest() + ).rstrip(b'=').decode() + state = random_string(16) + + # Compose auth URL + auth_url = ( + f"{BASE_URL}codeflash/auth?" + f"response_type=code" + f"&client_id=cf_vscode_app" + f"&redirect_uri={urllib.parse.quote(redirect_uri)}" + f"&code_challenge={code_challenge}" + f"&code_challenge_method=sha256" + f"&state={state}" + ) + + # Start HTTP server in thread + handler_class = OAuthCallbackHandler + httpd = http.server.HTTPServer(("localhost", port), handler_class) + server_thread = threading.Thread(target=httpd.handle_request) + server_thread.daemon = True + server_thread.start() + click.echo(f"🌐 Opening browser to sign in to Codeflash…") + webbrowser.open(auth_url) + click.echo(f"If your browser did not open, visit:\n {auth_url}") + # Wait for callback (with timeout) + max_wait = 120 # seconds + waited = 0 + while handler_class.code is None and handler_class.error is None and waited < max_wait: + time.sleep(0.5) + waited += 0.5 + httpd.server_close() + if handler_class.error: + click.echo(f"❌ Sign-in failed: {handler_class.error}") + apologize_and_exit() + if not handler_class.code or not handler_class.state: + click.echo("❌ Did not receive code from sign-in. Please try again.") + apologize_and_exit() + if handler_class.state != state: + click.echo("❌ State mismatch in OAuth callback.") + apologize_and_exit() + code = handler_class.code + console.print(code) + # Exchange code for token + token_url = f"{BASE_URL}codeflash/auth/oauth/token" + data = { + "grant_type": "authorization_code", + "code": code, + "code_verifier": code_verifier, + "redirect_uri": redirect_uri, + "client_id": "cf_vscode_app" + } + try: + resp = requests.post( + token_url, + headers={"Content-Type": "application/json"}, + data=json.dumps(data), + timeout=10, +) + resp.raise_for_status() + token_json = resp.json() + api_key = token_json.get("api_key") or token_json.get("access_token") + if not api_key: + click.echo("❌ Could not retrieve API key from response.") + apologize_and_exit() + result = save_api_key_to_rc(api_key) + if is_successful(result): + click.echo(result.unwrap()) + click.echo("βœ… Signed in successfully and API key saved!") + else: + click.echo(result.failure()) + click.pause() + os.environ["CODEFLASH_API_KEY"] = api_key + ph("cli-new-api-key-entered") + return True + except Exception as e: + click.echo(f"❌ Failed to exchange code for API key: {e}") + apologize_and_exit() def enter_api_key_and_save_to_rc() -> None: From 6082650cd8f18cd68e18a79e9837f6b32248bf86 Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Mon, 17 Nov 2025 22:01:35 +0200 Subject: [PATCH 02/11] add login flow --- codeflash/cli_cmds/cmd_init.py | 176 +++-------- codeflash/code_utils/oauth_handler.py | 409 ++++++++++++++++++++++++++ 2 files changed, 445 insertions(+), 140 deletions(-) create mode 100644 codeflash/code_utils/oauth_handler.py diff --git a/codeflash/cli_cmds/cmd_init.py b/codeflash/cli_cmds/cmd_init.py index 192ee52e2..af45e38c6 100644 --- a/codeflash/cli_cmds/cmd_init.py +++ b/codeflash/cli_cmds/cmd_init.py @@ -31,6 +31,7 @@ from codeflash.code_utils.env_utils import check_formatter_installed, get_codeflash_api_key from codeflash.code_utils.git_utils import get_git_remotes, get_repo_owner_and_name from codeflash.code_utils.github_utils import get_github_secrets_page_url +from codeflash.code_utils.oauth_handler import perform_oauth_signin from codeflash.code_utils.shell_utils import get_shell_rc_path, save_api_key_to_rc from codeflash.either import is_successful from codeflash.lsp.helpers import is_LSP_enabled @@ -1149,25 +1150,14 @@ def convert(self, value: str, param: click.Parameter | None, ctx: click.Context # Returns True if the user entered a new API key, False if they used an existing one def prompt_api_key() -> bool: - import threading - import socket - import http.server - import urllib.parse - import random - import string - import base64 - import hashlib - import time - import json - import webbrowser - import requests - - BASE_URL = "https://app.codeflash.ai/" + """Prompt user for API key via OAuth or manual entry""" + # Check for existing API key try: existing_api_key = get_codeflash_api_key() except OSError: existing_api_key = None + if existing_api_key: display_key = f"{existing_api_key[:3]}****{existing_api_key[-4:]}" api_key_panel = Panel( @@ -1183,15 +1173,17 @@ def prompt_api_key() -> bool: console.print(api_key_panel) console.print() return False + + # Prompt for authentication method auth_choices = [ - "πŸ” Sign in", - "πŸ”‘ Enter Api key" + "πŸ” Login in with Codeflash", + "πŸ”‘ Use Codeflash API key" ] - name="auth_method" + questions = [ inquirer.List( - name, - message="How would you like to sign in?", + "auth_method", + message="How would you like to authenticate?", choices=auth_choices, default=auth_choices[0], carousel=True, @@ -1201,133 +1193,37 @@ def prompt_api_key() -> bool: answers = inquirer.prompt(questions, theme=CodeflashTheme()) if not answers: apologize_and_exit() - method = answers[name] - if method == "πŸ”‘ Enter Api key": + + method = answers["auth_method"] + + if method == auth_choices[1]: enter_api_key_and_save_to_rc() ph("cli-new-api-key-entered") return True - # OAuth PKCE Flow for "πŸ” Sign in" - # 1. Start a local server on available port - class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler): - server_version = "CFHTTP" - code = None - state = None - error = None - def do_GET(self): - parsed = urllib.parse.urlparse(self.path) - if parsed.path != "/callback": - self.send_response(404) - self.end_headers() - return - params = urllib.parse.parse_qs(parsed.query) - OAuthCallbackHandler.code = params.get("code", [None])[0] - OAuthCallbackHandler.state = params.get("state", [None])[0] - OAuthCallbackHandler.error = params.get("error", [None])[0] - self.send_response(200) - self.send_header("Content-type", "text/html") - self.end_headers() - if OAuthCallbackHandler.code: - self.wfile.write(b"

Sign-in successful!

You may close this window.") - elif OAuthCallbackHandler.error: - self.wfile.write(b"

Sign-in failed.

") - else: - self.wfile.write(b"

Missing code.

") - - def log_message(self, format, *args): - # Silence HTTP logs - pass - - # Find a free port - def get_free_port(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(("", 0)) - return s.getsockname()[1] - - port = get_free_port() - redirect_uri = f"http://localhost:{port}/callback" - # PKCE code_verifier and code_challenge - def random_string(length=64): - return ''.join(random.choices(string.ascii_letters + string.digits + "-._~", k=length)) - code_verifier = random_string(64) - code_challenge = base64.urlsafe_b64encode( - hashlib.sha256(code_verifier.encode()).digest() - ).rstrip(b'=').decode() - state = random_string(16) - - # Compose auth URL - auth_url = ( - f"{BASE_URL}codeflash/auth?" - f"response_type=code" - f"&client_id=cf_vscode_app" - f"&redirect_uri={urllib.parse.quote(redirect_uri)}" - f"&code_challenge={code_challenge}" - f"&code_challenge_method=sha256" - f"&state={state}" - ) - # Start HTTP server in thread - handler_class = OAuthCallbackHandler - httpd = http.server.HTTPServer(("localhost", port), handler_class) - server_thread = threading.Thread(target=httpd.handle_request) - server_thread.daemon = True - server_thread.start() - click.echo(f"🌐 Opening browser to sign in to Codeflash…") - webbrowser.open(auth_url) - click.echo(f"If your browser did not open, visit:\n {auth_url}") - # Wait for callback (with timeout) - max_wait = 120 # seconds - waited = 0 - while handler_class.code is None and handler_class.error is None and waited < max_wait: - time.sleep(0.5) - waited += 0.5 - httpd.server_close() - if handler_class.error: - click.echo(f"❌ Sign-in failed: {handler_class.error}") - apologize_and_exit() - if not handler_class.code or not handler_class.state: - click.echo("❌ Did not receive code from sign-in. Please try again.") - apologize_and_exit() - if handler_class.state != state: - click.echo("❌ State mismatch in OAuth callback.") - apologize_and_exit() - code = handler_class.code - console.print(code) - # Exchange code for token - token_url = f"{BASE_URL}codeflash/auth/oauth/token" - data = { - "grant_type": "authorization_code", - "code": code, - "code_verifier": code_verifier, - "redirect_uri": redirect_uri, - "client_id": "cf_vscode_app" - } - try: - resp = requests.post( - token_url, - headers={"Content-Type": "application/json"}, - data=json.dumps(data), - timeout=10, -) - resp.raise_for_status() - token_json = resp.json() - api_key = token_json.get("api_key") or token_json.get("access_token") - if not api_key: - click.echo("❌ Could not retrieve API key from response.") - apologize_and_exit() - result = save_api_key_to_rc(api_key) - if is_successful(result): - click.echo(result.unwrap()) - click.echo("βœ… Signed in successfully and API key saved!") - else: - click.echo(result.failure()) - click.pause() - os.environ["CODEFLASH_API_KEY"] = api_key - ph("cli-new-api-key-entered") - return True - except Exception as e: - click.echo(f"❌ Failed to exchange code for API key: {e}") + # Perform OAuth sign-in + api_key = perform_oauth_signin() + + if not api_key: apologize_and_exit() + # Save API key + shell_rc_path = get_shell_rc_path() + if not shell_rc_path.exists() and os.name == "nt": + shell_rc_path.touch() + click.echo(f"βœ… Created {shell_rc_path}") + + result = save_api_key_to_rc(api_key) + if is_successful(result): + click.echo(result.unwrap()) + click.echo("βœ… Signed in successfully and API key saved!") + else: + click.echo(result.failure()) + click.pause() + + os.environ["CODEFLASH_API_KEY"] = api_key + ph("cli-oauth-signin-completed") + return True def enter_api_key_and_save_to_rc() -> None: browser_launched = False diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py new file mode 100644 index 000000000..33e28693a --- /dev/null +++ b/codeflash/code_utils/oauth_handler.py @@ -0,0 +1,409 @@ +import threading +import socket +import http.server +import urllib.parse +import random +import string +import base64 +import hashlib +import time +import json +import webbrowser +import requests +from typing import Optional, Tuple +import click + +from codeflash.api.cfapi import get_cfapi_base_urls + + +class OAuthHandler: + """Handles OAuth PKCE flow for CodeFlash authentication""" + + def __init__(self): + self.code: Optional[str] = None + self.state: Optional[str] = None + self.error: Optional[str] = None + self.is_complete = False + self.token_error: Optional[str] = None + + def create_callback_handler(self): + """Creates HTTP handler for OAuth callback""" + oauth_handler = self + + class CallbackHandler(http.server.BaseHTTPRequestHandler): + server_version = "CFHTTP" + + def do_GET(self): + parsed = urllib.parse.urlparse(self.path) + + if parsed.path == "/status": + self.send_response(200) + self.send_header("Content-type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + + status = { + "success": oauth_handler.token_error is None and oauth_handler.code is not None, + "error": oauth_handler.token_error + } + self.wfile.write(json.dumps(status).encode()) + return + + if parsed.path != "/callback": + self.send_response(404) + self.end_headers() + return + + params = urllib.parse.parse_qs(parsed.query) + oauth_handler.code = params.get("code", [None])[0] + oauth_handler.state = params.get("state", [None])[0] + oauth_handler.error = params.get("error", [None])[0] + + # Send HTML response + self.send_response(200) + self.send_header("Content-type", "text/html") + self.end_headers() + + html_content = self._get_html_response() + self.wfile.write(html_content.encode()) + + oauth_handler.is_complete = True + + def _get_html_response(self): + """Returns simple HTML response""" + if oauth_handler.error: + return self._get_error_html(oauth_handler.error) + elif oauth_handler.code: + return self._get_loading_html() + else: + return self._get_error_html("unauthorized") + + @staticmethod + def _get_loading_html(): + """Loading state while exchanging token""" + return """ + + + + + + CodeFlash Authentication + + + +
+
+

Authenticating

+

Please wait...

+
+ + + + + """ + + @staticmethod + def _get_error_html(self, error_message: str): + """Error state HTML""" + return f""" + + + + + + CodeFlash Authentication + + + +
+

Authentication Failed

+
{error_message}
+
+ + + """ + + def log_message(self, format, *args): + pass + + return CallbackHandler + + @staticmethod + def get_free_port() -> int: + """Find an available port""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("", 0)) + return s.getsockname()[1] + + @staticmethod + def generate_pkce_pair() -> Tuple[str, str]: + """Generate PKCE code verifier and challenge""" + code_verifier = ''.join( + random.choices(string.ascii_letters + string.digits + "-._~", k=64) + ) + code_challenge = base64.urlsafe_b64encode( + hashlib.sha256(code_verifier.encode()).digest() + ).rstrip(b'=').decode() + return code_verifier, code_challenge + + def start_local_server(self, port: int) -> http.server.HTTPServer: + """Start local HTTP server for OAuth callback""" + handler_class = self.create_callback_handler() + httpd = http.server.HTTPServer(("localhost", port), handler_class) + + def serve_forever_wrapper(): + httpd.serve_forever() + + server_thread = threading.Thread(target=serve_forever_wrapper) + server_thread.daemon = True + server_thread.start() + + return httpd + + def wait_for_callback(self, httpd: http.server.HTTPServer, timeout: int = 120) -> bool: + """Wait for OAuth callback with timeout""" + waited = 0 + while not self.is_complete and waited < timeout: + time.sleep(0.5) + waited += 0.5 + + return self.is_complete + + def exchange_code_for_token( + self, + code: str, + code_verifier: str, + redirect_uri: str + ) -> Optional[str]: + """Exchange authorization code for API token""" + token_url = f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth/oauth/token" + data = { + "grant_type": "authorization_code", + "code": code, + "code_verifier": code_verifier, + "redirect_uri": redirect_uri, + "client_id": "cf_vscode_app" + } + + try: + resp = requests.post( + token_url, + headers={"Content-Type": "application/json"}, + data=json.dumps(data), + timeout=10, + ) + resp.raise_for_status() + token_json = resp.json() + api_key = token_json.get("access_token") + + if not api_key: + self.token_error = "No access token in response" + return None + + return api_key + except requests.exceptions.HTTPError as e: + error_msg = f"HTTP {e.response.status_code}" + try: + error_data = e.response.json() + error_msg = error_data.get("error_description", error_data.get("error", error_msg)) + except: + pass + self.token_error = "Unauthorized" + click.echo(f"❌ {self.token_error}") + return None + except Exception as e: + self.token_error = "Unauthorized" + click.echo(f"❌ {self.token_error}") + return None + + +def perform_oauth_signin() -> Optional[str]: + """ + Perform OAuth PKCE flow and return API key if successful. + Returns None if failed. + """ + oauth = OAuthHandler() + + # Setup PKCE + port = oauth.get_free_port() + redirect_uri = f"http://localhost:{port}/callback" + code_verifier, code_challenge = oauth.generate_pkce_pair() + state = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) + + # Build authorization URL + auth_url = ( + f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth?" + f"response_type=code" + f"&client_id=cf_vscode_app" + f"&redirect_uri={urllib.parse.quote(redirect_uri)}" + f"&code_challenge={code_challenge}" + f"&code_challenge_method=sha256" + f"&state={state}" + ) + + # Start local server + httpd = oauth.start_local_server(port) + + # Open browser + click.echo("🌐 Opening browser to sign in to CodeFlash…") + webbrowser.open(auth_url) + + click.echo(f"\nπŸ“‹ If your browser didn't open, visit: {auth_url}\n") + + # Wait for callback + click.echo("⏳ Waiting for authentication...") + success = oauth.wait_for_callback(httpd, timeout=120) + + if not success: + httpd.shutdown() + click.echo("❌ Authentication timed out. Please try again.") + return None + + if oauth.error: + httpd.shutdown() + click.echo(f"❌ Authentication failed:") + return None + + if not oauth.code or not oauth.state: + httpd.shutdown() + click.echo("❌ Unauthorized.") + return None + + if oauth.state != state: + httpd.shutdown() + click.echo("❌ Unauthorized.") + return None + + api_key = oauth.exchange_code_for_token(oauth.code, code_verifier, redirect_uri) + + # Wait for browser to poll status + time.sleep(3) + + # Shutdown server + httpd.shutdown() + + return api_key From 8d7bc7f044b95b9269e51cc7d461e6fa94f6d43a Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Tue, 18 Nov 2025 00:18:35 +0200 Subject: [PATCH 03/11] Enhance ui/ux --- codeflash/code_utils/oauth_handler.py | 541 +++++++++++++++++++++----- 1 file changed, 438 insertions(+), 103 deletions(-) diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index 33e28693a..32e74c7b5 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -23,6 +23,7 @@ def __init__(self): self.code: Optional[str] = None self.state: Optional[str] = None self.error: Optional[str] = None + self.theme: Optional[str] = None self.is_complete = False self.token_error: Optional[str] = None @@ -58,6 +59,7 @@ def do_GET(self): oauth_handler.code = params.get("code", [None])[0] oauth_handler.state = params.get("state", [None])[0] oauth_handler.error = params.get("error", [None])[0] + oauth_handler.theme = params.get("theme", ["light"])[0] # Send HTML response self.send_response(200) @@ -71,123 +73,305 @@ def do_GET(self): def _get_html_response(self): """Returns simple HTML response""" + theme = oauth_handler.theme or "light" if oauth_handler.error: - return self._get_error_html(oauth_handler.error) + return self._get_error_html(oauth_handler.error, theme) elif oauth_handler.code: - return self._get_loading_html() + return self._get_loading_html(theme) else: - return self._get_error_html("unauthorized") + return self._get_error_html("unauthorized", theme) @staticmethod - def _get_loading_html(): + def _get_loading_html(theme: str = "light"): """Loading state while exchanging token""" - return """ + theme_class = "dark" if theme == "dark" else "" + return f""" - + CodeFlash Authentication -
-
-

Authenticating

-

Please wait...

+
+
+ + +
+
+
+
+
+

Authenticating

+

Please wait while we verify your credentials...

+
@@ -196,52 +380,203 @@ def _get_loading_html(): """ @staticmethod - def _get_error_html(self, error_message: str): + def _get_error_html(error_message: str, theme: str = "light"): """Error state HTML""" + theme_class = "dark" if theme == "dark" else "" return f""" - + CodeFlash Authentication -
-

Authentication Failed

-
{error_message}
+
+
+ + +
+
+
+ + + + + +
+

Authentication Failed

+
{error_message}
+
From 0fcd1028412fffdcbc04494452ffd9f39bb3db5a Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Tue, 18 Nov 2025 00:45:12 +0200 Subject: [PATCH 04/11] Fix formatting --- codeflash/cli_cmds/cmd_init.py | 9 +- codeflash/code_utils/oauth_handler.py | 129 ++++++++++++-------------- 2 files changed, 61 insertions(+), 77 deletions(-) diff --git a/codeflash/cli_cmds/cmd_init.py b/codeflash/cli_cmds/cmd_init.py index 1b3667942..fcbfb8942 100644 --- a/codeflash/cli_cmds/cmd_init.py +++ b/codeflash/cli_cmds/cmd_init.py @@ -1167,8 +1167,7 @@ def convert(self, value: str, param: click.Parameter | None, ctx: click.Context # Returns True if the user entered a new API key, False if they used an existing one def prompt_api_key() -> bool: - """Prompt user for API key via OAuth or manual entry""" - + """Prompt user for API key via OAuth or manual entry.""" # Check for existing API key try: existing_api_key = get_codeflash_api_key() @@ -1192,10 +1191,7 @@ def prompt_api_key() -> bool: return False # Prompt for authentication method - auth_choices = [ - "πŸ” Login in with Codeflash", - "πŸ”‘ Use Codeflash API key" - ] + auth_choices = ["πŸ” Login in with Codeflash", "πŸ”‘ Use Codeflash API key"] questions = [ inquirer.List( @@ -1242,6 +1238,7 @@ def prompt_api_key() -> bool: ph("cli-oauth-signin-completed") return True + def enter_api_key_and_save_to_rc() -> None: browser_launched = False api_key = "" diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index 32e74c7b5..f1fd478f1 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -1,40 +1,41 @@ -import threading -import socket -import http.server -import urllib.parse -import random -import string +from __future__ import annotations + import base64 import hashlib -import time +import http.server import json +import secrets +import socket +import threading +import time +import urllib.parse import webbrowser -import requests -from typing import Optional, Tuple + import click +import requests from codeflash.api.cfapi import get_cfapi_base_urls class OAuthHandler: - """Handles OAuth PKCE flow for CodeFlash authentication""" + """Handle OAuth PKCE flow for CodeFlash authentication.""" - def __init__(self): - self.code: Optional[str] = None - self.state: Optional[str] = None - self.error: Optional[str] = None - self.theme: Optional[str] = None + def __init__(self) -> None: + self.code: str | None = None + self.state: str | None = None + self.error: str | None = None + self.theme: str | None = None self.is_complete = False - self.token_error: Optional[str] = None + self.token_error: str | None = None - def create_callback_handler(self): - """Creates HTTP handler for OAuth callback""" + def create_callback_handler(self) -> type[http.server.BaseHTTPRequestHandler]: + """Create HTTP handler for OAuth callback.""" oauth_handler = self class CallbackHandler(http.server.BaseHTTPRequestHandler): server_version = "CFHTTP" - def do_GET(self): + def do_GET(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path == "/status": @@ -45,7 +46,7 @@ def do_GET(self): status = { "success": oauth_handler.token_error is None and oauth_handler.code is not None, - "error": oauth_handler.token_error + "error": oauth_handler.token_error, } self.wfile.write(json.dumps(status).encode()) return @@ -71,19 +72,18 @@ def do_GET(self): oauth_handler.is_complete = True - def _get_html_response(self): - """Returns simple HTML response""" + def _get_html_response(self) -> str: + """Return simple HTML response.""" theme = oauth_handler.theme or "light" if oauth_handler.error: return self._get_error_html(oauth_handler.error, theme) - elif oauth_handler.code: + if oauth_handler.code: return self._get_loading_html(theme) - else: - return self._get_error_html("unauthorized", theme) + return self._get_error_html("unauthorized", theme) @staticmethod - def _get_loading_html(theme: str = "light"): - """Loading state while exchanging token""" + def _get_loading_html(theme: str = "light") -> str: + """Return loading state while exchanging token.""" theme_class = "dark" if theme == "dark" else "" return f""" @@ -142,7 +142,6 @@ def _get_loading_html(theme: str = "light"): position: relative; }} - /* Background gradient effect */ body::before {{ content: ''; position: fixed; @@ -155,7 +154,6 @@ def _get_loading_html(theme: str = "light"): z-index: 0; }} - /* Grid pattern */ body::after {{ content: ''; position: fixed; @@ -380,8 +378,8 @@ def _get_loading_html(theme: str = "light"): """ @staticmethod - def _get_error_html(error_message: str, theme: str = "light"): - """Error state HTML""" + def _get_error_html(error_message: str, theme: str = "light") -> str: + """Return error state HTML.""" theme_class = "dark" if theme == "dark" else "" return f""" @@ -434,7 +432,6 @@ def _get_error_html(error_message: str, theme: str = "light"): position: relative; }}}} - /* Background gradient effect */ body::before {{{{ content: ''; position: fixed; @@ -447,7 +444,6 @@ def _get_error_html(error_message: str, theme: str = "light"): z-index: 0; }}}} - /* Grid pattern */ body::after {{{{ content: ''; position: fixed; @@ -582,35 +578,33 @@ def _get_error_html(error_message: str, theme: str = "light"): """ - def log_message(self, format, *args): - pass + def log_message(self, fmt: str, *args: object) -> None: + """Suppress log messages.""" return CallbackHandler @staticmethod def get_free_port() -> int: - """Find an available port""" + """Find an available port.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) return s.getsockname()[1] @staticmethod - def generate_pkce_pair() -> Tuple[str, str]: - """Generate PKCE code verifier and challenge""" - code_verifier = ''.join( - random.choices(string.ascii_letters + string.digits + "-._~", k=64) + def generate_pkce_pair() -> tuple[str, str]: + """Generate PKCE code verifier and challenge.""" + code_verifier = "".join( + secrets.choice("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~") for _ in range(64) ) - code_challenge = base64.urlsafe_b64encode( - hashlib.sha256(code_verifier.encode()).digest() - ).rstrip(b'=').decode() + code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).rstrip(b"=").decode() return code_verifier, code_challenge def start_local_server(self, port: int) -> http.server.HTTPServer: - """Start local HTTP server for OAuth callback""" + """Start local HTTP server for OAuth callback.""" handler_class = self.create_callback_handler() httpd = http.server.HTTPServer(("localhost", port), handler_class) - def serve_forever_wrapper(): + def serve_forever_wrapper() -> None: httpd.serve_forever() server_thread = threading.Thread(target=serve_forever_wrapper) @@ -619,8 +613,8 @@ def serve_forever_wrapper(): return httpd - def wait_for_callback(self, httpd: http.server.HTTPServer, timeout: int = 120) -> bool: - """Wait for OAuth callback with timeout""" + def wait_for_callback(self, httpd: http.server.HTTPServer, timeout: int = 120) -> bool: # noqa: ARG002 + """Wait for OAuth callback with timeout.""" waited = 0 while not self.is_complete and waited < timeout: time.sleep(0.5) @@ -628,57 +622,50 @@ def wait_for_callback(self, httpd: http.server.HTTPServer, timeout: int = 120) - return self.is_complete - def exchange_code_for_token( - self, - code: str, - code_verifier: str, - redirect_uri: str - ) -> Optional[str]: - """Exchange authorization code for API token""" + def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: str) -> str | None: + """Exchange authorization code for API token.""" token_url = f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth/oauth/token" data = { "grant_type": "authorization_code", "code": code, "code_verifier": code_verifier, "redirect_uri": redirect_uri, - "client_id": "cf_vscode_app" + "client_id": "cf_vscode_app", } try: resp = requests.post( - token_url, - headers={"Content-Type": "application/json"}, - data=json.dumps(data), - timeout=10, + token_url, headers={"Content-Type": "application/json"}, data=json.dumps(data), timeout=10 ) resp.raise_for_status() token_json = resp.json() api_key = token_json.get("access_token") if not api_key: - self.token_error = "No access token in response" + self.token_error = "No access token in response" # noqa: S105 return None - return api_key except requests.exceptions.HTTPError as e: error_msg = f"HTTP {e.response.status_code}" try: error_data = e.response.json() error_msg = error_data.get("error_description", error_data.get("error", error_msg)) - except: + except Exception: # noqa: S110 pass - self.token_error = "Unauthorized" + self.token_error = "Unauthorized" # noqa: S105 click.echo(f"❌ {self.token_error}") return None - except Exception as e: - self.token_error = "Unauthorized" + except Exception: + self.token_error = "Unauthorized" # noqa: S105 click.echo(f"❌ {self.token_error}") return None + else: + return api_key -def perform_oauth_signin() -> Optional[str]: - """ - Perform OAuth PKCE flow and return API key if successful. +def perform_oauth_signin() -> str | None: + """Perform OAuth PKCE flow and return API key if successful. + Returns None if failed. """ oauth = OAuthHandler() @@ -687,7 +674,7 @@ def perform_oauth_signin() -> Optional[str]: port = oauth.get_free_port() redirect_uri = f"http://localhost:{port}/callback" code_verifier, code_challenge = oauth.generate_pkce_pair() - state = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) + state = "".join(secrets.choice("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") for _ in range(16)) # Build authorization URL auth_url = ( @@ -711,7 +698,7 @@ def perform_oauth_signin() -> Optional[str]: # Wait for callback click.echo("⏳ Waiting for authentication...") - success = oauth.wait_for_callback(httpd, timeout=120) + success = oauth.wait_for_callback(httpd, timeout=180) if not success: httpd.shutdown() @@ -720,7 +707,7 @@ def perform_oauth_signin() -> Optional[str]: if oauth.error: httpd.shutdown() - click.echo(f"❌ Authentication failed:") + click.echo("❌ Authentication failed:") return None if not oauth.code or not oauth.state: From 76089d2376438834186a226ee4e83439298b49f4 Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Tue, 18 Nov 2025 16:46:40 +0200 Subject: [PATCH 05/11] change client id --- codeflash/code_utils/oauth_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index f1fd478f1..ec78d9a8a 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -630,7 +630,7 @@ def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: s "code": code, "code_verifier": code_verifier, "redirect_uri": redirect_uri, - "client_id": "cf_vscode_app", + "client_id": "cf-cli-app", } try: @@ -680,7 +680,7 @@ def perform_oauth_signin() -> str | None: auth_url = ( f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth?" f"response_type=code" - f"&client_id=cf_vscode_app" + f"&client_id=cf-cli-app" f"&redirect_uri={urllib.parse.quote(redirect_uri)}" f"&code_challenge={code_challenge}" f"&code_challenge_method=sha256" From 6d1b3b85a6d3a7b94bd02540872c422b6468e64c Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Tue, 18 Nov 2025 23:46:11 +0200 Subject: [PATCH 06/11] Add remote-machine logic. --- codeflash/code_utils/oauth_handler.py | 122 +++++++++++++++++--------- 1 file changed, 81 insertions(+), 41 deletions(-) diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index ec78d9a8a..a5c559926 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -663,6 +663,62 @@ def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: s return api_key +def _handle_local_oauth_flow( + oauth: OAuthHandler, + httpd: http.server.HTTPServer, + state: str, + code_verifier: str, + local_redirect_uri: str, + local_auth_url: str, +) -> str | None: + """Handle local OAuth flow with browser and server.""" + click.echo(f"\nπŸ“‹ If your browser didn't open, visit: {local_auth_url}\n") + click.echo("⏳ Waiting for authentication...") + + success = oauth.wait_for_callback(httpd, timeout=180) + + if not success: + httpd.shutdown() + click.echo("❌ Authentication timed out. Please try again.") + return None + + if oauth.error or not oauth.code or not oauth.state or oauth.state != state: + httpd.shutdown() + click.echo("❌ Unauthorized.") + return None + + api_key = oauth.exchange_code_for_token(oauth.code, code_verifier, local_redirect_uri) + + # Wait for browser to poll status + time.sleep(3) + httpd.shutdown() + + return api_key + + +def _handle_remote_oauth_flow(code_verifier: str, remote_redirect_uri: str, remote_auth_url: str) -> str | None: + """Handle remote OAuth flow with manual code entry.""" + oauth = OAuthHandler() + click.echo("⚠️ Browser could not be opened automatically.") + click.echo("\nπŸ“‹ Please visit this URL to authenticate:") + click.echo(f"\n{remote_auth_url}\n") + + # Prompt user to paste the code + code = click.prompt("Paste the authorization code here", type=str).strip() + + if not code: + click.echo("❌ No code provided.") + return None + + # Exchange code for token + api_key = oauth.exchange_code_for_token(code, code_verifier, remote_redirect_uri) + + if api_key: + click.echo("βœ… Authentication successful!") + + return api_key + + def perform_oauth_signin() -> str | None: """Perform OAuth PKCE flow and return API key if successful. @@ -672,60 +728,44 @@ def perform_oauth_signin() -> str | None: # Setup PKCE port = oauth.get_free_port() - redirect_uri = f"http://localhost:{port}/callback" code_verifier, code_challenge = oauth.generate_pkce_pair() state = "".join(secrets.choice("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") for _ in range(16)) - # Build authorization URL - auth_url = ( + # Build authorization URLs for both local and remote + local_redirect_uri = f"http://localhost:{port}/callback" + remote_redirect_uri = f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth/callback" + + local_auth_url = ( f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth?" f"response_type=code" f"&client_id=cf-cli-app" - f"&redirect_uri={urllib.parse.quote(redirect_uri)}" + f"&redirect_uri={urllib.parse.quote(local_redirect_uri)}" f"&code_challenge={code_challenge}" f"&code_challenge_method=sha256" f"&state={state}" ) - # Start local server - httpd = oauth.start_local_server(port) + remote_auth_url = ( + f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth?" + f"response_type=code" + f"&client_id=cf-cli-app" + f"&redirect_uri={urllib.parse.quote(remote_redirect_uri)}" + f"&code_challenge={code_challenge}" + f"&code_challenge_method=sha256" + f"&state={state}" + ) - # Open browser + # Try to open browser click.echo("🌐 Opening browser to sign in to CodeFlash…") - webbrowser.open(auth_url) - - click.echo(f"\nπŸ“‹ If your browser didn't open, visit: {auth_url}\n") - - # Wait for callback - click.echo("⏳ Waiting for authentication...") - success = oauth.wait_for_callback(httpd, timeout=180) - - if not success: - httpd.shutdown() - click.echo("❌ Authentication timed out. Please try again.") - return None - if oauth.error: - httpd.shutdown() - click.echo("❌ Authentication failed:") - return None - - if not oauth.code or not oauth.state: - httpd.shutdown() - click.echo("❌ Unauthorized.") - return None - - if oauth.state != state: - httpd.shutdown() - click.echo("❌ Unauthorized.") - return None - - api_key = oauth.exchange_code_for_token(oauth.code, code_verifier, redirect_uri) - - # Wait for browser to poll status - time.sleep(3) + try: + # Start local server first + httpd = oauth.start_local_server(port) + browser_opened = webbrowser.open(local_auth_url) + except Exception: + browser_opened = False - # Shutdown server + if browser_opened: + return _handle_local_oauth_flow(oauth, httpd, state, code_verifier, local_redirect_uri, local_auth_url) httpd.shutdown() - - return api_key + return _handle_remote_oauth_flow(code_verifier, remote_redirect_uri, remote_auth_url) From b4e87a07862dab4ec8eb20d2bc6ead9d7acdbeb7 Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Tue, 18 Nov 2025 14:10:15 -0800 Subject: [PATCH 07/11] Update codeflash/code_utils/oauth_handler.py Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com> --- codeflash/code_utils/oauth_handler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index a5c559926..683f40b7f 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -634,9 +634,7 @@ def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: s } try: - resp = requests.post( - token_url, headers={"Content-Type": "application/json"}, data=json.dumps(data), timeout=10 - ) + resp = requests.post(token_url, json=data, timeout=10) resp.raise_for_status() token_json = resp.json() api_key = token_json.get("access_token") From ebaa5ce0ae6b4b432622d72f575f3d8e0813bd37 Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Wed, 19 Nov 2025 01:24:47 +0200 Subject: [PATCH 08/11] Change the logic related to the remote machine. --- codeflash/code_utils/oauth_handler.py | 171 ++++++++++++-------------- 1 file changed, 76 insertions(+), 95 deletions(-) diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index 683f40b7f..dc518da67 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -1,6 +1,7 @@ from __future__ import annotations import base64 +import contextlib import hashlib import http.server import json @@ -27,6 +28,8 @@ def __init__(self) -> None: self.theme: str | None = None self.is_complete = False self.token_error: str | None = None + self.manual_code: str | None = None + self.lock = threading.Lock() def create_callback_handler(self) -> type[http.server.BaseHTTPRequestHandler]: """Create HTTP handler for OAuth callback.""" @@ -57,10 +60,14 @@ def do_GET(self) -> None: return params = urllib.parse.parse_qs(parsed.query) - oauth_handler.code = params.get("code", [None])[0] - oauth_handler.state = params.get("state", [None])[0] - oauth_handler.error = params.get("error", [None])[0] - oauth_handler.theme = params.get("theme", ["light"])[0] + + with oauth_handler.lock: + if not oauth_handler.is_complete: + oauth_handler.code = params.get("code", [None])[0] + oauth_handler.state = params.get("state", [None])[0] + oauth_handler.error = params.get("error", [None])[0] + oauth_handler.theme = params.get("theme", ["light"])[0] + oauth_handler.is_complete = True # Send HTML response self.send_response(200) @@ -70,8 +77,6 @@ def do_GET(self) -> None: html_content = self._get_html_response() self.wfile.write(html_content.encode()) - oauth_handler.is_complete = True - def _get_html_response(self) -> str: """Return simple HTML response.""" theme = oauth_handler.theme or "light" @@ -613,15 +618,6 @@ def serve_forever_wrapper() -> None: return httpd - def wait_for_callback(self, httpd: http.server.HTTPServer, timeout: int = 120) -> bool: # noqa: ARG002 - """Wait for OAuth callback with timeout.""" - waited = 0 - while not self.is_complete and waited < timeout: - time.sleep(0.5) - waited += 0.5 - - return self.is_complete - def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: str) -> str | None: """Exchange authorization code for API token.""" token_url = f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth/oauth/token" @@ -648,73 +644,26 @@ def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: s try: error_data = e.response.json() error_msg = error_data.get("error_description", error_data.get("error", error_msg)) - except Exception: # noqa: S110 - pass - self.token_error = "Unauthorized" # noqa: S105 - click.echo(f"❌ {self.token_error}") + except Exception: + self.token_error = "Unauthorized" # noqa: S105 return None except Exception: self.token_error = "Unauthorized" # noqa: S105 - click.echo(f"❌ {self.token_error}") return None else: return api_key -def _handle_local_oauth_flow( - oauth: OAuthHandler, - httpd: http.server.HTTPServer, - state: str, - code_verifier: str, - local_redirect_uri: str, - local_auth_url: str, -) -> str | None: - """Handle local OAuth flow with browser and server.""" - click.echo(f"\nπŸ“‹ If your browser didn't open, visit: {local_auth_url}\n") - click.echo("⏳ Waiting for authentication...") - - success = oauth.wait_for_callback(httpd, timeout=180) - - if not success: - httpd.shutdown() - click.echo("❌ Authentication timed out. Please try again.") - return None - - if oauth.error or not oauth.code or not oauth.state or oauth.state != state: - httpd.shutdown() - click.echo("❌ Unauthorized.") - return None - - api_key = oauth.exchange_code_for_token(oauth.code, code_verifier, local_redirect_uri) - - # Wait for browser to poll status - time.sleep(3) - httpd.shutdown() - - return api_key - - -def _handle_remote_oauth_flow(code_verifier: str, remote_redirect_uri: str, remote_auth_url: str) -> str | None: - """Handle remote OAuth flow with manual code entry.""" - oauth = OAuthHandler() - click.echo("⚠️ Browser could not be opened automatically.") - click.echo("\nπŸ“‹ Please visit this URL to authenticate:") - click.echo(f"\n{remote_auth_url}\n") - - # Prompt user to paste the code - code = click.prompt("Paste the authorization code here", type=str).strip() - - if not code: - click.echo("❌ No code provided.") - return None - - # Exchange code for token - api_key = oauth.exchange_code_for_token(code, code_verifier, remote_redirect_uri) - - if api_key: - click.echo("βœ… Authentication successful!") - - return api_key +def _wait_for_manual_code_input(oauth: OAuthHandler) -> None: + """Thread function to wait for manual code input.""" + try: + code = input() + with oauth.lock: + if not oauth.is_complete: + oauth.manual_code = code.strip() + oauth.is_complete = True + except Exception: # noqa: S110 + pass def perform_oauth_signin() -> str | None: @@ -733,37 +682,69 @@ def perform_oauth_signin() -> str | None: local_redirect_uri = f"http://localhost:{port}/callback" remote_redirect_uri = f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth/callback" - local_auth_url = ( - f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth?" + base_url = f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth" + params = ( f"response_type=code" f"&client_id=cf-cli-app" - f"&redirect_uri={urllib.parse.quote(local_redirect_uri)}" f"&code_challenge={code_challenge}" f"&code_challenge_method=sha256" f"&state={state}" ) + local_auth_url = f"{base_url}?{params}&redirect_uri={urllib.parse.quote(local_redirect_uri)}" + remote_auth_url = f"{base_url}?{params}&redirect_uri={urllib.parse.quote(remote_redirect_uri)}" - remote_auth_url = ( - f"{get_cfapi_base_urls().cfwebapp_base_url}/codeflash/auth?" - f"response_type=code" - f"&client_id=cf-cli-app" - f"&redirect_uri={urllib.parse.quote(remote_redirect_uri)}" - f"&code_challenge={code_challenge}" - f"&code_challenge_method=sha256" - f"&state={state}" - ) + # Start local server + try: + httpd = oauth.start_local_server(port) + except Exception: + click.echo("❌ Failed to start local server.") + return None # Try to open browser click.echo("🌐 Opening browser to sign in to CodeFlash…") + with contextlib.suppress(Exception): + webbrowser.open(local_auth_url) - try: - # Start local server first - httpd = oauth.start_local_server(port) - browser_opened = webbrowser.open(local_auth_url) - except Exception: - browser_opened = False + # Show remote URL and start input thread + click.echo("\nπŸ“‹ If browser didn't open, visit this URL:") + click.echo(f"\n{remote_auth_url}\n") + click.echo("Paste code here if prompted > ", nl=False) + + # Start thread to wait for manual input + input_thread = threading.Thread(target=_wait_for_manual_code_input, args=(oauth,)) + input_thread.daemon = True + input_thread.start() + + waited = 0 + while not oauth.is_complete and waited < 180: + time.sleep(0.5) + waited += 0.5 + + if not oauth.is_complete: + httpd.shutdown() + click.echo("\n❌ Authentication timed out.") + return None - if browser_opened: - return _handle_local_oauth_flow(oauth, httpd, state, code_verifier, local_redirect_uri, local_auth_url) + # Check which method completed + api_key = None + + if oauth.manual_code: + # Manual code was entered + api_key = oauth.exchange_code_for_token(oauth.manual_code, code_verifier, remote_redirect_uri) + elif oauth.code: + # Browser callback received + if oauth.error or not oauth.state or oauth.state != state: + httpd.shutdown() + click.echo("\n❌ Unauthorized.") + return None + + api_key = oauth.exchange_code_for_token(oauth.code, code_verifier, local_redirect_uri) + + # Cleanup + time.sleep(3) httpd.shutdown() - return _handle_remote_oauth_flow(code_verifier, remote_redirect_uri, remote_auth_url) + + if not api_key: + click.echo("❌ Authentication failed.") + + return api_key From 3f49a502b50b85d8252d170f75fdef5dee227c83 Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Wed, 19 Nov 2025 02:32:28 +0200 Subject: [PATCH 09/11] check for gui browser --- codeflash/code_utils/oauth_handler.py | 33 ++++++++++++++++----------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index dc518da67..0acdb9d13 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -639,21 +639,27 @@ def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: s self.token_error = "No access token in response" # noqa: S105 return None - except requests.exceptions.HTTPError as e: - error_msg = f"HTTP {e.response.status_code}" - try: - error_data = e.response.json() - error_msg = error_data.get("error_description", error_data.get("error", error_msg)) - except Exception: - self.token_error = "Unauthorized" # noqa: S105 - return None - except Exception: + except requests.exceptions.HTTPError: self.token_error = "Unauthorized" # noqa: S105 return None else: return api_key +def _is_graphical_browser() -> bool: + text_browsers = {"lynx", "links", "w3m", "elinks", "links2"} + + try: + # Get the default browser + browser = webbrowser.get() + browser_name = getattr(browser, "name", "").lower() + + # Check if it's a known text browser + return all(text_browser not in browser_name for text_browser in text_browsers) + except Exception: + return True + + def _wait_for_manual_code_input(oauth: OAuthHandler) -> None: """Thread function to wait for manual code input.""" try: @@ -700,10 +706,11 @@ def perform_oauth_signin() -> str | None: click.echo("❌ Failed to start local server.") return None - # Try to open browser - click.echo("🌐 Opening browser to sign in to CodeFlash…") - with contextlib.suppress(Exception): - webbrowser.open(local_auth_url) + if _is_graphical_browser(): + # Try to open browser + click.echo("🌐 Opening browser to sign in to CodeFlash…") + with contextlib.suppress(Exception): + webbrowser.open(local_auth_url) # Show remote URL and start input thread click.echo("\nπŸ“‹ If browser didn't open, visit this URL:") From f58e685570f0e4a8bdafa87ba4d059b3b3d302bc Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Thu, 20 Nov 2025 21:41:28 +0200 Subject: [PATCH 10/11] Enhance the logic by adding a better function to detect whether we can open the browser. --- codeflash/code_utils/oauth_handler.py | 56 +++++++++++++++++++++------ 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index 0acdb9d13..8ab0f127b 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -5,8 +5,10 @@ import hashlib import http.server import json +import os import secrets import socket +import sys import threading import time import urllib.parse @@ -646,18 +648,50 @@ def exchange_code_for_token(self, code: str, code_verifier: str, redirect_uri: s return api_key -def _is_graphical_browser() -> bool: - text_browsers = {"lynx", "links", "w3m", "elinks", "links2"} - +def get_browser_name_fallback() -> str | None: try: - # Get the default browser - browser = webbrowser.get() - browser_name = getattr(browser, "name", "").lower() - - # Check if it's a known text browser - return all(text_browser not in browser_name for text_browser in text_browsers) + controller = webbrowser.get() + # controller.name exists for most browser controllers + return getattr(controller, "name", None) except Exception: - return True + return None + + +def should_attempt_browser_launch() -> bool: + # A list of browser names that indicate we should not attempt to open a + # web browser for the user. + browser_blocklist = ["www-browser", "lynx", "links", "w3m", "elinks", "links2"] + browser_env = os.environ.get("BROWSER") or get_browser_name_fallback() + if browser_env and browser_env in browser_blocklist: + return False + + # Common environment variables used in CI/CD or other non-interactive shells. + if os.environ.get("CI") or os.environ.get("DEBIAN_FRONTEND") == "noninteractive": + return False + + # The presence of SSH_CONNECTION indicates a remote session. + # We should not attempt to launch a browser unless a display is explicitly available + # (checked below for Linux). + is_ssh = bool(os.environ.get("SSH_CONNECTION")) + + # On Linux, the presence of a display server is a strong indicator of a GUI. + if sys.platform == "linux": + # These are environment variables that can indicate a running compositor on + # Linux. + display_variables = ["DISPLAY", "WAYLAND_DISPLAY", "MIR_SOCKET"] + has_display = any(os.environ.get(v) for v in display_variables) + if not has_display: + return False + + # If in an SSH session on a non-Linux OS (e.g., macOS), don't launch browser. + # The Linux case is handled above (it's allowed if DISPLAY is set). + if is_ssh and sys.platform != "linux": + return False + + # For non-Linux OSes, we generally assume a GUI is available + # unless other signals (like SSH) suggest otherwise. + # The `open` command's error handling will catch final edge cases. + return True def _wait_for_manual_code_input(oauth: OAuthHandler) -> None: @@ -706,7 +740,7 @@ def perform_oauth_signin() -> str | None: click.echo("❌ Failed to start local server.") return None - if _is_graphical_browser(): + if should_attempt_browser_launch(): # Try to open browser click.echo("🌐 Opening browser to sign in to CodeFlash…") with contextlib.suppress(Exception): From c4651751cdeadc107ae012ba6b7fee75f7df097c Mon Sep 17 00:00:00 2001 From: HeshamHM28 Date: Thu, 20 Nov 2025 23:01:25 +0200 Subject: [PATCH 11/11] add new line --- codeflash/code_utils/oauth_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/codeflash/code_utils/oauth_handler.py b/codeflash/code_utils/oauth_handler.py index 8ab0f127b..65e9f1341 100644 --- a/codeflash/code_utils/oauth_handler.py +++ b/codeflash/code_utils/oauth_handler.py @@ -786,6 +786,6 @@ def perform_oauth_signin() -> str | None: httpd.shutdown() if not api_key: - click.echo("❌ Authentication failed.") - + click.echo("\n❌ Authentication failed.") + click.echo("\n") return api_key