Skip to content

Commit 42a0d08

Browse files
init sea backend
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent c91bc37 commit 42a0d08

File tree

5 files changed

+555
-10
lines changed

5 files changed

+555
-10
lines changed

examples/sea_connector_test.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#!/usr/bin/env python
2+
3+
import os
4+
import sys
5+
import logging
6+
from databricks.sql.client import Connection
7+
8+
# Configure logging
9+
logging.basicConfig(level=logging.DEBUG)
10+
logger = logging.getLogger(__name__)
11+
12+
def test_sea_session():
13+
"""
14+
Test opening and closing a SEA session using the connector.
15+
16+
This function connects to a Databricks SQL endpoint using the SEA backend,
17+
opens a session, and then closes it.
18+
19+
Required environment variables:
20+
- DATABRICKS_SERVER_HOSTNAME: Databricks server hostname
21+
- DATABRICKS_HTTP_PATH: HTTP path for the SQL endpoint
22+
- DATABRICKS_TOKEN: Personal access token for authentication
23+
"""
24+
# Get connection parameters from environment variables
25+
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
26+
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
27+
access_token = os.environ.get("DATABRICKS_TOKEN")
28+
catalog = os.environ.get("DATABRICKS_CATALOG")
29+
30+
if not all([server_hostname, http_path, access_token]):
31+
logger.error("Missing required environment variables.")
32+
logger.error("Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.")
33+
sys.exit(1)
34+
35+
# Print connection info (partially masked)
36+
logger.info(f"Connecting to {server_hostname}")
37+
logger.info(f"HTTP Path: {http_path}")
38+
if catalog:
39+
logger.info(f"Using catalog: {catalog}")
40+
41+
try:
42+
# Create connection with SEA backend
43+
logger.info("Creating connection with SEA backend...")
44+
connection = Connection(
45+
server_hostname=server_hostname,
46+
http_path=http_path,
47+
access_token=access_token,
48+
catalog=catalog,
49+
use_sea=True, # Enable SEA backend
50+
user_agent_entry="SEA-Test-Client" # Add custom user agent
51+
)
52+
53+
logger.info(f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}")
54+
55+
# Close the connection
56+
logger.info("Closing the SEA session...")
57+
connection.close()
58+
logger.info("Successfully closed SEA session")
59+
60+
except Exception as e:
61+
logger.error(f"Error testing SEA session: {str(e)}")
62+
import traceback
63+
logger.error(traceback.format_exc())
64+
sys.exit(1)
65+
66+
logger.info("SEA session test completed successfully")
67+
68+
if __name__ == "__main__":
69+
test_sea_session()

