Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions dev/breeze/tests/test_selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"providers/http/tests/file.py",
),
{
"selected-providers-list-as-string": "amazon apache.livy dbt.cloud dingding discord google http",
"selected-providers-list-as-string": "amazon apache.livy dbt.cloud dingding discord google http pagerduty",
"all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string": DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
"python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
Expand All @@ -708,7 +708,7 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
[
{
"description": "amazon...google",
"test_types": "Providers[amazon] Providers[apache.livy,dbt.cloud,dingding,discord,http] Providers[google]",
"test_types": "Providers[amazon] Providers[apache.livy,dbt.cloud,dingding,discord,http,pagerduty] Providers[google]",
}
]
),
Expand All @@ -722,9 +722,12 @@ def assert_outputs_are_printed(expected_outputs: dict[str, str], stderr: str):
"description": "dbt.cloud...dingding",
"test_types": "Providers[dbt.cloud] Providers[dingding]",
},
{"description": "discord", "test_types": "Providers[discord]"},
{"description": "google", "test_types": "Providers[google]"},
{
"description": "discord...google",
"test_types": "Providers[discord] Providers[google]",
},
{"description": "http", "test_types": "Providers[http]"},
{"description": "pagerduty", "test_types": "Providers[pagerduty]"},
]
),
"run-mypy": "true",
Expand Down
2 changes: 2 additions & 0 deletions providers/pagerduty/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ requires-python = ">=3.10"
dependencies = [
"apache-airflow>=2.10.0",
"apache-airflow-providers-common-compat>=1.6.1",
"apache-airflow-providers-http",
"pagerduty>=2.3.0",
]

