Skip to content

Commit b8fcbb5

Browse files
saponifi3dandrewshie-sentry
authored andcommitted
fix(workflow_engine): Add logs for Evaluation (#103270)
# Description Try two for this, see #103190 for initial implementation. This PR will fix the error, `'msg' in LogRecord`: https://sentry-st.sentry.io/issues/7021864045/?project=1513938 by changing `msg` to `debug_msg` in the log.
1 parent c4b9f72 commit b8fcbb5

File tree

5 files changed

+181
-41
lines changed

5 files changed

+181
-41
lines changed

src/sentry/workflow_engine/processors/workflow.py

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@
3333
)
3434
from sentry.workflow_engine.processors.detector import get_detector_by_event
3535
from sentry.workflow_engine.processors.workflow_fire_history import create_workflow_fire_histories
36-
from sentry.workflow_engine.types import WorkflowEventData
36+
from sentry.workflow_engine.types import (
37+
WorkflowEvaluation,
38+
WorkflowEvaluationData,
39+
WorkflowEventData,
40+
)
3741
from sentry.workflow_engine.utils import log_context, scopedstats
3842
from sentry.workflow_engine.utils.metrics import metrics_incr
3943

@@ -464,7 +468,7 @@ def process_workflows(
464468
event_data: WorkflowEventData,
465469
event_start_time: datetime,
466470
detector: Detector | None = None,
467-
) -> set[Workflow]:
471+
) -> WorkflowEvaluation:
468472
"""
469473
This method will get the detector based on the event, and then gather the associated workflows.
470474
Next, it will evaluate the "when" (or trigger) conditions for each workflow, if the conditions are met,
@@ -478,6 +482,8 @@ def process_workflows(
478482
fire_actions,
479483
)
480484

485+
workflow_evaluation_data = WorkflowEvaluationData(group_event=event_data.event)
486+
481487
try:
482488
if detector is None and isinstance(event_data.event, GroupEvent):
483489
detector = get_detector_by_event(event_data)
@@ -496,7 +502,13 @@ def process_workflows(
496502
)
497503
)
498504
except Detector.DoesNotExist:
499-
return set()
505+
return WorkflowEvaluation(
506+
tainted=True,
507+
msg="No Detectors associated with the issue were found",
508+
data=workflow_evaluation_data,
509+
)
510+
511+
workflow_evaluation_data.associated_detector = detector
500512

501513
try:
502514
environment = get_environment_by_event(event_data)
@@ -510,33 +522,58 @@ def process_workflows(
510522
)
511523
)
512524
except Environment.DoesNotExist:
513-
return set()
525+
return WorkflowEvaluation(
526+
tainted=True,
527+
msg="Environment for event not found",
528+
data=workflow_evaluation_data,
529+
)
514530

515531
if features.has("organizations:workflow-engine-process-workflows-logs", organization):
516532
log_context.set_verbose(True)
517533

518534
workflows = _get_associated_workflows(detector, environment, event_data)
535+
workflow_evaluation_data.workflows = workflows
536+
519537
if not workflows:
520-
# If there aren't any workflows, there's nothing to evaluate
521-
return set()
538+
return WorkflowEvaluation(
539+
tainted=True,
540+
msg="No workflows are associated with the detector in the event",
541+
data=workflow_evaluation_data,
542+
)
522543

523544
triggered_workflows, queue_items_by_workflow_id = evaluate_workflow_triggers(
524545
workflows, event_data, event_start_time
525546
)
547+
548+
workflow_evaluation_data.triggered_workflows = triggered_workflows
549+
526550
if not triggered_workflows and not queue_items_by_workflow_id:
527-
# if there aren't any triggered workflows, there's no action filters to evaluate
528-
return set()
551+
return WorkflowEvaluation(
552+
tainted=True,
553+
msg="No items were triggered or queued for slow evaluation",
554+
data=workflow_evaluation_data,
555+
)
529556

557+
# TODO - we should probably return here and have the rest from here be
558+
# `process_actions`, this will take a list of "triggered_workflows"
530559
actions_to_trigger, queue_items_by_workflow_id = evaluate_workflows_action_filters(
531560
triggered_workflows, event_data, queue_items_by_workflow_id, event_start_time
532561
)
562+
533563
enqueue_workflows(batch_client, queue_items_by_workflow_id)
564+
534565
actions = filter_recently_fired_workflow_actions(actions_to_trigger, event_data)
535566
sentry_sdk.set_tag("workflow_engine.triggered_actions", len(actions))
536567

568+
workflow_evaluation_data.action_groups = actions_to_trigger
569+
workflow_evaluation_data.triggered_actions = set(actions)
570+
537571
if not actions:
538-
# If there aren't any actions on the associated workflows, there's nothing to trigger
539-
return triggered_workflows
572+
return WorkflowEvaluation(
573+
tainted=True,
574+
msg="No actions to evaluate; filtered or not triggered",
575+
data=workflow_evaluation_data,
576+
)
540577

541578
should_trigger_actions = should_fire_workflow_actions(organization, event_data.group.type)
542579
create_workflow_fire_histories(
@@ -549,5 +586,4 @@ def process_workflows(
549586
)
550587

551588
fire_actions(actions, detector, event_data)
552-
553-
return triggered_workflows
589+
return WorkflowEvaluation(tainted=False, msg=None, data=workflow_evaluation_data)

src/sentry/workflow_engine/tasks/workflows.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from sentry.utils import metrics
1919
from sentry.utils.exceptions import quiet_redis_noise
2020
from sentry.utils.locking import UnableToAcquireLock
21+
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
2122
from sentry.workflow_engine.models import DataConditionGroup, Detector
2223
from sentry.workflow_engine.tasks.utils import (
2324
EventNotFoundError,
@@ -44,7 +45,6 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int)
4445
The task will get the Activity from the database, create a WorkflowEventData object,
4546
and then process the data in `process_workflows`.
4647
"""
47-
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
4848
from sentry.workflow_engine.processors.workflow import process_workflows
4949

