Skip to content

feat: optimize ci cd workflow #6744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
87521d5
Add workflow status latest service with CI/CD status optimization
prakash100198 Jul 21, 2025
b97e7e4
Decouple workflow status management by deriving `status` from respect…
prakash100198 Jul 21, 2025
bc7e274
Refactor `WorkflowStatusLatestService` to derive `status` from workfl…
prakash100198 Jul 21, 2025
604baa8
Remove `WorkflowStatusLatestService` usage and fallback logic from `C…
prakash100198 Jul 21, 2025
17c1dae
Refactor workflow status handling: rename `WorkflowStatusLatestServic…
prakash100198 Jul 21, 2025
cb6ce49
Integrate optimized CI workflow fetching in `CiHandlerImpl` using `Fi…
prakash100198 Jul 21, 2025
96815d3
Add `StorageConfigured` field to `CiWorkflowStatusLatest` and update …
prakash100198 Jul 22, 2025
4a0bc3a
wip: adding code for cd
iamayushm Jul 22, 2025
fc4c6aa
Implement hybrid CI workflow status fetching in `CiHandlerImpl` using…
prakash100198 Jul 22, 2025
7f327f2
wip
iamayushm Jul 23, 2025
19f6886
Rename `GetCachedPipelineIds` to `GetByPipelineIds` in `WorkflowStatu…
prakash100198 Jul 23, 2025
01b8bc6
Refactor CI status fetching in `CiHandlerImpl` by renaming methods fo…
prakash100198 Jul 23, 2025
3bc8e3d
Refactor method names and variable references in `CiHandlerImpl` for …
prakash100198 Jul 23, 2025
02fd2b4
Remove unused `GetByPipelineIds` method from `WorkflowStatusLatestRep…
prakash100198 Jul 23, 2025
b3e96fc
Refactor `WorkflowStatusLatestService` package imports and CI status …
prakash100198 Jul 23, 2025
083fd43
Merge remote-tracking branch 'origin/optimize-ci-cd-workflow' into cd…
iamayushm Jul 23, 2025
03f78d4
circular import fix
iamayushm Jul 23, 2025
677f569
Merge pull request #6748 from devtron-labs/cd-get-optimised
iamayushm Jul 23, 2025
48468ce
refactoring
iamayushm Jul 23, 2025
8cd66fe
Merge remote-tracking branch 'origin/optimize-ci-cd-workflow' into cd…
iamayushm Jul 23, 2025
61a1c74
opt (#6747)
prkhrkat Jul 23, 2025
1368559
removed extra code
prkhrkat Jul 23, 2025
20dc988
dev testing fixes
iamayushm Jul 23, 2025
b6156f0
dev testing changes
iamayushm Jul 23, 2025
4443e57
opt
iamayushm Jul 23, 2025
714107a
Remove redundant workflow status update logic for CI/CD workflows and…
prakash100198 Jul 24, 2025
ca578cf
changing slow query
iamayushm Jul 24, 2025
df5fb33
Remove `WorkflowStatusUpdateService` and update references to streaml…
prakash100198 Jul 24, 2025
58895c6
Merge remote-tracking branch 'origin/optimize-ci-cd-workflow' into op…
prakash100198 Jul 24, 2025
6ecb09b
Remove legacy CI/CD workflow status fetching methods and streamline i…
prakash100198 Jul 24, 2025
aae32f6
Remove unused methods from `WorkflowStatusLatestRepository` to stream…
prakash100198 Jul 24, 2025
ee5672e
Update unique constraints in workflow status tables to use `ci_workfl…
prakash100198 Jul 24, 2025
b86f98f
Update `WorkflowStatusLatestService` to handle both create and update…
prakash100198 Jul 24, 2025
197a9dc
fix: refactored unit conversion for cluster details (#6768)
SATYAsasini Jul 28, 2025
35f4e38
develop merge
iamayushm Jul 29, 2025
797bc2c
develop merge
iamayushm Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down
25 changes: 25 additions & 0 deletions internal/sql/repository/pipelineConfig/CiWorkflowRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type CiWorkflowRepository interface {
FindByName(name string) (*CiWorkflow, error)

FindLastTriggeredWorkflowByCiIds(pipelineId []int) (ciWorkflow []*CiWorkflow, err error)
FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error)
FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error)
FindAllLastTriggeredWorkflowByArtifactId(ciArtifactId []int) (ciWorkflow []*CiWorkflow, err error)
FindAllTriggeredWorkflowCountInLast24Hour() (ciWorkflowCount int, err error)
Expand All @@ -48,6 +49,7 @@ type CiWorkflowRepository interface {
ExistsByStatus(status string) (bool, error)
FindBuildTypeAndStatusDataOfLast1Day() ([]*BuildTypeCount, error)
FIndCiWorkflowStatusesByAppId(appId int) ([]*CiWorkflowStatus, error)
FindCiPipelineIdsByAppId(appId int) ([]int, error)

MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool)
MigrateCiArtifactLocation(wfId int, artifactLocation string)
Expand Down Expand Up @@ -290,6 +292,19 @@ func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByCiIds(pipelineI
return ciWorkflow, err
}

