22Test for SEA multi-chunk responses.
33
44This script tests the SEA connector's ability to handle multi-chunk responses correctly.
5- It runs queries that generate large rows to force multiple chunks and verifies that
5+ It runs a query that generates large rows to force multiple chunks and verifies that
66the correct number of rows are returned.
77"""
88import os
99import sys
1010import logging
1111import time
12+ import json
13+ import csv
14+ from pathlib import Path
1215from databricks .sql .client import Connection
1316
1417logging .basicConfig (level = logging .INFO )
1821def test_sea_multi_chunk_with_cloud_fetch (requested_row_count = 5000 ):
1922 """
2023 Test executing a query that generates multiple chunks using cloud fetch.
21-
24+
2225 Args:
2326 requested_row_count: Number of rows to request in the query
24-
27+
2528 Returns:
2629 bool: True if the test passed, False otherwise
2730 """
2831 server_hostname = os .environ .get ("DATABRICKS_SERVER_HOSTNAME" )
2932 http_path = os .environ .get ("DATABRICKS_HTTP_PATH" )
3033 access_token = os .environ .get ("DATABRICKS_TOKEN" )
3134 catalog = os .environ .get ("DATABRICKS_CATALOG" )
35+
36+ # Create output directory for test results
37+ output_dir = Path ("test_results" )
38+ output_dir .mkdir (exist_ok = True )
39+
40+ # Files to store results
41+ rows_file = output_dir / "cloud_fetch_rows.csv"
42+ stats_file = output_dir / "cloud_fetch_stats.json"
3243
3344 if not all ([server_hostname , http_path , access_token ]):
3445 logger .error ("Missing required environment variables." )
@@ -39,7 +50,9 @@ def test_sea_multi_chunk_with_cloud_fetch(requested_row_count=5000):
3950
4051 try :
4152 # Create connection with cloud fetch enabled
42- logger .info ("Creating connection for query execution with cloud fetch enabled" )
53+ logger .info (
54+ "Creating connection for query execution with cloud fetch enabled"
55+ )
4356 connection = Connection (
4457 server_hostname = server_hostname ,
4558 http_path = http_path ,
@@ -63,133 +76,104 @@ def test_sea_multi_chunk_with_cloud_fetch(requested_row_count=5000):
6376 concat('value_', repeat('a', 10000)) as test_value
6477 FROM range(1, { requested_row_count } + 1) AS t(id)
6578 """
66-
67- logger .info (
68- f"Executing query with cloud fetch to generate { requested_row_count } rows"
69- )
79+
80+ logger .info (f"Executing query with cloud fetch to generate { requested_row_count } rows" )
7081 start_time = time .time ()
7182 cursor .execute (query )
72-
83+
7384 # Fetch all rows
7485 rows = cursor .fetchall ()
7586 actual_row_count = len (rows )
7687 end_time = time .time ()
77-
78- logger .info (f"Query executed in { end_time - start_time :.2f} seconds" )
79- logger .info (
80- f"Requested { requested_row_count } rows, received { actual_row_count } rows"
81- )
82-
88+ execution_time = end_time - start_time
89+
90+ logger .info (f"Query executed in { execution_time :.2f} seconds" )
91+ logger .info (f"Requested { requested_row_count } rows, received { actual_row_count } rows" )
92+
93+ # Write rows to CSV file for inspection
94+ logger .info (f"Writing rows to { rows_file } " )
95+ with open (rows_file , 'w' , newline = '' ) as f :
96+ writer = csv .writer (f )
97+ writer .writerow (['id' , 'value_length' ]) # Header
98+
99+ # Extract IDs to check for duplicates and missing values
100+ row_ids = []
101+ for row in rows :
102+ row_id = row [0 ]
103+ value_length = len (row [1 ])
104+ writer .writerow ([row_id , value_length ])
105+ row_ids .append (row_id )
106+
83107 # Verify row count
84108 success = actual_row_count == requested_row_count
85- if success :
86- logger .info ("✅ PASSED: Received correct number of rows" )
109+
110+ # Check for duplicate IDs
111+ unique_ids = set (row_ids )
112+ duplicate_count = len (row_ids ) - len (unique_ids )
113+
114+ # Check for missing IDs
115+ expected_ids = set (range (1 , requested_row_count + 1 ))
116+ missing_ids = expected_ids - unique_ids
117+ extra_ids = unique_ids - expected_ids
118+
119+ # Write statistics to JSON file
120+ stats = {
121+ "requested_row_count" : requested_row_count ,
122+ "actual_row_count" : actual_row_count ,
123+ "execution_time_seconds" : execution_time ,
124+ "duplicate_count" : duplicate_count ,
125+ "missing_ids_count" : len (missing_ids ),
126+ "extra_ids_count" : len (extra_ids ),
127+ "missing_ids" : list (missing_ids )[:100 ] if missing_ids else [], # Limit to first 100 for readability
128+ "extra_ids" : list (extra_ids )[:100 ] if extra_ids else [], # Limit to first 100 for readability
129+ "success" : success and duplicate_count == 0 and len (missing_ids ) == 0 and len (extra_ids ) == 0
130+ }
131+
132+ with open (stats_file , 'w' ) as f :
133+ json .dump (stats , f , indent = 2 )
134+
135+ # Log detailed results
136+ if duplicate_count > 0 :
137+ logger .error (f"❌ FAILED: Found { duplicate_count } duplicate row IDs" )
138+ success = False
87139 else :
88- logger .error (
89- f"❌ FAILED: Row count mismatch. Expected { requested_row_count } , got { actual_row_count } "
90- )
91-
140+ logger .info ("✅ PASSED: No duplicate row IDs found" )
141+
142+ if missing_ids :
143+ logger .error (f"❌ FAILED: Missing { len (missing_ids )} expected row IDs" )
144+ if len (missing_ids ) <= 10 :
145+ logger .error (f"Missing IDs: { sorted (list (missing_ids ))} " )
146+ success = False
147+ else :
148+ logger .info ("✅ PASSED: All expected row IDs present" )
149+
150+ if extra_ids :
151+ logger .error (f"❌ FAILED: Found { len (extra_ids )} unexpected row IDs" )
152+ if len (extra_ids ) <= 10 :
153+ logger .error (f"Extra IDs: { sorted (list (extra_ids ))} " )
154+ success = False
155+ else :
156+ logger .info ("✅ PASSED: No unexpected row IDs found" )
157+
158+ if actual_row_count == requested_row_count :
159+ logger .info ("✅ PASSED: Row count matches requested count" )
160+ else :
161+ logger .error (f"❌ FAILED: Row count mismatch. Expected { requested_row_count } , got { actual_row_count } " )
162+ success = False
163+
92164 # Close resources
93165 cursor .close ()
94166 connection .close ()
95167 logger .info ("Successfully closed SEA session" )
96-
168+
169+ logger .info (f"Test results written to { rows_file } and { stats_file } " )
97170 return success
98171
99172 except Exception as e :
100- logger .error (f"Error during SEA multi-chunk test with cloud fetch: { str (e )} " )
101- import traceback
102-
103- logger .error (traceback .format_exc ())
104- return False
105-
106-
107- def test_sea_multi_chunk_without_cloud_fetch (requested_row_count = 100 ):
108- """
109- Test executing a query that generates multiple chunks without using cloud fetch.
110-
111- Args:
112- requested_row_count: Number of rows to request in the query
113-
114- Returns:
115- bool: True if the test passed, False otherwise
116- """
117- server_hostname = os .environ .get ("DATABRICKS_SERVER_HOSTNAME" )
118- http_path = os .environ .get ("DATABRICKS_HTTP_PATH" )
119- access_token = os .environ .get ("DATABRICKS_TOKEN" )
120- catalog = os .environ .get ("DATABRICKS_CATALOG" )
121-
122- if not all ([server_hostname , http_path , access_token ]):
123- logger .error ("Missing required environment variables." )
124173 logger .error (
125- "Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
126- )
127- return False
128-
129- try :
130- # Create connection with cloud fetch disabled
131- logger .info ("Creating connection for query execution with cloud fetch disabled" )
132- connection = Connection (
133- server_hostname = server_hostname ,
134- http_path = http_path ,
135- access_token = access_token ,
136- catalog = catalog ,
137- schema = "default" ,
138- use_sea = True ,
139- user_agent_entry = "SEA-Test-Client" ,
140- use_cloud_fetch = False ,
141- enable_query_result_lz4_compression = False ,
174+ f"Error during SEA multi-chunk test with cloud fetch: { str (e )} "
142175 )
143-
144- logger .info (
145- f"Successfully opened SEA session with ID: { connection .get_session_id_hex ()} "
146- )
147-
148- # For non-cloud fetch, use a smaller row count to avoid exceeding inline limits
149- cursor = connection .cursor ()
150- query = f"""
151- SELECT
152- id,
153- concat('value_', repeat('a', 100)) as test_value
154- FROM range(1, { requested_row_count } + 1) AS t(id)
155- """
156-
157- logger .info (
158- f"Executing query without cloud fetch to generate { requested_row_count } rows"
159- )
160- start_time = time .time ()
161- cursor .execute (query )
162-
163- # Fetch all rows
164- rows = cursor .fetchall ()
165- actual_row_count = len (rows )
166- end_time = time .time ()
167-
168- logger .info (f"Query executed in { end_time - start_time :.2f} seconds" )
169- logger .info (
170- f"Requested { requested_row_count } rows, received { actual_row_count } rows"
171- )
172-
173- # Verify row count
174- success = actual_row_count == requested_row_count
175- if success :
176- logger .info ("✅ PASSED: Received correct number of rows" )
177- else :
178- logger .error (
179- f"❌ FAILED: Row count mismatch. Expected { requested_row_count } , got { actual_row_count } "
180- )
181-
182- # Close resources
183- cursor .close ()
184- connection .close ()
185- logger .info ("Successfully closed SEA session" )
186-
187- return success
188-
189- except Exception as e :
190- logger .error (f"Error during SEA multi-chunk test without cloud fetch: { str (e )} " )
191176 import traceback
192-
193177 logger .error (traceback .format_exc ())
194178 return False
195179
@@ -209,63 +193,31 @@ def main():
209193 )
210194 logger .error ("Please set these variables before running the tests." )
211195 sys .exit (1 )
212-
196+
213197 # Get row count from command line or use default
214- cloud_fetch_row_count = 5000
215- non_cloud_fetch_row_count = 100
216-
198+ requested_row_count = 5000
199+
217200 if len (sys .argv ) > 1 :
218201 try :
219- cloud_fetch_row_count = int (sys .argv [1 ])
220- except ValueError :
221- logger .error (f"Invalid row count for cloud fetch: { sys .argv [1 ]} " )
222- logger .error ("Please provide a valid integer for row count." )
223- sys .exit (1 )
224-
225- if len (sys .argv ) > 2 :
226- try :
227- non_cloud_fetch_row_count = int (sys .argv [2 ])
202+ requested_row_count = int (sys .argv [1 ])
228203 except ValueError :
229- logger .error (f"Invalid row count for non-cloud fetch : { sys .argv [2 ]} " )
204+ logger .error (f"Invalid row count: { sys .argv [1 ]} " )
230205 logger .error ("Please provide a valid integer for row count." )
231206 sys .exit (1 )
232-
233- logger .info (
234- f"Testing with { cloud_fetch_row_count } rows for cloud fetch and { non_cloud_fetch_row_count } rows for non-cloud fetch"
235- )
236-
237- # Test with cloud fetch
238- with_cloud_fetch_success = test_sea_multi_chunk_with_cloud_fetch (
239- cloud_fetch_row_count
240- )
241- logger .info (
242- f"Multi-chunk test with cloud fetch: { '✅ PASSED' if with_cloud_fetch_success else '❌ FAILED' } "
243- )
244-
245- # Test without cloud fetch
246- without_cloud_fetch_success = test_sea_multi_chunk_without_cloud_fetch (
247- non_cloud_fetch_row_count
248- )
249- logger .info (
250- f"Multi-chunk test without cloud fetch: { '✅ PASSED' if without_cloud_fetch_success else '❌ FAILED' } "
251- )
252-
253- # Compare results
254- logger .info ("\n === RESULTS SUMMARY ===" )
255- logger .info (
256- f"Cloud fetch test ({ cloud_fetch_row_count } rows): { '✅ PASSED' if with_cloud_fetch_success else '❌ FAILED' } "
257- )
258- logger .info (
259- f"Non-cloud fetch test ({ non_cloud_fetch_row_count } rows): { '✅ PASSED' if without_cloud_fetch_success else '❌ FAILED' } "
260- )
261-
262- if with_cloud_fetch_success and without_cloud_fetch_success :
263- logger .info ("✅ ALL TESTS PASSED" )
207+
208+ logger .info (f"Testing with { requested_row_count } rows" )
209+
210+ # Run the multi-chunk test with cloud fetch
211+ success = test_sea_multi_chunk_with_cloud_fetch (requested_row_count )
212+
213+ # Report results
214+ if success :
215+ logger .info ("✅ TEST PASSED: Multi-chunk cloud fetch test completed successfully" )
264216 sys .exit (0 )
265217 else :
266- logger .info ("❌ SOME TESTS FAILED" )
218+ logger .error ("❌ TEST FAILED: Multi-chunk cloud fetch test encountered errors " )
267219 sys .exit (1 )
268220
269221
270222if __name__ == "__main__" :
271- main ()
223+ main ()
0 commit comments