Skip to content

Commit e67a27b

Browse files
dondaumpotiuk
authored andcommitted
feat: add async pagerduty notifier
1 parent e75657a commit e67a27b

File tree

6 files changed

+338
-59
lines changed

6 files changed

+338
-59
lines changed

dev/breeze/tests/test_selective_checks.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
687687
"providers/http/tests/file.py",
688688
),
689689
{
690-
"selected-providers-list-as-string": "amazon apache.livy dbt.cloud dingding discord google http",
690+
"selected-providers-list-as-string": "amazon apache.livy dbt.cloud dingding discord google http pagerduty",
691691
"all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
692692
"all-python-versions-list-as-string": DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
693693
"python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
@@ -708,7 +708,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
708708
[
709709
{
710710
"description": "amazon...google",
711-
"test_types": "Providers[amazon] Providers[apache.livy,dbt.cloud,dingding,discord,http] Providers[google]",
711+
"test_types": "Providers[amazon] Providers[apache.livy,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]",
712712
}
713713
]
714714
),
@@ -722,9 +722,12 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
722722
"description": "dbt.cloud...dingding",
723723
"test_types": "Providers[dbt.cloud] Providers[dingding]",
724724
},
725-
{"description": "discord", "test_types": "Providers[discord]"},
726-
{"description": "google", "test_types": "Providers[google]"},
725+
{
726+
"description": "discord...google",
727+
"test_types": "Providers[discord] Providers[google]",
728+
},
727729
{"description": "http", "test_types": "Providers[http]"},
730+
{"description": "pagerduty", "test_types": "Providers[pagerduty]"},
728731
]
729732
),
730733
"run-mypy": "true",

providers/pagerduty/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ requires-python = ">=3.10"
5959
dependencies = [
6060
"apache-airflow>=2.10.0",
6161
"apache-airflow-providers-common-compat>=1.6.1",
62+
"apache-airflow-providers-http",
6263
"pagerduty>=2.3.0",
6364
]
6465

@@ -68,6 +69,7 @@ dev = [
6869
"apache-airflow-task-sdk",
6970
"apache-airflow-devel-common",
7071
"apache-airflow-providers-common-compat",
72+
"apache-airflow-providers-http",
7173
# Additional devel dependencies (do not remove this line and add extra development dependencies)
7274
]
7375

providers/pagerduty/src/airflow/providers/pagerduty/hooks/pagerduty_events.py

Lines changed: 190 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,68 @@
2121

2222
from typing import TYPE_CHECKING, Any
2323

24+
import aiohttp
2425
import pagerduty
26+
from asgiref.sync import sync_to_async
2527

2628
from airflow.exceptions import AirflowException
29+
from airflow.providers.http.hooks.http import HttpAsyncHook
2730
from airflow.providers.pagerduty.version_compat import BaseHook
2831

2932
if TYPE_CHECKING:
3033
from datetime import datetime
3134

3235

