Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/integration-test-fast.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ jobs:
remove-android: 'true'
remove-haskell: 'true'
build-mount-path: /var/lib/docker/
if: inputs.os == 'ubuntu-latest' && (contains(inputs.test_environment, 'docker')
|| contains(inputs.test_environment, 'kubeflow') || contains(inputs.test_environment,
'airflow') || contains(inputs.test_environment, 'kubernetes'))
if: inputs.os == 'ubuntu-latest'
- name: Reload Docker
run: sudo systemctl restart docker
if: inputs.os == 'ubuntu-latest' && (contains(inputs.test_environment, 'docker')
Expand Down
4 changes: 1 addition & 3 deletions .github/workflows/integration-test-slow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,7 @@ jobs:
remove-android: 'true'
remove-haskell: 'true'
build-mount-path: /var/lib/docker/
if: inputs.os == 'ubuntu-latest' && (contains(inputs.test_environment, 'docker')
|| contains(inputs.test_environment, 'kubeflow') || contains(inputs.test_environment,
'airflow') || contains(inputs.test_environment, 'kubernetes'))
if: inputs.os == 'ubuntu-latest'
- name: Reload Docker
run: sudo systemctl restart docker
if: inputs.os == 'ubuntu-latest' && (contains(inputs.test_environment, 'docker')
Expand Down
10 changes: 9 additions & 1 deletion src/zenml/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,15 @@ def runs() -> None:
@runs.command("list", help="List all registered pipeline runs.")
@list_options(
PipelineRunFilter,
default_columns=["id", "run_name", "pipeline", "status", "stack", "owner"],
default_columns=[
"id",
"index",
"run_name",
"pipeline",
"status",
"stack",
"owner",
],
)
def list_pipeline_runs(
columns: str, output_format: OutputFormat, **kwargs: Any
Expand Down
3 changes: 3 additions & 0 deletions src/zenml/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4741,6 +4741,7 @@ def list_pipeline_runs(
linked_to_model_version_id: Optional[Union[str, UUID]] = None,
orchestrator_run_id: Optional[str] = None,
status: Optional[str] = None,
index: Optional[int] = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
unlisted: Optional[bool] = None,
Expand Down Expand Up @@ -4790,6 +4791,7 @@ def list_pipeline_runs(
orchestrator_run_id: The run id of the orchestrator to filter by.
name: The name of the run to filter by.
status: The status of the pipeline run
index: The index of the pipeline run
start_time: The start_time for the pipeline run
end_time: The end_time for the pipeline run
unlisted: If the runs should be unlisted or not.
Expand Down Expand Up @@ -4839,6 +4841,7 @@ def list_pipeline_runs(
orchestrator_run_id=orchestrator_run_id,
stack_id=stack_id,
status=status,
index=index,
start_time=start_time,
end_time=end_time,
tag=tag,
Expand Down
20 changes: 16 additions & 4 deletions src/zenml/models/v2/core/pipeline_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ class PipelineRunRequest(ProjectScopedRequest):
snapshot: UUID = Field(
title="The snapshot associated with the pipeline run."
)
pipeline: Optional[UUID] = Field(
title="The pipeline associated with the pipeline run.",
default=None,
)
orchestrator_run_id: Optional[str] = Field(
title="The orchestrator run ID.",
max_length=STR_FIELD_MAX_LENGTH,
Expand Down Expand Up @@ -214,6 +210,9 @@ class PipelineRunResponseBody(ProjectScopedResponseBody):
default=None,
title="The reason for the status of the pipeline run.",
)
index: int = Field(
title="The unique index of the run within the pipeline."
)

model_config = ConfigDict(protected_namespaces=())

Expand Down Expand Up @@ -391,6 +390,15 @@ def status(self) -> ExecutionStatus:
"""
return self.get_body().status

@property
def index(self) -> int:
"""The `index` property.

Returns:
the value of the property.
"""
return self.get_body().index

@property
def run_metadata(self) -> Dict[str, MetadataType]:
"""The `run_metadata` property.
Expand Down Expand Up @@ -672,6 +680,10 @@ class PipelineRunFilter(
default=None,
description="Name of the Pipeline Run",
)
index: Optional[int] = Field(
default=None,
description="The unique index of the run within the pipeline.",
)
orchestrator_run_id: Optional[str] = Field(
default=None,
description="Name of the Pipeline Run within the orchestrator",
Expand Down
3 changes: 0 additions & 3 deletions src/zenml/orchestrators/step_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,6 @@ def _create_or_reuse_run(self) -> Tuple[PipelineRunResponse, bool]:
orchestrator_run_id=self._orchestrator_run_id,
project=client.active_project.id,
snapshot=self._snapshot.id,
pipeline=(
self._snapshot.pipeline.id if self._snapshot.pipeline else None
),
status=ExecutionStatus.RUNNING,
orchestrator_environment=get_run_environment_dict(),
start_time=start_time,
Expand Down
1 change: 0 additions & 1 deletion src/zenml/pipelines/run_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ def create_placeholder_run(
orchestrator_run_id=orchestrator_run_id,
project=snapshot.project_id,
snapshot=snapshot.id,
pipeline=snapshot.pipeline.id if snapshot.pipeline else None,
status=ExecutionStatus.INITIALIZING,
tags=snapshot.pipeline_configuration.tags,
logs=logs,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""Unique run index [6e4eb89f632d].

Revision ID: 6e4eb89f632d
Revises: 5c0a1c787128
Create Date: 2025-12-03 17:27:32.828004

"""

from typing import Any

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "6e4eb89f632d"
down_revision = "5c0a1c787128"
branch_labels = None
depends_on = None


def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision."""
with op.batch_alter_table("pipeline", schema=None) as batch_op:
batch_op.add_column(
sa.Column("run_count", sa.Integer(), nullable=True)
)

with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
batch_op.add_column(sa.Column("index", sa.Integer(), nullable=True))

connection = op.get_bind()
meta = sa.MetaData()
meta.reflect(bind=connection, only=("pipeline_run", "pipeline"))
run_table = sa.Table("pipeline_run", meta)
pipeline_table = sa.Table("pipeline", meta)

# Some very old runs (version <0.34.0) might not have a pipeline ID in the
# rare case where the associated pipeline was deleted after the migration
# with revision `8ad841ad9bfe`.
connection.execute(
sa.update(run_table)
.where(run_table.c.pipeline_id.is_(None))
.values(index=0)
)

result = connection.execute(
sa.select(
run_table.c.id,
run_table.c.pipeline_id,
)
.where(run_table.c.pipeline_id.is_not(None))
.order_by(run_table.c.pipeline_id, run_table.c.created, run_table.c.id)
).fetchall()

current_pipeline_id = None
index_within_pipeline = 0
run_updates: list[dict[str, Any]] = []
run_counts: dict[str, int] = {}
for row in result:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I would advocate to execute data population for optional values in a post-deployment or post-migration step. Let's think about the worst case scenario in terms of execution time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the index should be non-optional and the column type is changed at the end of the DB migration to reflect that, which is why we need some values for each existing row. I guess we could populate it with a placeholder and then later on replace by real values, not sure it's worth the effort here though?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah let's no change the migration logic, let's just estimate the execution time for the backfill (worst case scenario). The general rule is migrations should complete quickly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I would advocate to execute data population for optional values in a post-deployment or post-migration step

@Json-Andriopoulos I'm curious, why is it not acceptable to do these sort of changes in the alembic migration script ? what are the pros/cons ?

pipeline_id = row.pipeline_id
if pipeline_id != current_pipeline_id:
current_pipeline_id = pipeline_id
index_within_pipeline = 1
else:
index_within_pipeline += 1
run_updates.append({"id_": row.id, "index": index_within_pipeline})
run_counts[pipeline_id] = index_within_pipeline

update_batch_size = 10000
if run_updates:
update_statement = (
sa.update(run_table)
.where(run_table.c.id == sa.bindparam("id_"))
.values(index=sa.bindparam("index"))
)

for start in range(0, len(run_updates), update_batch_size):
batch = run_updates[start : start + update_batch_size]
if batch:
connection.execute(update_statement, batch)

if run_counts:
pipeline_updates = [
{"id_": pipeline_id, "run_count": run_count}
for pipeline_id, run_count in run_counts.items()
]
update_statement = (
sa.update(pipeline_table)
.where(pipeline_table.c.id == sa.bindparam("id_"))
.values(run_count=sa.bindparam("run_count"))
)
for start in range(0, len(pipeline_updates), update_batch_size):
batch = pipeline_updates[start : start + update_batch_size]
if batch:
connection.execute(update_statement, batch)

with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
batch_op.alter_column(
"index", existing_type=sa.Integer(), nullable=False
)

with op.batch_alter_table("pipeline", schema=None) as batch_op:
batch_op.alter_column(
"run_count", existing_type=sa.Integer(), nullable=False
)


def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
batch_op.drop_column("number")

with op.batch_alter_table("pipeline", schema=None) as batch_op:
batch_op.drop_column("run_count")
12 changes: 8 additions & 4 deletions src/zenml/zen_stores/schemas/pipeline_run_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class PipelineRunSchema(NamedSchema, RunMetadataInterface, table=True):
orchestrator_environment: Optional[str] = Field(
sa_column=Column(TEXT, nullable=True)
)
index: int = Field(nullable=False)

# Foreign keys
snapshot_id: Optional[UUID] = build_foreign_key_field(
Expand Down Expand Up @@ -343,12 +344,14 @@ def get_query_options(

@classmethod
def from_request(
cls, request: "PipelineRunRequest"
cls, request: "PipelineRunRequest", pipeline_id: UUID, index: int
) -> "PipelineRunSchema":
"""Convert a `PipelineRunRequest` to a `PipelineRunSchema`.

Args:
request: The request to convert.
pipeline_id: The ID of the pipeline.
index: The index of the pipeline run.

Returns:
The created `PipelineRunSchema`.
Expand Down Expand Up @@ -379,9 +382,10 @@ def from_request(
orchestrator_environment=orchestrator_environment,
start_time=request.start_time,
status=request.status.value,
index=index,
in_progress=not request.status.is_finished,
status_reason=request.status_reason,
pipeline_id=request.pipeline,
pipeline_id=pipeline_id,
snapshot_id=request.snapshot,
trigger_execution_id=request.trigger_execution_id,
triggered_by=triggered_by,
Expand Down Expand Up @@ -547,6 +551,7 @@ def to_model(
created=self.created,
updated=self.updated,
in_progress=self.in_progress,
index=self.index,
)
metadata = None
if include_metadata:
Expand Down Expand Up @@ -771,11 +776,10 @@ def update_placeholder(

if (
self.snapshot_id != request.snapshot
or self.pipeline_id != request.pipeline
or self.project_id != request.project
):
raise ValueError(
"Snapshot, project or pipeline ID of placeholder run "
"Snapshot or project ID of placeholder run "
"do not match the IDs of the run request."
)

Expand Down
2 changes: 2 additions & 0 deletions src/zenml/zen_stores/schemas/pipeline_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class PipelineSchema(NamedSchema, table=True):
ondelete="SET NULL",
nullable=True,
)
run_count: int = Field(nullable=False)

# Relationships
user: Optional["UserSchema"] = Relationship(back_populates="pipelines")
Expand Down Expand Up @@ -198,6 +199,7 @@ def from_request(
description=pipeline_request.description,
project_id=pipeline_request.project,
user_id=pipeline_request.user,
run_count=0,
)

def to_model(
Expand Down
41 changes: 33 additions & 8 deletions src/zenml/zen_stores/sql_zen_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6102,6 +6102,7 @@ def get_pipeline_run_dag(self, pipeline_run_id: UUID) -> PipelineRunDAG:
jl_arg(PipelineRunSchema.start_time),
jl_arg(PipelineRunSchema.end_time),
jl_arg(PipelineRunSchema.status),
jl_arg(PipelineRunSchema.index),
),
],
)
Expand Down Expand Up @@ -6336,6 +6337,7 @@ def _get_regular_output_artifact_node(
for triggered_run in step_run.triggered_runs:
triggered_run_metadata: Dict[str, Any] = {
"status": triggered_run.status,
"index": triggered_run.index,
}

if triggered_run.start_time:
Expand Down Expand Up @@ -6504,6 +6506,31 @@ def _get_duplicate_run_name_error_message(
f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name"
)

def _get_next_run_index(self, pipeline_id: UUID, session: Session) -> int:
"""Get the next run index for a pipeline.

Args:
pipeline_id: The ID of the pipeline to get the next run index for.
session: SQLAlchemy session.

Returns:
The next run index for the pipeline.
"""
# Commit before acquiring the exclusive lock on the pipeline
session.commit()
session.execute(
update(PipelineSchema)
.where(col(PipelineSchema.id) == pipeline_id)
.values(run_count=col(PipelineSchema.run_count) + 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one! I think this should be ok with the default isolation level in MySQL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep I think so! Also in my tests it seemed to work just fine. I also didn't add a unique constraint on pipeline_id, index, so even if this for some reason fails to generate unique values, we will not fail to create a run but instead only have a duplicate index.

)
index = session.exec(
select(PipelineSchema.run_count).where(
col(PipelineSchema.id) == pipeline_id
)
).one()
session.commit()
return index

def _create_run(
self, pipeline_run: PipelineRunRequest, session: Session
) -> PipelineRunResponse:
Expand All @@ -6524,21 +6551,19 @@ def _create_run(
can not be created.
"""
self._set_request_user_id(request_model=pipeline_run, session=session)
self._get_reference_schema_by_id(
snapshot = self._get_reference_schema_by_id(
resource=pipeline_run,
reference_schema=PipelineSnapshotSchema,
reference_id=pipeline_run.snapshot,
session=session,
)

self._get_reference_schema_by_id(
resource=pipeline_run,
reference_schema=PipelineSchema,
reference_id=pipeline_run.pipeline,
session=session,
index = self._get_next_run_index(
pipeline_id=snapshot.pipeline_id, session=session
)
new_run = PipelineRunSchema.from_request(
pipeline_run, pipeline_id=snapshot.pipeline_id, index=index
)

new_run = PipelineRunSchema.from_request(pipeline_run)

session.add(new_run)

Expand Down
Loading
Loading