Skip to content

Commit edbc642

Browse files
committed
Merge branch 'main' into telemetry-batching
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
2 parents 2777681 + 59d28b0 commit edbc642

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+7252
-485
lines changed
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
"""
2+
Main script to run all SEA connector tests.
3+
4+
This script runs all the individual test modules and displays
5+
a summary of test results with visual indicators.
6+
7+
In order to run the script, the following environment variables need to be set:
8+
- DATABRICKS_SERVER_HOSTNAME: The hostname of the Databricks server
9+
- DATABRICKS_HTTP_PATH: The HTTP path of the Databricks server
10+
- DATABRICKS_TOKEN: The token to use for authentication
11+
"""
12+
13+
import os
14+
import sys
15+
import logging
16+
import subprocess
17+
from typing import List, Tuple
18+
19+
logging.basicConfig(level=logging.DEBUG)
20+
logger = logging.getLogger(__name__)
21+
22+
TEST_MODULES = [
23+
"test_sea_session",
24+
"test_sea_sync_query",
25+
"test_sea_async_query",
26+
"test_sea_metadata",
27+
]
28+
29+
30+
def run_test_module(module_name: str) -> bool:
31+
"""Run a test module and return success status."""
32+
module_path = os.path.join(
33+
os.path.dirname(os.path.abspath(__file__)), "tests", f"{module_name}.py"
34+
)
35+
36+
# Simply run the module as a script - each module handles its own test execution
37+
result = subprocess.run(
38+
[sys.executable, module_path], capture_output=True, text=True
39+
)
40+
41+
# Log the output from the test module
42+
if result.stdout:
43+
for line in result.stdout.strip().split("\n"):
44+
logger.info(line)
45+
46+
if result.stderr:
47+
for line in result.stderr.strip().split("\n"):
48+
logger.error(line)
49+
50+
return result.returncode == 0
51+
52+
53+
def run_tests() -> List[Tuple[str, bool]]:
54+
"""Run all tests and return results."""
55+
results = []
56+
57+
for module_name in TEST_MODULES:
58+
try:
59+
logger.info(f"\n{'=' * 50}")
60+
logger.info(f"Running test: {module_name}")
61+
logger.info(f"{'-' * 50}")
62+
63+
success = run_test_module(module_name)
64+
results.append((module_name, success))
65+
66+
status = "✅ PASSED" if success else "❌ FAILED"
67+
logger.info(f"Test {module_name}: {status}")
68+
69+
except Exception as e:
70+
logger.error(f"Error loading or running test {module_name}: {str(e)}")
71+
import traceback
72+
73+
logger.error(traceback.format_exc())
74+
results.append((module_name, False))
75+
76+
return results
77+
78+
79+
def print_summary(results: List[Tuple[str, bool]]) -> None:
80+
"""Print a summary of test results."""
81+
logger.info(f"\n{'=' * 50}")
82+
logger.info("TEST SUMMARY")
83+
logger.info(f"{'-' * 50}")
84+
85+
passed = sum(1 for _, success in results if success)
86+
total = len(results)
87+
88+
for module_name, success in results:
89+
status = "✅ PASSED" if success else "❌ FAILED"
90+
logger.info(f"{status} - {module_name}")
91+
92+
logger.info(f"{'-' * 50}")
93+
logger.info(f"Total: {total} | Passed: {passed} | Failed: {total - passed}")
94+
logger.info(f"{'=' * 50}")
95+
96+
97+
if __name__ == "__main__":
98+
# Check if required environment variables are set
99+
required_vars = [
100+
"DATABRICKS_SERVER_HOSTNAME",
101+
"DATABRICKS_HTTP_PATH",
102+
"DATABRICKS_TOKEN",
103+
]
104+
missing_vars = [var for var in required_vars if not os.environ.get(var)]
105+
106+
if missing_vars:
107+
logger.error(
108+
f"Missing required environment variables: {', '.join(missing_vars)}"
109+
)
110+
logger.error("Please set these variables before running the tests.")
111+
sys.exit(1)
112+
113+
# Run all tests
114+
results = run_tests()
115+
116+
# Print summary
117+
print_summary(results)
118+
119+
# Exit with appropriate status code
120+
all_passed = all(success for _, success in results)
121+
sys.exit(0 if all_passed else 1)

