Skip to content

Commit f10c336

Browse files
init sea backend
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 00d9aeb commit f10c336

File tree

6 files changed

+516
-11
lines changed

6 files changed

+516
-11
lines changed

src/databricks/sql/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def __init__(
8181
catalog: Optional[str] = None,
8282
schema: Optional[str] = None,
8383
_use_arrow_native_complex_types: Optional[bool] = True,
84+
use_sea: Optional[bool] = False,
8485
**kwargs,
8586
) -> None:
8687
"""
@@ -106,6 +107,7 @@ def __init__(
106107
Execute the SQL command `SET -v` to get a full list of available commands.
107108
:param catalog: An optional initial catalog to use. Requires DBR version 9.0+
108109
:param schema: An optional initial schema to use. Requires DBR version 9.0+
110+
:param use_sea: `bool`, optional (default is False). Alternative to thrift backend.
109111
110112
Other Parameters:
111113
use_inline_params: `boolean` | str, optional (default is False)
@@ -240,6 +242,7 @@ def read(self) -> Optional[OAuthToken]:
240242
catalog,
241243
schema,
242244
_use_arrow_native_complex_types,
245+
use_sea=kwargs.get("use_sea", False),
243246
**kwargs,
244247
)
245248
self.session.open()

src/databricks/sql/sea/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
"""
2+
Statement Execution API (SEA) implementation for the Databricks SQL Python Connector.
3+
"""
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""
2+
Constants for the Statement Execution API.
3+
"""
4+
5+
# API Paths
6+
BASE_PATH = "/api/2.0/sql"
7+
SESSION_PATH = f"{BASE_PATH}/sessions"
8+
SESSION_PATH_WITH_ID = f"{SESSION_PATH}/{{session_id}}"
9+
STATEMENT_PATH = f"{BASE_PATH}/statements"
10+
STATEMENT_PATH_WITH_ID = f"{STATEMENT_PATH}/{{statement_id}}"
11+
CANCEL_STATEMENT_PATH = f"{STATEMENT_PATH}/{{statement_id}}/cancel"
12+
RESULT_CHUNK_PATH = f"{STATEMENT_PATH}/{{statement_id}}/result/chunks/{{chunk_index}}"
13+
14+
# Disposition types
15+
DISPOSITION_INLINE = "INLINE"
16+
DISPOSITION_EXTERNAL_LINKS = "EXTERNAL_LINKS"
17+
DISPOSITION_INLINE_OR_EXTERNAL_LINKS = "INLINE_OR_EXTERNAL_LINKS"
18+
19+
# Format types
20+
FORMAT_JSON_ARRAY = "JSON_ARRAY"
21+
FORMAT_ARROW_STREAM = "ARROW_STREAM"
22+
23+
# Compression types
24+
COMPRESSION_NONE = "NONE"
25+
COMPRESSION_SNAPPY = "SNAPPY"
26+
COMPRESSION_LZ4 = "LZ4"
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
"""
2+
HTTP client for the Statement Execution API.
3+
"""
4+
5+
import json
6+
import logging
7+
from typing import Dict, Optional, Any, Union, List, Tuple
8+
import requests
9+
from requests.adapters import HTTPAdapter
10+
from urllib3.util.retry import Retry
11+
12+
from databricks.sql.auth.auth import AuthProvider
13+
from databricks.sql.types import SSLOptions
14+
15+
logger = logging.getLogger(__name__)
16+
17+
class SEAHttpClient:
18+
"""
19+
Custom HTTP client for making requests to the Statement Execution API.
20+
"""
21+
22+
def __init__(
23+
self,
24+
host: str,
25+
port: int,
26+
http_path: str,
27+
http_headers: List[Tuple[str, str]],
28+
auth_provider: AuthProvider,
29+
ssl_options: SSLOptions,
30+
**kwargs
31+
):
32+
"""
33+
Initialize the SEA HTTP client.
34+
35+
Args:
36+
host: The hostname of the Databricks deployment
37+
port: The port to connect to
38+
http_path: The HTTP path for the endpoint
39+
http_headers: Additional HTTP headers to include in requests
40+
auth_provider: The authentication provider to use
41+
ssl_options: SSL configuration options
42+
**kwargs: Additional arguments
43+
"""
44+
self.host = host
45+
self.port = port
46+
self.http_path = http_path
47+
self.http_headers = dict(http_headers) if http_headers else {}
48+
self.auth_provider = auth_provider
49+
self.ssl_options = ssl_options
50+
51+
# Configure retry strategy
52+
retry_strategy = Retry(
53+
total=kwargs.get('_retry_total', 3),
54+
backoff_factor=kwargs.get('_retry_backoff_factor', 0.5),
55+
status_forcelist=[429, 500, 502, 503, 504],
56+
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"]
57+
)
58+
59+
# Create session with retry strategy
60+
self.session = requests.Session()
61+
self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
62+
63+
# Configure SSL verification
64+
if ssl_options.tls_verify:
65+
self.session.verify = True
66+
if ssl_options.tls_trusted_ca_file:
67+
self.session.verify = ssl_options.tls_trusted_ca_file
68+
else:
69+
self.session.verify = False
70+
71+
# Configure client certificates if provided
72+
if ssl_options.tls_client_cert_file:
73+
client_cert = ssl_options.tls_client_cert_file
74+
client_key = ssl_options.tls_client_cert_key_file
75+
76+
if client_key:
77+
self.session.cert = (client_cert, client_key)
78+
else:
79+
self.session.cert = client_cert
80+
81+
# Base URL for API requests
82+
self.base_url = f"https://{self.host}:{self.port}"
83+
84+
def _prepare_headers(self, additional_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
85+
"""
86+
Prepare headers for a request, including authentication.
87+
88+
Args:
89+
additional_headers: Additional headers to include in the request
90+
91+
Returns:
92+
Dict[str, str]: The complete set of headers for the request
93+
"""
94+
headers = {
95+
"Content-Type": "application/json",
96+
"Accept": "application/json",
97+
**self.http_headers
98+
}
99+
100+
if additional_headers:
101+
headers.update(additional_headers)
102+
103+
# Add authentication headers
104+
auth_headers = self.auth_provider.authentication_headers(self.host)
105+
headers.update(auth_headers)
106+
107+
return headers
108+
109+
def request(
110+
self,
111+
method: str,
112+
path: str,
113+
data: Optional[Dict[str, Any]] = None,
114+
params: Optional[Dict[str, Any]] = None,
115+
additional_headers: Optional[Dict[str, str]] = None
116+
) -> Dict[str, Any]:
117+
"""
118+
Make an HTTP request to the SEA API.
119+
120+
Args:
121+
method: The HTTP method to use (GET, POST, etc.)
122+
path: The API path to request
123+
data: The request body data (for POST/PUT)
124+
params: Query parameters to include in the URL
125+
additional_headers: Additional headers to include in the request
126+
127+
Returns:
128+
Dict[str, Any]: The parsed JSON response
129+
130+
Raises:
131+
requests.exceptions.RequestException: If the request fails
132+
"""
133+
url = f"{self.base_url}{path}"
134+
headers = self._prepare_headers(additional_headers)
135+
136+
logger.debug(f"Making {method} request to {url}")
137+
138+
if data:
139+
data_str = json.dumps(data)
140+
response = self.session.request(
141+
method=method,
142+
url=url,
143+
data=data_str,
144+
params=params,
145+
headers=headers
146+
)
147+
else:
148+
response = self.session.request(
149+
method=method,
150+
url=url,
151+
params=params,
152+
headers=headers
153+
)
154+
155+
response.raise_for_status()
156+
157+
if response.content:
158+
return response.json()
159+
return {}

0 commit comments

Comments
 (0)