Skip to content

Commit 971969a

Browse files
Merge branch 'sea-migration' into sea-http-client
2 parents 2d451b6 + 80692e3 commit 971969a

File tree

9 files changed

+14
-22
lines changed

9 files changed

+14
-22
lines changed

src/databricks/sql/backend/sea/backend.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ def _results_message_to_execute_response(
361361

362362
# Check for compression
363363
lz4_compressed = (
364-
response.manifest.result_compression == ResultCompression.LZ4_FRAME
364+
response.manifest.result_compression == ResultCompression.LZ4_FRAME.value
365365
)
366366

367367
execute_response = ExecuteResponse(

src/databricks/sql/backend/sea/queue.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def build_queue(
7272
return JsonQueue(result_data.data)
7373
elif manifest.format == ResultFormat.ARROW_STREAM.value:
7474
if result_data.attachment is not None:
75+
# direct results from Hybrid disposition
7576
arrow_file = (
7677
ResultSetDownloadHandler._decompress_data(result_data.attachment)
7778
if lz4_compressed

src/databricks/sql/backend/sea/utils/filters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def _filter_sea_result_set(
5353
# Reuse the command_id from the original result set
5454
command_id = result_set.command_id
5555

56-
# Create an ExecuteResponse with the filtered data
56+
# Create an ExecuteResponse for the filtered data
5757
execute_response = ExecuteResponse(
5858
command_id=command_id,
5959
status=result_set.status,

src/databricks/sql/result_set.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __init__(
5555
5656
Parameters:
5757
:param connection: The parent connection that was used to execute this command
58-
:param backend: The backend specialised backend client to be invoked in the fetch phase
58+
:param backend: The specialised backend client to be invoked in the fetch phase
5959
:param arraysize: The max number of rows to fetch at a time (PEP-249)
6060
:param buffer_size_bytes: The size (in bytes) of the internal buffer + max fetch
6161
:param command_id: The command ID
@@ -334,7 +334,7 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table":
334334
n_remaining_rows -= partial_results.num_rows
335335
self._next_row_index += partial_results.num_rows
336336

337-
return pyarrow.concat_tables(partial_result_chunks)
337+
return pyarrow.concat_tables(partial_result_chunks, use_threads=True)
338338

339339
def fetchmany_columnar(self, size: int):
340340
"""
@@ -385,7 +385,7 @@ def fetchall_arrow(self) -> "pyarrow.Table":
385385
for name, col in zip(results.column_names, results.column_table)
386386
}
387387
return pyarrow.Table.from_pydict(data)
388-
return pyarrow.concat_tables(partial_result_chunks)
388+
return pyarrow.concat_tables(partial_result_chunks, use_threads=True)
389389

390390
def fetchall_columnar(self):
391391
"""Fetch all (remaining) rows of a query result, returning them as a Columnar table."""

src/databricks/sql/session.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,10 @@ def open(self):
126126
catalog=self.catalog,
127127
schema=self.schema,
128128
)
129+
129130
self.protocol_version = self.get_protocol_version(self._session_id)
130131
self.is_open = True
131-
logger.info("Successfully opened session " + str(self.guid_hex))
132+
logger.info("Successfully opened session %s", str(self.guid_hex))
132133

133134
@staticmethod
134135
def get_protocol_version(session_id: SessionId):

src/databricks/sql/types.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ def __contains__(self, item: Any) -> bool:
187187
# let object acts like class
188188
def __call__(self, *args: Any) -> "Row":
189189
"""create new Row object"""
190-
191190
if len(args) > len(self):
192191
raise ValueError(
193192
"Can not create Row with fields %s, expected %d values "
@@ -230,15 +229,13 @@ def __reduce__(
230229
self,
231230
) -> Union[str, Tuple[Any, ...]]:
232231
"""Returns a tuple so Python knows how to pickle Row."""
233-
234232
if hasattr(self, "__fields__"):
235233
return (_create_row, (self.__fields__, tuple(self)))
236234
else:
237235
return tuple.__reduce__(self)
238236

239237
def __repr__(self) -> str:
240238
"""Printable representation of Row used in Python REPL."""
241-
242239
if hasattr(self, "__fields__"):
243240
return "Row(%s)" % ", ".join(
244241
"%s=%r" % (k, v) for k, v in zip(self.__fields__, tuple(self))

src/databricks/sql/utils.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,7 @@ def convert_decimals_in_arrow_table(table, description) -> "pyarrow.Table":
740740

741741

742742
def convert_to_assigned_datatypes_in_column_table(column_table, description):
743+
743744
converted_column_table = []
744745
for i, col in enumerate(column_table):
745746
if description[i][1] == "decimal":

tests/unit/test_client.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -105,17 +105,13 @@ def test_closing_connection_closes_commands(self, mock_thrift_client_class):
105105

106106
# Mock the execute response with controlled state
107107
mock_execute_response = Mock(spec=ExecuteResponse)
108-
109-
mock_execute_response.command_id = Mock(spec=CommandId)
110-
mock_execute_response.status = (
111-
CommandState.SUCCEEDED if not closed else CommandState.CLOSED
112-
)
108+
mock_execute_response.status = initial_state
113109
mock_execute_response.has_been_closed_server_side = closed
114110
mock_execute_response.is_staging_operation = False
115111
mock_execute_response.command_id = Mock(spec=CommandId)
116112
mock_execute_response.description = []
117113

118-
# Mock the backend that will be used by the real ThriftResultSet
114+
# Mock the backend that will be used
119115
mock_backend = Mock(spec=ThriftDatabricksClient)
120116
mock_backend.staging_allowed_local_path = None
121117
mock_backend.fetch_results.return_value = (Mock(), False, 0)
@@ -137,15 +133,13 @@ def test_closing_connection_closes_commands(self, mock_thrift_client_class):
137133
# Mock execute_command to return our real result set
138134
cursor.backend.execute_command = Mock(return_value=real_result_set)
139135

140-
# Execute a command - this should set cursor.active_result_set to our real result set
136+
# Execute a command
141137
cursor.execute("SELECT 1")
142138

143-
# Close the connection - this should trigger the real close chain:
144-
# connection.close() -> cursor.close() -> result_set.close()
139+
# Close the connection
145140
connection.close()
146141

147-
# Verify the REAL close logic worked through the chain:
148-
# 1. has_been_closed_server_side should always be True after close()
142+
# Verify the close logic worked:
149143
assert real_result_set.has_been_closed_server_side is True
150144

151145
# 2. op_state should always be CLOSED after close()

tests/unit/test_cloud_fetch_queue.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,6 @@ def test_next_n_rows_0_rows(self, mock_create_next_table):
162162
result = queue.next_n_rows(0)
163163
assert result.num_rows == 0
164164
assert queue.table_row_index == 0
165-
# Instead of comparing tables directly, just check the row count
166-
# This avoids issues with empty table schema differences
167165

168166
@patch("databricks.sql.utils.ThriftCloudFetchQueue._create_next_table")
169167
def test_next_n_rows_partial_table(self, mock_create_next_table):

0 commit comments

Comments
 (0)