-
Notifications
You must be signed in to change notification settings - Fork 559
Add pipeline run index attribute #4288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| """Unique run index [6e4eb89f632d]. | ||
|
|
||
| Revision ID: 6e4eb89f632d | ||
| Revises: 5c0a1c787128 | ||
| Create Date: 2025-12-03 17:27:32.828004 | ||
|
|
||
| """ | ||
|
|
||
| from typing import Any | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "6e4eb89f632d" | ||
| down_revision = "5c0a1c787128" | ||
| branch_labels = None | ||
| depends_on = None | ||
|
|
||
|
|
||
| def upgrade() -> None: | ||
| """Upgrade database schema and/or data, creating a new revision.""" | ||
| with op.batch_alter_table("pipeline", schema=None) as batch_op: | ||
| batch_op.add_column( | ||
| sa.Column("run_count", sa.Integer(), nullable=True) | ||
| ) | ||
|
|
||
| with op.batch_alter_table("pipeline_run", schema=None) as batch_op: | ||
| batch_op.add_column(sa.Column("index", sa.Integer(), nullable=True)) | ||
|
|
||
| connection = op.get_bind() | ||
| meta = sa.MetaData() | ||
| meta.reflect(bind=connection, only=("pipeline_run", "pipeline")) | ||
| run_table = sa.Table("pipeline_run", meta) | ||
| pipeline_table = sa.Table("pipeline", meta) | ||
|
|
||
| # Some very old runs (version <0.34.0) might not have a pipeline ID in the | ||
| # rare case where the associated pipeline was deleted after the migration | ||
| # with revision `8ad841ad9bfe`. | ||
| connection.execute( | ||
| sa.update(run_table) | ||
| .where(run_table.c.pipeline_id.is_(None)) | ||
| .values(index=0) | ||
| ) | ||
|
|
||
| result = connection.execute( | ||
| sa.select( | ||
| run_table.c.id, | ||
| run_table.c.pipeline_id, | ||
| ) | ||
| .where(run_table.c.pipeline_id.is_not(None)) | ||
| .order_by(run_table.c.pipeline_id, run_table.c.created, run_table.c.id) | ||
| ).fetchall() | ||
|
|
||
| current_pipeline_id = None | ||
| index_within_pipeline = 0 | ||
| run_updates: list[dict[str, Any]] = [] | ||
| run_counts: dict[str, int] = {} | ||
| for row in result: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Normally I would advocate to execute data population for optional values in a post-deployment or post-migration step. Let's think about the worst case scenario in terms of execution time.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the index should be non-optional and the column type is changed at the end of the DB migration to reflect that, which is why we need some values for each existing row. I guess we could populate it with a placeholder and then later on replace by real values, not sure it's worth the effort here though?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nah let's no change the migration logic, let's just estimate the execution time for the backfill (worst case scenario). The general rule is migrations should complete quickly.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@Json-Andriopoulos I'm curious, why is it not acceptable to do these sort of changes in the alembic migration script ? what are the pros/cons ? |
||
| pipeline_id = row.pipeline_id | ||
| if pipeline_id != current_pipeline_id: | ||
| current_pipeline_id = pipeline_id | ||
| index_within_pipeline = 1 | ||
| else: | ||
| index_within_pipeline += 1 | ||
| run_updates.append({"id_": row.id, "index": index_within_pipeline}) | ||
| run_counts[pipeline_id] = index_within_pipeline | ||
|
|
||
| update_batch_size = 10000 | ||
| if run_updates: | ||
| update_statement = ( | ||
| sa.update(run_table) | ||
| .where(run_table.c.id == sa.bindparam("id_")) | ||
| .values(index=sa.bindparam("index")) | ||
| ) | ||
|
|
||
| for start in range(0, len(run_updates), update_batch_size): | ||
| batch = run_updates[start : start + update_batch_size] | ||
| if batch: | ||
| connection.execute(update_statement, batch) | ||
|
|
||
| if run_counts: | ||
| pipeline_updates = [ | ||
| {"id_": pipeline_id, "run_count": run_count} | ||
| for pipeline_id, run_count in run_counts.items() | ||
| ] | ||
| update_statement = ( | ||
| sa.update(pipeline_table) | ||
| .where(pipeline_table.c.id == sa.bindparam("id_")) | ||
| .values(run_count=sa.bindparam("run_count")) | ||
| ) | ||
| for start in range(0, len(pipeline_updates), update_batch_size): | ||
| batch = pipeline_updates[start : start + update_batch_size] | ||
| if batch: | ||
| connection.execute(update_statement, batch) | ||
|
|
||
| with op.batch_alter_table("pipeline_run", schema=None) as batch_op: | ||
| batch_op.alter_column( | ||
| "index", existing_type=sa.Integer(), nullable=False | ||
| ) | ||
|
|
||
| with op.batch_alter_table("pipeline", schema=None) as batch_op: | ||
| batch_op.alter_column( | ||
| "run_count", existing_type=sa.Integer(), nullable=False | ||
| ) | ||
|
|
||
|
|
||
| def downgrade() -> None: | ||
| """Downgrade database schema and/or data back to the previous revision.""" | ||
| with op.batch_alter_table("pipeline_run", schema=None) as batch_op: | ||
| batch_op.drop_column("number") | ||
|
|
||
| with op.batch_alter_table("pipeline", schema=None) as batch_op: | ||
| batch_op.drop_column("run_count") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6102,6 +6102,7 @@ def get_pipeline_run_dag(self, pipeline_run_id: UUID) -> PipelineRunDAG: | |
| jl_arg(PipelineRunSchema.start_time), | ||
| jl_arg(PipelineRunSchema.end_time), | ||
| jl_arg(PipelineRunSchema.status), | ||
| jl_arg(PipelineRunSchema.index), | ||
| ), | ||
| ], | ||
| ) | ||
|
|
@@ -6336,6 +6337,7 @@ def _get_regular_output_artifact_node( | |
| for triggered_run in step_run.triggered_runs: | ||
| triggered_run_metadata: Dict[str, Any] = { | ||
| "status": triggered_run.status, | ||
| "index": triggered_run.index, | ||
| } | ||
|
|
||
| if triggered_run.start_time: | ||
|
|
@@ -6504,6 +6506,31 @@ def _get_duplicate_run_name_error_message( | |
| f"For more information on run naming, see: https://docs.zenml.io/concepts/steps_and_pipelines/yaml_configuration#run-name" | ||
| ) | ||
|
|
||
| def _get_next_run_index(self, pipeline_id: UUID, session: Session) -> int: | ||
| """Get the next run index for a pipeline. | ||
|
|
||
| Args: | ||
| pipeline_id: The ID of the pipeline to get the next run index for. | ||
| session: SQLAlchemy session. | ||
|
|
||
| Returns: | ||
| The next run index for the pipeline. | ||
| """ | ||
| # Commit before acquiring the exclusive lock on the pipeline | ||
| session.commit() | ||
| session.execute( | ||
| update(PipelineSchema) | ||
| .where(col(PipelineSchema.id) == pipeline_id) | ||
| .values(run_count=col(PipelineSchema.run_count) + 1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good one! I think this should be ok with the default isolation level in MySQL.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep I think so! Also in my tests it seemed to work just fine. I also didn't add a unique constraint on |
||
| ) | ||
| index = session.exec( | ||
| select(PipelineSchema.run_count).where( | ||
| col(PipelineSchema.id) == pipeline_id | ||
| ) | ||
| ).one() | ||
| session.commit() | ||
| return index | ||
|
|
||
| def _create_run( | ||
| self, pipeline_run: PipelineRunRequest, session: Session | ||
| ) -> PipelineRunResponse: | ||
|
|
@@ -6524,21 +6551,19 @@ def _create_run( | |
| can not be created. | ||
| """ | ||
| self._set_request_user_id(request_model=pipeline_run, session=session) | ||
| self._get_reference_schema_by_id( | ||
| snapshot = self._get_reference_schema_by_id( | ||
| resource=pipeline_run, | ||
| reference_schema=PipelineSnapshotSchema, | ||
| reference_id=pipeline_run.snapshot, | ||
| session=session, | ||
| ) | ||
|
|
||
| self._get_reference_schema_by_id( | ||
| resource=pipeline_run, | ||
| reference_schema=PipelineSchema, | ||
| reference_id=pipeline_run.pipeline, | ||
| session=session, | ||
| index = self._get_next_run_index( | ||
| pipeline_id=snapshot.pipeline_id, session=session | ||
| ) | ||
| new_run = PipelineRunSchema.from_request( | ||
| pipeline_run, pipeline_id=snapshot.pipeline_id, index=index | ||
| ) | ||
|
|
||
| new_run = PipelineRunSchema.from_request(pipeline_run) | ||
|
|
||
| session.add(new_run) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.