Skip to content

Commit 1367f69

Browse files
authored
Merge branch 'sea-migration' into chunk-latency
2 parents 6d122c4 + 922c448 commit 1367f69

File tree

4 files changed

+22
-0
lines changed

4 files changed

+22
-0
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,6 @@ def remaining_rows(self) -> List[List[str]]:
6969
slice = self.data_array[self.cur_row_index :]
7070
self.cur_row_index += len(slice)
7171
return slice
72+
73+
def close(self):
74+
return

src/databricks/sql/result_set.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ def close(self) -> None:
170170
been closed on the server for some other reason, issue a request to the server to close it.
171171
"""
172172
try:
173+
self.results.close()
173174
if (
174175
self.status != CommandState.CLOSED
175176
and not self.has_been_closed_server_side

src/databricks/sql/utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ def next_n_rows(self, num_rows: int):
5151
def remaining_rows(self):
5252
pass
5353

54+
@abstractmethod
55+
def close(self):
56+
pass
57+
5458

5559
class ThriftResultSetQueueFactory(ABC):
5660
@staticmethod
@@ -171,6 +175,9 @@ def remaining_rows(self):
171175
self.cur_row_index += slice.num_rows
172176
return slice
173177

178+
def close(self):
179+
return
180+
174181

175182
class ArrowQueue(ResultSetQueue):
176183
def __init__(
@@ -208,6 +215,9 @@ def remaining_rows(self) -> "pyarrow.Table":
208215
self.cur_row_index += slice.num_rows
209216
return slice
210217

218+
def close(self):
219+
return
220+
211221

212222
class CloudFetchQueue(ResultSetQueue):
213223
def __init__(
@@ -372,6 +382,9 @@ def _create_empty_table(self) -> "pyarrow.Table":
372382
# Create a 0-row table with just the schema bytes
373383
return create_arrow_table_from_arrow_file(self.schema_bytes, self.description)
374384

385+
def close(self):
386+
self.download_manager._shutdown_manager()
387+
375388

376389
def _bound(min_x, max_x, x):
377390
"""Bound x by [min_x, max_x]

tests/unit/test_client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,8 @@ def test_closing_result_set_with_closed_connection_soft_closes_commands(self):
199199
session_id_hex=Mock(),
200200
statement_type=Mock(),
201201
)
202+
result_set.results = mock_results
203+
202204
# Setup session mock on the mock_connection
203205
mock_session = Mock()
204206
mock_session.open = False
@@ -208,12 +210,14 @@ def test_closing_result_set_with_closed_connection_soft_closes_commands(self):
208210

209211
self.assertFalse(mock_backend.close_command.called)
210212
self.assertTrue(result_set.has_been_closed_server_side)
213+
mock_results.close.assert_called_once()
211214

212215
def test_closing_result_set_hard_closes_commands(self):
213216
mock_results_response = Mock()
214217
mock_results_response.has_been_closed_server_side = False
215218
mock_connection = Mock()
216219
mock_thrift_backend = Mock()
220+
mock_results = Mock()
217221
# Setup session mock on the mock_connection
218222
mock_session = Mock()
219223
mock_session.open = True
@@ -228,6 +232,7 @@ def test_closing_result_set_hard_closes_commands(self):
228232
mock_thrift_backend.close_command.assert_called_once_with(
229233
mock_results_response.command_id
230234
)
235+
mock_results.close.assert_called_once()
231236

232237
def test_executing_multiple_commands_uses_the_most_recent_command(self):
233238
mock_result_sets = [Mock(), Mock()]

0 commit comments

Comments
 (0)