36+
def prepare_event_data(
37+
summary: str,
38+
severity: str,
39+
source: str = "airflow",
40+
custom_details: Any | None = None,
41+
component: str | None = None,
42+
group: str | None = None,
43+
class_type: str | None = None,
44+
action: str = "trigger",
45+
dedup_key: str | None = None,
46+
images: list[Any] | None = None,
47+
links: list[Any] | None = None,
48+
action_key_name: str = "action",
49+
) -> dict[str, Any]:
50+
"""Prepare event data for send_event / post('/v2/enqueue') method."""
51+
payload = {
52+
"summary": summary,
53+
"severity": severity,
54+
"source": source,
55+
}
56+
if custom_details is not None:
57+
payload["custom_details"] = custom_details
58+
if component:
59+
payload["component"] = component
60+
if group:
61+
payload["group"] = group
62+
if class_type:
63+
payload["class"] = class_type
64+
65+
actions = ("trigger", "acknowledge", "resolve")
66+
if action not in actions:
67+
raise ValueError(f"Event action must be one of: {', '.join(actions)}")
68+
data = {
69+
action_key_name: action,
70+
"payload": payload,
71+
}
72+
if dedup_key:
73+
data["dedup_key"] = dedup_key
74+
elif action != "trigger":
75+
raise ValueError(
76+
f"The dedup_key property is required for {action_key_name}={action} events,"
77+
f" and it must be a string."
78+
)
79+
if images is not None:
80+
data["images"] = images
81+
if links is not None:
82+
data["links"] = links
83+
return data
84+
85+
3386
class PagerdutyEventsHook(BaseHook):
3487
"""
3588
This class can be used to interact with the Pagerduty Events API.
@@ -120,7 +173,7 @@ def send_event(
120173
link's text.
121174
:return: PagerDuty Events API v2 response.
122175
"""
123-
data = PagerdutyEventsHook.prepare_event_data(
176+
data = prepare_event_data(
124177
summary=summary,
125178
severity=severity,
126179
source=source,
@@ -137,57 +190,6 @@ def send_event(
137190
client = pagerduty.EventsApiV2Client(self.integration_key)
138191
return client.send_event(**data)
139192

140-
@staticmethod
141-
def prepare_event_data(
142-
summary,
143-
severity,
144-
source,
145-
custom_details,
146-
component,
147-
group,
148-
class_type,
149-
action,
150-
dedup_key,
151-
images,
152-
links,
153-
action_key_name: str = "action",
154-
) -> dict:
155-
"""Prepare event data for send_event / post('/v2/enqueue') method."""
156-
payload = {
157-
"summary": summary,
158-
"severity": severity,
159-
"source": source,
160-
}
161-
if custom_details is not None:
162-
payload["custom_details"] = custom_details
163-
if component:
164-
payload["component"] = component
165-
if group:
166-
payload["group"] = group
167-
if class_type:
168-
payload["class"] = class_type
169-
170-
actions = ("trigger", "acknowledge", "resolve")
171-
if action not in actions:
172-
raise ValueError(f"Event action must be one of: {', '.join(actions)}")
173-
data = {
174-
action_key_name: action,
175-
"payload": payload,
176-
}
177-
if dedup_key:
178-
data["dedup_key"] = dedup_key
179-
elif action != "trigger":
180-
raise ValueError(
181-
f"The dedup_key property is required for {action_key_name}={action} events,"
182-
f" and it must be a string."
183-
)
184-
if images is not None:
185-
data["images"] = images
186-
if links is not None:
187-
data["links"] = links
188-
189-
return data
190-
191193
def create_change_event(
192194
self,
193195
summary: str,
@@ -237,3 +239,139 @@ def test_connection(self):
237239
except Exception:
238240
return False, "connection test failed, invalid routing key"
239241
return True, "connection tested successfully"
242+
243+
244+
class PagerdutyEventsAsyncHook(HttpAsyncHook):
245+
"""
246+
This class can be used to interact with the Pagerduty Events API via async http.
247+
248+
Documentation on how to use the PagerDuty Events API can be found at:
249+
https://developer.pagerduty.com/docs/events-api-v2-overview
250+
251+
It takes both an Events API token and a PagerDuty connection with the Events API token
252+
(i.e. Integration key) as the password/Pagerduty API token. If both supplied, the token will be used.
253+
254+
:param integration_key: PagerDuty Events API token
255+
:param pagerduty_events_conn_id: connection that has PagerDuty integration key in the Pagerduty
256+
API token field
257+
:param api_version: api version to use
258+
"""
259+
260+
default_headers = {
261+
"Content-Type": "application/json",
262+
"Accept": "application/json",
263+
}
264+
conn_name_attr = "pagerduty_events_conn_id"
265+
default_conn_name = "pagerduty_events_default"
266+
conn_type = "pagerduty_events"
267+
hook_name = "Async Pagerduty Events"
268+
269+
def __init__(
270+
self,
271+
integration_key: str | None = None,
272+
pagerduty_events_conn_id: str | None = None,
273+
api_version: str = "v2",
274+
) -> None:
275+
super().__init__()
276+
self.integration_key = integration_key
277+
self.pagerduty_events_conn_id = pagerduty_events_conn_id
278+
self.api_version = api_version
279+
self.method = "POST"
280+
self.base_url: str = "https://events.pagerduty.com"
281+
self.http_conn_id = ""
282+
283+
async def get_integration_key(self) -> str:
284+
"""Get integration key from the connection."""
285+
if self.integration_key:
286+
return self.integration_key
287+
288+
if self.pagerduty_events_conn_id is not None:
289+
conn = await sync_to_async(self.get_connection)(self.pagerduty_events_conn_id)
290+
self.integration_key = conn.password
291+
if self.integration_key:
292+
return self.integration_key
293+
294+
raise AirflowException(
295+
"Cannot get integration key: No valid integration key nor pagerduty_events_conn_id supplied."
296+
)
297+
298+
async def send_event(
299+
self,
300+
summary: str,
301+
severity: str,
302+
source: str = "airflow",
303+
action: str = "trigger",
304+
dedup_key: str | None = None,
305+
custom_details: Any | None = None,
306+
group: str | None = None,
307+
component: str | None = None,
308+
class_type: str | None = None,
309+
images: list[Any] | None = None,
310+
links: list[Any] | None = None,
311+
) -> str:
312+
"""
313+
Create event for service integration.
314+
315+
:param summary: Summary for the event
316+
:param severity: Severity for the event, needs to be one of: info, warning, error, critical
317+
:param source: Specific human-readable unique identifier, such as a
318+
hostname, for the system having the problem.
319+
:param action: Event action, needs to be one of: trigger, acknowledge,
320+
resolve. Default to trigger if not specified.
321+
:param dedup_key: A string which identifies the alert triggered for the given event.
322+
Required for the actions acknowledge and resolve.
323+
:param custom_details: Free-form details from the event. Can be a dictionary or a string.
324+
If a dictionary is passed it will show up in PagerDuty as a table.
325+
:param group: A cluster or grouping of sources. For example, sources
326+
"prod-datapipe-02" and "prod-datapipe-03" might both be part of "prod-datapipe"
327+
:param component: The part or component of the affected system that is broken.
328+
:param class_type: The class/type of the event.
329+
:param images: List of images to include. Each dictionary in the list accepts the following keys:
330+
`src`: The source (URL) of the image being attached to the incident. This image must be served via
331+
HTTPS.
332+
`href`: [Optional] URL to make the image a clickable link.
333+
`alt`: [Optional] Alternative text for the image.
334+
:param links: List of links to include. Each dictionary in the list accepts the following keys:
335+
`href`: URL of the link to be attached.
336+
`text`: [Optional] Plain text that describes the purpose of the link, and can be used as the
337+
link's text.
338+
:return: PagerDuty Events API response.
339+
"""
340+
event = {"event_action": action}
341+
342+
integration_key = await self.get_integration_key()
343+
# add routing key
344+
event["routing_key"] = integration_key
345+
346+
data = prepare_event_data(
347+
summary=summary,
348+
severity=severity,
349+
source=source,
350+
custom_details=custom_details,
351+
component=component,
352+
group=group,
353+
class_type=class_type,
354+
action=action,
355+
dedup_key=dedup_key,
356+
images=images,
357+
links=links,
358+
)
359+
360+
event.update(data)
361+
362+
if isinstance(dedup_key, str):
363+
event["dedup_key"] = dedup_key
364+
elif not action == "trigger":
365+
raise ValueError(
366+
f"The dedup_key property is required forevent_action={action} events, and it must be a string."
367+
)
368+
369+
async with aiohttp.ClientSession() as session:
370+
res = await super().run(
371+
session=session,
372+
endpoint=f"{self.api_version}/enqueue",
373+
json=event,
374+
headers=self.default_headers,
375+
)
376+
res_body = await res.json()
377+
return res_body["dedup_key"]

providers/pagerduty/src/airflow/providers/pagerduty/notifications/pagerduty.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
from typing import Any
2222

2323
from airflow.providers.common.compat.notifier import BaseNotifier
24-
from airflow.providers.pagerduty.hooks.pagerduty_events import PagerdutyEventsHook
24+
from airflow.providers.pagerduty.hooks.pagerduty_events import PagerdutyEventsAsyncHook, PagerdutyEventsHook
25+
from airflow.providers.pagerduty.version_compat import AIRFLOW_V_3_1_PLUS
2526

2627

2728
class PagerdutyNotifier(BaseNotifier):
@@ -86,8 +87,13 @@ def __init__(
8687
links: list[Any] | None = None,
8788
pagerduty_events_conn_id: str | None = "pagerduty_events_default",
8889
integration_key: str | None = None,
90+
**kwargs,
8991
):
90-
super().__init__()
92+
if AIRFLOW_V_3_1_PLUS:
93+
# Support for passing context was added in 3.1.0
94+
super().__init__(**kwargs)
95+
else:
96+
super().__init__()
9197
self.pagerduty_events_conn_id = pagerduty_events_conn_id
9298
self.integration_key = integration_key
9399
self.summary = summary
@@ -109,6 +115,13 @@ def hook(self) -> PagerdutyEventsHook:
109115
pagerduty_events_conn_id=self.pagerduty_events_conn_id, integration_key=self.integration_key
110116
)
111117

118+
@cached_property
119+
def hook_async(self) -> PagerdutyEventsAsyncHook:
120+
"""Pagerduty Events Async Hook."""
121+
return PagerdutyEventsAsyncHook(
122+
pagerduty_events_conn_id=self.pagerduty_events_conn_id, integration_key=self.integration_key
123+
)
124+
112125
def notify(self, context):
113126
"""Send a alert to a pagerduty event v2 API."""
114127
self.hook.send_event(
@@ -125,5 +138,21 @@ def notify(self, context):
125138
links=self.links,
126139
)
127140

141+
async def async_notify(self, context) -> None:
142+
"""Send a alert to a pagerduty event v2 API using async HTTP."""
143+
await self.hook_async.send_event(
144+
summary=self.summary,
145+
severity=self.severity,
146+
source=self.source,
147+
action=self.action,
148+
dedup_key=self.dedup_key,
149+
custom_details=self.custom_details,
150+
group=self.group,
151+
component=self.component,
152+
class_type=self.class_type,
153+
images=self.images,
154+
links=self.links,
155+
)
156+
128157

129158
send_pagerduty_notification = PagerdutyNotifier

0 commit comments

Comments
 (0)