Skip to content

Commit f0ccada

Browse files
authored
Merge pull request #81 from LamaAni/throw_general_errors
Error while throwing general connection errors through the event service
2 parents 4604e4a + 90f6c2d commit f0ccada

File tree

1 file changed

+11
-7
lines changed
  • airflow_kubernetes_job_operator/kube_api

1 file changed

+11
-7
lines changed

airflow_kubernetes_job_operator/kube_api/client.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def __init__(
9797
"""
9898
assert use_asyncio is not True, NotImplementedError("AsyncIO not yet implemented.")
9999
super().__init__(
100-
self._exdcute_query,
100+
self._execute_query,
101101
use_async_loop=use_asyncio or KubeApiRestQuery.default_use_asyncio,
102102
use_daemon_thread=True,
103103
thread_name=f"{self.__class__.__name__} {id(self)}",
@@ -221,18 +221,20 @@ def on_reconnect(self, client: "KubeApiRestClient"):
221221
"""
222222
return True
223223

224-
def _exdcute_query(self, client: "KubeApiRestClient"):
224+
def _execute_query(self, client: "KubeApiRestClient"):
225225
self._set_connection_state(KubeApiRestQueryConnectionState.Disconnected, False)
226226
self.emit(self.query_started_event_name, self, client)
227227
self.pre_request(client)
228228

229229
try:
230230
self.query_loop(client)
231+
self.post_request(client)
232+
except Exception as ex:
233+
self.emit_error(ex)
234+
raise ex
231235
finally:
232236
self._set_connection_state(KubeApiRestQueryConnectionState.Disconnected)
233-
234-
self.post_request(client)
235-
self.emit(self.query_ended_event_name, self, client)
237+
self.emit(self.query_ended_event_name, self, client)
236238

237239
def query_loop(self, client: "KubeApiRestClient"):
238240
"""Overridable. The main query loop. Called to execute the query.
@@ -605,16 +607,18 @@ def _create_query_handler(self, queries: List[KubeApiRestQuery]) -> EventHandler
605607

606608
pending = set(queries)
607609

608-
def remove_from_pending(q):
610+
def remove_from_pending(q, ex: Exception = None):
609611
if q in pending:
610612
pending.remove(q)
613+
if ex:
614+
handler.emit_error(ex)
611615
if len(pending) == 0:
612616
handler.stop_all_streams()
613617

614618
for q in queries:
615619
self._active_queries.add(q)
616620
q.on(q.query_ended_event_name, lambda query, client: remove_from_pending(query))
617-
q.on(q.error_event_name, lambda query, err: remove_from_pending(query))
621+
q.on(q.error_event_name, lambda query, err: remove_from_pending(query, err))
618622
q.pipe(handler)
619623

620624
return handler

0 commit comments

Comments
 (0)