Skip to content

Commit b3fb5fa

Browse files
Revert "simplify HTTP client using requests.session"
This reverts commit 25596e7.
1 parent 36daee6 commit b3fb5fa

File tree

1 file changed

+130
-52
lines changed

1 file changed

+130
-52
lines changed

src/databricks/sql/backend/sea/utils/http_client.py

Lines changed: 130 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
import json
12
import logging
3+
import ssl
4+
import urllib.parse
5+
import urllib.request
26
from typing import Dict, Any, Optional, List, Tuple, Union
7+
from urllib.parse import urljoin
38

4-
import requests
5-
from requests.adapters import HTTPAdapter
9+
from urllib3 import HTTPConnectionPool, HTTPSConnectionPool, ProxyManager
10+
from urllib3.util import make_headers
611
from urllib3.exceptions import MaxRetryError
712

813
from databricks.sql.auth.authenticators import AuthProvider
@@ -22,12 +27,16 @@ class SeaHttpClient:
2227
"""
2328
HTTP client for Statement Execution API (SEA).
2429
25-
This client uses requests.Session for HTTP communication with retry policies
26-
and connection pooling.
30+
This client uses urllib3 for robust HTTP communication with retry policies
31+
and connection pooling, similar to the Thrift HTTP client but simplified.
2732
"""
2833

2934
retry_policy: Union[DatabricksRetryPolicy, int]
30-
session: requests.Session
35+
_pool: Optional[Union[HTTPConnectionPool, HTTPSConnectionPool]]
36+
proxy_uri: Optional[str]
37+
realhost: Optional[str]
38+
realport: Optional[int]
39+
proxy_auth: Optional[Dict[str, str]]
3140

