Skip to content

Commit 7ec43e1

Browse files
align state checking with Thrift implementation
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 398909c commit 7ec43e1

File tree

1 file changed

+37
-22
lines changed

1 file changed

+37
-22
lines changed

src/databricks/sql/backend/sea/backend.py

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
BackendType,
2525
ExecuteResponse,
2626
)
27-
from databricks.sql.exc import ServerOperationError
27+
from databricks.sql.exc import DatabaseError, ServerOperationError
2828
from databricks.sql.backend.sea.utils.http_client import SeaHttpClient
2929
from databricks.sql.types import SSLOptions
3030

@@ -85,9 +85,6 @@ class SeaDatabricksClient(DatabricksClient):
8585
STATEMENT_PATH_WITH_ID = STATEMENT_PATH + "/{}"
8686
CANCEL_STATEMENT_PATH_WITH_ID = STATEMENT_PATH + "/{}/cancel"
8787

88-
# SEA API constants
89-
POLLING_INTERVAL_SECONDS = 0.5
90-
9188
def __init__(
9289
self,
9390
server_hostname: str,
@@ -356,6 +353,41 @@ def _results_message_to_execute_response(
356353

357354
return execute_response
358355

356+
def _check_command_not_in_failed_or_closed_state(
357+
self, state: CommandState, command_id: CommandId
358+
) -> None:
359+
if state == CommandState.CLOSED:
360+
raise DatabaseError(
361+
"Command {} unexpectedly closed server side".format(command_id),
362+
{
363+
"operation-id": command_id,
364+
},
365+
)
366+
if state == CommandState.FAILED:
367+
raise ServerOperationError(
368+
"Command {} failed".format(command_id),
369+
{
370+
"operation-id": command_id,
371+
},
372+
)
373+
374+
def _wait_until_command_done(
375+
self, response: ExecuteStatementResponse
376+
) -> CommandState:
377+
"""
378+
Wait until a command is done.
379+
"""
380+
381+
state = response.status.state
382+
command_id = CommandId.from_sea_statement_id(response.statement_id)
383+
384+
while state in [CommandState.PENDING, CommandState.RUNNING]:
385+
state = self.get_query_state(command_id)
386+
387+
self._check_command_not_in_failed_or_closed_state(state, command_id)
388+
389+
return state
390+
359391
def execute_command(
360392
self,
361393
operation: str,
@@ -453,24 +485,7 @@ def execute_command(
453485
if async_op:
454486
return None
455487

456-
# For synchronous operation, wait for the statement to complete
457-
status = response.status
458-
state = status.state
459-
460-
# Keep polling until we reach a terminal state
461-
while state in [CommandState.PENDING, CommandState.RUNNING]:
462-
time.sleep(self.POLLING_INTERVAL_SECONDS)
463-
state = self.get_query_state(command_id)
464-
465-
if state != CommandState.SUCCEEDED:
466-
raise ServerOperationError(
467-
f"Statement execution did not succeed: {status.error.message if status.error else 'Unknown error'}",
468-
{
469-
"operation-id": command_id.to_sea_statement_id(),
470-
"diagnostic-info": None,
471-
},
472-
)
473-
488+
self._wait_until_command_done(response)
474489
return self.get_execution_result(command_id, cursor)
475490

476491
def cancel_command(self, command_id: CommandId) -> None:

0 commit comments

Comments
 (0)