Skip to content

Commit b0c1aa3

Browse files
committed
Implement unique numbers for pipeline runs
1 parent 99be873 commit b0c1aa3

File tree

10 files changed

+176
-14
lines changed

10 files changed

+176
-14
lines changed
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1+
---
12
# Requires PRs to have either 'release-notes' or 'no-release-notes' label
23
# This ensures release notes are considered for every PR before merging.
34
# The check is enforced via branch protection rules on develop.
45
name: Require Release Label
5-
66
on:
77
pull_request:
88
types: [opened, labeled, unlabeled, synchronize]
9-
109
jobs:
1110
check-label:
1211
if: github.repository == 'zenml-io/zenml'
@@ -17,8 +16,8 @@ jobs:
1716
with:
1817
mode: exactly
1918
count: 1
20-
labels: "release-notes, no-release-notes"
21-
message: |
19+
labels: release-notes, no-release-notes
20+
message: |-
2221
This PR is missing a release label. Please add one of:
2322
- `release-notes` - if this PR has user-facing changes that should appear in the changelog
2423
- `no-release-notes` - if this is an internal change (refactoring, tests, CI, etc.)

src/zenml/cli/pipeline.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,15 @@ def runs() -> None:
740740
@runs.command("list", help="List all registered pipeline runs.")
741741
@list_options(
742742
PipelineRunFilter,
743-
default_columns=["id", "run_name", "pipeline", "status", "stack", "owner"],
743+
default_columns=[
744+
"id",
745+
"index",
746+
"run_name",
747+
"pipeline",
748+
"status",
749+
"stack",
750+
"owner",
751+
],
744752
)
745753
def list_pipeline_runs(
746754
columns: str, output_format: OutputFormat, **kwargs: Any

src/zenml/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4741,6 +4741,7 @@ def list_pipeline_runs(
47414741
linked_to_model_version_id: Optional[Union[str, UUID]] = None,
47424742
orchestrator_run_id: Optional[str] = None,
47434743
status: Optional[str] = None,
4744+
index: Optional[int] = None,
47444745
start_time: Optional[Union[datetime, str]] = None,
47454746
end_time: Optional[Union[datetime, str]] = None,
47464747
unlisted: Optional[bool] = None,
@@ -4790,6 +4791,7 @@ def list_pipeline_runs(
47904791
orchestrator_run_id: The run id of the orchestrator to filter by.
47914792
name: The name of the run to filter by.
47924793
status: The status of the pipeline run
4794+
index: The index of the pipeline run
47934795
start_time: The start_time for the pipeline run
47944796
end_time: The end_time for the pipeline run
47954797
unlisted: If the runs should be unlisted or not.
@@ -4839,6 +4841,7 @@ def list_pipeline_runs(
48394841
orchestrator_run_id=orchestrator_run_id,
48404842
stack_id=stack_id,
48414843
status=status,
4844+
index=index,
48424845
start_time=start_time,
48434846
end_time=end_time,
48444847
tag=tag,

src/zenml/models/v2/core/pipeline_run.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,8 @@ class PipelineRunRequest(ProjectScopedRequest):
101101
snapshot: UUID = Field(
102102
title="The snapshot associated with the pipeline run."
103103
)
104-
pipeline: Optional[UUID] = Field(
104+
pipeline: UUID = Field(
105105
title="The pipeline associated with the pipeline run.",
106-
default=None,
107106
)
108107
orchestrator_run_id: Optional[str] = Field(
109108
title="The orchestrator run ID.",
@@ -214,6 +213,9 @@ class PipelineRunResponseBody(ProjectScopedResponseBody):
214213
default=None,
215214
title="The reason for the status of the pipeline run.",
216215
)
216+
index: int = Field(
217+
title="The unique index of the run within the pipeline."
218+
)
217219

218220
model_config = ConfigDict(protected_namespaces=())
219221

@@ -391,6 +393,15 @@ def status(self) -> ExecutionStatus:
391393
"""
392394
return self.get_body().status
393395

396+
@property
397+
def index(self) -> int:
398+
"""The `index` property.
399+
400+
Returns:
401+
the value of the property.
402+
"""
403+
return self.get_body().index
404+
394405
@property
395406
def run_metadata(self) -> Dict[str, MetadataType]:
396407
"""The `run_metadata` property.
@@ -672,6 +683,10 @@ class PipelineRunFilter(
672683
default=None,
673684
description="Name of the Pipeline Run",
674685
)
686+
index: Optional[int] = Field(
687+
default=None,
688+
description="The unique index of the run within the pipeline.",
689+
)
675690
orchestrator_run_id: Optional[str] = Field(
676691
default=None,
677692
description="Name of the Pipeline Run within the orchestrator",

src/zenml/orchestrators/step_launcher.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,7 @@ def _create_or_reuse_run(self) -> Tuple[PipelineRunResponse, bool]:
406406
orchestrator_run_id=self._orchestrator_run_id,
407407
project=client.active_project.id,
408408
snapshot=self._snapshot.id,
409-
pipeline=(
410-
self._snapshot.pipeline.id if self._snapshot.pipeline else None
411-
),
409+
pipeline=self._snapshot.pipeline.id,
412410
status=ExecutionStatus.RUNNING,
413411
orchestrator_environment=get_run_environment_dict(),
414412
start_time=start_time,

src/zenml/pipelines/run_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def create_placeholder_run(
108108
orchestrator_run_id=orchestrator_run_id,
109109
project=snapshot.project_id,
110110
snapshot=snapshot.id,
111-
pipeline=snapshot.pipeline.id if snapshot.pipeline else None,
111+
pipeline=snapshot.pipeline.id,
112112
status=ExecutionStatus.INITIALIZING,
113113
tags=snapshot.pipeline_configuration.tags,
114114
logs=logs,
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""Unique run index [6e4eb89f632d].
2+
3+
Revision ID: 6e4eb89f632d
4+
Revises: 0.92.0
5+
Create Date: 2025-12-03 17:27:32.828004
6+
7+
"""
8+
9+
import sqlalchemy as sa
10+
from alembic import op
11+
12+
# revision identifiers, used by Alembic.
13+
revision = "6e4eb89f632d"
14+
down_revision = "0.92.0"
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade() -> None:
20+
"""Upgrade database schema and/or data, creating a new revision."""
21+
with op.batch_alter_table("pipeline", schema=None) as batch_op:
22+
batch_op.add_column(
23+
sa.Column("run_count", sa.Integer(), nullable=True)
24+
)
25+
26+
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
27+
batch_op.add_column(sa.Column("index", sa.Integer(), nullable=True))
28+
29+
connection = op.get_bind()
30+
meta = sa.MetaData()
31+
meta.reflect(bind=connection, only=("pipeline_run", "pipeline"))
32+
run_table = sa.Table("pipeline_run", meta)
33+
pipeline_table = sa.Table("pipeline", meta)
34+
35+
# These runs shouldn't exist, but just in case
36+
connection.execute(
37+
sa.update(run_table)
38+
.where(run_table.c.pipeline_id.is_(None))
39+
.values(index=0)
40+
)
41+
42+
result = connection.execute(
43+
sa.select(
44+
run_table.c.id,
45+
run_table.c.pipeline_id,
46+
run_table.c.created,
47+
)
48+
.where(run_table.c.pipeline_id.is_not(None))
49+
.order_by(run_table.c.pipeline_id, run_table.c.created, run_table.c.id)
50+
).fetchall()
51+
52+
current_pipeline_id = None
53+
index_within_pipeline = 0
54+
run_updates: list[dict] = []
55+
run_counts: dict = {}
56+
for row in result:
57+
pipeline_id = row.pipeline_id
58+
if pipeline_id != current_pipeline_id:
59+
current_pipeline_id = pipeline_id
60+
index_within_pipeline = 1
61+
else:
62+
index_within_pipeline += 1
63+
run_updates.append({"id": row.id, "index": index_within_pipeline})
64+
run_counts[pipeline_id] = index_within_pipeline
65+
66+
if run_updates:
67+
connection.execute(
68+
sa.update(run_table)
69+
.where(run_table.c.id == sa.bindparam("id"))
70+
.values(index=sa.bindparam("index")),
71+
run_updates,
72+
)
73+
74+
if run_counts:
75+
pipeline_updates = [
76+
{"id": pipeline_id, "run_count": run_count}
77+
for pipeline_id, run_count in run_counts.items()
78+
]
79+
connection.execute(
80+
sa.update(pipeline_table)
81+
.where(pipeline_table.c.id == sa.bindparam("id"))
82+
.values(run_count=sa.bindparam("run_count")),
83+
pipeline_updates,
84+
)
85+
86+
# Step 3: Make columns non-nullable
87+
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
88+
batch_op.alter_column(
89+
"index", existing_type=sa.Integer(), nullable=False
90+
)
91+
92+
with op.batch_alter_table("pipeline", schema=None) as batch_op:
93+
batch_op.alter_column(
94+
"run_count", existing_type=sa.Integer(), nullable=False
95+
)
96+
97+
98+
def downgrade() -> None:
99+
"""Downgrade database schema and/or data back to the previous revision."""
100+
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
101+
batch_op.drop_column("number")
102+
103+
with op.batch_alter_table("pipeline", schema=None) as batch_op:
104+
batch_op.drop_column("run_count")

src/zenml/zen_stores/schemas/pipeline_run_schemas.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class PipelineRunSchema(NamedSchema, RunMetadataInterface, table=True):
117117
orchestrator_environment: Optional[str] = Field(
118118
sa_column=Column(TEXT, nullable=True)
119119
)
120+
index: int = Field(nullable=False)
120121

121122
# Foreign keys
122123
snapshot_id: Optional[UUID] = build_foreign_key_field(
@@ -343,12 +344,13 @@ def get_query_options(
343344

344345
@classmethod
345346
def from_request(
346-
cls, request: "PipelineRunRequest"
347+
cls, request: "PipelineRunRequest", index: int
347348
) -> "PipelineRunSchema":
348349
"""Convert a `PipelineRunRequest` to a `PipelineRunSchema`.
349350
350351
Args:
351352
request: The request to convert.
353+
index: The index of the pipeline run.
352354
353355
Returns:
354356
The created `PipelineRunSchema`.
@@ -379,6 +381,7 @@ def from_request(
379381
orchestrator_environment=orchestrator_environment,
380382
start_time=request.start_time,
381383
status=request.status.value,
384+
index=index,
382385
in_progress=not request.status.is_finished,
383386
status_reason=request.status_reason,
384387
pipeline_id=request.pipeline,
@@ -547,6 +550,7 @@ def to_model(
547550
created=self.created,
548551
updated=self.updated,
549552
in_progress=self.in_progress,
553+
index=self.index,
550554
)
551555
metadata = None
552556
if include_metadata:

src/zenml/zen_stores/schemas/pipeline_schemas.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class PipelineSchema(NamedSchema, table=True):
8383
ondelete="SET NULL",
8484
nullable=True,
8585
)
86+
run_count: int = Field(nullable=False)
8687

8788
# Relationships
8889
user: Optional["UserSchema"] = Relationship(back_populates="pipelines")
@@ -198,6 +199,7 @@ def from_request(
198199
description=pipeline_request.description,
199200
project_id=pipeline_request.project,
200201
user_id=pipeline_request.user,
202+
run_count=0,
201203
)
202204

203205
def to_model(

src/zenml/zen_stores/sql_zen_store.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6504,6 +6504,32 @@ def _get_duplicate_run_name_error_message(
65046504
f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name"
65056505
)
65066506

6507+
def _get_next_run_index(self, pipeline_id: UUID, session: Session) -> int:
6508+
"""Get the next run index for a pipeline.
6509+
6510+
Args:
6511+
pipeline_id: The ID of the pipeline to get the next run index for.
6512+
session: SQLAlchemy session.
6513+
6514+
Returns:
6515+
The next run index for the pipeline.
6516+
"""
6517+
# Commit before acquiring the exclusive lock on the pipeline
6518+
session.commit()
6519+
current_run_index = session.exec(
6520+
select(PipelineSchema.run_count)
6521+
.where(PipelineSchema.id == pipeline_id)
6522+
.with_for_update()
6523+
).one()
6524+
new_run_index = current_run_index + 1
6525+
session.execute(
6526+
update(PipelineSchema)
6527+
.where(col(PipelineSchema.id) == pipeline_id)
6528+
.values(run_count=new_run_index)
6529+
)
6530+
session.commit()
6531+
return new_run_index
6532+
65076533
def _create_run(
65086534
self, pipeline_run: PipelineRunRequest, session: Session
65096535
) -> PipelineRunResponse:
@@ -6524,7 +6550,7 @@ def _create_run(
65246550
can not be created.
65256551
"""
65266552
self._set_request_user_id(request_model=pipeline_run, session=session)
6527-
self._get_reference_schema_by_id(
6553+
snapshot = self._get_reference_schema_by_id(
65286554
resource=pipeline_run,
65296555
reference_schema=PipelineSnapshotSchema,
65306556
reference_id=pipeline_run.snapshot,
@@ -6538,7 +6564,10 @@ def _create_run(
65386564
session=session,
65396565
)
65406566

6541-
new_run = PipelineRunSchema.from_request(pipeline_run)
6567+
index = self._get_next_run_index(
6568+
pipeline_id=snapshot.pipeline_id, session=session
6569+
)
6570+
new_run = PipelineRunSchema.from_request(pipeline_run, index=index)
65426571

65436572
session.add(new_run)
65446573

0 commit comments

Comments
 (0)