Skip to content

feat: optimize ci cd workflow #6744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
87521d5
Add workflow status latest service with CI/CD status optimization
prakash100198 Jul 21, 2025
b97e7e4
Decouple workflow status management by deriving `status` from respect…
prakash100198 Jul 21, 2025
bc7e274
Refactor `WorkflowStatusLatestService` to derive `status` from workfl…
prakash100198 Jul 21, 2025
604baa8
Remove `WorkflowStatusLatestService` usage and fallback logic from `C…
prakash100198 Jul 21, 2025
17c1dae
Refactor workflow status handling: rename `WorkflowStatusLatestServic…
prakash100198 Jul 21, 2025
cb6ce49
Integrate optimized CI workflow fetching in `CiHandlerImpl` using `Fi…
prakash100198 Jul 21, 2025
96815d3
Add `StorageConfigured` field to `CiWorkflowStatusLatest` and update …
prakash100198 Jul 22, 2025
4a0bc3a
wip: adding code for cd
iamayushm Jul 22, 2025
fc4c6aa
Implement hybrid CI workflow status fetching in `CiHandlerImpl` using…
prakash100198 Jul 22, 2025
7f327f2
wip
iamayushm Jul 23, 2025
19f6886
Rename `GetCachedPipelineIds` to `GetByPipelineIds` in `WorkflowStatu…
prakash100198 Jul 23, 2025
01b8bc6
Refactor CI status fetching in `CiHandlerImpl` by renaming methods fo…
prakash100198 Jul 23, 2025
3bc8e3d
Refactor method names and variable references in `CiHandlerImpl` for …
prakash100198 Jul 23, 2025
02fd2b4
Remove unused `GetByPipelineIds` method from `WorkflowStatusLatestRep…
prakash100198 Jul 23, 2025
b3e96fc
Refactor `WorkflowStatusLatestService` package imports and CI status …
prakash100198 Jul 23, 2025
083fd43
Merge remote-tracking branch 'origin/optimize-ci-cd-workflow' into cd…
iamayushm Jul 23, 2025
03f78d4
circular import fix
iamayushm Jul 23, 2025
677f569
Merge pull request #6748 from devtron-labs/cd-get-optimised
iamayushm Jul 23, 2025
48468ce
refactoring
iamayushm Jul 23, 2025
8cd66fe
Merge remote-tracking branch 'origin/optimize-ci-cd-workflow' into cd…
iamayushm Jul 23, 2025
61a1c74
opt (#6747)
prkhrkat Jul 23, 2025
1368559
removed extra code
prkhrkat Jul 23, 2025
20dc988
dev testing fixes
iamayushm Jul 23, 2025
b6156f0
dev testing changes
iamayushm Jul 23, 2025
4443e57
opt
iamayushm Jul 23, 2025
714107a
Remove redundant workflow status update logic for CI/CD workflows and…
prakash100198 Jul 24, 2025
ca578cf
changing slow query
iamayushm Jul 24, 2025
df5fb33
Remove `WorkflowStatusUpdateService` and update references to streaml…
prakash100198 Jul 24, 2025
58895c6
Merge remote-tracking branch 'origin/optimize-ci-cd-workflow' into op…
prakash100198 Jul 24, 2025
6ecb09b
Remove legacy CI/CD workflow status fetching methods and streamline i…
prakash100198 Jul 24, 2025
aae32f6
Remove unused methods from `WorkflowStatusLatestRepository` to stream…
prakash100198 Jul 24, 2025
ee5672e
Update unique constraints in workflow status tables to use `ci_workfl…
prakash100198 Jul 24, 2025
b86f98f
Update `WorkflowStatusLatestService` to handle both create and update…
prakash100198 Jul 24, 2025
197a9dc
fix: refactored unit conversion for cluster details (#6768)
SATYAsasini Jul 28, 2025
35f4e38
develop merge
iamayushm Jul 29, 2025
797bc2c
develop merge
iamayushm Jul 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/sql/repository/pipelineConfig/CiWorkflowRepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type CiWorkflowRepository interface {
FindByName(name string) (*CiWorkflow, error)

FindLastTriggeredWorkflowByCiIds(pipelineId []int) (ciWorkflow []*CiWorkflow, err error)
FindLastTriggeredWorkflowByCiIdsOptimized(pipelineId []int) (ciWorkflow []*CiWorkflow, err error)
FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error)
FindAllLastTriggeredWorkflowByArtifactId(ciArtifactId []int) (ciWorkflow []*CiWorkflow, err error)
FindAllTriggeredWorkflowCountInLast24Hour() (ciWorkflowCount int, err error)
Expand Down Expand Up @@ -290,6 +291,23 @@ func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByCiIds(pipelineI
return ciWorkflow, err
}

// FindLastTriggeredWorkflowByCiIdsOptimized uses the ci_workflow_status_latest table for better performance
func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByCiIdsOptimized(pipelineId []int) (ciWorkflow []*CiWorkflow, err error) {
err = impl.dbConnection.Model(&ciWorkflow).
Column("ci_workflow.*", "CiPipeline").
Join("INNER JOIN ci_workflow_status_latest cwsl ON cwsl.ci_workflow_id = ci_workflow.id").
Where("cwsl.pipeline_id IN (?)", pg.In(pipelineId)).
Select()

if err != nil {
impl.logger.Errorw("error in optimized query, falling back to old method", "err", err, "pipelineIds", pipelineId)
// Fallback to the old method if optimized query fails
return impl.FindLastTriggeredWorkflowByCiIds(pipelineId)
}

return ciWorkflow, err
}

func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error) {
workflow := &CiWorkflow{}
err = impl.dbConnection.Model(workflow).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (c) 2024. Devtron Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pipelineConfig

import (
"github.com/devtron-labs/devtron/pkg/sql"
"github.com/go-pg/pg"
"go.uber.org/zap"
)

type WorkflowStatusLatestRepository interface {
// CI Workflow Status Latest methods
SaveCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error
UpdateCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error
GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error)
GetCiWorkflowStatusLatestByAppId(appId int) ([]*CiWorkflowStatusLatest, error)
DeleteCiWorkflowStatusLatestByPipelineId(pipelineId int) error

// CD Workflow Status Latest methods
SaveCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error
UpdateCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error
GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error)
GetCdWorkflowStatusLatestByAppId(appId int) ([]*CdWorkflowStatusLatest, error)
GetCdWorkflowStatusLatestByPipelineId(pipelineId int) ([]*CdWorkflowStatusLatest, error)
DeleteCdWorkflowStatusLatestByPipelineId(pipelineId int) error
}

