Skip to content

Commit c2b6739

Browse files
committed
Feat!: cancel submitted BigQuery jobs on keyboard interrupts
1 parent 4fa9992 commit c2b6739

File tree

2 files changed

+88
-4
lines changed

2 files changed

+88
-4
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,10 +1017,22 @@ def _execute(
10171017
self._query_job.job_id,
10181018
)
10191019

1020-
results = self._db_call(
1021-
self._query_job.result,
1022-
timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore
1023-
)
1020+
try:
1021+
results = self._db_call(
1022+
self._query_job.result,
1023+
timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore
1024+
)
1025+
except KeyboardInterrupt:
1026+
# Wrapping this in another try-except to ensure the subsequent db calls don't change
1027+
# the original exception type.
1028+
try:
1029+
if not self._db_call(self._query_job.done):
1030+
self._db_call(self._query_job.cancel)
1031+
except:
1032+
pass
1033+
1034+
raise
1035+
10241036
self._query_data = iter(results) if results.total_rows else iter([])
10251037
query_results = self._query_job._query_results
10261038
self.cursor._set_rowcount(query_results)

tests/core/engine_adapter/test_bigquery.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1072,3 +1072,75 @@ def test_get_alter_expressions_includes_catalog(
10721072
assert schema.db == "bar"
10731073
assert schema.sql(dialect="bigquery") == "catalog2.bar"
10741074
assert tables == {"bing"}
1075+
1076+
1077+
def test_job_cancellation_on_keyboard_interrupt_job_still_running(mocker: MockerFixture):
1078+
# Create a mock connection
1079+
connection_mock = mocker.NonCallableMock()
1080+
cursor_mock = mocker.Mock()
1081+
cursor_mock.connection = connection_mock
1082+
connection_mock.cursor.return_value = cursor_mock
1083+
1084+
# Mock the query job
1085+
mock_job = mocker.Mock()
1086+
mock_job.project = "test-project"
1087+
mock_job.location = "us-central1"
1088+
mock_job.job_id = "test-job-123"
1089+
mock_job.done.return_value = False # Job is still running
1090+
mock_job.result.side_effect = KeyboardInterrupt()
1091+
mock_job._query_results = mocker.Mock()
1092+
mock_job._query_results.total_rows = 0
1093+
mock_job._query_results.schema = []
1094+
1095+
# Set up the client to return our mock job
1096+
connection_mock._client.query.return_value = mock_job
1097+
1098+
# Create adapter with the mocked connection
1099+
adapter = BigQueryEngineAdapter(lambda: connection_mock, job_retries=0)
1100+
1101+
# Execute a query and expect KeyboardInterrupt
1102+
with pytest.raises(KeyboardInterrupt):
1103+
adapter.execute("SELECT 1")
1104+
1105+
# Verify the job was created
1106+
connection_mock._client.query.assert_called_once()
1107+
1108+
# Verify job status was checked and cancellation was called
1109+
mock_job.done.assert_called_once()
1110+
mock_job.cancel.assert_called_once()
1111+
1112+
1113+
def test_job_cancellation_on_keyboard_interrupt_job_already_done(mocker: MockerFixture):
1114+
# Create a mock connection
1115+
connection_mock = mocker.NonCallableMock()
1116+
cursor_mock = mocker.Mock()
1117+
cursor_mock.connection = connection_mock
1118+
connection_mock.cursor.return_value = cursor_mock
1119+
1120+
# Mock the query job
1121+
mock_job = mocker.Mock()
1122+
mock_job.project = "test-project"
1123+
mock_job.location = "us-central1"
1124+
mock_job.job_id = "test-job-456"
1125+
mock_job.done.return_value = True # Job is already done
1126+
mock_job.result.side_effect = KeyboardInterrupt()
1127+
mock_job._query_results = mocker.Mock()
1128+
mock_job._query_results.total_rows = 0
1129+
mock_job._query_results.schema = []
1130+
1131+
# Set up the client to return our mock job
1132+
connection_mock._client.query.return_value = mock_job
1133+
1134+
# Create adapter with the mocked connection
1135+
adapter = BigQueryEngineAdapter(lambda: connection_mock, job_retries=0)
1136+
1137+
# Execute a query and expect KeyboardInterrupt
1138+
with pytest.raises(KeyboardInterrupt):
1139+
adapter.execute("SELECT 1")
1140+
1141+
# Verify the job was created
1142+
connection_mock._client.query.assert_called_once()
1143+
1144+
# Verify job status was checked but cancellation was NOT called
1145+
mock_job.done.assert_called_once()
1146+
mock_job.cancel.assert_not_called()

0 commit comments

Comments
 (0)