diff --git a/src/sentry/workflow_engine/processors/workflow.py b/src/sentry/workflow_engine/processors/workflow.py index 9983c49164e9fe..e5536a05df7f06 100644 --- a/src/sentry/workflow_engine/processors/workflow.py +++ b/src/sentry/workflow_engine/processors/workflow.py @@ -33,7 +33,11 @@ ) from sentry.workflow_engine.processors.detector import get_detector_by_event from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories -from sentry.workflow_engine.types import WorkflowEventData +from sentry.workflow_engine.types import ( + WorkflowEvaluation, + WorkflowEvaluationData, + WorkflowEventData, +) from sentry.workflow_engine.utils import log_context, scopedstats from sentry.workflow_engine.utils.metrics import metrics_incr @@ -464,7 +468,7 @@ def process_workflows( event_data: WorkflowEventData, event_start_time: datetime, detector: Detector | None = None, -) -> set[Workflow]: +) -> WorkflowEvaluation: """ This method will get the detector based on the event, and then gather the associated workflows. Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met, @@ -478,6 +482,8 @@ def process_workflows( fire_actions, ) + workflow_evaluation_data = WorkflowEvaluationData(group_event=event_data.event) + try: if detector is None and isinstance(event_data.event, GroupEvent): detector = get_detector_by_event(event_data) @@ -496,7 +502,13 @@ def process_workflows( ) ) except Detector.DoesNotExist: - return set() + return WorkflowEvaluation( + tainted=True, + msg="No Detectors associated with the issue were found", + data=workflow_evaluation_data, + ) + + workflow_evaluation_data.associated_detector = detector try: environment = get_environment_by_event(event_data) @@ -510,33 +522,58 @@ def process_workflows( ) ) except Environment.DoesNotExist: - return set() + return WorkflowEvaluation( + tainted=True, + msg="Environment for event not found", + data=workflow_evaluation_data, + ) if features.has("organizations:workflow-engine-process-workflows-logs", organization): log_context.set_verbose(True) workflows = _get_associated_workflows(detector, environment, event_data) + workflow_evaluation_data.workflows = workflows + if not workflows: - # If there aren't any workflows, there's nothing to evaluate - return set() + return WorkflowEvaluation( + tainted=True, + msg="No workflows are associated with the detector in the event", + data=workflow_evaluation_data, + ) triggered_workflows, queue_items_by_workflow_id = evaluate_workflow_triggers( workflows, event_data, event_start_time ) + + workflow_evaluation_data.triggered_workflows = triggered_workflows + if not triggered_workflows and not queue_items_by_workflow_id: - # if there aren't any triggered workflows, there's no action filters to evaluate - return set() + return WorkflowEvaluation( + tainted=True, + msg="No items were triggered or queued for slow evaluation", + data=workflow_evaluation_data, + ) + # TODO - we should probably return here and have the rest from here be + # `process_actions`, this will take a list of "triggered_workflows" actions_to_trigger, queue_items_by_workflow_id = evaluate_workflows_action_filters( triggered_workflows, event_data, queue_items_by_workflow_id, event_start_time ) + enqueue_workflows(batch_client, queue_items_by_workflow_id) + actions = filter_recently_fired_workflow_actions(actions_to_trigger, event_data) sentry_sdk.set_tag("workflow_engine.triggered_actions", len(actions)) + workflow_evaluation_data.action_groups = actions_to_trigger + workflow_evaluation_data.triggered_actions = set(actions) + if not actions: - # If there aren't any actions on the associated workflows, there's nothing to trigger - return triggered_workflows + return WorkflowEvaluation( + tainted=True, + msg="No actions to evaluate; filtered or not triggered", + data=workflow_evaluation_data, + ) should_trigger_actions = should_fire_workflow_actions(organization, event_data.group.type) create_workflow_fire_histories( @@ -549,5 +586,4 @@ def process_workflows( ) fire_actions(actions, detector, event_data) - - return triggered_workflows + return WorkflowEvaluation(tainted=False, msg=None, data=workflow_evaluation_data) diff --git a/src/sentry/workflow_engine/tasks/workflows.py b/src/sentry/workflow_engine/tasks/workflows.py index c0fe693f8a264d..da507826f2735b 100644 --- a/src/sentry/workflow_engine/tasks/workflows.py +++ b/src/sentry/workflow_engine/tasks/workflows.py @@ -18,6 +18,7 @@ from sentry.utils import metrics from sentry.utils.exceptions import quiet_redis_noise from sentry.utils.locking import UnableToAcquireLock +from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.models import DataConditionGroup, Detector from sentry.workflow_engine.tasks.utils import ( EventNotFoundError, @@ -44,7 +45,6 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int) The task will get the Activity from the database, create a WorkflowEventData object, and then process the data in `process_workflows`. """ - from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.processors.workflow import process_workflows with transaction.atomic(router.db_for_write(Detector)): @@ -69,9 +69,12 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int) ) with quiet_redis_noise(): batch_client = DelayedWorkflowClient() - process_workflows( + evaluation = process_workflows( batch_client, event_data, event_start_time=activity.datetime, detector=detector ) + + evaluation.to_log(logger) + metrics.incr( "workflow_engine.tasks.process_workflows.activity_update.executed", tags={"activity_type": activity.type, "detector_type": detector.type}, @@ -103,11 +106,11 @@ def process_workflows_event( start_timestamp_seconds: float | None = None, **kwargs: dict[str, Any], ) -> None: - from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.processors.workflow import process_workflows recorder = scopedstats.Recorder() start_time = time.time() + with recorder.record(): try: event_data = build_workflow_event_data_from_event( @@ -131,7 +134,11 @@ def process_workflows_event( ) with quiet_redis_noise(): batch_client = DelayedWorkflowClient() - process_workflows(batch_client, event_data, event_start_time=event_start_time) + evaluation = process_workflows( + batch_client, event_data, event_start_time=event_start_time + ) + + evaluation.to_log(logger) duration = time.time() - start_time is_slow = duration > 1.0 # We want full coverage for particularly slow cases, plus a random sampling. @@ -158,7 +165,6 @@ def schedule_delayed_workflows(**kwargs: Any) -> None: """ Schedule delayed workflow buffers in a batch. """ - from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.processors.schedule import process_buffered_workflows lock_name = "schedule_delayed_workflows" diff --git a/src/sentry/workflow_engine/types.py b/src/sentry/workflow_engine/types.py index 760f65de4a362e..ac0847a8e85679 100644 --- a/src/sentry/workflow_engine/types.py +++ b/src/sentry/workflow_engine/types.py @@ -1,8 +1,9 @@ from __future__ import annotations from abc import ABC, abstractmethod -from dataclasses import dataclass, field +from dataclasses import asdict, dataclass, field from enum import IntEnum, StrEnum +from logging import Logger from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypedDict, TypeVar from sentry.types.group import PriorityLevel @@ -20,7 +21,7 @@ from sentry.snuba.models import SnubaQueryEventType from sentry.workflow_engine.endpoints.validators.base import BaseDetectorTypeValidator from sentry.workflow_engine.handlers.detector import DetectorHandler - from sentry.workflow_engine.models import Action, Detector + from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow from sentry.workflow_engine.models.data_condition import Condition T = TypeVar("T") @@ -72,6 +73,56 @@ class WorkflowEventData: workflow_env: Environment | None = None +@dataclass +class WorkflowEvaluationData: + group_event: GroupEvent | Activity + action_groups: set[DataConditionGroup] | None = None + workflows: set[Workflow] | None = None + triggered_actions: set[Action] | None = None + triggered_workflows: set[Workflow] | None = None + associated_detector: Detector | None = None + + +@dataclass(frozen=True) +class WorkflowEvaluation: + """ + This is the result of `process_workflows`, and is used to + encapsulate different stages of completion for the method. + + The `tainted` flag is used to indicate whether or not actions + have been triggered during the workflows evaluation. + + The `msg` field is used for debug information during the evaluation. + + The `data` attribute will include all the data used to evaluate the + workflows, and determine if an action should be triggered. + """ + + tainted: bool + msg: str | None + data: WorkflowEvaluationData + + def to_log(self, logger: Logger) -> None: + """ + Determines how far in the process the evaluation got to + and creates a structured log string to quickly find. + + Then this will return the that log string, and the + relevant processing data to be logged. + """ + log_str = "workflow_engine.process_workflows.evaluation" + + if self.tainted: + if self.data.triggered_workflows is None: + log_str = f"{log_str}.workflows.not_triggered" + else: + log_str = f"{log_str}.workflows.triggered" + else: + log_str = f"{log_str}.actions.triggered" + + logger.info(log_str, extra={**asdict(self.data), "debug_msg": self.msg}) + + class ConfigTransformer(ABC): """ A ConfigTransformer is used to transform the config between API and internal representations. diff --git a/tests/sentry/workflow_engine/processors/test_workflow.py b/tests/sentry/workflow_engine/processors/test_workflow.py index 5c50475803764a..3e22bd6855ae57 100644 --- a/tests/sentry/workflow_engine/processors/test_workflow.py +++ b/tests/sentry/workflow_engine/processors/test_workflow.py @@ -88,12 +88,12 @@ def test_skips_disabled_workflows(self) -> None: workflow=workflow, ) - triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert triggered_workflows == {self.error_workflow} + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert result.data.triggered_workflows == {self.error_workflow} def test_error_event(self) -> None: - triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert triggered_workflows == {self.error_workflow} + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert result.data.triggered_workflows == {self.error_workflow} @patch("sentry.workflow_engine.processors.action.fire_actions") def test_process_workflows_event(self, mock_fire_actions: MagicMock) -> None: @@ -161,9 +161,9 @@ def test_populate_workflow_env_for_filters(self, mock_filter: MagicMock) -> None assert self.event_data.group_state self.event_data.group_state["is_new"] = True - process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) mock_filter.assert_called_with({workflow_filters}, self.event_data) + assert result.tainted is False def test_same_environment_only(self) -> None: env = self.create_environment(project=self.project) @@ -208,15 +208,15 @@ def test_same_environment_only(self) -> None: workflow=mismatched_env_workflow, ) - triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert triggered_workflows == {self.error_workflow, matching_env_workflow} + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert result.data.triggered_workflows == {self.error_workflow, matching_env_workflow} def test_issue_occurrence_event(self) -> None: issue_occurrence = self.build_occurrence(evidence_data={"detector_id": self.detector.id}) self.group_event.occurrence = issue_occurrence - triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert triggered_workflows == {self.workflow} + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert result.data.triggered_workflows == {self.workflow} def test_regressed_event(self) -> None: dcg = self.create_data_condition_group() @@ -233,17 +233,16 @@ def test_regressed_event(self) -> None: workflow=workflow, ) - triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert triggered_workflows == {self.error_workflow, workflow} + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert result.data.triggered_workflows == {self.error_workflow, workflow} @patch("sentry.utils.metrics.incr") @patch("sentry.workflow_engine.processors.detector.logger") def test_no_detector(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None: self.group_event.occurrence = self.build_occurrence(evidence_data={}) - triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - - assert not triggered_workflows + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert result.msg == "No Detectors associated with the issue were found" mock_incr.assert_called_once_with("workflow_engine.detectors.error") mock_logger.exception.assert_called_once_with( @@ -260,9 +259,10 @@ def test_no_detector(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None def test_no_environment(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None: Environment.objects.all().delete() cache.clear() - triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert not triggered_workflows + assert not result.data.triggered_workflows + assert result.msg == "Environment for event not found" mock_incr.assert_called_once_with( "workflow_engine.process_workflows.error", 1, tags={"detector_type": "error"} @@ -338,8 +338,12 @@ def test_defaults_to_error_workflows(self) -> None: self.group_event.occurrence = issue_occurrence self.group.update(type=issue_occurrence.type.type_id) - triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert triggered_workflows == {self.error_workflow} + result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + + assert result.tainted is True + assert result.data.triggered_workflows == {self.error_workflow} + assert result.data.triggered_actions is not None + assert len(result.data.triggered_actions) == 0 class TestEvaluateWorkflowTriggers(BaseWorkflowTest): diff --git a/tests/sentry/workflow_engine/test_task.py b/tests/sentry/workflow_engine/test_task.py index df2476d3d17862..c633741dcc245d 100644 --- a/tests/sentry/workflow_engine/test_task.py +++ b/tests/sentry/workflow_engine/test_task.py @@ -137,7 +137,8 @@ def setUp(self) -> None: self.activity.save() self.detector = self.create_detector(type=MetricIssue.slug) - def test_process_workflow_activity__no_workflows(self) -> None: + @mock.patch("sentry.workflow_engine.tasks.workflows.logger") + def test_process_workflow_activity__no_workflows(self, mock_logger) -> None: with mock.patch( "sentry.workflow_engine.processors.workflow.evaluate_workflow_triggers", return_value=set(), @@ -150,6 +151,19 @@ def test_process_workflow_activity__no_workflows(self) -> None: # Short-circuit evaluation, no workflows associated assert mock_evaluate.call_count == 0 + mock_logger.info.assert_called_once_with( + "workflow_engine.process_workflows.evaluation.workflows.not_triggered", + extra={ + "debug_msg": "No workflows are associated with the detector in the event", + "group_event": self.activity, + "action_groups": None, + "triggered_actions": None, + "workflows": set(), + "triggered_workflows": None, + "associated_detector": self.detector, + }, + ) + @mock.patch( "sentry.workflow_engine.processors.workflow.evaluate_workflow_triggers", return_value=(set(), {}), @@ -158,8 +172,9 @@ def test_process_workflow_activity__no_workflows(self) -> None: "sentry.workflow_engine.processors.workflow.evaluate_workflows_action_filters", return_value=set(), ) + @mock.patch("sentry.workflow_engine.tasks.workflows.logger") def test_process_workflow_activity__workflows__no_actions( - self, mock_eval_actions, mock_evaluate + self, mock_logger, mock_eval_actions, mock_evaluate ): self.workflow = self.create_workflow(organization=self.organization) self.create_detector_workflow( @@ -181,8 +196,24 @@ def test_process_workflow_activity__workflows__no_actions( mock_evaluate.assert_called_once_with({self.workflow}, event_data, mock.ANY) assert mock_eval_actions.call_count == 0 + mock_logger.info.assert_called_once_with( + "workflow_engine.process_workflows.evaluation.workflows.triggered", + extra={ + "debug_msg": "No items were triggered or queued for slow evaluation", + "group_event": self.activity, + "action_groups": None, + "triggered_actions": None, + "workflows": {self.workflow}, + "triggered_workflows": set(), # from the mock + "associated_detector": self.detector, + }, + ) + @mock.patch("sentry.workflow_engine.processors.action.filter_recently_fired_workflow_actions") - def test_process_workflow_activity(self, mock_filter_actions: mock.MagicMock) -> None: + @mock.patch("sentry.workflow_engine.tasks.workflows.logger") + def test_process_workflow_activity( + self, mock_logger, mock_filter_actions: mock.MagicMock + ) -> None: self.workflow = self.create_workflow(organization=self.organization) self.action_group = self.create_data_condition_group(logic_type="any-short") @@ -210,6 +241,18 @@ def test_process_workflow_activity(self, mock_filter_actions: mock.MagicMock) -> ) mock_filter_actions.assert_called_once_with({self.action_group}, expected_event_data) + mock_logger.info.assert_called_once_with( + "workflow_engine.process_workflows.evaluation.actions.triggered", + extra={ + "debug_msg": None, + "group_event": self.activity, + "action_groups": {self.action_group}, + "triggered_actions": set(), + "workflows": {self.workflow}, + "triggered_workflows": {self.workflow}, + "associated_detector": self.detector, + }, + ) @mock.patch( "sentry.workflow_engine.models.incident_groupopenperiod.update_incident_based_on_open_period_status_change"