5050
with transaction.atomic(router.db_for_write(Detector)):
@@ -69,9 +69,12 @@ def process_workflow_activity(activity_id: int, group_id: int, detector_id: int)
6969
)
7070
with quiet_redis_noise():
7171
batch_client = DelayedWorkflowClient()
72-
process_workflows(
72+
evaluation = process_workflows(
7373
batch_client, event_data, event_start_time=activity.datetime, detector=detector
7474
)
75+
76+
evaluation.to_log(logger)
77+
7578
metrics.incr(
7679
"workflow_engine.tasks.process_workflows.activity_update.executed",
7780
tags={"activity_type": activity.type, "detector_type": detector.type},
@@ -103,11 +106,11 @@ def process_workflows_event(
103106
start_timestamp_seconds: float | None = None,
104107
**kwargs: dict[str, Any],
105108
) -> None:
106-
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
107109
from sentry.workflow_engine.processors.workflow import process_workflows
108110

109111
recorder = scopedstats.Recorder()
110112
start_time = time.time()
113+
111114
with recorder.record():
112115
try:
113116
event_data = build_workflow_event_data_from_event(
@@ -131,7 +134,11 @@ def process_workflows_event(
131134
)
132135
with quiet_redis_noise():
133136
batch_client = DelayedWorkflowClient()
134-
process_workflows(batch_client, event_data, event_start_time=event_start_time)
137+
evaluation = process_workflows(
138+
batch_client, event_data, event_start_time=event_start_time
139+
)
140+
141+
evaluation.to_log(logger)
135142
duration = time.time() - start_time
136143
is_slow = duration > 1.0
137144
# We want full coverage for particularly slow cases, plus a random sampling.
@@ -158,7 +165,6 @@ def schedule_delayed_workflows(**kwargs: Any) -> None:
158165
"""
159166
Schedule delayed workflow buffers in a batch.
160167
"""
161-
from sentry.workflow_engine.buffer.batch_client import DelayedWorkflowClient
162168
from sentry.workflow_engine.processors.schedule import process_buffered_workflows
163169

164170
lock_name = "schedule_delayed_workflows"

src/sentry/workflow_engine/types.py

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from __future__ import annotations
22

33
from abc import ABC, abstractmethod
4-
from dataclasses import dataclass, field
4+
from dataclasses import asdict, dataclass, field
55
from enum import IntEnum, StrEnum
6+
from logging import Logger
67
from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypedDict, TypeVar
78

89
from sentry.types.group import PriorityLevel
@@ -20,7 +21,7 @@
2021
from sentry.snuba.models import SnubaQueryEventType
2122
from sentry.workflow_engine.endpoints.validators.base import BaseDetectorTypeValidator
2223
from sentry.workflow_engine.handlers.detector import DetectorHandler
23-
from sentry.workflow_engine.models import Action, Detector
24+
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
2425
from sentry.workflow_engine.models.data_condition import Condition
2526

2627
T = TypeVar("T")
@@ -72,6 +73,56 @@ class WorkflowEventData:
7273
workflow_env: Environment | None = None
7374

7475

76+
@dataclass
77+
class WorkflowEvaluationData:
78+
group_event: GroupEvent | Activity
79+
action_groups: set[DataConditionGroup] | None = None
80+
workflows: set[Workflow] | None = None
81+
triggered_actions: set[Action] | None = None
82+
triggered_workflows: set[Workflow] | None = None
83+
associated_detector: Detector | None = None
84+
85+
86+
@dataclass(frozen=True)
87+
class WorkflowEvaluation:
88+
"""
89+
This is the result of `process_workflows`, and is used to
90+
encapsulate different stages of completion for the method.
91+
92+
The `tainted` flag is used to indicate whether or not actions
93+
have been triggered during the workflows evaluation.
94+
95+
The `msg` field is used for debug information during the evaluation.
96+
97+
The `data` attribute will include all the data used to evaluate the
98+
workflows, and determine if an action should be triggered.
99+
"""
100+
101+
tainted: bool
102+
msg: str | None
103+
data: WorkflowEvaluationData
104+
105+
def to_log(self, logger: Logger) -> None:
106+
"""
107+
Determines how far in the process the evaluation got to
108+
and creates a structured log string to quickly find.
109+
110+
Then this will return the that log string, and the
111+
relevant processing data to be logged.
112+
"""
113+
log_str = "workflow_engine.process_workflows.evaluation"
114+
115+
if self.tainted:
116+
if self.data.triggered_workflows is None:
117+
log_str = f"{log_str}.workflows.not_triggered"
118+
else:
119+
log_str = f"{log_str}.workflows.triggered"
120+
else:
121+
log_str = f"{log_str}.actions.triggered"
122+
123+
logger.info(log_str, extra={**asdict(self.data), "debug_msg": self.msg})
124+
125+
75126
class ConfigTransformer(ABC):
76127
"""
77128
A ConfigTransformer is used to transform the config between API and internal representations.

tests/sentry/workflow_engine/processors/test_workflow.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,12 @@ def test_skips_disabled_workflows(self) -> None:
8888
workflow=workflow,
8989
)
9090

91-
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
92-
assert triggered_workflows == {self.error_workflow}
91+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
92+
assert result.data.triggered_workflows == {self.error_workflow}
9393

9494
def test_error_event(self) -> None:
95-
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
96-
assert triggered_workflows == {self.error_workflow}
95+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
96+
assert result.data.triggered_workflows == {self.error_workflow}
9797

9898
@patch("sentry.workflow_engine.processors.action.fire_actions")
9999
def test_process_workflows_event(self, mock_fire_actions: MagicMock) -> None:
@@ -161,9 +161,9 @@ def test_populate_workflow_env_for_filters(self, mock_filter: MagicMock) -> None
161161
assert self.event_data.group_state
162162
self.event_data.group_state["is_new"] = True
163163

164-
process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
165-
164+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
166165
mock_filter.assert_called_with({workflow_filters}, self.event_data)
166+
assert result.tainted is False
167167

168168
def test_same_environment_only(self) -> None:
169169
env = self.create_environment(project=self.project)
@@ -208,15 +208,15 @@ def test_same_environment_only(self) -> None:
208208
workflow=mismatched_env_workflow,
209209
)
210210

211-
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
212-
assert triggered_workflows == {self.error_workflow, matching_env_workflow}
211+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
212+
assert result.data.triggered_workflows == {self.error_workflow, matching_env_workflow}
213213

214214
def test_issue_occurrence_event(self) -> None:
215215
issue_occurrence = self.build_occurrence(evidence_data={"detector_id": self.detector.id})
216216
self.group_event.occurrence = issue_occurrence
217217

218-
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
219-
assert triggered_workflows == {self.workflow}
218+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
219+
assert result.data.triggered_workflows == {self.workflow}
220220

221221
def test_regressed_event(self) -> None:
222222
dcg = self.create_data_condition_group()
@@ -233,17 +233,16 @@ def test_regressed_event(self) -> None:
233233
workflow=workflow,
234234
)
235235

236-
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
237-
assert triggered_workflows == {self.error_workflow, workflow}
236+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
237+
assert result.data.triggered_workflows == {self.error_workflow, workflow}
238238

239239
@patch("sentry.utils.metrics.incr")
240240
@patch("sentry.workflow_engine.processors.detector.logger")
241241
def test_no_detector(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None:
242242
self.group_event.occurrence = self.build_occurrence(evidence_data={})
243243

244-
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
245-
246-
assert not triggered_workflows
244+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
245+
assert result.msg == "No Detectors associated with the issue were found"
247246

248247
mock_incr.assert_called_once_with("workflow_engine.detectors.error")
249248
mock_logger.exception.assert_called_once_with(
@@ -260,9 +259,10 @@ def test_no_detector(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None
260259
def test_no_environment(self, mock_logger: MagicMock, mock_incr: MagicMock) -> None:
261260
Environment.objects.all().delete()
262261
cache.clear()
263-
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
262+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
264263

265-
assert not triggered_workflows
264+
assert not result.data.triggered_workflows
265+
assert result.msg == "Environment for event not found"
266266

267267
mock_incr.assert_called_once_with(
268268
"workflow_engine.process_workflows.error", 1, tags={"detector_type": "error"}
@@ -338,8 +338,12 @@ def test_defaults_to_error_workflows(self) -> None:
338338
self.group_event.occurrence = issue_occurrence
339339
self.group.update(type=issue_occurrence.type.type_id)
340340

341-
triggered_workflows = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
342-
assert triggered_workflows == {self.error_workflow}
341+
result = process_workflows(self.batch_client, self.event_data, FROZEN_TIME)
342+
343+
assert result.tainted is True
344+
assert result.data.triggered_workflows == {self.error_workflow}
345+
assert result.data.triggered_actions is not None
346+
assert len(result.data.triggered_actions) == 0
343347

344348

345349
class TestEvaluateWorkflowTriggers(BaseWorkflowTest):

0 commit comments

Comments
 (0)