type WorkflowStatusLatestRepositoryImpl struct {
dbConnection *pg.DB
logger *zap.SugaredLogger
}

func NewWorkflowStatusLatestRepositoryImpl(dbConnection *pg.DB, logger *zap.SugaredLogger) *WorkflowStatusLatestRepositoryImpl {
return &WorkflowStatusLatestRepositoryImpl{
dbConnection: dbConnection,
logger: logger,
}
}

// CI Workflow Status Latest model
type CiWorkflowStatusLatest struct {
tableName struct{} `sql:"ci_workflow_status_latest" pg:",discard_unknown_columns"`
Id int `sql:"id,pk"`
PipelineId int `sql:"pipeline_id"`
AppId int `sql:"app_id"`
CiWorkflowId int `sql:"ci_workflow_id"`
sql.AuditLog
}

// CD Workflow Status Latest model
type CdWorkflowStatusLatest struct {
tableName struct{} `sql:"cd_workflow_status_latest" pg:",discard_unknown_columns"`
Id int `sql:"id,pk"`
PipelineId int `sql:"pipeline_id"`
AppId int `sql:"app_id"`
EnvironmentId int `sql:"environment_id"`
WorkflowType string `sql:"workflow_type"`
WorkflowRunnerId int `sql:"workflow_runner_id"`
sql.AuditLog
}

