Skip to content

Commit cba3da7

Browse files
more fixes
Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 000d3a3 commit cba3da7

File tree

9 files changed

+68
-65
lines changed

9 files changed

+68
-65
lines changed

src/databricks/sql/auth/retry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool:
294294
else:
295295
proposed_wait = self.get_backoff_time()
296296

297-
proposed_wait = min(proposed_wait, self.delay_max)
297+
proposed_wait = max(proposed_wait, self.delay_max)
298298
self.check_proposed_wait(proposed_wait)
299299
logger.debug(f"Retrying after {proposed_wait} seconds")
300300
time.sleep(proposed_wait)

src/databricks/sql/backend/thrift_backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def __init__(
105105
http_headers,
106106
auth_provider: AuthProvider,
107107
ssl_options: SSLOptions,
108-
http_client=None,
108+
http_client,
109109
**kwargs,
110110
):
111111
# Internal arguments in **kwargs:

src/databricks/sql/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ def read(self) -> Optional[OAuthToken]:
277277
host_url=server_hostname,
278278
http_path=http_path,
279279
port=kwargs.get("_port", 443),
280-
http_client=http_client,
280+
client_context=client_context,
281281
user_agent=self.session.useragent_header
282282
if hasattr(self, "session")
283283
else None,
@@ -299,7 +299,7 @@ def read(self) -> Optional[OAuthToken]:
299299
auth_provider=self.session.auth_provider,
300300
host_url=self.session.host,
301301
batch_size=self.telemetry_batch_size,
302-
http_client=self.session.http_client,
302+
client_context=client_context,
303303
)
304304

