Skip to content

Commit da38d13

Browse files
committed
Feat!: cancel submitted BigQuery jobs on keyboard interrupts
1 parent 9eba5c1 commit da38d13

File tree

2 files changed

+88
-4
lines changed

2 files changed

+88
-4
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,10 +1029,22 @@ def _execute(
10291029
self._query_job.job_id,
10301030
)
10311031

1032-
results = self._db_call(
1033-
self._query_job.result,
1034-
timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore
1035-
)
1032+
try:
1033+
results = self._db_call(
1034+
self._query_job.result,
1035+
timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore
1036+
)
1037+
except KeyboardInterrupt:
1038+
# Wrapping this in another try-except to ensure the subsequent db calls don't change
1039+
# the original exception type.
1040+
try:
1041+
if not self._db_call(self._query_job.done):
1042+
self._db_call(self._query_job.cancel)
1043+
except:
1044+
pass
1045+
1046+
raise
1047+
10361048
self._query_data = iter(results) if results.total_rows else iter([])
10371049
query_results = self._query_job._query_results
10381050
self.cursor._set_rowcount(query_results)

tests/core/engine_adapter/test_bigquery.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,3 +1072,75 @@ def test_get_alter_expressions_includes_catalog(
10721072
assert schema.db == "bar"
10731073
assert schema.sql(dialect="bigquery") == "catalog2.bar"
10741074
assert tables == {"bing"}
1075+
1076+
1077+
def test_job_cancellation_on_keyboard_interrupt_job_still_running(mocker: MockerFixture):
1078+
# Create a mock connection
1079+
connection_mock = mocker.NonCallableMock()
1080+
cursor_mock = mocker.Mock()
1081+
cursor_mock.connection = connection_mock
1082+
connection_mock.cursor.return_value = cursor_mock
1083+
1084+
# Mock the query job
1085+
mock_job = mocker.Mock()
1086+
mock_job.project = "test-project"
1087+
mock_job.location = "us-central1"
1088+
mock_job.job_id = "test-job-123"
1089+
mock_job.done.return_value = False # Job is still running
1090+
mock_job.result.side_effect = KeyboardInterrupt()
1091+
mock_job._query_results = mocker.Mock()
1092+
mock_job._query_results.total_rows = 0
1093+
mock_job._query_results.schema = []
1094+
1095+
# Set up the client to return our mock job
1096+
connection_mock._client.query.return_value = mock_job
1097+
1098+
# Create adapter with the mocked connection
1099+
adapter = BigQueryEngineAdapter(lambda: connection_mock, job_retries=0)
1100+
1101+
# Execute a query and expect KeyboardInterrupt
1102+
with pytest.raises(KeyboardInterrupt):
1103+
adapter.execute("SELECT 1")
1104+
1105+
# Verify the job was created
1106+
connection_mock._client.query.assert_called_once()
1107+
1108+
# Verify job status was checked and cancellation was called
1109+
mock_job.done.assert_called_once()
1110+
mock_job.cancel.assert_called_once()
1111+
1112+
1113+
def test_job_cancellation_on_keyboard_interrupt_job_already_done(mocker: MockerFixture):
1114+
# Create a mock connection
1115+
connection_mock = mocker.NonCallableMock()
1116+
cursor_mock = mocker.Mock()
1117+
cursor_mock.connection = connection_mock
1118+
connection_mock.cursor.return_value = cursor_mock
1119+
1120+
# Mock the query job
1121+
mock_job = mocker.Mock()
1122+
mock_job.project = "test-project"
1123+
mock_job.location = "us-central1"
1124+
mock_job.job_id = "test-job-456"
1125+
mock_job.done.return_value = True # Job is already done
1126+
mock_job.result.side_effect = KeyboardInterrupt()
1127+
mock_job._query_results = mocker.Mock()
1128+
mock_job._query_results.total_rows = 0
1129+
mock_job._query_results.schema = []
1130+
1131+
# Set up the client to return our mock job
1132+
connection_mock._client.query.return_value = mock_job
1133+
1134+
# Create adapter with the mocked connection
1135+
adapter = BigQueryEngineAdapter(lambda: connection_mock, job_retries=0)
1136+
1137+
# Execute a query and expect KeyboardInterrupt
1138+
with pytest.raises(KeyboardInterrupt):
1139+
adapter.execute("SELECT 1")
1140+
1141+
# Verify the job was created
1142+
connection_mock._client.query.assert_called_once()
1143+
1144+
# Verify job status was checked but cancellation was NOT called
1145+
mock_job.done.assert_called_once()
1146+
mock_job.cancel.assert_not_called()

0 commit comments

Comments
 (0)