Skip to content

Commit d426655

Browse files
move to new test structure
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 936713c commit d426655

File tree

7 files changed

+856
-716
lines changed

7 files changed

+856
-716
lines changed

examples/experimental/sea_connector_test.py

Lines changed: 90 additions & 716 deletions
Large diffs are not rendered by default.

examples/experimental/tests/__init__.py

Whitespace-only changes.
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
"""
2+
Test for SEA asynchronous query execution functionality.
3+
"""
4+
import os
5+
import sys
6+
import logging
7+
import time
8+
from databricks.sql.client import Connection
9+
from databricks.sql.backend.types import CommandState
10+
11+
logging.basicConfig(level=logging.INFO)
12+
logger = logging.getLogger(__name__)
13+
14+
15+
def test_sea_async_query_with_cloud_fetch():
16+
"""
17+
Test executing a query asynchronously using the SEA backend with cloud fetch enabled.
18+
19+
This function connects to a Databricks SQL endpoint using the SEA backend,
20+
executes a simple query asynchronously with cloud fetch enabled, and verifies that execution completes successfully.
21+
"""
22+
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
23+
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
24+
access_token = os.environ.get("DATABRICKS_TOKEN")
25+
catalog = os.environ.get("DATABRICKS_CATALOG")
26+
27+
if not all([server_hostname, http_path, access_token]):
28+
logger.error("Missing required environment variables.")
29+
logger.error(
30+
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
31+
)
32+
return False
33+
34+
try:
35+
# Create connection with cloud fetch enabled
36+
logger.info(
37+
"Creating connection for asynchronous query execution with cloud fetch enabled"
38+
)
39+
connection = Connection(
40+
server_hostname=server_hostname,
41+
http_path=http_path,
42+
access_token=access_token,
43+
catalog=catalog,
44+
schema="default",
45+
use_sea=True,
46+
user_agent_entry="SEA-Test-Client",
47+
use_cloud_fetch=True,
48+
)
49+
50+
logger.info(
51+
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
52+
)
53+
54+
# Execute a query that returns 100 rows asynchronously
55+
cursor = connection.cursor()
56+
logger.info("Executing asynchronous query with cloud fetch: SELECT 100 rows")
57+
cursor.execute_async(
58+
"SELECT id, 'test_value_' || CAST(id as STRING) as test_value FROM range(1, 101)"
59+
)
60+
logger.info(
61+
"Asynchronous query submitted successfully with cloud fetch enabled"
62+
)
63+
64+
# Check query state
65+
logger.info("Checking query state...")
66+
while cursor.is_query_pending():
67+
logger.info("Query is still pending, waiting...")
68+
time.sleep(1)
69+
70+
logger.info("Query is no longer pending, getting results...")
71+
cursor.get_async_execution_result()
72+
rows = cursor.fetchall()
73+
logger.info(f"Retrieved rows: {rows}")
74+
logger.info(
75+
"Successfully retrieved asynchronous query results with cloud fetch enabled"
76+
)
77+
78+
# Close resources
79+
cursor.close()
80+
connection.close()
81+
logger.info("Successfully closed SEA session")
82+
83+
return True
84+
85+
except Exception as e:
86+
logger.error(
87+
f"Error during SEA asynchronous query execution test with cloud fetch: {str(e)}"
88+
)
89+
import traceback
90+
91+
logger.error(traceback.format_exc())
92+
return False
93+
94+
95+
def test_sea_async_query_without_cloud_fetch():
96+
"""
97+
Test executing a query asynchronously using the SEA backend with cloud fetch disabled.
98+
99+
This function connects to a Databricks SQL endpoint using the SEA backend,
100+
executes a simple query asynchronously with cloud fetch disabled, and verifies that execution completes successfully.
101+
"""
102+
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
103+
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
104+
access_token = os.environ.get("DATABRICKS_TOKEN")
105+
catalog = os.environ.get("DATABRICKS_CATALOG")
106+
107+
if not all([server_hostname, http_path, access_token]):
108+
logger.error("Missing required environment variables.")
109+
logger.error(
110+
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
111+
)
112+
return False
113+
114+
try:
115+
# Create connection with cloud fetch disabled
116+
logger.info(
117+
"Creating connection for asynchronous query execution with cloud fetch disabled"
118+
)
119+
connection = Connection(
120+
server_hostname=server_hostname,
121+
http_path=http_path,
122+
access_token=access_token,
123+
catalog=catalog,
124+
schema="default",
125+
use_sea=True,
126+
user_agent_entry="SEA-Test-Client",
127+
use_cloud_fetch=False,
128+
enable_query_result_lz4_compression=False,
129+
)
130+
131+
logger.info(
132+
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
133+
)
134+
135+
# Execute a query that returns 100 rows asynchronously
136+
cursor = connection.cursor()
137+
logger.info("Executing asynchronous query without cloud fetch: SELECT 100 rows")
138+
cursor.execute_async(
139+
"SELECT id, 'test_value_' || CAST(id as STRING) as test_value FROM range(1, 101)"
140+
)
141+
logger.info(
142+
"Asynchronous query submitted successfully with cloud fetch disabled"
143+
)
144+
145+
# Check query state
146+
logger.info("Checking query state...")
147+
while cursor.is_query_pending():
148+
logger.info("Query is still pending, waiting...")
149+
time.sleep(1)
150+
151+
logger.info("Query is no longer pending, getting results...")
152+
cursor.get_async_execution_result()
153+
rows = cursor.fetchall()
154+
logger.info(f"Retrieved rows: {rows}")
155+
logger.info(
156+
"Successfully retrieved asynchronous query results with cloud fetch disabled"
157+
)
158+
159+
# Close resources
160+
cursor.close()
161+
connection.close()
162+
logger.info("Successfully closed SEA session")
163+
164+
return True
165+
166+
except Exception as e:
167+
logger.error(
168+
f"Error during SEA asynchronous query execution test without cloud fetch: {str(e)}"
169+
)
170+
import traceback
171+
172+
logger.error(traceback.format_exc())
173+
return False
174+
175+
176+
def test_sea_async_query_exec():
177+
"""
178+
Run both asynchronous query tests and return overall success.
179+
"""
180+
with_cloud_fetch_success = test_sea_async_query_with_cloud_fetch()
181+
logger.info(
182+
f"Asynchronous query with cloud fetch: {'✅ PASSED' if with_cloud_fetch_success else '❌ FAILED'}"
183+
)
184+
185+
without_cloud_fetch_success = test_sea_async_query_without_cloud_fetch()
186+
logger.info(
187+
f"Asynchronous query without cloud fetch: {'✅ PASSED' if without_cloud_fetch_success else '❌ FAILED'}"
188+
)
189+
190+
return with_cloud_fetch_success and without_cloud_fetch_success
191+
192+
193+
if __name__ == "__main__":
194+
success = test_sea_async_query_exec()
195+
sys.exit(0 if success else 1)
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
"""
2+
Test specifically for the duplicate rows issue in SEA.
3+
"""
4+
import os
5+
import sys
6+
import logging
7+
from databricks.sql.client import Connection
8+
9+
logging.basicConfig(level=logging.INFO)
10+
logger = logging.getLogger(__name__)
11+
12+
13+
def test_sea_duplicate_rows():
14+
"""
15+
Test to identify duplicate rows in SEA results.
16+
17+
This test executes a query that should return a specific number of rows,
18+
and checks if the actual number of rows matches the expected count.
19+
"""
20+
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
21+
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
22+
access_token = os.environ.get("DATABRICKS_TOKEN")
23+
catalog = os.environ.get("DATABRICKS_CATALOG")
24+
25+
if not all([server_hostname, http_path, access_token]):
26+
logger.error("Missing required environment variables.")
27+
logger.error(
28+
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
29+
)
30+
return False
31+
32+
try:
33+
# Test with cloud fetch enabled
34+
logger.info("=== Testing with cloud fetch ENABLED ===")
35+
test_with_cloud_fetch(server_hostname, http_path, access_token, catalog)
36+
37+
return True
38+
39+
except Exception as e:
40+
logger.error(f"Error during SEA duplicate rows test: {str(e)}")
41+
import traceback
42+
logger.error(traceback.format_exc())
43+
return False
44+
45+
46+
def test_with_cloud_fetch(server_hostname, http_path, access_token, catalog):
47+
"""Test for duplicate rows with cloud fetch enabled."""
48+
# Create connection with cloud fetch enabled
49+
connection = Connection(
50+
server_hostname=server_hostname,
51+
http_path=http_path,
52+
access_token=access_token,
53+
catalog=catalog,
54+
schema="default",
55+
use_sea=True,
56+
user_agent_entry="SEA-Test-Client",
57+
use_cloud_fetch=True,
58+
arraysize=10000, # Set a large arraysize to see if it affects the results
59+
)
60+
61+
logger.info(f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}")
62+
63+
# Focus on the problematic case
64+
count = 10000
65+
cursor = connection.cursor()
66+
# Using the same query as in the original test that showed the issue
67+
query = f"""
68+
SELECT
69+
id,
70+
concat('value_', repeat('a', 10000)) as test_value
71+
FROM range(1, {count} + 1) AS t(id)
72+
"""
73+
74+
logger.info(f"Executing query for {count} rows with cloud fetch enabled")
75+
cursor.execute(query)
76+
77+
rows = cursor.fetchall()
78+
logger.info(f"Requested {count} rows, retrieved {len(rows)} rows")
79+
80+
# Check for duplicate rows
81+
row_ids = [row[0] for row in rows]
82+
unique_ids = set(row_ids)
83+
logger.info(f"Number of unique IDs: {len(unique_ids)}")
84+
85+
if len(rows) != len(unique_ids):
86+
logger.info("DUPLICATE ROWS DETECTED!")
87+
88+
# Analyze the distribution of IDs
89+
logger.info("Analyzing ID distribution...")
90+
91+
# Create a sorted list of all IDs to see if there are patterns
92+
sorted_ids = sorted(row_ids)
93+
94+
# Check for missing IDs
95+
expected_ids = set(range(1, count + 1))
96+
missing_ids = expected_ids - set(sorted_ids)
97+
logger.info(f"Missing IDs count: {len(missing_ids)}")
98+
if missing_ids:
99+
logger.info(f"Sample of missing IDs: {sorted(list(missing_ids))[:20]}")
100+
101+
# Find duplicates
102+
id_counts = {}
103+
for id_val in row_ids:
104+
if id_val in id_counts:
105+
id_counts[id_val] += 1
106+
else:
107+
id_counts[id_val] = 1
108+
109+
duplicates = {id_val: count for id_val, count in id_counts.items() if count > 1}
110+
logger.info(f"Number of duplicate IDs: {len(duplicates)}")
111+
logger.info(f"Sample of duplicate IDs: {list(duplicates.items())[:20]}")
112+
113+
# Check if duplicates occur at specific positions
114+
logger.info("Checking positions of duplicates...")
115+
116+
# Find the positions of each ID in the result set
117+
id_positions = {}
118+
for i, row_id in enumerate(row_ids):
119+
if row_id in id_positions:
120+
id_positions[row_id].append(i)
121+
else:
122+
id_positions[row_id] = [i]
123+
124+
# Look at positions of duplicates
125+
duplicate_positions = {id_val: positions for id_val, positions in id_positions.items() if len(positions) > 1}
126+
127+
# Check the distance between duplicates
128+
logger.info("Analyzing distance between duplicate occurrences...")
129+
distances = []
130+
for id_val, positions in duplicate_positions.items():
131+
for i in range(1, len(positions)):
132+
distances.append(positions[i] - positions[i-1])
133+
134+
if distances:
135+
avg_distance = sum(distances) / len(distances)
136+
logger.info(f"Average distance between duplicates: {avg_distance}")
137+
logger.info(f"Min distance: {min(distances)}, Max distance: {max(distances)}")
138+
139+
# Count distances by frequency
140+
distance_counts = {}
141+
for d in distances:
142+
if d in distance_counts:
143+
distance_counts[d] += 1
144+
else:
145+
distance_counts[d] = 1
146+
147+
# Show most common distances
148+
sorted_distances = sorted(distance_counts.items(), key=lambda x: x[1], reverse=True)
149+
logger.info(f"Most common distances: {sorted_distances[:10]}")
150+
151+
# Look for patterns in chunk boundaries
152+
logger.info("Checking for patterns at chunk boundaries...")
153+
154+
# Get the approximate chunk sizes from the logs
155+
chunk_sizes = [2187, 2187, 2188, 2187, 2187] # Based on previous logs
156+
157+
chunk_start = 0
158+
for i, size in enumerate(chunk_sizes):
159+
chunk_end = chunk_start + size
160+
chunk_ids = row_ids[chunk_start:chunk_end]
161+
unique_chunk_ids = set(chunk_ids)
162+
163+
logger.info(f"Chunk {i}: rows {chunk_start}-{chunk_end-1}, size {size}, unique IDs: {len(unique_chunk_ids)}")
164+
165+
# Check for overlap with next chunk
166+
if i < len(chunk_sizes) - 1:
167+
next_chunk_start = chunk_end
168+
next_chunk_end = next_chunk_start + chunk_sizes[i+1]
169+
if next_chunk_end > len(row_ids):
170+
next_chunk_end = len(row_ids)
171+
172+
next_chunk_ids = row_ids[next_chunk_start:next_chunk_end]
173+
174+
# Check for IDs that appear in both chunks
175+
current_ids_set = set(chunk_ids)
176+
next_ids_set = set(next_chunk_ids)
177+
overlap = current_ids_set.intersection(next_ids_set)
178+
179+
if overlap:
180+
logger.info(f"Overlap between chunks {i} and {i+1}: {len(overlap)} IDs")
181+
logger.info(f"Sample overlapping IDs: {sorted(list(overlap))[:10]}")
182+
183+
# Check if the overlapping IDs are at the boundaries
184+
end_of_current = chunk_ids[-10:]
185+
start_of_next = next_chunk_ids[:10]
186+
187+
logger.info(f"End of chunk {i}: {end_of_current}")
188+
logger.info(f"Start of chunk {i+1}: {start_of_next}")
189+
190+
chunk_start = chunk_end
191+
192+
cursor.close()
193+
connection.close()
194+
logger.info("Successfully closed SEA session with cloud fetch enabled")
195+
196+
197+
if __name__ == "__main__":
198+
success = test_sea_duplicate_rows()
199+
sys.exit(0 if success else 1)

0 commit comments

Comments
 (0)