Skip to content

Commit 254bd56

Browse files
committed
Refactor: cancel jobs on close()
1 parent da38d13 commit 254bd56

File tree

2 files changed

+29
-14
lines changed

2 files changed

+29
-14
lines changed

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,14 @@ def query_factory() -> Query:
186186
)
187187
]
188188

189+
def close(self) -> t.Any:
190+
# Cancel all pending query jobs to avoid them becoming orphan, e.g., due to interrupts
191+
for query_job in self._query_jobs:
192+
if not self._db_call(query_job.done):
193+
self._db_call(query_job.cancel)
194+
195+
return super().close()
196+
189197
def _begin_session(self, properties: SessionProperties) -> None:
190198
from google.cloud.bigquery import QueryJobConfig
191199

@@ -1021,6 +1029,7 @@ def _execute(
10211029
job_config=job_config,
10221030
timeout=self._extra_config.get("job_creation_timeout_seconds"),
10231031
)
1032+
self._query_jobs.add(self._query_job)
10241033

10251034
logger.debug(
10261035
"BigQuery job created: https://console.cloud.google.com/bigquery?project=%s&j=bq:%s:%s",
@@ -1029,21 +1038,12 @@ def _execute(
10291038
self._query_job.job_id,
10301039
)
10311040

1032-
try:
1033-
results = self._db_call(
1034-
self._query_job.result,
1035-
timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore
1036-
)
1037-
except KeyboardInterrupt:
1038-
# Wrapping this in another try-except to ensure the subsequent db calls don't change
1039-
# the original exception type.
1040-
try:
1041-
if not self._db_call(self._query_job.done):
1042-
self._db_call(self._query_job.cancel)
1043-
except:
1044-
pass
1041+
results = self._db_call(
1042+
self._query_job.result,
1043+
timeout=self._extra_config.get("job_execution_timeout_seconds"), # type: ignore
1044+
)
10451045

1046-
raise
1046+
self._query_jobs.remove(self._query_job)
10471047

10481048
self._query_data = iter(results) if results.total_rows else iter([])
10491049
query_results = self._query_job._query_results
@@ -1212,6 +1212,15 @@ def _query_data(self) -> t.Any:
12121212
def _query_data(self, value: t.Any) -> None:
12131213
return self._connection_pool.set_attribute("query_data", value)
12141214

1215+
@property
1216+
def _query_jobs(self) -> t.Any:
1217+
query_jobs = self._connection_pool.get_attribute("query_jobs")
1218+
if not isinstance(query_jobs, set):
1219+
query_jobs = set()
1220+
self._connection_pool.set_attribute("query_jobs", query_jobs)
1221+
1222+
return query_jobs
1223+
12151224
@property
12161225
def _query_job(self) -> t.Any:
12171226
return self._connection_pool.get_attribute("query_job")

tests/core/engine_adapter/test_bigquery.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,9 @@ def test_job_cancellation_on_keyboard_interrupt_job_still_running(mocker: Mocker
11021102
with pytest.raises(KeyboardInterrupt):
11031103
adapter.execute("SELECT 1")
11041104

1105+
# Ensure the adapter's closed, so that the job can be aborted
1106+
adapter.close()
1107+
11051108
# Verify the job was created
11061109
connection_mock._client.query.assert_called_once()
11071110

@@ -1138,6 +1141,9 @@ def test_job_cancellation_on_keyboard_interrupt_job_already_done(mocker: MockerF
11381141
with pytest.raises(KeyboardInterrupt):
11391142
adapter.execute("SELECT 1")
11401143

1144+
# Ensure the adapter's closed, so that the job can be aborted
1145+
adapter.close()
1146+
11411147
# Verify the job was created
11421148
connection_mock._client.query.assert_called_once()
11431149

0 commit comments

Comments
 (0)