// CI Workflow Status Latest methods implementation
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error {
err := impl.dbConnection.Insert(model)
if err != nil {
impl.logger.Errorw("error in saving ci workflow status latest", "err", err, "model", model)
return err
}
return nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error {
_, err := impl.dbConnection.Model(model).WherePK().UpdateNotNull()
if err != nil {
impl.logger.Errorw("error in updating ci workflow status latest", "err", err, "model", model)
return err
}
return nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error) {
model := &CiWorkflowStatusLatest{}
err := impl.dbConnection.Model(model).
Where("pipeline_id = ?", pipelineId).
Select()
if err != nil {
impl.logger.Errorw("error in getting ci workflow status latest by pipeline id", "err", err, "pipelineId", pipelineId)
return nil, err
}
return model, nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByAppId(appId int) ([]*CiWorkflowStatusLatest, error) {
var models []*CiWorkflowStatusLatest
err := impl.dbConnection.Model(&models).
Where("app_id = ?", appId).
Select()
if err != nil {
impl.logger.Errorw("error in getting ci workflow status latest by app id", "err", err, "appId", appId)
return nil, err
}
return models, nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) DeleteCiWorkflowStatusLatestByPipelineId(pipelineId int) error {
_, err := impl.dbConnection.Model(&CiWorkflowStatusLatest{}).
Where("pipeline_id = ?", pipelineId).
Delete()
if err != nil {
impl.logger.Errorw("error in deleting ci workflow status latest by pipeline id", "err", err, "pipelineId", pipelineId)
return err
}
return nil
}

// CD Workflow Status Latest methods implementation
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error {
err := impl.dbConnection.Insert(model)
if err != nil {
impl.logger.Errorw("error in saving cd workflow status latest", "err", err, "model", model)
return err
}
return nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error {
_, err := impl.dbConnection.Model(model).WherePK().UpdateNotNull()
if err != nil {
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "model", model)
return err
}
return nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) {
model := &CdWorkflowStatusLatest{}
err := impl.dbConnection.Model(model).
Where("pipeline_id = ?", pipelineId).
Where("workflow_type = ?", workflowType).
Select()
if err != nil {
impl.logger.Errorw("error in getting cd workflow status latest by pipeline id and workflow type", "err", err, "pipelineId", pipelineId, "workflowType", workflowType)
return nil, err
}
return model, nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByAppId(appId int) ([]*CdWorkflowStatusLatest, error) {
var models []*CdWorkflowStatusLatest
err := impl.dbConnection.Model(&models).
Where("app_id = ?", appId).
Select()
if err != nil {
impl.logger.Errorw("error in getting cd workflow status latest by app id", "err", err, "appId", appId)
return nil, err
}
return models, nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineId(pipelineId int) ([]*CdWorkflowStatusLatest, error) {
var models []*CdWorkflowStatusLatest
err := impl.dbConnection.Model(&models).
Where("pipeline_id = ?", pipelineId).
Select()
if err != nil {
impl.logger.Errorw("error in getting cd workflow status latest by pipeline id", "err", err, "pipelineId", pipelineId)
return nil, err
}
return models, nil
}

func (impl *WorkflowStatusLatestRepositoryImpl) DeleteCdWorkflowStatusLatestByPipelineId(pipelineId int) error {
_, err := impl.dbConnection.Model(&CdWorkflowStatusLatest{}).
Where("pipeline_id = ?", pipelineId).
Delete()
if err != nil {
impl.logger.Errorw("error in deleting cd workflow status latest by pipeline id", "err", err, "pipelineId", pipelineId)
return err
}
return nil
}
21 changes: 15 additions & 6 deletions pkg/pipeline/CiHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
eventProcessorBean "github.com/devtron-labs/devtron/pkg/eventProcessor/bean"
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
"github.com/devtron-labs/devtron/pkg/workflow/status/workflowStatusLatest"
"regexp"
"slices"
"strconv"
Expand Down Expand Up @@ -97,13 +98,15 @@ type CiHandlerImpl struct {
config *types.CiConfig
k8sCommonService k8sPkg.K8sCommonService
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
}

func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository, gitSensorClient gitSensor.Client, ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
ciArtifactRepository repository.CiArtifactRepository, userService user.UserService, eventClient client.EventClient, eventFactory client.EventFactory, ciPipelineRepository pipelineConfig.CiPipelineRepository,
appListingRepository repository.AppListingRepository, cdPipelineRepository pipelineConfig.PipelineRepository, enforcerUtil rbac.EnforcerUtil, resourceGroupService resourceGroup.ResourceGroupService, envRepository repository2.EnvironmentRepository,
imageTaggingService imageTagging.ImageTaggingService, k8sCommonService k8sPkg.K8sCommonService, appWorkflowRepository appWorkflow.AppWorkflowRepository, customTagService CustomTagService,
workFlowStageStatusService workflowStatus.WorkFlowStageStatusService,
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService,
) *CiHandlerImpl {
cih := &CiHandlerImpl{
Logger: Logger,
Expand All @@ -126,6 +129,7 @@ func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipeline
appWorkflowRepository: appWorkflowRepository,
k8sCommonService: k8sCommonService,
workFlowStageStatusService: workFlowStageStatusService,
workflowStatusUpdateService: workflowStatusUpdateService,
}
config, err := types.GetCiConfig()
if err != nil {
Expand Down Expand Up @@ -644,10 +648,15 @@ func (impl *CiHandlerImpl) stateChanged(status string, podStatus string, msg str
}

func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewV1(appId int) ([]*pipelineConfig.CiWorkflowStatus, error) {
ciWorkflowStatuses, err := impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId)
if err != nil && !util.IsErrNoRows(err) {
impl.Logger.Errorw("err in fetching ciWorkflowStatuses from ciWorkflowRepository", "appId", appId, "err", err)
return ciWorkflowStatuses, err
ciWorkflowStatuses, err := impl.workflowStatusUpdateService.FetchCiStatusForTriggerViewOptimized(appId)
if err != nil {
impl.Logger.Errorw("error in fetching ci status from optimized service, falling back to old method", "appId", appId, "err", err)
// Fallback to old method if optimized service fails
ciWorkflowStatuses, err = impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId)
if err != nil && !util.IsErrNoRows(err) {
impl.Logger.Errorw("err in fetching ciWorkflowStatuses from ciWorkflowRepository", "appId", appId, "err", err)
return ciWorkflowStatuses, err
}
}

return ciWorkflowStatuses, err
Expand Down Expand Up @@ -861,9 +870,9 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewForEnvironment(request res
if len(ciPipelineIds) == 0 {
return ciWorkflowStatuses, nil
}
latestCiWorkflows, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(ciPipelineIds)
latestCiWorkflows, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIdsOptimized(ciPipelineIds)
if err != nil && !util.IsErrNoRows(err) {
impl.Logger.Errorw("err", "ciPipelineIds", ciPipelineIds, "err", err)
impl.Logger.Errorw("err in optimized ci workflow fetch", "ciPipelineIds", ciPipelineIds, "err", err)
return ciWorkflowStatuses, err
}

Expand Down
Loading
Loading