examples/experimental/tests/__init__.py

Whitespace-only changes.
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
"""
2+
Test for SEA asynchronous query execution functionality.
3+
"""
4+
import os
5+
import sys
6+
import logging
7+
import time
8+
from databricks.sql.client import Connection
9+
from databricks.sql.backend.types import CommandState
10+
11+
logging.basicConfig(level=logging.INFO)
12+
logger = logging.getLogger(__name__)
13+
14+
15+
def test_sea_async_query_with_cloud_fetch():
16+
"""
17+
Test executing a query asynchronously using the SEA backend with cloud fetch enabled.
18+
19+
This function connects to a Databricks SQL endpoint using the SEA backend,
20+
executes a simple query asynchronously with cloud fetch enabled, and verifies that execution completes successfully.
21+
"""
22+
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
23+
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
24+
access_token = os.environ.get("DATABRICKS_TOKEN")
25+
catalog = os.environ.get("DATABRICKS_CATALOG")
26+
27+
if not all([server_hostname, http_path, access_token]):
28+
logger.error("Missing required environment variables.")
29+
logger.error(
30+
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
31+
)
32+
return False
33+
34+
try:
35+
# Create connection with cloud fetch enabled
36+
logger.info(
37+
"Creating connection for asynchronous query execution with cloud fetch enabled"
38+
)
39+
connection = Connection(
40+
server_hostname=server_hostname,
41+
http_path=http_path,
42+
access_token=access_token,
43+
catalog=catalog,
44+
schema="default",
45+
use_sea=True,
46+
user_agent_entry="SEA-Test-Client",
47+
use_cloud_fetch=True,
48+
enable_query_result_lz4_compression=False,
49+
)
50+
51+
logger.info(
52+
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
53+
)
54+
55+
# Execute a query that generates large rows to force multiple chunks
56+
requested_row_count = 5000
57+
cursor = connection.cursor()
58+
query = f"""
59+
SELECT
60+
id,
61+
concat('value_', repeat('a', 10000)) as test_value
62+
FROM range(1, {requested_row_count} + 1) AS t(id)
63+
"""
64+
65+
logger.info(
66+
f"Executing asynchronous query with cloud fetch to generate {requested_row_count} rows"
67+
)
68+
cursor.execute_async(query)
69+
logger.info(
70+
"Asynchronous query submitted successfully with cloud fetch enabled"
71+
)
72+
73+
# Check query state
74+
logger.info("Checking query state...")
75+
while cursor.is_query_pending():
76+
logger.info("Query is still pending, waiting...")
77+
time.sleep(1)
78+
79+
logger.info("Query is no longer pending, getting results...")
80+
cursor.get_async_execution_result()
81+
82+
results = [cursor.fetchone()]
83+
results.extend(cursor.fetchmany(10))
84+
results.extend(cursor.fetchall())
85+
actual_row_count = len(results)
86+
87+
logger.info(
88+
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
89+
)
90+
91+
# Verify total row count
92+
if actual_row_count != requested_row_count:
93+
logger.error(
94+
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
95+
)
96+
return False
97+
98+
logger.info(
99+
"PASS: Received correct number of rows with cloud fetch and all fetch methods work correctly"
100+
)
101+
102+
# Close resources
103+
cursor.close()
104+
connection.close()
105+
logger.info("Successfully closed SEA session")
106+
107+
return True
108+
109+
except Exception as e:
110+
logger.error(
111+
f"Error during SEA asynchronous query execution test with cloud fetch: {str(e)}"
112+
)
113+
import traceback
114+
115+
logger.error(traceback.format_exc())
116+
return False
117+
118+
119+
def test_sea_async_query_without_cloud_fetch():
120+
"""
121+
Test executing a query asynchronously using the SEA backend with cloud fetch disabled.
122+
123+
This function connects to a Databricks SQL endpoint using the SEA backend,
124+
executes a simple query asynchronously with cloud fetch disabled, and verifies that execution completes successfully.
125+
"""
126+
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
127+
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
128+
access_token = os.environ.get("DATABRICKS_TOKEN")
129+
catalog = os.environ.get("DATABRICKS_CATALOG")
130+
131+
if not all([server_hostname, http_path, access_token]):
132+
logger.error("Missing required environment variables.")
133+
logger.error(
134+
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
135+
)
136+
return False
137+
138+
try:
139+
# Create connection with cloud fetch disabled
140+
logger.info(
141+
"Creating connection for asynchronous query execution with cloud fetch disabled"
142+
)
143+
connection = Connection(
144+
server_hostname=server_hostname,
145+
http_path=http_path,
146+
access_token=access_token,
147+
catalog=catalog,
148+
schema="default",
149+
use_sea=True,
150+
user_agent_entry="SEA-Test-Client",
151+
use_cloud_fetch=False,
152+
enable_query_result_lz4_compression=False,
153+
)
154+
155+
logger.info(
156+
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
157+
)
158+
159+
# For non-cloud fetch, use a smaller row count to avoid exceeding inline limits
160+
requested_row_count = 100
161+
cursor = connection.cursor()
162+
query = f"""
163+
SELECT
164+
id,
165+
concat('value_', repeat('a', 100)) as test_value
166+
FROM range(1, {requested_row_count} + 1) AS t(id)
167+
"""
168+
169+
logger.info(
170+
f"Executing asynchronous query without cloud fetch to generate {requested_row_count} rows"
171+
)
172+
cursor.execute_async(query)
173+
logger.info(
174+
"Asynchronous query submitted successfully with cloud fetch disabled"
175+
)
176+
177+
# Check query state
178+
logger.info("Checking query state...")
179+
while cursor.is_query_pending():
180+
logger.info("Query is still pending, waiting...")
181+
time.sleep(1)
182+
183+
logger.info("Query is no longer pending, getting results...")
184+
cursor.get_async_execution_result()
185+
results = [cursor.fetchone()]
186+
results.extend(cursor.fetchmany(10))
187+
results.extend(cursor.fetchall())
188+
actual_row_count = len(results)
189+
190+
logger.info(
191+
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
192+
)
193+
194+
# Verify total row count
195+
if actual_row_count != requested_row_count:
196+
logger.error(
197+
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
198+
)
199+
return False
200+
201+
logger.info(
202+
"PASS: Received correct number of rows without cloud fetch and all fetch methods work correctly"
203+
)
204+
205+
# Close resources
206+
cursor.close()
207+
connection.close()
208+
logger.info("Successfully closed SEA session")
209+
210+
return True
211+
212+
except Exception as e:
213+
logger.error(
214+
f"Error during SEA asynchronous query execution test without cloud fetch: {str(e)}"
215+
)
216+
import traceback
217+
218+
logger.error(traceback.format_exc())
219+
return False
220+
221+
222+
def test_sea_async_query_exec():
223+
"""
224+
Run both asynchronous query tests and return overall success.
225+
"""
226+
with_cloud_fetch_success = test_sea_async_query_with_cloud_fetch()
227+
logger.info(
228+
f"Asynchronous query with cloud fetch: {'✅ PASSED' if with_cloud_fetch_success else '❌ FAILED'}"
229+
)
230+
231+
without_cloud_fetch_success = test_sea_async_query_without_cloud_fetch()
232+
logger.info(
233+
f"Asynchronous query without cloud fetch: {'✅ PASSED' if without_cloud_fetch_success else '❌ FAILED'}"
234+
)
235+
236+
return with_cloud_fetch_success and without_cloud_fetch_success
237+
238+
239+
if __name__ == "__main__":
240+
success = test_sea_async_query_exec()
241+
sys.exit(0 if success else 1)

0 commit comments

Comments
 (0)