Skip to content

Commit 9c44a32

Browse files
committed
Report auto-restatement triggers only
1 parent 5f72a94 commit 9c44a32

File tree

7 files changed

+90
-131
lines changed

7 files changed

+90
-131
lines changed

sqlmesh/core/console.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
Interval,
4242
Intervals,
4343
SnapshotTableInfo,
44-
SnapshotEvaluationTriggers,
4544
)
4645
from sqlmesh.core.test import ModelTest
4746
from sqlmesh.utils import rich as srich
@@ -433,7 +432,7 @@ def update_snapshot_evaluation_progress(
433432
num_audits_passed: int,
434433
num_audits_failed: int,
435434
audit_only: bool = False,
436-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
435+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
437436
) -> None:
438437
"""Updates the snapshot evaluation progress."""
439438

@@ -581,7 +580,7 @@ def update_snapshot_evaluation_progress(
581580
num_audits_passed: int,
582581
num_audits_failed: int,
583582
audit_only: bool = False,
584-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
583+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
585584
) -> None:
586585
pass
587586

@@ -1063,7 +1062,7 @@ def update_snapshot_evaluation_progress(
10631062
num_audits_passed: int,
10641063
num_audits_failed: int,
10651064
audit_only: bool = False,
1066-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
1065+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10671066
) -> None:
10681067
"""Update the snapshot evaluation progress."""
10691068
if (
@@ -3647,7 +3646,7 @@ def update_snapshot_evaluation_progress(
36473646
num_audits_passed: int,
36483647
num_audits_failed: int,
36493648
audit_only: bool = False,
3650-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
3649+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36513650
) -> None:
36523651
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36533652

@@ -3817,22 +3816,15 @@ def update_snapshot_evaluation_progress(
38173816
num_audits_passed: int,
38183817
num_audits_failed: int,
38193818
audit_only: bool = False,
3820-
snapshot_evaluation_triggers: t.Optional[SnapshotEvaluationTriggers] = None,
3819+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38213820
) -> None:
38223821
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38233822

3824-
if snapshot_evaluation_triggers:
3825-
if snapshot_evaluation_triggers.ignore_cron_flag is not None:
3826-
message += f" | ignore_cron_flag={snapshot_evaluation_triggers.ignore_cron_flag}"
3827-
if snapshot_evaluation_triggers.cron_ready is not None:
3828-
message += f" | cron_ready={snapshot_evaluation_triggers.cron_ready}"
3829-
if snapshot_evaluation_triggers.auto_restatement_triggers:
3830-
message += f" | auto_restatement_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.auto_restatement_triggers)}"
3831-
if snapshot_evaluation_triggers.select_snapshot_triggers:
3832-
message += f" | select_snapshot_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.select_snapshot_triggers)}"
3823+
if auto_restatement_triggers:
3824+
message += f" | Auto-restatement triggers {', '.join(trigger.name for trigger in auto_restatement_triggers)}"
38333825

38343826
if audit_only:
3835-
message = f"Audited {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3827+
message = f"Audited {snapshot.name} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38363828

38373829
self._write(message)
38383830

sqlmesh/core/context.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2296,9 +2296,11 @@ def check_intervals(
22962296
}
22972297

22982298
if select_models:
2299-
selected, _ = self._select_models_for_run(select_models, True, snapshots.values())
2299+
selected: t.Collection[str] = self._select_models_for_run(
2300+
select_models, True, snapshots.values()
2301+
)
23002302
else:
2301-
selected = set(snapshots.keys())
2303+
selected = snapshots.keys()
23022304

23032305
results = {}
23042306
execution_context = self.execution_context(snapshots=snapshots)
@@ -2448,9 +2450,8 @@ def _run(
24482450
scheduler = self.scheduler(environment=environment)
24492451
snapshots = scheduler.snapshots
24502452

2451-
select_models_auto_upstream = None
24522453
if select_models is not None:
2453-
select_models, select_models_auto_upstream = self._select_models_for_run(
2454+
select_models = self._select_models_for_run(
24542455
select_models, no_auto_upstream, snapshots.values()
24552456
)
24562457

@@ -2462,7 +2463,6 @@ def _run(
24622463
ignore_cron=ignore_cron,
24632464
circuit_breaker=circuit_breaker,
24642465
selected_snapshots=select_models,
2465-
selected_snapshots_auto_upstream=select_models_auto_upstream,
24662466
auto_restatement_enabled=environment.lower() == c.PROD,
24672467
run_environment_statements=True,
24682468
)
@@ -2878,7 +2878,7 @@ def _select_models_for_run(
28782878
select_models: t.Collection[str],
28792879
no_auto_upstream: bool,
28802880
snapshots: t.Collection[Snapshot],
2881-
) -> t.Tuple[t.Set[str], t.Set[str]]:
2881+
) -> t.Set[str]:
28822882
models: UniqueKeyDict[str, Model] = UniqueKeyDict(
28832883
"models", **{s.name: s.model for s in snapshots if s.is_model}
28842884
)
@@ -2887,10 +2887,9 @@ def _select_models_for_run(
28872887
dag.add(fqn, model.depends_on)
28882888
model_selector = self._new_selector(models=models, dag=dag)
28892889
result = set(model_selector.expand_model_selections(select_models))
2890-
if no_auto_upstream:
2891-
return result, set()
2892-
result_with_upstream = set(dag.subdag(*result))
2893-
return result_with_upstream, result_with_upstream - result
2890+
if not no_auto_upstream:
2891+
result = set(dag.subdag(*result))
2892+
return result
28942893

28952894
@cached_property
28962895
def _project_type(self) -> str:

sqlmesh/core/scheduler.py

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
)
2929
from sqlmesh.core.snapshot.definition import (
3030
Interval,
31-
SnapshotEvaluationTriggers,
3231
SnapshotIntervals,
3332
check_ready_intervals,
3433
expand_range,
@@ -228,7 +227,6 @@ def run(
228227
ignore_cron: bool = False,
229228
end_bounded: bool = False,
230229
selected_snapshots: t.Optional[t.Set[str]] = None,
231-
selected_snapshots_auto_upstream: t.Optional[t.Set[str]] = None,
232230
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
233231
deployability_index: t.Optional[DeployabilityIndex] = None,
234232
auto_restatement_enabled: bool = False,
@@ -245,7 +243,6 @@ def run(
245243
ignore_cron=ignore_cron,
246244
end_bounded=end_bounded,
247245
selected_snapshots=selected_snapshots,
248-
selected_snapshots_auto_upstream=selected_snapshots_auto_upstream,
249246
circuit_breaker=circuit_breaker,
250247
deployability_index=deployability_index,
251248
auto_restatement_enabled=auto_restatement_enabled,
@@ -381,7 +378,7 @@ def run_merged_intervals(
381378
run_environment_statements: bool = False,
382379
audit_only: bool = False,
383380
restatements: t.Optional[t.Dict[SnapshotId, Interval]] = None,
384-
snapshot_evaluation_triggers: t.Dict[SnapshotId, SnapshotEvaluationTriggers] = {},
381+
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
385382
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
386383
"""Runs precomputed batches of missing intervals.
387384
@@ -484,9 +481,7 @@ def evaluate_node(node: SchedulingUnit) -> None:
484481
evaluation_duration_ms,
485482
num_audits - num_audits_failed,
486483
num_audits_failed,
487-
snapshot_evaluation_triggers=snapshot_evaluation_triggers.get(
488-
snapshot.snapshot_id
489-
),
484+
auto_restatement_triggers=auto_restatement_triggers.get(snapshot.snapshot_id),
490485
)
491486

492487
try:
@@ -597,7 +592,6 @@ def _run_or_audit(
597592
ignore_cron: bool = False,
598593
end_bounded: bool = False,
599594
selected_snapshots: t.Optional[t.Set[str]] = None,
600-
selected_snapshots_auto_upstream: t.Optional[t.Set[str]] = None,
601595
circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
602596
deployability_index: t.Optional[DeployabilityIndex] = None,
603597
auto_restatement_enabled: bool = False,
@@ -621,7 +615,6 @@ def _run_or_audit(
621615
end_bounded: If set to true, the evaluated intervals will be bounded by the target end date, disregarding lookback,
622616
allow_partials, and other attributes that could cause the intervals to exceed the target end date.
623617
selected_snapshots: A set of snapshot names to run. If not provided, all snapshots will be run.
624-
selected_snapshots_auto_upstream: The set of selected_snapshots that were automatically added because they're upstream of a selected snapshot.
625618
circuit_breaker: An optional handler which checks if the run should be aborted.
626619
deployability_index: Determines snapshots that are deployable in the context of this render.
627620
auto_restatement_enabled: Whether to enable auto restatements.
@@ -680,38 +673,9 @@ def _run_or_audit(
680673
return CompletionStatus.NOTHING_TO_DO
681674

682675
merged_intervals_snapshots = {snapshot.snapshot_id for snapshot in merged_intervals}
683-
select_snapshot_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
684-
if selected_snapshots and selected_snapshots_auto_upstream:
685-
# actually selected snapshots are their own triggers
686-
selected_snapshots_no_auto_upstream = (
687-
selected_snapshots - selected_snapshots_auto_upstream
688-
)
689-
select_snapshot_triggers = {
690-
s_id: [s_id]
691-
for s_id in [
692-
snapshot_id
693-
for snapshot_id in merged_intervals_snapshots
694-
if snapshot_id.name in selected_snapshots_no_auto_upstream
695-
]
696-
}
697676

698-
# trace upstream by walking downstream on reversed dag
699-
reversed_dag = snapshots_to_dag(self.snapshots.values()).reversed
700-
for s_id in reversed_dag:
701-
if s_id in merged_intervals_snapshots:
702-
triggers = select_snapshot_triggers.get(s_id, [])
703-
for parent_s_id in reversed_dag.graph.get(s_id, set()):
704-
triggers.extend(select_snapshot_triggers.get(parent_s_id, []))
705-
select_snapshot_triggers[s_id] = list(dict.fromkeys(triggers))
706-
707-
all_snapshot_triggers: t.Dict[SnapshotId, SnapshotEvaluationTriggers] = {
708-
s_id: SnapshotEvaluationTriggers(
709-
ignore_cron_flag=ignore_cron,
710-
cron_ready=s_id not in auto_restated_snapshots,
711-
auto_restatement_triggers=auto_restatement_triggers.get(s_id, []),
712-
select_snapshot_triggers=select_snapshot_triggers.get(s_id, []),
713-
)
714-
for s_id in merged_intervals_snapshots
677+
auto_restatement_triggers_dict: t.Dict[SnapshotId, t.List[SnapshotId]] = {
678+
s_id: auto_restatement_triggers.get(s_id, []) for s_id in merged_intervals_snapshots
715679
}
716680

717681
errors, _ = self.run_merged_intervals(
@@ -725,7 +689,7 @@ def _run_or_audit(
725689
run_environment_statements=run_environment_statements,
726690
audit_only=audit_only,
727691
restatements=remove_intervals,
728-
snapshot_evaluation_triggers=all_snapshot_triggers,
692+
auto_restatement_triggers=auto_restatement_triggers_dict,
729693
)
730694

731695
return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS

sqlmesh/core/snapshot/definition.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from sqlmesh.core.model import Model, ModelKindMixin, ModelKindName, ViewKind, CustomKind
2222
from sqlmesh.core.model.definition import _Model
2323
from sqlmesh.core.node import IntervalUnit, NodeType
24-
from sqlmesh.utils import sanitize_name
24+
from sqlmesh.utils import sanitize_name, unique
2525
from sqlmesh.utils.dag import DAG
2626
from sqlmesh.utils.date import (
2727
TimeLike,
@@ -326,13 +326,6 @@ def table_name_for_environment(
326326
return table
327327

328328

329-
class SnapshotEvaluationTriggers(PydanticModel):
330-
ignore_cron_flag: t.Optional[bool] = None
331-
cron_ready: t.Optional[bool] = None
332-
auto_restatement_triggers: t.List[SnapshotId] = []
333-
select_snapshot_triggers: t.List[SnapshotId] = []
334-
335-
336329
class SnapshotInfoMixin(ModelKindMixin):
337330
name: str
338331
dev_version_: t.Optional[str]
@@ -2207,14 +2200,15 @@ def apply_auto_restatements(
22072200

22082201
# auto-restated snapshot is its own trigger
22092202
upstream_triggers = [s_id]
2203+
else:
2204+
# inherit each parent's auto-restatement triggers (if any)
2205+
for parent_s_id in snapshot.parents:
2206+
if parent_s_id in auto_restatement_triggers:
2207+
upstream_triggers.extend(auto_restatement_triggers[parent_s_id])
22102208

2211-
for parent_s_id in snapshot.parents:
2212-
if parent_s_id in auto_restatement_triggers:
2213-
upstream_triggers.extend(auto_restatement_triggers[parent_s_id])
2214-
2215-
# remove duplicate triggers
2209+
# remove duplicate triggers, retaining order and keeping first seen of duplicates
22162210
if upstream_triggers:
2217-
auto_restatement_triggers[s_id] = list(dict.fromkeys(upstream_triggers))
2211+
auto_restatement_triggers[s_id] = unique(upstream_triggers)
22182212

22192213
if auto_restated_intervals:
22202214
auto_restated_interval_start = sys.maxsize

0 commit comments

Comments
 (0)