Skip to content

Commit cdc3723

Browse files
committed
Implement unique numbers for pipeline runs
1 parent 7c411ee commit cdc3723

File tree

15 files changed

+221
-28
lines changed

15 files changed

+221
-28
lines changed

.github/workflows/integration-test-fast.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,7 @@ jobs:
122122
remove-android: 'true'
123123
remove-haskell: 'true'
124124
build-mount-path: /var/lib/docker/
125-
if: inputs.os == 'ubuntu-latest' && (contains(inputs.test_environment, 'docker')
126-
|| contains(inputs.test_environment, 'kubeflow') || contains(inputs.test_environment,
127-
'airflow') || contains(inputs.test_environment, 'kubernetes'))
125+
if: inputs.os == 'ubuntu-latest'
128126
- name: Reload Docker
129127
run: sudo systemctl restart docker
130128
if: inputs.os == 'ubuntu-latest' && (contains(inputs.test_environment, 'docker')

.github/workflows/integration-test-slow.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,7 @@ jobs:
120120
remove-android: 'true'
121121
remove-haskell: 'true'
122122
build-mount-path: /var/lib/docker/
123-
if: inputs.os == 'ubuntu-latest' && (contains(inputs.test_environment, 'docker')
124-
|| contains(inputs.test_environment, 'kubeflow') || contains(inputs.test_environment,
125-
'airflow') || contains(inputs.test_environment, 'kubernetes'))
123+
if: inputs.os == 'ubuntu-latest'
126124
- name: Reload Docker
127125
run: sudo systemctl restart docker
128126
if: inputs.os == 'ubuntu-latest' && (contains(inputs.test_environment, 'docker')

src/zenml/cli/pipeline.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,15 @@ def runs() -> None:
740740
@runs.command("list", help="List all registered pipeline runs.")
741741
@list_options(
742742
PipelineRunFilter,
743-
default_columns=["id", "run_name", "pipeline", "status", "stack", "owner"],
743+
default_columns=[
744+
"id",
745+
"index",
746+
"run_name",
747+
"pipeline",
748+
"status",
749+
"stack",
750+
"owner",
751+
],
744752
)
745753
def list_pipeline_runs(
746754
columns: str, output_format: OutputFormat, **kwargs: Any

src/zenml/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4741,6 +4741,7 @@ def list_pipeline_runs(
47414741
linked_to_model_version_id: Optional[Union[str, UUID]] = None,
47424742
orchestrator_run_id: Optional[str] = None,
47434743
status: Optional[str] = None,
4744+
index: Optional[int] = None,
47444745
start_time: Optional[Union[datetime, str]] = None,
47454746
end_time: Optional[Union[datetime, str]] = None,
47464747
unlisted: Optional[bool] = None,
@@ -4790,6 +4791,7 @@ def list_pipeline_runs(
47904791
orchestrator_run_id: The run id of the orchestrator to filter by.
47914792
name: The name of the run to filter by.
47924793
status: The status of the pipeline run
4794+
index: The index of the pipeline run
47934795
start_time: The start_time for the pipeline run
47944796
end_time: The end_time for the pipeline run
47954797
unlisted: If the runs should be unlisted or not.
@@ -4839,6 +4841,7 @@ def list_pipeline_runs(
48394841
orchestrator_run_id=orchestrator_run_id,
48404842
stack_id=stack_id,
48414843
status=status,
4844+
index=index,
48424845
start_time=start_time,
48434846
end_time=end_time,
48444847
tag=tag,

src/zenml/models/v2/core/pipeline_run.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,6 @@ class PipelineRunRequest(ProjectScopedRequest):
101101
snapshot: UUID = Field(
102102
title="The snapshot associated with the pipeline run."
103103
)
104-
pipeline: Optional[UUID] = Field(
105-
title="The pipeline associated with the pipeline run.",
106-
default=None,
107-
)
108104
orchestrator_run_id: Optional[str] = Field(
109105
title="The orchestrator run ID.",
110106
max_length=STR_FIELD_MAX_LENGTH,
@@ -214,6 +210,9 @@ class PipelineRunResponseBody(ProjectScopedResponseBody):
214210
default=None,
215211
title="The reason for the status of the pipeline run.",
216212
)
213+
index: int = Field(
214+
title="The unique index of the run within the pipeline."
215+
)
217216

218217
model_config = ConfigDict(protected_namespaces=())
219218

@@ -391,6 +390,15 @@ def status(self) -> ExecutionStatus:
391390
"""
392391
return self.get_body().status
393392

393+
@property
394+
def index(self) -> int:
395+
"""The `index` property.
396+
397+
Returns:
398+
the value of the property.
399+
"""
400+
return self.get_body().index
401+
394402
@property
395403
def run_metadata(self) -> Dict[str, MetadataType]:
396404
"""The `run_metadata` property.
@@ -672,6 +680,10 @@ class PipelineRunFilter(
672680
default=None,
673681
description="Name of the Pipeline Run",
674682
)
683+
index: Optional[int] = Field(
684+
default=None,
685+
description="The unique index of the run within the pipeline.",
686+
)
675687
orchestrator_run_id: Optional[str] = Field(
676688
default=None,
677689
description="Name of the Pipeline Run within the orchestrator",

src/zenml/orchestrators/step_launcher.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,6 @@ def _create_or_reuse_run(self) -> Tuple[PipelineRunResponse, bool]:
406406
orchestrator_run_id=self._orchestrator_run_id,
407407
project=client.active_project.id,
408408
snapshot=self._snapshot.id,
409-
pipeline=(
410-
self._snapshot.pipeline.id if self._snapshot.pipeline else None
411-
),
412409
status=ExecutionStatus.RUNNING,
413410
orchestrator_environment=get_run_environment_dict(),
414411
start_time=start_time,

src/zenml/pipelines/run_utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ def create_placeholder_run(
108108
orchestrator_run_id=orchestrator_run_id,
109109
project=snapshot.project_id,
110110
snapshot=snapshot.id,
111-
pipeline=snapshot.pipeline.id if snapshot.pipeline else None,
112111
status=ExecutionStatus.INITIALIZING,
113112
tags=snapshot.pipeline_configuration.tags,
114113
logs=logs,
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Unique run index [6e4eb89f632d].
2+
3+
Revision ID: 6e4eb89f632d
4+
Revises: 0.92.0
5+
Create Date: 2025-12-03 17:27:32.828004
6+
7+
"""
8+
9+
from typing import Any
10+
11+
import sqlalchemy as sa
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "6e4eb89f632d"
16+
down_revision = "0.92.0"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
"""Upgrade database schema and/or data, creating a new revision."""
23+
with op.batch_alter_table("pipeline", schema=None) as batch_op:
24+
batch_op.add_column(
25+
sa.Column("run_count", sa.Integer(), nullable=True)
26+
)
27+
28+
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
29+
batch_op.add_column(sa.Column("index", sa.Integer(), nullable=True))
30+
31+
connection = op.get_bind()
32+
meta = sa.MetaData()
33+
meta.reflect(bind=connection, only=("pipeline_run", "pipeline"))
34+
run_table = sa.Table("pipeline_run", meta)
35+
pipeline_table = sa.Table("pipeline", meta)
36+
37+
# Some very old runs (version <0.34.0) might not have a pipeline ID in the
38+
# rare case where the associated pipeline was deleted after the migration
39+
# with revision `8ad841ad9bfe`.
40+
connection.execute(
41+
sa.update(run_table)
42+
.where(run_table.c.pipeline_id.is_(None))
43+
.values(index=0)
44+
)
45+
46+
result = connection.execute(
47+
sa.select(
48+
run_table.c.id,
49+
run_table.c.pipeline_id,
50+
run_table.c.created,
51+
)
52+
.where(run_table.c.pipeline_id.is_not(None))
53+
.order_by(run_table.c.pipeline_id, run_table.c.created, run_table.c.id)
54+
).fetchall()
55+
56+
current_pipeline_id = None
57+
index_within_pipeline = 0
58+
run_updates: list[dict[str, Any]] = []
59+
run_counts: dict[str, int] = {}
60+
for row in result:
61+
pipeline_id = row.pipeline_id
62+
if pipeline_id != current_pipeline_id:
63+
current_pipeline_id = pipeline_id
64+
index_within_pipeline = 1
65+
else:
66+
index_within_pipeline += 1
67+
run_updates.append({"id_": row.id, "index": index_within_pipeline})
68+
run_counts[pipeline_id] = index_within_pipeline
69+
70+
if run_updates:
71+
connection.execute(
72+
sa.update(run_table)
73+
.where(run_table.c.id == sa.bindparam("id_"))
74+
.values(index=sa.bindparam("index")),
75+
run_updates,
76+
)
77+
78+
if run_counts:
79+
pipeline_updates = [
80+
{"id_": pipeline_id, "run_count": run_count}
81+
for pipeline_id, run_count in run_counts.items()
82+
]
83+
connection.execute(
84+
sa.update(pipeline_table)
85+
.where(pipeline_table.c.id == sa.bindparam("id_"))
86+
.values(run_count=sa.bindparam("run_count")),
87+
pipeline_updates,
88+
)
89+
90+
# Step 3: Make columns non-nullable
91+
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
92+
batch_op.alter_column(
93+
"index", existing_type=sa.Integer(), nullable=False
94+
)
95+
96+
with op.batch_alter_table("pipeline", schema=None) as batch_op:
97+
batch_op.alter_column(
98+
"run_count", existing_type=sa.Integer(), nullable=False
99+
)
100+
101+
102+
def downgrade() -> None:
103+
"""Downgrade database schema and/or data back to the previous revision."""
104+
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
105+
batch_op.drop_column("number")
106+
107+
with op.batch_alter_table("pipeline", schema=None) as batch_op:
108+
batch_op.drop_column("run_count")

src/zenml/zen_stores/schemas/pipeline_run_schemas.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class PipelineRunSchema(NamedSchema, RunMetadataInterface, table=True):
117117
orchestrator_environment: Optional[str] = Field(
118118
sa_column=Column(TEXT, nullable=True)
119119
)
120+
index: int = Field(nullable=False)
120121

121122
# Foreign keys
122123
snapshot_id: Optional[UUID] = build_foreign_key_field(
@@ -343,12 +344,14 @@ def get_query_options(
343344

344345
@classmethod
345346
def from_request(
346-
cls, request: "PipelineRunRequest"
347+
cls, request: "PipelineRunRequest", pipeline_id: UUID, index: int
347348
) -> "PipelineRunSchema":
348349
"""Convert a `PipelineRunRequest` to a `PipelineRunSchema`.
349350
350351
Args:
351352
request: The request to convert.
353+
pipeline_id: The ID of the pipeline.
354+
index: The index of the pipeline run.
352355
353356
Returns:
354357
The created `PipelineRunSchema`.
@@ -379,9 +382,10 @@ def from_request(
379382
orchestrator_environment=orchestrator_environment,
380383
start_time=request.start_time,
381384
status=request.status.value,
385+
index=index,
382386
in_progress=not request.status.is_finished,
383387
status_reason=request.status_reason,
384-
pipeline_id=request.pipeline,
388+
pipeline_id=pipeline_id,
385389
snapshot_id=request.snapshot,
386390
trigger_execution_id=request.trigger_execution_id,
387391
triggered_by=triggered_by,
@@ -547,6 +551,7 @@ def to_model(
547551
created=self.created,
548552
updated=self.updated,
549553
in_progress=self.in_progress,
554+
index=self.index,
550555
)
551556
metadata = None
552557
if include_metadata:
@@ -771,11 +776,10 @@ def update_placeholder(
771776

772777
if (
773778
self.snapshot_id != request.snapshot
774-
or self.pipeline_id != request.pipeline
775779
or self.project_id != request.project
776780
):
777781
raise ValueError(
778-
"Snapshot, project or pipeline ID of placeholder run "
782+
"Snapshot or project ID of placeholder run "
779783
"do not match the IDs of the run request."
780784
)
781785

src/zenml/zen_stores/schemas/pipeline_schemas.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ class PipelineSchema(NamedSchema, table=True):
8383
ondelete="SET NULL",
8484
nullable=True,
8585
)
86+
run_count: int = Field(nullable=False)
8687

8788
# Relationships
8889
user: Optional["UserSchema"] = Relationship(back_populates="pipelines")
@@ -198,6 +199,7 @@ def from_request(
198199
description=pipeline_request.description,
199200
project_id=pipeline_request.project,
200201
user_id=pipeline_request.user,
202+
run_count=0,
201203
)
202204

203205
def to_model(

0 commit comments

Comments
 (0)