Expand All @@ -68,6 +69,7 @@ dev = [
"apache-airflow-task-sdk",
"apache-airflow-devel-common",
"apache-airflow-providers-common-compat",
"apache-airflow-providers-http",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,68 @@

from typing import TYPE_CHECKING, Any

import aiohttp
import pagerduty
from asgiref.sync import sync_to_async

from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpAsyncHook
from airflow.providers.pagerduty.version_compat import BaseHook

if TYPE_CHECKING:
from datetime import datetime


def prepare_event_data(
summary: str,
severity: str,
source: str = "airflow",
custom_details: Any | None = None,
component: str | None = None,
group: str | None = None,
class_type: str | None = None,
action: str = "trigger",
dedup_key: str | None = None,
images: list[Any] | None = None,
links: list[Any] | None = None,
action_key_name: str = "action",
) -> dict[str, Any]:
"""Prepare event data for send_event / post('/v2/enqueue') method."""
payload = {
"summary": summary,
"severity": severity,
"source": source,
}
if custom_details is not None:
payload["custom_details"] = custom_details
if component:
payload["component"] = component
if group:
payload["group"] = group
if class_type:
payload["class"] = class_type

actions = ("trigger", "acknowledge", "resolve")
if action not in actions:
raise ValueError(f"Event action must be one of: {', '.join(actions)}")
data = {
action_key_name: action,
"payload": payload,
}
if dedup_key:
data["dedup_key"] = dedup_key
elif action != "trigger":
raise ValueError(
f"The dedup_key property is required for {action_key_name}={action} events,"
f" and it must be a string."
)
if images is not None:
data["images"] = images
if links is not None:
data["links"] = links
return data


class PagerdutyEventsHook(BaseHook):
"""
This class can be used to interact with the Pagerduty Events API.
Expand Down Expand Up @@ -120,7 +173,7 @@ def send_event(
link's text.
:return: PagerDuty Events API v2 response.
"""
data = PagerdutyEventsHook.prepare_event_data(
data = prepare_event_data(
summary=summary,
severity=severity,
source=source,
Expand All @@ -137,57 +190,6 @@ def send_event(
client = pagerduty.EventsApiV2Client(self.integration_key)
return client.send_event(**data)

@staticmethod
def prepare_event_data(
summary,
severity,
source,
custom_details,
component,
group,
class_type,
action,
dedup_key,
images,
links,
action_key_name: str = "action",
) -> dict:
"""Prepare event data for send_event / post('/v2/enqueue') method."""
payload = {
"summary": summary,
"severity": severity,
"source": source,
}
if custom_details is not None:
payload["custom_details"] = custom_details
if component:
payload["component"] = component
if group:
payload["group"] = group
if class_type:
payload["class"] = class_type

actions = ("trigger", "acknowledge", "resolve")
if action not in actions:
raise ValueError(f"Event action must be one of: {', '.join(actions)}")
data = {
action_key_name: action,
"payload": payload,
}
if dedup_key:
data["dedup_key"] = dedup_key
elif action != "trigger":
raise ValueError(
f"The dedup_key property is required for {action_key_name}={action} events,"
f" and it must be a string."
)
if images is not None:
data["images"] = images
if links is not None:
data["links"] = links

return data

def create_change_event(
self,
summary: str,
Expand Down Expand Up @@ -237,3 +239,139 @@ def test_connection(self):
except Exception:
return False, "connection test failed, invalid routing key"
return True, "connection tested successfully"


class PagerdutyEventsAsyncHook(HttpAsyncHook):
"""
This class can be used to interact with the Pagerduty Events API via async http.

Documentation on how to use the PagerDuty Events API can be found at:
https://developer.pagerduty.com/docs/events-api-v2-overview

It takes both an Events API token and a PagerDuty connection with the Events API token
(i.e. Integration key) as the password/Pagerduty API token. If both supplied, the token will be used.

:param integration_key: PagerDuty Events API token
:param pagerduty_events_conn_id: connection that has PagerDuty integration key in the Pagerduty
API token field
:param api_version: api version to use
"""

default_headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
conn_name_attr = "pagerduty_events_conn_id"
default_conn_name = "pagerduty_events_default"
conn_type = "pagerduty_events"
hook_name = "Async Pagerduty Events"

def __init__(
self,
integration_key: str | None = None,
pagerduty_events_conn_id: str | None = None,
api_version: str = "v2",
) -> None:
super().__init__()
self.integration_key = integration_key
self.pagerduty_events_conn_id = pagerduty_events_conn_id
self.api_version = api_version
self.method = "POST"
self.base_url: str = "https://events.pagerduty.com"
self.http_conn_id = ""

async def get_integration_key(self) -> str:
"""Get integration key from the connection."""
if self.integration_key:
return self.integration_key

if self.pagerduty_events_conn_id is not None:
conn = await sync_to_async(self.get_connection)(self.pagerduty_events_conn_id)
self.integration_key = conn.password
if self.integration_key:
return self.integration_key

raise AirflowException(
"Cannot get integration key: No valid integration key nor pagerduty_events_conn_id supplied."
)

async def send_event(
self,
summary: str,
severity: str,
source: str = "airflow",
action: str = "trigger",
dedup_key: str | None = None,
custom_details: Any | None = None,
group: str | None = None,
component: str | None = None,
class_type: str | None = None,
images: list[Any] | None = None,
links: list[Any] | None = None,
) -> str:
"""
Create event for service integration.

:param summary: Summary for the event
:param severity: Severity for the event, needs to be one of: info, warning, error, critical
:param source: Specific human-readable unique identifier, such as a
hostname, for the system having the problem.
:param action: Event action, needs to be one of: trigger, acknowledge,
resolve. Default to trigger if not specified.
:param dedup_key: A string which identifies the alert triggered for the given event.
Required for the actions acknowledge and resolve.
:param custom_details: Free-form details from the event. Can be a dictionary or a string.
If a dictionary is passed it will show up in PagerDuty as a table.
:param group: A cluster or grouping of sources. For example, sources
"prod-datapipe-02" and "prod-datapipe-03" might both be part of "prod-datapipe"
:param component: The part or component of the affected system that is broken.
:param class_type: The class/type of the event.
:param images: List of images to include. Each dictionary in the list accepts the following keys:
`src`: The source (URL) of the image being attached to the incident. This image must be served via
HTTPS.
`href`: [Optional] URL to make the image a clickable link.
`alt`: [Optional] Alternative text for the image.
:param links: List of links to include. Each dictionary in the list accepts the following keys:
`href`: URL of the link to be attached.
`text`: [Optional] Plain text that describes the purpose of the link, and can be used as the
link's text.
:return: PagerDuty Events API response.
"""
event = {"event_action": action}

integration_key = await self.get_integration_key()
# add routing key
event["routing_key"] = integration_key

data = prepare_event_data(
summary=summary,
severity=severity,
source=source,
custom_details=custom_details,
component=component,
group=group,
class_type=class_type,
action=action,
dedup_key=dedup_key,
images=images,
links=links,
)

event.update(data)

if isinstance(dedup_key, str):
event["dedup_key"] = dedup_key
elif not action == "trigger":
raise ValueError(
f"The dedup_key property is required forevent_action={action} events, and it must be a string."
)

async with aiohttp.ClientSession() as session:
res = await super().run(
session=session,
endpoint=f"{self.api_version}/enqueue",
json=event,
headers=self.default_headers,
)
res_body = await res.json()
return res_body["dedup_key"]
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from typing import Any

from airflow.providers.common.compat.notifier import BaseNotifier
from airflow.providers.pagerduty.hooks.pagerduty_events import PagerdutyEventsHook
from airflow.providers.pagerduty.hooks.pagerduty_events import PagerdutyEventsAsyncHook, PagerdutyEventsHook
from airflow.providers.pagerduty.version_compat import AIRFLOW_V_3_1_PLUS


class PagerdutyNotifier(BaseNotifier):
Expand Down Expand Up @@ -86,8 +87,13 @@ def __init__(
links: list[Any] | None = None,
pagerduty_events_conn_id: str | None = "pagerduty_events_default",
integration_key: str | None = None,
**kwargs,
):
super().__init__()
if AIRFLOW_V_3_1_PLUS:
# Support for passing context was added in 3.1.0
super().__init__(**kwargs)
else:
super().__init__()
self.pagerduty_events_conn_id = pagerduty_events_conn_id
self.integration_key = integration_key
self.summary = summary
Expand All @@ -109,6 +115,13 @@ def hook(self) -> PagerdutyEventsHook:
pagerduty_events_conn_id=self.pagerduty_events_conn_id, integration_key=self.integration_key
)

@cached_property
def hook_async(self) -> PagerdutyEventsAsyncHook:
"""Pagerduty Events Async Hook."""
return PagerdutyEventsAsyncHook(
pagerduty_events_conn_id=self.pagerduty_events_conn_id, integration_key=self.integration_key
)

def notify(self, context):
"""Send a alert to a pagerduty event v2 API."""
self.hook.send_event(
Expand All @@ -125,5 +138,21 @@ def notify(self, context):
links=self.links,
)

async def async_notify(self, context) -> None:
"""Send a alert to a pagerduty event v2 API using async HTTP."""
await self.hook_async.send_event(
summary=self.summary,
severity=self.severity,
source=self.source,
action=self.action,
dedup_key=self.dedup_key,
custom_details=self.custom_details,
group=self.group,
component=self.component,
class_type=self.class_type,
images=self.images,
links=self.links,
)


send_pagerduty_notification = PagerdutyNotifier
Loading
Loading