// FindWorkflowsByCiWorkflowIds fetches workflows by their workflow IDs (simple query)
func (impl *CiWorkflowRepositoryImpl) FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error) {
if len(ciWorkflowIds) == 0 {
return ciWorkflow, nil
}

err = impl.dbConnection.Model(&ciWorkflow).
Column("ci_workflow.*", "CiPipeline").
Where("ci_workflow.id IN (?)", pg.In(ciWorkflowIds)).
Select()
return ciWorkflow, err
}

func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error) {
workflow := &CiWorkflow{}
err = impl.dbConnection.Model(workflow).
Expand Down Expand Up @@ -379,6 +394,16 @@ func (impl *CiWorkflowRepositoryImpl) FIndCiWorkflowStatusesByAppId(appId int) (
return ciworkflowStatuses, err
}

// FindCiPipelineIdsByAppId gets all CI pipeline IDs for an app (simple query)
func (impl *CiWorkflowRepositoryImpl) FindCiPipelineIdsByAppId(appId int) ([]int, error) {
var ciPipelineIds []int
err := impl.dbConnection.Model((*CiPipeline)(nil)).
Column("id").
Where("app_id = ? AND deleted = false", appId).
Select(&ciPipelineIds)
return ciPipelineIds, err
}

func (impl *CiWorkflowRepositoryImpl) MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool) {
_, err := impl.dbConnection.Model((*CiWorkflow)(nil)).
Set("is_artifact_uploaded = ?", workflow.GetArtifactUploadedType(isArtifactUploaded)).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright (c) 2024. Devtron Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pipelineConfig

import (
"github.com/devtron-labs/devtron/pkg/sql"
"github.com/go-pg/pg"
"github.com/go-pg/pg/orm"
"go.uber.org/zap"
)

type WorkflowStatusLatestRepository interface {
// CI Workflow Status Latest methods
SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error
UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error
GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error)
GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error)

// CD Workflow Status Latest methods
SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error
UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error
GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error)
GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error)
}

type WorkflowStatusLatestRepositoryImpl struct {
dbConnection *pg.DB
logger *zap.SugaredLogger
}

func NewWorkflowStatusLatestRepositoryImpl(dbConnection *pg.DB, logger *zap.SugaredLogger) *WorkflowStatusLatestRepositoryImpl {
return &WorkflowStatusLatestRepositoryImpl{
dbConnection: dbConnection,
logger: logger,
}
}

// CI Workflow Status Latest model
type CiWorkflowStatusLatest struct {
tableName struct{} `sql:"ci_workflow_status_latest" pg:",discard_unknown_columns"`
Id int `sql:"id,pk"`
PipelineId int `sql:"pipeline_id"`
AppId int `sql:"app_id"`
CiWorkflowId int `sql:"ci_workflow_id"`
sql.AuditLog
}

