Skip to content

Commit 8cf118f

Browse files
Merge branch 'exec-resp-norm' into cloudfetch-sea
2 parents b404af7 + 73bc282 commit 8cf118f

File tree

6 files changed

+87
-122
lines changed

6 files changed

+87
-122
lines changed

src/databricks/sql/backend/thrift_backend.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -756,11 +756,13 @@ def _results_message_to_execute_response(self, resp, operation_state):
756756
)
757757
direct_results = resp.directResults
758758
has_been_closed_server_side = direct_results and direct_results.closeOperation
759+
759760
has_more_rows = (
760761
(not direct_results)
761762
or (not direct_results.resultSet)
762763
or direct_results.resultSet.hasMoreRows
763764
)
765+
764766
description = self._hive_schema_to_description(
765767
t_result_set_metadata_resp.schema
766768
)
@@ -782,18 +784,19 @@ def _results_message_to_execute_response(self, resp, operation_state):
782784
if status is None:
783785
raise ValueError(f"Unknown command state: {operation_state}")
784786

785-
return ExecuteResponse(
787+
execute_response = ExecuteResponse(
786788
command_id=command_id,
787789
status=status,
788790
description=description,
789-
has_more_rows=has_more_rows,
790791
has_been_closed_server_side=has_been_closed_server_side,
791792
lz4_compressed=lz4_compressed,
792793
is_staging_operation=t_result_set_metadata_resp.isStagingOperation,
793794
arrow_schema_bytes=schema_bytes,
794795
result_format=t_result_set_metadata_resp.resultFormat,
795796
)
796797

798+
return execute_response, has_more_rows
799+
797800
def get_execution_result(
798801
self, command_id: CommandId, cursor: "Cursor"
799802
) -> "ResultSet":
@@ -842,7 +845,6 @@ def get_execution_result(
842845
command_id=command_id,
843846
status=status,
844847
description=description,
845-
has_more_rows=has_more_rows,
846848
has_been_closed_server_side=False,
847849
lz4_compressed=lz4_compressed,
848850
is_staging_operation=is_staging_operation,
@@ -860,6 +862,7 @@ def get_execution_result(
860862
t_row_set=resp.results,
861863
max_download_threads=self.max_download_threads,
862864
ssl_options=self._ssl_options,
865+
has_more_rows=has_more_rows,
863866
)
864867

865868
def _wait_until_command_done(self, op_handle, initial_operation_status_resp):
@@ -972,7 +975,9 @@ def execute_command(
972975
self._handle_execute_response_async(resp, cursor)
973976
return None
974977
else:
975-
execute_response = self._handle_execute_response(resp, cursor)
978+
execute_response, has_more_rows = self._handle_execute_response(
979+
resp, cursor
980+
)
976981

977982
t_row_set = None
978983
if resp.directResults and resp.directResults.resultSet:
@@ -988,6 +993,7 @@ def execute_command(
988993
t_row_set=t_row_set,
989994
max_download_threads=self.max_download_threads,
990995
ssl_options=self._ssl_options,
996+
has_more_rows=has_more_rows,
991997
)
992998

993999
def get_catalogs(
@@ -1009,7 +1015,7 @@ def get_catalogs(
10091015
)
10101016
resp = self.make_request(self._client.GetCatalogs, req)
10111017

1012-
execute_response = self._handle_execute_response(resp, cursor)
1018+
execute_response, has_more_rows = self._handle_execute_response(resp, cursor)
10131019

10141020
t_row_set = None
10151021
if resp.directResults and resp.directResults.resultSet:
@@ -1025,6 +1031,7 @@ def get_catalogs(
10251031
t_row_set=t_row_set,
10261032
max_download_threads=self.max_download_threads,
10271033
ssl_options=self._ssl_options,
1034+
has_more_rows=has_more_rows,
10281035
)
10291036

10301037
def get_schemas(
@@ -1050,7 +1057,7 @@ def get_schemas(
10501057
)
10511058
resp = self.make_request(self._client.GetSchemas, req)
10521059

1053-
execute_response = self._handle_execute_response(resp, cursor)
1060+
execute_response, has_more_rows = self._handle_execute_response(resp, cursor)
10541061

10551062
t_row_set = None
10561063
if resp.directResults and resp.directResults.resultSet:
@@ -1066,6 +1073,7 @@ def get_schemas(
10661073
t_row_set=t_row_set,
10671074
max_download_threads=self.max_download_threads,
10681075
ssl_options=self._ssl_options,
1076+
has_more_rows=has_more_rows,
10691077
)
10701078

10711079
def get_tables(
@@ -1095,7 +1103,7 @@ def get_tables(
10951103
)
10961104
resp = self.make_request(self._client.GetTables, req)
10971105

1098-
execute_response = self._handle_execute_response(resp, cursor)
1106+
execute_response, has_more_rows = self._handle_execute_response(resp, cursor)
10991107

11001108
t_row_set = None
11011109
if resp.directResults and resp.directResults.resultSet:
@@ -1111,6 +1119,7 @@ def get_tables(
11111119
t_row_set=t_row_set,
11121120
max_download_threads=self.max_download_threads,
11131121
ssl_options=self._ssl_options,
1122+
has_more_rows=has_more_rows,
11141123
)
11151124

11161125
def get_columns(
@@ -1140,7 +1149,7 @@ def get_columns(
11401149
)
11411150
resp = self.make_request(self._client.GetColumns, req)
11421151

1143-
execute_response = self._handle_execute_response(resp, cursor)
1152+
execute_response, has_more_rows = self._handle_execute_response(resp, cursor)
11441153

11451154
t_row_set = None
11461155
if resp.directResults and resp.directResults.resultSet:
@@ -1156,6 +1165,7 @@ def get_columns(
11561165
t_row_set=t_row_set,
11571166
max_download_threads=self.max_download_threads,
11581167
ssl_options=self._ssl_options,
1168+
has_more_rows=has_more_rows,
11591169
)
11601170

11611171
def _handle_execute_response(self, resp, cursor):

src/databricks/sql/backend/types.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,6 @@ class ExecuteResponse:
402402
command_id: CommandId
403403
status: CommandState
404404
description: Optional[List[Tuple]] = None
405-
has_more_rows: bool = False
406405
has_been_closed_server_side: bool = False
407406
lz4_compressed: bool = True
408407
is_staging_operation: bool = False

src/databricks/sql/result_set.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def __init__(
163163
t_row_set=None,
164164
max_download_threads: int = 10,
165165
ssl_options=None,
166+
has_more_rows: bool = True,
166167
):
167168
"""
168169
Initialize a ThriftResultSet with direct access to the ThriftDatabricksClient.
@@ -177,6 +178,7 @@ def __init__(
177178
t_row_set: The TRowSet containing result data (if available)
178179
max_download_threads: Maximum number of download threads for cloud fetch
179180
ssl_options: SSL options for cloud fetch
181+
has_more_rows: Whether there are more rows to fetch
180182
"""
181183
# Initialize ThriftResultSet-specific attributes
182184
self._arrow_schema_bytes = execute_response.arrow_schema_bytes
@@ -208,7 +210,7 @@ def __init__(
208210
command_id=execute_response.command_id,
209211
status=execute_response.status,
210212
has_been_closed_server_side=execute_response.has_been_closed_server_side,
211-
has_more_rows=execute_response.has_more_rows,
213+
has_more_rows=has_more_rows,
212214
results_queue=results_queue,
213215
description=execute_response.description,
214216
is_staging_operation=execute_response.is_staging_operation,

tests/unit/test_client.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ def test_closing_result_set_hard_closes_commands(self):
213213
type(mock_connection).session = PropertyMock(return_value=mock_session)
214214

215215
mock_thrift_backend.fetch_results.return_value = (Mock(), False)
216-
217216
result_set = ThriftResultSet(
218217
mock_connection, mock_results_response, mock_thrift_backend
219218
)
@@ -479,7 +478,6 @@ def make_fake_row_slice(n_rows):
479478
mock_aq = Mock()
480479
mock_aq.next_n_rows.side_effect = make_fake_row_slice
481480
mock_thrift_backend.execute_command.return_value.arrow_queue = mock_aq
482-
mock_thrift_backend.fetch_results.return_value = (mock_aq, True)
483481

484482
cursor = client.Cursor(Mock(), mock_thrift_backend)
485483
cursor.execute("foo")

tests/unit/test_fetches.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,13 @@ def make_dummy_result_set_from_initial_results(initial_results):
5757
command_id=None,
5858
status=None,
5959
has_been_closed_server_side=True,
60-
has_more_rows=False,
6160
description=description,
6261
lz4_compressed=True,
6362
is_staging_operation=False,
6463
),
6564
thrift_client=mock_thrift_backend,
6665
t_row_set=None,
6766
)
68-
69-
# Replace the results queue with our arrow_queue
70-
rs.results = arrow_queue
7167
return rs
7268

7369
@staticmethod
@@ -105,7 +101,6 @@ def fetch_results(
105101
command_id=None,
106102
status=None,
107103
has_been_closed_server_side=False,
108-
has_more_rows=True,
109104
description=description,
110105
lz4_compressed=True,
111106
is_staging_operation=False,

0 commit comments

Comments
 (0)