Skip to content

Commit 611d79f

Browse files
result compression
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent b6d7c0c commit 611d79f

File tree

3 files changed

+47
-17
lines changed

3 files changed

+47
-17
lines changed

examples/experimental/sea_connector_test.py

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
logger = logging.getLogger(__name__)
88

99

10-
def test_sea_query_execution():
10+
def test_sea_query_execution_with_compression():
1111
"""
12-
Test executing a query using the SEA backend.
12+
Test executing a query using the SEA backend with result compression.
1313
1414
This function connects to a Databricks SQL endpoint using the SEA backend,
15-
executes a simple query, and verifies that execution completes successfully.
15+
executes a simple query with result compression enabled and disabled,
16+
and verifies that execution completes successfully.
1617
"""
1718
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
1819
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
@@ -27,7 +28,8 @@ def test_sea_query_execution():
2728
sys.exit(1)
2829

2930
try:
30-
# Create connection with SEA backend
31+
# Test with compression enabled
32+
logger.info("Creating connection with LZ4 compression enabled")
3133
connection = Connection(
3234
server_hostname=server_hostname,
3335
http_path=http_path,
@@ -36,27 +38,50 @@ def test_sea_query_execution():
3638
schema="default",
3739
use_sea=True,
3840
user_agent_entry="SEA-Test-Client",
39-
use_cloud_fetch=False,
41+
use_cloud_fetch=True, # Enable cloud fetch to use compression
42+
enable_query_result_lz4_compression=True, # Enable LZ4 compression
4043
)
4144

4245
logger.info(
4346
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
4447
)
4548
logger.info(f"backend type: {type(connection.session.backend)}")
4649

47-
# Create a cursor and execute a simple query
50+
# Execute a simple query with compression enabled
4851
cursor = connection.cursor(arraysize=0, buffer_size_bytes=0)
49-
50-
logger.info("Executing query: SELECT 1 as test_value")
52+
logger.info("Executing query with LZ4 compression: SELECT 1 as test_value")
5153
cursor.execute("SELECT 1 as test_value")
54+
logger.info("Query with compression executed successfully")
55+
cursor.close()
56+
connection.close()
57+
logger.info("Successfully closed SEA session with compression enabled")
5258

53-
# We don't fetch results yet since we haven't implemented the fetch functionality
54-
logger.info("Query executed successfully")
59+
# Test with compression disabled
60+
logger.info("Creating connection with LZ4 compression disabled")
61+
connection = Connection(
62+
server_hostname=server_hostname,
63+
http_path=http_path,
64+
access_token=access_token,
65+
catalog=catalog,
66+
schema="default",
67+
use_sea=True,
68+
user_agent_entry="SEA-Test-Client",
69+
use_cloud_fetch=False, # Enable cloud fetch
70+
enable_query_result_lz4_compression=False, # Disable LZ4 compression
71+
)
5572

56-
# Close cursor and connection
73+
logger.info(
74+
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
75+
)
76+
77+
# Execute a simple query with compression disabled
78+
cursor = connection.cursor(arraysize=0, buffer_size_bytes=0)
79+
logger.info("Executing query without compression: SELECT 1 as test_value")
80+
cursor.execute("SELECT 1 as test_value")
81+
logger.info("Query without compression executed successfully")
5782
cursor.close()
5883
connection.close()
59-
logger.info("Successfully closed SEA session")
84+
logger.info("Successfully closed SEA session with compression disabled")
6085

6186
except Exception as e:
6287
logger.error(f"Error during SEA query execution test: {str(e)}")
@@ -65,7 +90,7 @@ def test_sea_query_execution():
6590
logger.error(traceback.format_exc())
6691
sys.exit(1)
6792

68-
logger.info("SEA query execution test completed successfully")
93+
logger.info("SEA query execution test with compression completed successfully")
6994

7095

7196
def test_sea_session():
@@ -133,5 +158,5 @@ def test_sea_session():
133158
# Test session management
134159
test_sea_session()
135160

136-
# Test query execution
137-
test_sea_query_execution()
161+
# Test query execution with compression
162+
test_sea_query_execution_with_compression()

src/databricks/sql/backend/models/requests.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from typing import Dict, List, Any, Optional, Union
88
from dataclasses import dataclass, field
99

10-
1110
@dataclass
1211
class StatementParameter:
1312
"""Parameter for a SQL statement."""
@@ -21,7 +20,6 @@ class StatementParameter:
2120
class ExecuteStatementRequest:
2221
"""Request to execute a SQL statement."""
2322

24-
# TODO: result_compression key
2523
warehouse_id: str
2624
statement: str
2725
session_id: str
@@ -34,6 +32,7 @@ class ExecuteStatementRequest:
3432
parameters: Optional[List[StatementParameter]] = None
3533
catalog: Optional[str] = None
3634
schema: Optional[str] = None
35+
result_compression: Optional[str] = None
3736

3837
def to_dict(self) -> Dict[str, Any]:
3938
"""Convert the request to a dictionary for JSON serialization."""
@@ -58,6 +57,9 @@ def to_dict(self) -> Dict[str, Any]:
5857

5958
if self.schema:
6059
result["schema"] = self.schema
60+
61+
if self.result_compression:
62+
result["result_compression"] = self.result_compression
6163

6264
if self.parameters:
6365
result["parameters"] = [

src/databricks/sql/backend/sea_backend.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,8 @@ def execute_command(
277277

278278
format = "ARROW_STREAM" if use_cloud_fetch else "JSON_ARRAY"
279279
disposition = "EXTERNAL_LINKS" if use_cloud_fetch else "INLINE"
280+
result_compression = "LZ4_FRAME" if lz4_compression else "NONE"
281+
280282
request = ExecuteStatementRequest(
281283
warehouse_id=self.warehouse_id,
282284
session_id=sea_session_id,
@@ -288,6 +290,7 @@ def execute_command(
288290
row_limit=max_rows if max_rows > 0 else None,
289291
byte_limit=max_bytes if max_bytes > 0 else None,
290292
parameters=sea_parameters if sea_parameters else None,
293+
result_compression=result_compression,
291294
)
292295

293296
response_data = self.http_client._make_request(

0 commit comments

Comments
 (0)