// CD Workflow Status Latest model
type CdWorkflowStatusLatest struct {
tableName struct{} `sql:"cd_workflow_status_latest" pg:",discard_unknown_columns"`
Id int `sql:"id,pk"`
PipelineId int `sql:"pipeline_id"`
AppId int `sql:"app_id"`
EnvironmentId int `sql:"environment_id"`
WorkflowType string `sql:"workflow_type"`
WorkflowRunnerId int `sql:"workflow_runner_id"`
sql.AuditLog
}

// CI Workflow Status Latest methods implementation
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error {
var connection orm.DB
if tx != nil {
connection = tx
} else {
connection = impl.dbConnection
}
err := connection.Insert(model)
if err != nil {
impl.logger.Errorw("error in saving ci workflow status latest", "err", err, "model", model)
return err
}
return nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error) {
if len(pipelineIds) == 0 {
return []*CiWorkflowStatusLatest{}, nil
}

var models []*CiWorkflowStatusLatest
err := impl.dbConnection.Model(&models).
Where("pipeline_id IN (?)", pg.In(pipelineIds)).
Select()
if err != nil {
impl.logger.Errorw("error in getting ci workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds)
return nil, err
}
return models, nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error {
var connection orm.DB
if tx != nil {
connection = tx
} else {
connection = impl.dbConnection
}
_, err := connection.Model(model).WherePK().Update()
if err != nil {
impl.logger.Errorw("error in updating ci workflow status latest", "err", err, "model", model)
return err
}
return nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error) {
var model CiWorkflowStatusLatest
err := impl.dbConnection.Model(&model).
Where("pipeline_id = ?", pipelineId).
Select()
if err != nil {
impl.logger.Errorw("error in getting ci workflow status latest by pipeline id", "err", err, "pipelineId", pipelineId)
return nil, err
}
return &model, nil
}

// CD Workflow Status Latest methods implementation
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
var connection orm.DB
if tx != nil {
connection = tx
} else {
connection = impl.dbConnection
}
err := connection.Insert(model)
if err != nil {
impl.logger.Errorw("error in saving cd workflow status latest", "err", err, "model", model)
return err
}
return nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error) {
var models []*CdWorkflowStatusLatest
err := impl.dbConnection.Model(&models).
Where("pipeline_id IN (?)", pg.In(pipelineIds)).
Select()
if err != nil {
impl.logger.Errorw("error in getting cd workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds)
return nil, err
}
return models, nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
var connection orm.DB
if tx != nil {
connection = tx
} else {
connection = impl.dbConnection
}
_, err := connection.Model(model).WherePK().Update()
if err != nil {
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "model", model)
return err
}
return nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) {
var connection orm.DB
if tx != nil {
connection = tx
} else {
connection = impl.dbConnection
}

var model CdWorkflowStatusLatest
err := connection.Model(&model).
Where("pipeline_id = ? AND workflow_type = ?", pipelineId, workflowType).
Select()
if err != nil {
impl.logger.Errorw("error in getting cd workflow status latest by pipeline id and workflow type", "err", err, "pipelineId", pipelineId, "workflowType", workflowType)
return nil, err
}
return &model, nil
}
10 changes: 7 additions & 3 deletions pkg/deployment/trigger/devtronApps/HandlerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"bufio"
"context"
"github.com/devtron-labs/common-lib/async"
service2 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service"
"github.com/devtron-labs/devtron/client/fluxcd"
service2 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service"
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
"os"
"time"

Expand Down Expand Up @@ -174,6 +175,7 @@ type HandlerServiceImpl struct {
asyncRunnable *async.Runnable
workflowTriggerAuditService service2.WorkflowTriggerAuditService
fluxCdDeploymentService fluxcd.DeploymentService
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService
}

func NewHandlerServiceImpl(logger *zap.SugaredLogger,
Expand Down Expand Up @@ -238,7 +240,8 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
deploymentEventHandler app.DeploymentEventHandler,
asyncRunnable *async.Runnable,
workflowTriggerAuditService service2.WorkflowTriggerAuditService,
fluxCdDeploymentService fluxcd.DeploymentService) (*HandlerServiceImpl, error) {
fluxCdDeploymentService fluxcd.DeploymentService,
workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService) (*HandlerServiceImpl, error) {
impl := &HandlerServiceImpl{
logger: logger,
cdWorkflowCommonService: cdWorkflowCommonService,
Expand Down Expand Up @@ -307,7 +310,8 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger,
deploymentEventHandler: deploymentEventHandler,
asyncRunnable: asyncRunnable,
workflowTriggerAuditService: workflowTriggerAuditService,
fluxCdDeploymentService: fluxCdDeploymentService,
fluxCdDeploymentService: fluxCdDeploymentService,
workflowStatusLatestService: workflowStatusLatestService,
}
config, err := types.GetCdConfig()
if err != nil {
Expand Down
63 changes: 60 additions & 3 deletions pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,20 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte
}
cdWorkflowId = cdWf.Id
}

tx, err := impl.cdWorkflowRepository.GetConnection().Begin()
if err != nil {
impl.logger.Errorw("error in getting connection for cdWorkflowRepository, ManualCdTrigger", "err", err)
return 0, "", nil, err
}
isRollbackNeeded := true
defer func() {
if isRollbackNeeded {
err = tx.Rollback()
if err != nil {
impl.logger.Errorw("error in rolling back transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
}
}
}()
runner := &pipelineConfig.CdWorkflowRunner{
Name: cdPipeline.Name,
WorkflowType: bean3.CD_WORKFLOW_TYPE_DEPLOY,
Expand All @@ -282,11 +295,26 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte
AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: overrideRequest.UserId, UpdatedOn: triggeredAt, UpdatedBy: overrideRequest.UserId},
ReferenceId: triggerContext.ReferenceId,
}
err = impl.cdWorkflowRunnerService.SaveWfr(nil, runner)

err = impl.cdWorkflowRunnerService.SaveWfr(tx, runner)
if err != nil {
impl.logger.Errorw("err in creating cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
return 0, "", nil, err
}

err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, overrideRequest.PipelineId, overrideRequest.AppId, overrideRequest.EnvId, runner.Id, runner.WorkflowType.String(), overrideRequest.UserId)
if err != nil {
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", overrideRequest.WfrId, "err", err)
return 0, "", nil, err
}

isRollbackNeeded = false
err = tx.Commit()
if err != nil {
impl.logger.Errorw("error in committing transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err)
return 0, "", nil, err
}

runner.CdWorkflow = &pipelineConfig.CdWorkflow{
Pipeline: cdPipeline,
}
Expand Down Expand Up @@ -407,6 +435,22 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge
}
}

