Skip to content

Commit 29a2985

Browse files
Merge remote-tracking branch 'origin/sea-migration' into backend-interface
2 parents a888dd6 + 5bf5d4c commit 29a2985

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed

src/databricks/sql/backend/thrift_backend.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,66 @@ def get_execution_result(self, command_id: CommandId, cursor):
875875
arrow_schema_bytes=schema_bytes,
876876
)
877877

878+
def get_execution_result(self, op_handle, cursor):
879+
880+
assert op_handle is not None
881+
882+
req = ttypes.TFetchResultsReq(
883+
operationHandle=ttypes.TOperationHandle(
884+
op_handle.operationId,
885+
op_handle.operationType,
886+
False,
887+
op_handle.modifiedRowCount,
888+
),
889+
maxRows=cursor.arraysize,
890+
maxBytes=cursor.buffer_size_bytes,
891+
orientation=ttypes.TFetchOrientation.FETCH_NEXT,
892+
includeResultSetMetadata=True,
893+
)
894+
895+
resp = self.make_request(self._client.FetchResults, req)
896+
897+
t_result_set_metadata_resp = resp.resultSetMetadata
898+
899+
lz4_compressed = t_result_set_metadata_resp.lz4Compressed
900+
is_staging_operation = t_result_set_metadata_resp.isStagingOperation
901+
has_more_rows = resp.hasMoreRows
902+
description = self._hive_schema_to_description(
903+
t_result_set_metadata_resp.schema
904+
)
905+
906+
if pyarrow:
907+
schema_bytes = (
908+
t_result_set_metadata_resp.arrowSchema
909+
or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema)
910+
.serialize()
911+
.to_pybytes()
912+
)
913+
else:
914+
schema_bytes = None
915+
916+
queue = ResultSetQueueFactory.build_queue(
917+
row_set_type=resp.resultSetMetadata.resultFormat,
918+
t_row_set=resp.results,
919+
arrow_schema_bytes=schema_bytes,
920+
max_download_threads=self.max_download_threads,
921+
lz4_compressed=lz4_compressed,
922+
description=description,
923+
ssl_options=self._ssl_options,
924+
)
925+
926+
return ExecuteResponse(
927+
arrow_queue=queue,
928+
status=resp.status,
929+
has_been_closed_server_side=False,
930+
has_more_rows=has_more_rows,
931+
lz4_compressed=lz4_compressed,
932+
is_staging_operation=is_staging_operation,
933+
command_handle=op_handle,
934+
description=description,
935+
arrow_schema_bytes=schema_bytes,
936+
)
937+
878938
def _wait_until_command_done(self, op_handle, initial_operation_status_resp):
879939
if initial_operation_status_resp:
880940
self._check_command_not_in_error_or_closed_state(
@@ -946,6 +1006,12 @@ def execute_command(
9461006
thrift_handle,
9471007
)
9481008

1009+
logger.debug(
1010+
"ThriftBackend.execute_command(operation=%s, session_handle=%s)",
1011+
operation,
1012+
session_handle,
1013+
)
1014+
9491015
spark_arrow_types = ttypes.TSparkArrowTypes(
9501016
timestampAsArrow=self._use_arrow_native_timestamps,
9511017
decimalAsArrow=self._use_arrow_native_decimals,
@@ -1105,6 +1171,10 @@ def _handle_execute_response_async(self, resp, cursor):
11051171
cursor.active_command_id = command_id
11061172
self._check_direct_results_for_error(resp.directResults)
11071173

1174+
def _handle_execute_response_async(self, resp, cursor):
1175+
cursor.active_op_handle = resp.operationHandle
1176+
self._check_direct_results_for_error(resp.directResults)
1177+
11081178
def fetch_results(
11091179
self,
11101180
command_id: CommandId,

0 commit comments

Comments
 (0)