Skip to content

Commit eb17e24

Browse files
authored
Feat!: print auto-restatement triggers in debug console (#4980)
1 parent 014fe6a commit eb17e24

File tree

6 files changed

+279
-41
lines changed

6 files changed

+279
-41
lines changed

sqlmesh/core/console.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ def update_snapshot_evaluation_progress(
428428
num_audits_passed: int,
429429
num_audits_failed: int,
430430
audit_only: bool = False,
431+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
431432
) -> None:
432433
"""Updates the snapshot evaluation progress."""
433434

@@ -575,6 +576,7 @@ def update_snapshot_evaluation_progress(
575576
num_audits_passed: int,
576577
num_audits_failed: int,
577578
audit_only: bool = False,
579+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
578580
) -> None:
579581
pass
580582

@@ -1056,6 +1058,7 @@ def update_snapshot_evaluation_progress(
10561058
num_audits_passed: int,
10571059
num_audits_failed: int,
10581060
audit_only: bool = False,
1061+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
10591062
) -> None:
10601063
"""Update the snapshot evaluation progress."""
10611064
if (
@@ -3639,6 +3642,7 @@ def update_snapshot_evaluation_progress(
36393642
num_audits_passed: int,
36403643
num_audits_failed: int,
36413644
audit_only: bool = False,
3645+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
36423646
) -> None:
36433647
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36443648

@@ -3808,11 +3812,15 @@ def update_snapshot_evaluation_progress(
38083812
num_audits_passed: int,
38093813
num_audits_failed: int,
38103814
audit_only: bool = False,
3815+
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
38113816
) -> None:
3812-
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3817+
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3818+
3819+
if auto_restatement_triggers:
3820+
message += f" | auto_restatement_triggers=[{', '.join(trigger.name for trigger in auto_restatement_triggers)}]"
38133821

38143822
if audit_only:
3815-
message = f"Auditing {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
3823+
message = f"Audited {snapshot.name} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38163824

38173825
self._write(message)
38183826

sqlmesh/core/scheduler.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ def run_merged_intervals(
415415
selected_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
416416
run_environment_statements: bool = False,
417417
audit_only: bool = False,
418+
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {},
418419
) -> t.Tuple[t.List[NodeExecutionFailedError[SchedulingUnit]], t.List[SchedulingUnit]]:
419420
"""Runs precomputed batches of missing intervals.
420421
@@ -531,6 +532,9 @@ def run_node(node: SchedulingUnit) -> None:
531532
evaluation_duration_ms,
532533
num_audits - num_audits_failed,
533534
num_audits_failed,
535+
auto_restatement_triggers=auto_restatement_triggers.get(
536+
snapshot.snapshot_id
537+
),
534538
)
535539
elif isinstance(node, CreateNode):
536540
self.snapshot_evaluator.create_snapshot(
@@ -736,8 +740,11 @@ def _run_or_audit(
736740
for s_id, interval in (remove_intervals or {}).items():
737741
self.snapshots[s_id].remove_interval(interval)
738742

743+
all_auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
739744
if auto_restatement_enabled:
740-
auto_restated_intervals = apply_auto_restatements(self.snapshots, execution_time)
745+
auto_restated_intervals, all_auto_restatement_triggers = apply_auto_restatements(
746+
self.snapshots, execution_time
747+
)
741748
self.state_sync.add_snapshots_intervals(auto_restated_intervals)
742749
self.state_sync.update_auto_restatements(
743750
{s.name_version: s.next_auto_restatement_ts for s in self.snapshots.values()}
@@ -758,6 +765,14 @@ def _run_or_audit(
758765
if not merged_intervals:
759766
return CompletionStatus.NOTHING_TO_DO
760767

768+
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
769+
if all_auto_restatement_triggers:
770+
merged_intervals_snapshots = {snapshot.snapshot_id for snapshot in merged_intervals}
771+
auto_restatement_triggers = {
772+
s_id: all_auto_restatement_triggers.get(s_id, [])
773+
for s_id in merged_intervals_snapshots
774+
}
775+
761776
errors, _ = self.run_merged_intervals(
762777
merged_intervals=merged_intervals,
763778
deployability_index=deployability_index,
@@ -768,6 +783,7 @@ def _run_or_audit(
768783
end=end,
769784
run_environment_statements=run_environment_statements,
770785
audit_only=audit_only,
786+
auto_restatement_triggers=auto_restatement_triggers,
771787
)
772788

773789
return CompletionStatus.FAILURE if errors else CompletionStatus.SUCCESS

sqlmesh/core/snapshot/definition.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from sqlmesh.core.model import Model, ModelKindMixin, ModelKindName, ViewKind, CustomKind
2222
from sqlmesh.core.model.definition import _Model
2323
from sqlmesh.core.node import IntervalUnit, NodeType
24-
from sqlmesh.utils import sanitize_name
24+
from sqlmesh.utils import sanitize_name, unique
2525
from sqlmesh.utils.dag import DAG
2626
from sqlmesh.utils.date import (
2727
TimeLike,
@@ -2180,7 +2180,7 @@ def snapshots_to_dag(snapshots: t.Collection[Snapshot]) -> DAG[SnapshotId]:
21802180

21812181
def apply_auto_restatements(
21822182
snapshots: t.Dict[SnapshotId, Snapshot], execution_time: TimeLike
2183-
) -> t.List[SnapshotIntervals]:
2183+
) -> t.Tuple[t.List[SnapshotIntervals], t.Dict[SnapshotId, t.List[SnapshotId]]]:
21842184
"""Applies auto restatements to the snapshots.
21852185
21862186
This operation results in the removal of intervals for snapshots that are ready to be restated based
@@ -2195,6 +2195,7 @@ def apply_auto_restatements(
21952195
A list of SnapshotIntervals with **new** intervals that need to be restated.
21962196
"""
21972197
dag = snapshots_to_dag(snapshots.values())
2198+
auto_restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
21982199
auto_restated_intervals_per_snapshot: t.Dict[SnapshotId, Interval] = {}
21992200
for s_id in dag:
22002201
if s_id not in snapshots:
@@ -2209,6 +2210,7 @@ def apply_auto_restatements(
22092210
for parent_s_id in snapshot.parents
22102211
if parent_s_id in auto_restated_intervals_per_snapshot
22112212
]
2213+
upstream_triggers = []
22122214
if next_auto_restated_interval:
22132215
logger.info(
22142216
"Calculated the next auto restated interval (%s, %s) for snapshot %s",
@@ -2218,6 +2220,18 @@ def apply_auto_restatements(
22182220
)
22192221
auto_restated_intervals.append(next_auto_restated_interval)
22202222

2223+
# auto-restated snapshot is its own trigger
2224+
upstream_triggers = [s_id]
2225+
else:
2226+
# inherit each parent's auto-restatement triggers (if any)
2227+
for parent_s_id in snapshot.parents:
2228+
if parent_s_id in auto_restatement_triggers:
2229+
upstream_triggers.extend(auto_restatement_triggers[parent_s_id])
2230+
2231+
# remove duplicate triggers, retaining order and keeping first seen of duplicates
2232+
if upstream_triggers:
2233+
auto_restatement_triggers[s_id] = unique(upstream_triggers)
2234+
22212235
if auto_restated_intervals:
22222236
auto_restated_interval_start = sys.maxsize
22232237
auto_restated_interval_end = -sys.maxsize
@@ -2247,20 +2261,22 @@ def apply_auto_restatements(
22472261

22482262
snapshot.apply_pending_restatement_intervals()
22492263
snapshot.update_next_auto_restatement_ts(execution_time)
2250-
2251-
return [
2252-
SnapshotIntervals(
2253-
name=snapshots[s_id].name,
2254-
identifier=None,
2255-
version=snapshots[s_id].version,
2256-
dev_version=None,
2257-
intervals=[],
2258-
dev_intervals=[],
2259-
pending_restatement_intervals=[interval],
2260-
)
2261-
for s_id, interval in auto_restated_intervals_per_snapshot.items()
2262-
if s_id in snapshots
2263-
]
2264+
return (
2265+
[
2266+
SnapshotIntervals(
2267+
name=snapshots[s_id].name,
2268+
identifier=None,
2269+
version=snapshots[s_id].version,
2270+
dev_version=None,
2271+
intervals=[],
2272+
dev_intervals=[],
2273+
pending_restatement_intervals=[interval],
2274+
)
2275+
for s_id, interval in auto_restated_intervals_per_snapshot.items()
2276+
if s_id in snapshots
2277+
],
2278+
auto_restatement_triggers,
2279+
)
22642280

22652281

22662282
def parent_snapshots_by_name(

0 commit comments

Comments
 (0)