src/databricks/sql/sea/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""
2+
Statement Execution API (SEA) module for the Databricks SQL Connector.
3+
4+
This module provides classes and utilities for interacting with the
5+
Databricks SQL service using the Statement Execution API.
6+
"""
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import json
2+
import logging
3+
import requests
4+
from typing import Dict, Any, Optional, Union, List
5+
from urllib.parse import urljoin
6+
7+
from databricks.sql.auth.authenticators import AuthProvider
8+
from databricks.sql.types import SSLOptions
9+
10+
logger = logging.getLogger(__name__)
11+
12+
class SEAHttpClient:
13+
"""
14+
HTTP client for Statement Execution API (SEA).
15+
16+
This client handles the HTTP communication with the SEA endpoints,
17+
including authentication, request formatting, and response parsing.
18+
"""
19+
20+
def __init__(
21+
self,
22+
server_hostname: str,
23+
port: int,
24+
http_path: str,
25+
http_headers: List[tuple],
26+
auth_provider: AuthProvider,
27+
ssl_options: SSLOptions,
28+
**kwargs
29+
):
30+
"""
31+
Initialize the SEA HTTP client.
32+
33+
Args:
34+
server_hostname: Hostname of the Databricks server
35+
port: Port number for the connection
36+
http_path: HTTP path for the connection
37+
http_headers: List of HTTP headers to include in requests
38+
auth_provider: Authentication provider
39+
ssl_options: SSL configuration options
40+
**kwargs: Additional keyword arguments
41+
"""
42+
self.server_hostname = server_hostname
43+
self.port = port
44+
self.http_path = http_path
45+
self.auth_provider = auth_provider
46+
self.ssl_options = ssl_options
47+
48+
# Base URL for API requests
49+
self.base_url = f"https://{server_hostname}:{port}"
50+
51+
# Convert headers list to dictionary
52+
self.headers = dict(http_headers)
53+
self.headers.update({"Content-Type": "application/json"})
54+
55+
# Session retry configuration
56+
self.max_retries = kwargs.get("_retry_stop_after_attempts_count", 30)
57+
58+
# Create a session for connection pooling
59+
self.session = requests.Session()
60+
61+
# Configure SSL verification
62+
if ssl_options.tls_verify:
63+
self.session.verify = ssl_options.tls_trusted_ca_file or True
64+
else:
65+
self.session.verify = False
66+
67+
# Configure client certificates if provided
68+
if ssl_options.tls_client_cert_file:
69+
client_cert = ssl_options.tls_client_cert_file
70+
client_key = ssl_options.tls_client_cert_key_file
71+
client_key_password = ssl_options.tls_client_cert_key_password
72+
73+
if client_key:
74+
self.session.cert = (client_cert, client_key)
75+
else:
76+
self.session.cert = client_cert
77+
78+
if client_key_password:
79+
# Note: requests doesn't directly support key passwords
80+
# This would require more complex handling with libraries like pyOpenSSL
81+
logger.warning("Client key password provided but not supported by requests library")
82+
83+
def _get_auth_headers(self) -> Dict[str, str]:
84+
"""Get authentication headers from the auth provider."""
85+
headers = {}
86+
self.auth_provider.add_headers(headers)
87+
return headers
88+
89+
def _make_request(self, method: str, path: str, data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
90+
"""
91+
Make an HTTP request to the SEA endpoint.
92+
93+
Args:
94+
method: HTTP method (GET, POST, DELETE)
95+
path: API endpoint path
96+
data: Request payload data
97+
98+
Returns:
99+
Dict[str, Any]: Response data parsed from JSON
100+
101+
Raises:
102+
RequestError: If the request fails
103+
"""
104+
url = urljoin(self.base_url, path)
105+
headers = {**self.headers, **self._get_auth_headers()}
106+
107+
# Log request details (without sensitive information)
108+
logger.debug(f"Making {method} request to {url}")
109+
logger.debug(f"Headers: {[k for k in headers.keys()]}")
110+
if data:
111+
# Don't log sensitive data like access tokens
112+
safe_data = {k: v for k, v in data.items() if k not in ["access_token", "token"]}
113+
logger.debug(f"Request data: {safe_data}")
114+
115+
try:
116+
if method.upper() == "GET":
117+
response = self.session.get(url, headers=headers, params=data)
118+
elif method.upper() == "POST":
119+
response = self.session.post(url, headers=headers, json=data)
120+
elif method.upper() == "DELETE":
121+
# For DELETE requests, use params for data (query parameters)
122+
response = self.session.delete(url, headers=headers, params=data)
123+
else:
124+
raise ValueError(f"Unsupported HTTP method: {method}")
125+
126+
# Check for HTTP errors
127+
response.raise_for_status()
128+
129+
# Log response details
130+
logger.debug(f"Response status: {response.status_code}")
131+
logger.debug(f"Response headers: {dict(response.headers)}")
132+
133+
# Parse JSON response
134+
if response.content:
135+
result = response.json()
136+
# Log response content (but limit it for large responses)
137+
content_str = json.dumps(result)
138+
if len(content_str) > 1000:
139+
logger.debug(f"Response content (truncated): {content_str[:1000]}...")
140+
else:
141+
logger.debug(f"Response content: {content_str}")
142+
return result
143+
return {}
144+
145+
except requests.exceptions.RequestException as e:
146+
# Handle request errors
147+
error_message = f"SEA HTTP request failed: {str(e)}"
148+
logger.error(error_message)
149+
150+
# Extract error details from response if available
151+
if hasattr(e, "response") and e.response is not None:
152+
try:
153+
error_details = e.response.json()
154+
error_message = f"{error_message}: {error_details.get('message', '')}"
155+
logger.error(f"Response status: {e.response.status_code}, Error details: {error_details}")
156+
except (ValueError, KeyError):
157+
# If we can't parse the JSON, just log the raw content
158+
logger.error(f"Response status: {e.response.status_code}, Raw content: {e.response.content}")
159+
pass
160+
161+
# Re-raise as a RequestError
162+
from databricks.sql.exc import RequestError
163+
raise RequestError(error_message, e)

0 commit comments

Comments
 (0)