3241
def __init__(
3342
self,
@@ -55,13 +64,22 @@ def __init__(
5564
self.server_hostname = server_hostname
5665
self.port = port or 443
5766
self.http_path = http_path
58-
self.http_headers = http_headers
5967
self.auth_provider = auth_provider
6068
self.ssl_options = ssl_options
6169

6270
# Build base URL
6371
self.base_url = f"https://{server_hostname}:{self.port}"
6472

73+
# Parse URL for proxy handling
74+
parsed_url = urllib.parse.urlparse(self.base_url)
75+
self.scheme = parsed_url.scheme
76+
self.host = parsed_url.hostname
77+
self.port = parsed_url.port or (443 if self.scheme == "https" else 80)
78+
79+
# Setup headers
80+
self.headers: Dict[str, str] = dict(http_headers)
81+
self.headers.update({"Content-Type": "application/json"})
82+
6583
# Extract retry policy settings
6684
self._retry_delay_min = kwargs.get("_retry_delay_min", 1.0)
6785
self._retry_delay_max = kwargs.get("_retry_delay_max", 60.0)
@@ -103,44 +121,83 @@ def __init__(
103121
# Legacy behavior - no automatic retries
104122
self.retry_policy = 0
105123

106-
# Create session and configure it
107-
self.session = requests.Session()
108-
self._configure_session()
109-
110-
def _configure_session(self):
111-
"""Configure the requests session with headers, SSL, and retry policy."""
112-
# Setup headers
113-
self.session.headers.update(dict(self.http_headers))
114-
self.session.headers.update({"Content-Type": "application/json"})
115-
116-
# Configure SSL
117-
if not self.ssl_options.tls_verify:
118-
self.session.verify = False
119-
elif self.ssl_options.tls_trusted_ca_file:
120-
self.session.verify = self.ssl_options.tls_trusted_ca_file
121-
122-
if self.ssl_options.tls_client_cert_file:
123-
if self.ssl_options.tls_client_cert_key_file:
124-
self.session.cert = (
125-
self.ssl_options.tls_client_cert_file,
126-
self.ssl_options.tls_client_cert_key_file,
127-
)
128-
else:
129-
self.session.cert = self.ssl_options.tls_client_cert_file
130-
131-
# Configure retry adapter
132-
adapter = HTTPAdapter(
133-
pool_connections=self.max_connections,
134-
pool_maxsize=self.max_connections,
135-
max_retries=self.retry_policy,
136-
)
124+
# Handle proxy settings
125+
try:
126+
proxy = urllib.request.getproxies().get(self.scheme)
127+
except (KeyError, AttributeError):
128+
proxy = None
129+
else:
130+
if self.host and urllib.request.proxy_bypass(self.host):
131+
proxy = None
132+
133+
if proxy:
134+
parsed_proxy = urllib.parse.urlparse(proxy)
135+
self.realhost = self.host
136+
self.realport = self.port
137+
self.proxy_uri = proxy
138+
self.host = parsed_proxy.hostname
139+
self.port = parsed_proxy.port or (443 if self.scheme == "https" else 80)
140+
self.proxy_auth = self._basic_proxy_auth_headers(parsed_proxy)
141+
else:
142+
self.realhost = None
143+
self.realport = None
144+
self.proxy_auth = None
145+
self.proxy_uri = None
146+
147+
# Initialize connection pool
148+
self._pool = None
149+
self._open()
150+
151+
def _basic_proxy_auth_headers(self, proxy_parsed) -> Optional[Dict[str, str]]:
152+
"""Create basic auth headers for proxy if credentials are provided."""
153+
if proxy_parsed is None or not proxy_parsed.username:
154+
return None
155+
ap = f"{urllib.parse.unquote(proxy_parsed.username)}:{urllib.parse.unquote(proxy_parsed.password)}"
156+
return make_headers(proxy_basic_auth=ap)
157+
158+
def _open(self):
159+
"""Initialize the connection pool."""
160+
pool_kwargs = {"maxsize": self.max_connections}
161+
162+
if self.scheme == "http":
163+
pool_class = HTTPConnectionPool
164+
else: # https
165+
pool_class = HTTPSConnectionPool
166+
pool_kwargs.update(
167+
{
168+
"cert_reqs": ssl.CERT_REQUIRED
169+
if self.ssl_options.tls_verify
170+
else ssl.CERT_NONE,
171+
"ca_certs": self.ssl_options.tls_trusted_ca_file,
172+
"cert_file": self.ssl_options.tls_client_cert_file,
173+
"key_file": self.ssl_options.tls_client_cert_key_file,
174+
"key_password": self.ssl_options.tls_client_cert_key_password,
175+
}
176+
)
137177

138-
self.session.mount("https://", adapter)
139-
self.session.mount("http://", adapter)
178+
if self.using_proxy():
179+
proxy_manager = ProxyManager(
180+
self.proxy_uri,
181+
num_pools=1,
182+
proxy_headers=self.proxy_auth,
183+
)
184+
self._pool = proxy_manager.connection_from_host(
185+
host=self.realhost,
186+
port=self.realport,
187+
scheme=self.scheme,
188+
pool_kwargs=pool_kwargs,
189+
)
190+
else:
191+
self._pool = pool_class(self.host, self.port, **pool_kwargs)
140192

141193
def close(self):
142-
"""Close the session."""
143-
self.session.close()
194+
"""Close the connection pool."""
195+
if self._pool:
196+
self._pool.clear()
197+
198+
def using_proxy(self) -> bool:
199+
"""Check if proxy is being used (for compatibility with Thrift client)."""
200+
return self.realhost is not None
144201

145202
def set_retry_command_type(self, command_type: CommandType):
146203
"""Set the command type for retry policy decision making."""
@@ -179,36 +236,46 @@ def _make_request(
179236
RequestError: If the request fails after retries
180237
"""
181238

239+
# Prepare headers
240+
headers = {**self.headers, **self._get_auth_headers()}
241+
242+
# Prepare request body
243+
body = json.dumps(data).encode("utf-8") if data else b""
244+
if body:
245+
headers["Content-Length"] = str(len(body))
246+
182247
# Set command type for retry policy
183248
command_type = self._get_command_type_from_path(path, method)
184249
self.set_retry_command_type(command_type)
185250
self.start_retry_timer()
186251

187-
# Prepare headers
188-
headers = self._get_auth_headers()
189-
190252
logger.debug(f"Making {method} request to {path}")
191253

254+
# When v3 retries are enabled, urllib3 handles retries internally via DatabricksRetryPolicy
255+
# When disabled, we let exceptions bubble up (similar to Thrift backend approach)
256+
if self._pool is None:
257+
raise RequestError("Connection pool not initialized", None)
258+
192259
try:
193-
response = self.session.request(
260+
response = self._pool.request(
194261
method=method.upper(),
195-
url=f"{self.base_url}{path}",
196-
json=data, # requests handles JSON encoding automatically
262+
url=path,
263+
body=body,
197264
headers=headers,
265+
preload_content=False,
266+
retries=self.retry_policy,
198267
)
199-
200-
response.raise_for_status() # This will raise an HTTPError for non-2xx responses
201-
return response.json()
202-
203268
except MaxRetryDurationError as e:
204269
# MaxRetryDurationError is raised directly by DatabricksRetryPolicy
205-
# when duration limits are exceeded
270+
# when duration limits are exceeded (like in test_retry_exponential_backoff)
206271
error_message = f"Request failed due to retry duration limit: {e}"
272+
# Construct RequestError with message, context, and specific error
207273
raise RequestError(error_message, None, e)
208274
except (SessionAlreadyClosedError, CursorAlreadyClosedError) as e:
209275
# These exceptions are raised by DatabricksRetryPolicy when detecting
210276
# "already closed" scenarios (404 responses with retry history)
211277
error_message = f"Request failed: {e}"
278+
# Construct RequestError with proper 3-argument format (message, context, error)
212279
raise RequestError(error_message, None, e)
213280
except MaxRetryError as e:
214281
# urllib3 MaxRetryError should bubble up for redirect tests to catch
@@ -217,8 +284,19 @@ def _make_request(
217284
except Exception as e:
218285
logger.error(f"SEA HTTP request failed with exception: {e}")
219286
error_message = f"Error during request to server. {e}"
287+
# Construct RequestError with proper 3-argument format (message, context, error)
220288
raise RequestError(error_message, None, e)
221289

290+
logger.debug(f"Response status: {response.status}")
291+
292+
# Handle successful responses
293+
if 200 <= response.status < 300:
294+
return response.json()
295+
296+
error_message = f"SEA HTTP request failed with status {response.status}"
297+
298+
raise RequestError(error_message, None)
299+
222300
def _get_command_type_from_path(self, path: str, method: str) -> CommandType:
223301
"""
224302
Determine the command type based on the API path and method.

0 commit comments

Comments
 (0)