From b61e83b2a415fd28d42f8d5724d38ec5a2987b9b Mon Sep 17 00:00:00 2001 From: Josh Callender <1569818+saponifi3d@users.noreply.github.com> Date: Wed, 12 Nov 2025 13:37:21 -0800 Subject: [PATCH] Revert "ref(workflow_engine): Add logging for process_workflows (#103190)" This reverts commit 49b7c139a91f7618213bba2297e21119780cec63. --- .../workflow_engine/processors/workflow.py | 60 ++++--------------- src/sentry/workflow_engine/tasks/workflows.py | 16 ++--- src/sentry/workflow_engine/types.py | 55 +---------------- .../processors/test_workflow.py | 42 ++++++------- tests/sentry/workflow_engine/test_task.py | 49 +-------------- 5 files changed, 41 insertions(+), 181 deletions(-) diff --git a/src/sentry/workflow_engine/processors/workflow.py b/src/sentry/workflow_engine/processors/workflow.py index e5536a05df7f06..9983c49164e9fe 100644 --- a/src/sentry/workflow_engine/processors/workflow.py +++ b/src/sentry/workflow_engine/processors/workflow.py @@ -33,11 +33,7 @@ ) from sentry.workflow_engine.processors.detector import get_detector_by_event from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories -from sentry.workflow_engine.types import ( - WorkflowEvaluation, - WorkflowEvaluationData, - WorkflowEventData, -) +from sentry.workflow_engine.types import WorkflowEventData from sentry.workflow_engine.utils import log_context, scopedstats from sentry.workflow_engine.utils.metrics import metrics_incr @@ -468,7 +464,7 @@ def process_workflows( event_data: WorkflowEventData, event_start_time: datetime, detector: Detector | None = None, -) -> WorkflowEvaluation: +) -> set[Workflow]: """ This method will get the detector based on the event, and then gather the associated workflows. Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met, @@ -482,8 +478,6 @@ def process_workflows( fire_actions, ) - workflow_evaluation_data = WorkflowEvaluationData(group_event=event_data.event) - try: if detector is None and isinstance(event_data.event, GroupEvent): detector = get_detector_by_event(event_data) @@ -502,13 +496,7 @@ def process_workflows( ) ) except Detector.DoesNotExist: - return WorkflowEvaluation( - tainted=True, - msg="No Detectors associated with the issue were found", - data=workflow_evaluation_data, - ) - - workflow_evaluation_data.associated_detector = detector + return set() try: environment = get_environment_by_event(event_data) @@ -522,58 +510,33 @@ def process_workflows( ) ) except Environment.DoesNotExist: - return WorkflowEvaluation( - tainted=True, - msg="Environment for event not found", - data=workflow_evaluation_data, - ) + return set() if features.has("organizations:workflow-engine-process-workflows-logs", organization): log_context.set_verbose(True) workflows = _get_associated_workflows(detector, environment, event_data) - workflow_evaluation_data.workflows = workflows - if not workflows: - return WorkflowEvaluation( - tainted=True, - msg="No workflows are associated with the detector in the event", - data=workflow_evaluation_data, - ) + # If there aren't any workflows, there's nothing to evaluate + return set() triggered_workflows, queue_items_by_workflow_id = evaluate_workflow_triggers( workflows, event_data, event_start_time ) - - workflow_evaluation_data.triggered_workflows = triggered_workflows - if not triggered_workflows and not queue_items_by_workflow_id: - return WorkflowEvaluation( - tainted=True, - msg="No items were triggered or queued for slow evaluation", - data=workflow_evaluation_data, - ) + # if there aren't any triggered workflows, there's no action filters to evaluate + return set() - # TODO - we should probably return here and have the rest from here be - # `process_actions`, this will take a list of "triggered_workflows" actions_to_trigger, queue_items_by_workflow_id = evaluate_workflows_action_filters( triggered_workflows, event_data, queue_items_by_workflow_id, event_start_time ) - enqueue_workflows(batch_client, queue_items_by_workflow_id) - actions = filter_recently_fired_workflow_actions(actions_to_trigger, event_data) sentry_sdk.set_tag("workflow_engine.triggered_actions", len(actions)) - workflow_evaluation_data.action_groups = actions_to_trigger - workflow_evaluation_data.triggered_actions = set(actions) - if not actions: - return WorkflowEvaluation( - tainted=True, - msg="No actions to evaluate; filtered or not triggered", - data=workflow_evaluation_data, - ) + # If there aren't any actions on the associated workflows, there's nothing to trigger + return triggered_workflows should_trigger_actions = should_fire_workflow_actions(organization, event_data.group.type) create_workflow_fire_histories( @@ -586,4 +549,5 @@ def process_workflows( ) fire_actions(actions, detector, event_data) - return WorkflowEvaluation(tainted=False, msg=None, data=workflow_evaluation_data) + + return triggered_workflows diff --git a/src/sentry/workflow_engine/tasks/workflows.py b/src/sentry/workflow_engine/tasks/workflows.py index da507826f2735b..c0fe693f8a264d 100644 --- a/src/sentry/workflow_engine/tasks/workflows.py +++ b/src/sentry/workflow_engine/tasks/workflows.py @@ -18,7 +18,6 @@ from sentry.utils import metrics from sentry.utils.exceptions import quiet_redis_noise from sentry.utils.locking import UnableToAcquireLock -from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.models import DataConditionGroup, Detector from sentry.workflow_engine.tasks.utils import ( EventNotFoundError, @@ -45,6 +44,7 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int) The task will get the Activity from the database, create a WorkflowEventData object, and then process the data in `process_workflows`. """ + from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.processors.workflow import process_workflows with transaction.atomic(router.db_for_write(Detector)): @@ -69,12 +69,9 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int) ) with quiet_redis_noise(): batch_client = DelayedWorkflowClient() - evaluation = process_workflows( + process_workflows( batch_client, event_data, event_start_time=activity.datetime, detector=detector ) - - evaluation.to_log(logger) - metrics.incr( "workflow_engine.tasks.process_workflows.activity_update.executed", tags={"activity_type": activity.type, "detector_type": detector.type}, @@ -106,11 +103,11 @@ def process_workflows_event( start_timestamp_seconds: float | None = None, **kwargs: dict[str, Any], ) -> None: + from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.processors.workflow import process_workflows recorder = scopedstats.Recorder() start_time = time.time() - with recorder.record(): try: event_data = build_workflow_event_data_from_event( @@ -134,11 +131,7 @@ def process_workflows_event( ) with quiet_redis_noise(): batch_client = DelayedWorkflowClient() - evaluation = process_workflows( - batch_client, event_data, event_start_time=event_start_time - ) - - evaluation.to_log(logger) + process_workflows(batch_client, event_data, event_start_time=event_start_time) duration = time.time() - start_time is_slow = duration > 1.0 # We want full coverage for particularly slow cases, plus a random sampling. @@ -165,6 +158,7 @@ def schedule_delayed_workflows(**kwargs: Any) -> None: """ Schedule delayed workflow buffers in a batch. """ + from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient from sentry.workflow_engine.processors.schedule import process_buffered_workflows lock_name = "schedule_delayed_workflows" diff --git a/src/sentry/workflow_engine/types.py b/src/sentry/workflow_engine/types.py index f300c1de7b208a..760f65de4a362e 100644 --- a/src/sentry/workflow_engine/types.py +++ b/src/sentry/workflow_engine/types.py @@ -1,9 +1,8 @@ from __future__ import annotations from abc import ABC, abstractmethod -from dataclasses import asdict, dataclass, field +from dataclasses import dataclass, field from enum import IntEnum, StrEnum -from logging import Logger from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypedDict, TypeVar from sentry.types.group import PriorityLevel @@ -21,7 +20,7 @@ from sentry.snuba.models import SnubaQueryEventType from sentry.workflow_engine.endpoints.validators.base import BaseDetectorTypeValidator from sentry.workflow_engine.handlers.detector import DetectorHandler - from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow + from sentry.workflow_engine.models import Action, Detector from sentry.workflow_engine.models.data_condition import Condition T = TypeVar("T") @@ -73,56 +72,6 @@ class WorkflowEventData: workflow_env: Environment | None = None -@dataclass -class WorkflowEvaluationData: - group_event: GroupEvent | Activity - action_groups: set[DataConditionGroup] | None = None - workflows: set[Workflow] | None = None - triggered_actions: set[Action] | None = None - triggered_workflows: set[Workflow] | None = None - associated_detector: Detector | None = None - - -@dataclass(frozen=True) -class WorkflowEvaluation: - """ - This is the result of `process_workflows`, and is used to - encapsulate different stages of completion for the method. - - The `tainted` flag is used to indicate whether or not actions - have been triggered during the workflows evaluation. - - The `msg` field is used for debug information during the evaluation. - - The `data` attribute will include all the data used to evaluate the - workflows, and determine if an action should be triggered. - """ - - tainted: bool - msg: str | None - data: WorkflowEvaluationData - - def to_log(self, logger: Logger) -> None: - """ - Determines how far in the process the evaluation got to - and creates a structured log string to quickly find. - - Then this will return the that log string, and the - relevant processing data to be logged. - """ - log_str = "workflow_engine.process_workflows.evaluation" - - if self.tainted: - if self.data.triggered_workflows is None: - log_str = f"{log_str}.workflows.not_triggered" - else: - log_str = f"{log_str}.workflows.triggered" - else: - log_str = f"{log_str}.actions.triggered" - - logger.info(log_str, extra={**asdict(self.data), "msg": self.msg}) - - class ConfigTransformer(ABC): """ A ConfigTransformer is used to transform the config between API and internal representations. diff --git a/tests/sentry/workflow_engine/processors/test_workflow.py b/tests/sentry/workflow_engine/processors/test_workflow.py index 3e22bd6855ae57..5c50475803764a 100644 --- a/tests/sentry/workflow_engine/processors/test_workflow.py +++ b/tests/sentry/workflow_engine/processors/test_workflow.py @@ -88,12 +88,12 @@ def test_skips_disabled_workflows(self) -> None: workflow=workflow, ) - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert result.data.triggered_workflows == {self.error_workflow} + triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert triggered_workflows == {self.error_workflow} def test_error_event(self) -> None: - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert result.data.triggered_workflows == {self.error_workflow} + triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert triggered_workflows == {self.error_workflow} @patch("sentry.workflow_engine.processors.action.fire_actions") def test_process_workflows_event(self, mock_fire_actions: MagicMock) -> None: @@ -161,9 +161,9 @@ def test_populate_workflow_env_for_filters(self, mock_filter: MagicMock) -> None assert self.event_data.group_state self.event_data.group_state["is_new"] = True - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + mock_filter.assert_called_with({workflow_filters}, self.event_data) - assert result.tainted is False def test_same_environment_only(self) -> None: env = self.create_environment(project=self.project) @@ -208,15 +208,15 @@ def test_same_environment_only(self) -> None: workflow=mismatched_env_workflow, ) - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert result.data.triggered_workflows == {self.error_workflow, matching_env_workflow} + triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert triggered_workflows == {self.error_workflow, matching_env_workflow} def test_issue_occurrence_event(self) -> None: issue_occurrence = self.build_occurrence(evidence_data={"detector_id": self.detector.id}) self.group_event.occurrence = issue_occurrence - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert result.data.triggered_workflows == {self.workflow} + triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert triggered_workflows == {self.workflow} def test_regressed_event(self) -> None: dcg = self.create_data_condition_group() @@ -233,16 +233,17 @@ def test_regressed_event(self) -> None: workflow=workflow, ) - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert result.data.triggered_workflows == {self.error_workflow, workflow} + triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert triggered_workflows == {self.error_workflow, workflow} @patch("sentry.utils.metrics.incr") @patch("sentry.workflow_engine.processors.detector.logger") def test_no_detector(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None: self.group_event.occurrence = self.build_occurrence(evidence_data={}) - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert result.msg == "No Detectors associated with the issue were found" + triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + + assert not triggered_workflows mock_incr.assert_called_once_with("workflow_engine.detectors.error") mock_logger.exception.assert_called_once_with( @@ -259,10 +260,9 @@ def test_no_detector(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None def test_no_environment(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None: Environment.objects.all().delete() cache.clear() - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - assert not result.data.triggered_workflows - assert result.msg == "Environment for event not found" + assert not triggered_workflows mock_incr.assert_called_once_with( "workflow_engine.process_workflows.error", 1, tags={"detector_type": "error"} @@ -338,12 +338,8 @@ def test_defaults_to_error_workflows(self) -> None: self.group_event.occurrence = issue_occurrence self.group.update(type=issue_occurrence.type.type_id) - result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) - - assert result.tainted is True - assert result.data.triggered_workflows == {self.error_workflow} - assert result.data.triggered_actions is not None - assert len(result.data.triggered_actions) == 0 + triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME) + assert triggered_workflows == {self.error_workflow} class TestEvaluateWorkflowTriggers(BaseWorkflowTest): diff --git a/tests/sentry/workflow_engine/test_task.py b/tests/sentry/workflow_engine/test_task.py index a51374306fda38..df2476d3d17862 100644 --- a/tests/sentry/workflow_engine/test_task.py +++ b/tests/sentry/workflow_engine/test_task.py @@ -137,8 +137,7 @@ def setUp(self) -> None: self.activity.save() self.detector = self.create_detector(type=MetricIssue.slug) - @mock.patch("sentry.workflow_engine.tasks.workflows.logger") - def test_process_workflow_activity__no_workflows(self, mock_logger) -> None: + def test_process_workflow_activity__no_workflows(self) -> None: with mock.patch( "sentry.workflow_engine.processors.workflow.evaluate_workflow_triggers", return_value=set(), @@ -151,19 +150,6 @@ def test_process_workflow_activity__no_workflows(self, mock_logger) -> None: # Short-circuit evaluation, no workflows associated assert mock_evaluate.call_count == 0 - mock_logger.info.assert_called_once_with( - "workflow_engine.process_workflows.evaluation.workflows.not_triggered", - extra={ - "msg": "No workflows are associated with the detector in the event", - "group_event": self.activity, - "action_groups": None, - "triggered_actions": None, - "workflows": set(), - "triggered_workflows": None, - "associated_detector": self.detector, - }, - ) - @mock.patch( "sentry.workflow_engine.processors.workflow.evaluate_workflow_triggers", return_value=(set(), {}), @@ -172,9 +158,8 @@ def test_process_workflow_activity__no_workflows(self, mock_logger) -> None: "sentry.workflow_engine.processors.workflow.evaluate_workflows_action_filters", return_value=set(), ) - @mock.patch("sentry.workflow_engine.tasks.workflows.logger") def test_process_workflow_activity__workflows__no_actions( - self, mock_logger, mock_eval_actions, mock_evaluate + self, mock_eval_actions, mock_evaluate ): self.workflow = self.create_workflow(organization=self.organization) self.create_detector_workflow( @@ -196,24 +181,8 @@ def test_process_workflow_activity__workflows__no_actions( mock_evaluate.assert_called_once_with({self.workflow}, event_data, mock.ANY) assert mock_eval_actions.call_count == 0 - mock_logger.info.assert_called_once_with( - "workflow_engine.process_workflows.evaluation.workflows.triggered", - extra={ - "msg": "No items were triggered or queued for slow evaluation", - "group_event": self.activity, - "action_groups": None, - "triggered_actions": None, - "workflows": {self.workflow}, - "triggered_workflows": set(), # from the mock - "associated_detector": self.detector, - }, - ) - @mock.patch("sentry.workflow_engine.processors.action.filter_recently_fired_workflow_actions") - @mock.patch("sentry.workflow_engine.tasks.workflows.logger") - def test_process_workflow_activity( - self, mock_logger, mock_filter_actions: mock.MagicMock - ) -> None: + def test_process_workflow_activity(self, mock_filter_actions: mock.MagicMock) -> None: self.workflow = self.create_workflow(organization=self.organization) self.action_group = self.create_data_condition_group(logic_type="any-short") @@ -241,18 +210,6 @@ def test_process_workflow_activity( ) mock_filter_actions.assert_called_once_with({self.action_group}, expected_event_data) - mock_logger.info.assert_called_once_with( - "workflow_engine.process_workflows.evaluation.actions.triggered", - extra={ - "msg": None, - "group_event": self.activity, - "action_groups": {self.action_group}, - "triggered_actions": set(), - "workflows": {self.workflow}, - "triggered_workflows": {self.workflow}, - "associated_detector": self.detector, - }, - ) @mock.patch( "sentry.workflow_engine.models.incident_groupopenperiod.update_incident_based_on_open_period_status_change"