305305
self._telemetry_client = TelemetryClientFactory.get_telemetry_client(

src/databricks/sql/cloudfetch/downloader.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@
1313

1414
logger = logging.getLogger(__name__)
1515

16-
# TODO: Ideally, we should use a common retry policy (DatabricksRetryPolicy) for all the requests across the library.
17-
# But DatabricksRetryPolicy should be updated first - currently it can work only with Thrift requests
18-
1916

2017
@dataclass
2118
class DownloadedFile:

src/databricks/sql/common/unified_http_client.py

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -193,38 +193,6 @@ def request(
193193
response._body = response.data
194194
return response
195195

196-
def upload_file(
197-
self, url: str, file_path: str, headers: Optional[Dict[str, str]] = None
198-
) -> urllib3.HTTPResponse:
199-
"""
200-
Upload a file using PUT method.
201-
202-
Args:
203-
url: URL to upload to
204-
file_path: Path to the file to upload
205-
headers: Optional headers
206-
207-
Returns:
208-
urllib3.HTTPResponse: The response from the server
209-
"""
210-
with open(file_path, "rb") as file_obj:
211-
return self.request("PUT", url, body=file_obj.read(), headers=headers)
212-
213-
def download_file(
214-
self, url: str, file_path: str, headers: Optional[Dict[str, str]] = None
215-
) -> None:
216-
"""
217-
Download a file using GET method.
218-
219-
Args:
220-
url: URL to download from
221-
file_path: Path where to save the downloaded file
222-
headers: Optional headers
223-
"""
224-
response = self.request("GET", url, headers=headers)
225-
with open(file_path, "wb") as file_obj:
226-
file_obj.write(response.data)
227-
228196
def close(self):
229197
"""Close the underlying connection pools."""
230198
if self._pool_manager:
@@ -236,14 +204,3 @@ def __enter__(self):
236204

237205
def __exit__(self, exc_type, exc_val, exc_tb):
238206
self.close()
239-
240-
241-
# Compatibility class to maintain requests-like interface for OAuth
242-
class IgnoreNetrcAuth:
243-
"""
244-
Compatibility class for OAuth code that expects requests.auth.AuthBase interface.
245-
This is a no-op auth handler since OAuth handles auth differently.
246-
"""
247-
248-
def __call__(self, request):
249-
return request

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import threading
22
import time
33
import logging
4+
import json
45
from concurrent.futures import ThreadPoolExecutor
5-
from typing import Dict, Optional, TYPE_CHECKING
6+
from concurrent.futures import Future
7+
from datetime import datetime, timezone
8+
from typing import List, Dict, Any, Optional, TYPE_CHECKING
69
from databricks.sql.telemetry.models.event import (
710
TelemetryEvent,
811
DriverSystemConfiguration,
@@ -36,8 +39,7 @@
3639
import locale
3740
from databricks.sql.telemetry.utils import BaseTelemetryClient
3841
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
39-
40-
from src.databricks.sql.common.unified_http_client import UnifiedHttpClient
42+
from databricks.sql.common.unified_http_client import UnifiedHttpClient
4143

4244
if TYPE_CHECKING:
4345
from databricks.sql.client import Connection
@@ -151,6 +153,44 @@ def _flush(self):
151153
pass
152154

153155

156+
class TelemetryHttpClientSingleton:
157+
"""
158+
Singleton HTTP client for telemetry operations.
159+
160+
This ensures that telemetry has its own dedicated HTTP client that
161+
is independent of individual connection lifecycles.
162+
"""
163+
164+
_instance = None
165+
_lock = threading.RLock()
166+
167+
def __new__(cls):
168+
if cls._instance is None:
169+
with cls._lock:
170+
if cls._instance is None:
171+
cls._instance = super().__new__(cls)
172+
cls._instance._http_client = None
173+
cls._instance._initialized = False
174+
return cls._instance
175+
176+
def get_http_client(self, client_context):
177+
"""Get or create the singleton HTTP client."""
178+
if not self._initialized and client_context:
179+
with self._lock:
180+
if not self._initialized:
181+
self._http_client = UnifiedHttpClient(client_context)
182+
self._initialized = True
183+
return self._http_client
184+
185+
def close(self):
186+
"""Close the singleton HTTP client."""
187+
with self._lock:
188+
if self._http_client:
189+
self._http_client.close()
190+
self._http_client = None
191+
self._initialized = False
192+
193+
154194
class TelemetryClient(BaseTelemetryClient):
155195
"""
156196
Telemetry client class that handles sending telemetry events in batches to the server.
@@ -169,7 +209,7 @@ def __init__(
169209
host_url,
170210
executor,
171211
batch_size,
172-
http_client,
212+
client_context,
173213
):
174214
logger.debug("Initializing TelemetryClient for connection: %s", session_id_hex)
175215
self._telemetry_enabled = telemetry_enabled
@@ -182,7 +222,10 @@ def __init__(
182222
self._driver_connection_params = None
183223
self._host_url = host_url
184224
self._executor = executor
185-
self._http_client = http_client
225+
226+
# Use singleton HTTP client for telemetry instead of connection-specific client
227+
self._http_client_singleton = TelemetryHttpClientSingleton()
228+
self._http_client = self._http_client_singleton.get_http_client(client_context)
186229

187230
def _export_event(self, event):
188231
"""Add an event to the batch queue and flush if batch is full"""
@@ -246,17 +289,24 @@ def _send_telemetry(self, events):
246289
except Exception as e:
247290
logger.debug("Failed to submit telemetry request: %s", e)
248291

249-
def _send_with_unified_client(self, url, data, headers):
292+
def _send_with_unified_client(self, url, data, headers, timeout=900):
250293
"""Helper method to send telemetry using the unified HTTP client."""
251294
try:
252295
response = self._http_client.request(
253-
"POST", url, body=data, headers=headers, timeout=900
296+
"POST", url, body=data, headers=headers, timeout=timeout
254297
)
255298
# Convert urllib3 response to requests-like response for compatibility
256299
response.status_code = response.status
300+
response.ok = 200 <= response.status < 300
257301
response.json = (
258302
lambda: json.loads(response.data.decode()) if response.data else {}
259303
)
304+
# Add raise_for_status method
305+
def raise_for_status():
306+
if not response.ok:
307+
raise Exception(f"HTTP {response.status_code}")
308+
309+
response.raise_for_status = raise_for_status
260310
return response
261311
except Exception as e:
262312
logger.error("Failed to send telemetry with unified client: %s", e)
@@ -452,7 +502,7 @@ def initialize_telemetry_client(
452502
auth_provider,
453503
host_url,
454504
batch_size,
455-
http_client,
505+
client_context,
456506
):
457507
"""Initialize a telemetry client for a specific connection if telemetry is enabled"""
458508
try:
@@ -475,7 +525,7 @@ def initialize_telemetry_client(
475525
host_url=host_url,
476526
executor=TelemetryClientFactory._executor,
477527
batch_size=batch_size,
478-
http_client=http_client,
528+
client_context=client_context,
479529
)
480530
else:
481531
TelemetryClientFactory._clients[
@@ -528,7 +578,7 @@ def connection_failure_log(
528578
host_url: str,
529579
http_path: str,
530580
port: int,
531-
http_client: UnifiedHttpClient,
581+
client_context,
532582
user_agent: Optional[str] = None,
533583
):
534584
"""Send error telemetry when connection creation fails, without requiring a session"""
@@ -541,7 +591,7 @@ def connection_failure_log(
541591
auth_provider=None,
542592
host_url=host_url,
543593
batch_size=TelemetryClientFactory.DEFAULT_BATCH_SIZE,
544-
http_client=http_client,
594+
client_context=client_context,
545595
)
546596

547597
telemetry_client = TelemetryClientFactory.get_telemetry_client(

tests/e2e/common/retry_test_mixins.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ def test_retry_exponential_backoff(self, mock_send_telemetry, extra_params):
247247
"""
248248
retry_policy = self._retry_policy.copy()
249249
retry_policy["_retry_delay_min"] = 1
250-
retry_policy["_retry_delay_max"] = 10
251250

252251
time_start = time.time()
253252
with mocked_server_response(
@@ -283,11 +282,9 @@ def test_retry_max_duration_not_exceeded(self, extra_params):
283282
WHEN the server sends a Retry-After header of 60 seconds
284283
THEN the connector raises a MaxRetryDurationError
285284
"""
286-
retry_policy = self._retry_policy.copy()
287-
retry_policy["_retry_delay_max"] = 60
288285
with mocked_server_response(status=429, headers={"Retry-After": "60"}):
289286
with pytest.raises(RequestError) as cm:
290-
extra_params = {**extra_params, **retry_policy}
287+
extra_params = {**extra_params, **self._retry_policy}
291288
with self.connection(extra_params=extra_params) as conn:
292289
pass
293290
assert isinstance(cm.value.args[1], MaxRetryDurationError)

tests/e2e/common/staging_ingestion_tests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user):
7272
extra_params = {
7373
"staging_allowed_local_path": "/",
7474
"_retry_stop_after_attempts_count": 1,
75+
"_retry_delay_max": 10,
7576
}
7677
with self.connection(extra_params=extra_params) as conn:
7778
cursor = conn.cursor()

tests/e2e/common/uc_volume_tests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def test_uc_volume_life_cycle(self, catalog, schema):
7272
extra_params = {
7373
"staging_allowed_local_path": "/",
7474
"_retry_stop_after_attempts_count": 1,
75+
"_retry_delay_max": 10,
7576
}
7677
with self.connection(extra_params=extra_params) as conn:
7778
cursor = conn.cursor()

0 commit comments

Comments
 (0)