tx, err := impl.cdWorkflowRepository.GetConnection().Begin()
if err != nil {
impl.logger.Errorw("error in getting connection for cdWorkflowRepository, ManualCdTrigger", "err", err)
return err
}

isRollbackNeeded := true
defer func() {
if isRollbackNeeded {
err = tx.Rollback()
if err != nil {
impl.logger.Errorw("error in rolling back transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWf.Id, "err", err)
}
}
}()

runner := &pipelineConfig.CdWorkflowRunner{
Name: pipeline.Name,
WorkflowType: bean3.CD_WORKFLOW_TYPE_DEPLOY,
Expand All @@ -419,10 +463,23 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge
AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: triggeredBy, UpdatedOn: triggeredAt, UpdatedBy: triggeredBy},
ReferenceId: request.TriggerContext.ReferenceId,
}
err := impl.cdWorkflowRunnerService.SaveWfr(nil, runner)
err = impl.cdWorkflowRunnerService.SaveWfr(tx, runner)
if err != nil {
return err
}

err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, pipeline.Id, pipeline.AppId, pipeline.EnvironmentId, runner.Id, runner.WorkflowType.String(), request.TriggeredBy)
if err != nil {
impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", runner.Id, "err", err)
return err
}

err = tx.Commit()
if err != nil {
impl.logger.Errorw("error in committing transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWf.Id, "err", err)
return err
}
isRollbackNeeded = false
runner.CdWorkflow = &pipelineConfig.CdWorkflow{
Pipeline: pipeline,
}
Expand Down
Loading
Loading