diff --git a/go.sum b/go.sum index adfcbe0b44..038454d052 100644 --- a/go.sum +++ b/go.sum @@ -469,6 +469,7 @@ github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0= github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= +github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= diff --git a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go index 411662fcc6..ef43684d21 100644 --- a/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CiWorkflowRepository.go @@ -40,6 +40,7 @@ type CiWorkflowRepository interface { FindByName(name string) (*CiWorkflow, error) FindLastTriggeredWorkflowByCiIds(pipelineId []int) (ciWorkflow []*CiWorkflow, err error) + FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error) FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error) FindAllLastTriggeredWorkflowByArtifactId(ciArtifactId []int) (ciWorkflow []*CiWorkflow, err error) FindAllTriggeredWorkflowCountInLast24Hour() (ciWorkflowCount int, err error) @@ -48,6 +49,7 @@ type CiWorkflowRepository interface { ExistsByStatus(status string) (bool, error) FindBuildTypeAndStatusDataOfLast1Day() ([]*BuildTypeCount, error) FIndCiWorkflowStatusesByAppId(appId int) ([]*CiWorkflowStatus, error) + FindCiPipelineIdsByAppId(appId int) ([]int, error) MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool) MigrateCiArtifactLocation(wfId int, artifactLocation string) @@ -290,6 +292,19 @@ func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByCiIds(pipelineI return ciWorkflow, err } +// FindWorkflowsByCiWorkflowIds fetches workflows by their workflow IDs (simple query) +func (impl *CiWorkflowRepositoryImpl) FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error) { + if len(ciWorkflowIds) == 0 { + return ciWorkflow, nil + } + + err = impl.dbConnection.Model(&ciWorkflow). + Column("ci_workflow.*", "CiPipeline"). + Where("ci_workflow.id IN (?)", pg.In(ciWorkflowIds)). + Select() + return ciWorkflow, err +} + func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error) { workflow := &CiWorkflow{} err = impl.dbConnection.Model(workflow). @@ -379,6 +394,16 @@ func (impl *CiWorkflowRepositoryImpl) FIndCiWorkflowStatusesByAppId(appId int) ( return ciworkflowStatuses, err } +// FindCiPipelineIdsByAppId gets all CI pipeline IDs for an app (simple query) +func (impl *CiWorkflowRepositoryImpl) FindCiPipelineIdsByAppId(appId int) ([]int, error) { + var ciPipelineIds []int + err := impl.dbConnection.Model((*CiPipeline)(nil)). + Column("id"). + Where("app_id = ? AND deleted = false", appId). + Select(&ciPipelineIds) + return ciPipelineIds, err +} + func (impl *CiWorkflowRepositoryImpl) MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool) { _, err := impl.dbConnection.Model((*CiWorkflow)(nil)). Set("is_artifact_uploaded = ?", workflow.GetArtifactUploadedType(isArtifactUploaded)). diff --git a/internal/sql/repository/pipelineConfig/WorkflowStatusLatestRepository.go b/internal/sql/repository/pipelineConfig/WorkflowStatusLatestRepository.go new file mode 100644 index 0000000000..e9c78e0866 --- /dev/null +++ b/internal/sql/repository/pipelineConfig/WorkflowStatusLatestRepository.go @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2024. Devtron Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pipelineConfig + +import ( + "github.com/devtron-labs/devtron/pkg/sql" + "github.com/go-pg/pg" + "github.com/go-pg/pg/orm" + "go.uber.org/zap" +) + +type WorkflowStatusLatestRepository interface { + // CI Workflow Status Latest methods + SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error + UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error + GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error) + GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error) + + // CD Workflow Status Latest methods + SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error + UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error + GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) + GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error) +} + +type WorkflowStatusLatestRepositoryImpl struct { + dbConnection *pg.DB + logger *zap.SugaredLogger +} + +func NewWorkflowStatusLatestRepositoryImpl(dbConnection *pg.DB, logger *zap.SugaredLogger) *WorkflowStatusLatestRepositoryImpl { + return &WorkflowStatusLatestRepositoryImpl{ + dbConnection: dbConnection, + logger: logger, + } +} + +// CI Workflow Status Latest model +type CiWorkflowStatusLatest struct { + tableName struct{} `sql:"ci_workflow_status_latest" pg:",discard_unknown_columns"` + Id int `sql:"id,pk"` + PipelineId int `sql:"pipeline_id"` + AppId int `sql:"app_id"` + CiWorkflowId int `sql:"ci_workflow_id"` + sql.AuditLog +} + +// CD Workflow Status Latest model +type CdWorkflowStatusLatest struct { + tableName struct{} `sql:"cd_workflow_status_latest" pg:",discard_unknown_columns"` + Id int `sql:"id,pk"` + PipelineId int `sql:"pipeline_id"` + AppId int `sql:"app_id"` + EnvironmentId int `sql:"environment_id"` + WorkflowType string `sql:"workflow_type"` + WorkflowRunnerId int `sql:"workflow_runner_id"` + sql.AuditLog +} + +// CI Workflow Status Latest methods implementation +func (impl *WorkflowStatusLatestRepositoryImpl) SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error { + var connection orm.DB + if tx != nil { + connection = tx + } else { + connection = impl.dbConnection + } + err := connection.Insert(model) + if err != nil { + impl.logger.Errorw("error in saving ci workflow status latest", "err", err, "model", model) + return err + } + return nil +} + +func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error) { + if len(pipelineIds) == 0 { + return []*CiWorkflowStatusLatest{}, nil + } + + var models []*CiWorkflowStatusLatest + err := impl.dbConnection.Model(&models). + Where("pipeline_id IN (?)", pg.In(pipelineIds)). + Select() + if err != nil { + impl.logger.Errorw("error in getting ci workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds) + return nil, err + } + return models, nil +} + +func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error { + var connection orm.DB + if tx != nil { + connection = tx + } else { + connection = impl.dbConnection + } + _, err := connection.Model(model).WherePK().Update() + if err != nil { + impl.logger.Errorw("error in updating ci workflow status latest", "err", err, "model", model) + return err + } + return nil +} + +func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error) { + var model CiWorkflowStatusLatest + err := impl.dbConnection.Model(&model). + Where("pipeline_id = ?", pipelineId). + Select() + if err != nil { + impl.logger.Errorw("error in getting ci workflow status latest by pipeline id", "err", err, "pipelineId", pipelineId) + return nil, err + } + return &model, nil +} + +// CD Workflow Status Latest methods implementation +func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error { + var connection orm.DB + if tx != nil { + connection = tx + } else { + connection = impl.dbConnection + } + err := connection.Insert(model) + if err != nil { + impl.logger.Errorw("error in saving cd workflow status latest", "err", err, "model", model) + return err + } + return nil +} + +func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error) { + var models []*CdWorkflowStatusLatest + err := impl.dbConnection.Model(&models). + Where("pipeline_id IN (?)", pg.In(pipelineIds)). + Select() + if err != nil { + impl.logger.Errorw("error in getting cd workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds) + return nil, err + } + return models, nil +} + +func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error { + var connection orm.DB + if tx != nil { + connection = tx + } else { + connection = impl.dbConnection + } + _, err := connection.Model(model).WherePK().Update() + if err != nil { + impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "model", model) + return err + } + return nil +} + +func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) { + var connection orm.DB + if tx != nil { + connection = tx + } else { + connection = impl.dbConnection + } + + var model CdWorkflowStatusLatest + err := connection.Model(&model). + Where("pipeline_id = ? AND workflow_type = ?", pipelineId, workflowType). + Select() + if err != nil { + impl.logger.Errorw("error in getting cd workflow status latest by pipeline id and workflow type", "err", err, "pipelineId", pipelineId, "workflowType", workflowType) + return nil, err + } + return &model, nil +} diff --git a/pkg/deployment/trigger/devtronApps/HandlerService.go b/pkg/deployment/trigger/devtronApps/HandlerService.go index 28ff4ca715..7a50014808 100644 --- a/pkg/deployment/trigger/devtronApps/HandlerService.go +++ b/pkg/deployment/trigger/devtronApps/HandlerService.go @@ -20,8 +20,9 @@ import ( "bufio" "context" "github.com/devtron-labs/common-lib/async" - service2 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service" "github.com/devtron-labs/devtron/client/fluxcd" + service2 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service" + "github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest" "os" "time" @@ -174,6 +175,7 @@ type HandlerServiceImpl struct { asyncRunnable *async.Runnable workflowTriggerAuditService service2.WorkflowTriggerAuditService fluxCdDeploymentService fluxcd.DeploymentService + workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService } func NewHandlerServiceImpl(logger *zap.SugaredLogger, @@ -238,7 +240,8 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger, deploymentEventHandler app.DeploymentEventHandler, asyncRunnable *async.Runnable, workflowTriggerAuditService service2.WorkflowTriggerAuditService, - fluxCdDeploymentService fluxcd.DeploymentService) (*HandlerServiceImpl, error) { + fluxCdDeploymentService fluxcd.DeploymentService, + workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService) (*HandlerServiceImpl, error) { impl := &HandlerServiceImpl{ logger: logger, cdWorkflowCommonService: cdWorkflowCommonService, @@ -307,7 +310,8 @@ func NewHandlerServiceImpl(logger *zap.SugaredLogger, deploymentEventHandler: deploymentEventHandler, asyncRunnable: asyncRunnable, workflowTriggerAuditService: workflowTriggerAuditService, - fluxCdDeploymentService: fluxCdDeploymentService, + fluxCdDeploymentService: fluxCdDeploymentService, + workflowStatusLatestService: workflowStatusLatestService, } config, err := types.GetCdConfig() if err != nil { diff --git a/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go b/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go index 0b082f38c8..40f7e374b9 100644 --- a/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go +++ b/pkg/deployment/trigger/devtronApps/deployStageHandlerCode.go @@ -269,7 +269,20 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte } cdWorkflowId = cdWf.Id } - + tx, err := impl.cdWorkflowRepository.GetConnection().Begin() + if err != nil { + impl.logger.Errorw("error in getting connection for cdWorkflowRepository, ManualCdTrigger", "err", err) + return 0, "", nil, err + } + isRollbackNeeded := true + defer func() { + if isRollbackNeeded { + err = tx.Rollback() + if err != nil { + impl.logger.Errorw("error in rolling back transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err) + } + } + }() runner := &pipelineConfig.CdWorkflowRunner{ Name: cdPipeline.Name, WorkflowType: bean3.CD_WORKFLOW_TYPE_DEPLOY, @@ -282,11 +295,26 @@ func (impl *HandlerServiceImpl) ManualCdTrigger(triggerContext bean.TriggerConte AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: overrideRequest.UserId, UpdatedOn: triggeredAt, UpdatedBy: overrideRequest.UserId}, ReferenceId: triggerContext.ReferenceId, } - err = impl.cdWorkflowRunnerService.SaveWfr(nil, runner) + + err = impl.cdWorkflowRunnerService.SaveWfr(tx, runner) if err != nil { impl.logger.Errorw("err in creating cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err) return 0, "", nil, err } + + err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, overrideRequest.PipelineId, overrideRequest.AppId, overrideRequest.EnvId, runner.Id, runner.WorkflowType.String(), overrideRequest.UserId) + if err != nil { + impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", overrideRequest.WfrId, "err", err) + return 0, "", nil, err + } + + isRollbackNeeded = false + err = tx.Commit() + if err != nil { + impl.logger.Errorw("error in committing transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWorkflowId, "err", err) + return 0, "", nil, err + } + runner.CdWorkflow = &pipelineConfig.CdWorkflow{ Pipeline: cdPipeline, } @@ -407,6 +435,22 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge } } + tx, err := impl.cdWorkflowRepository.GetConnection().Begin() + if err != nil { + impl.logger.Errorw("error in getting connection for cdWorkflowRepository, ManualCdTrigger", "err", err) + return err + } + + isRollbackNeeded := true + defer func() { + if isRollbackNeeded { + err = tx.Rollback() + if err != nil { + impl.logger.Errorw("error in rolling back transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWf.Id, "err", err) + } + } + }() + runner := &pipelineConfig.CdWorkflowRunner{ Name: pipeline.Name, WorkflowType: bean3.CD_WORKFLOW_TYPE_DEPLOY, @@ -419,10 +463,23 @@ func (impl *HandlerServiceImpl) TriggerAutomaticDeployment(request bean.CdTrigge AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: triggeredBy, UpdatedOn: triggeredAt, UpdatedBy: triggeredBy}, ReferenceId: request.TriggerContext.ReferenceId, } - err := impl.cdWorkflowRunnerService.SaveWfr(nil, runner) + err = impl.cdWorkflowRunnerService.SaveWfr(tx, runner) + if err != nil { + return err + } + + err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, pipeline.Id, pipeline.AppId, pipeline.EnvironmentId, runner.Id, runner.WorkflowType.String(), request.TriggeredBy) + if err != nil { + impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", runner.Id, "err", err) + return err + } + + err = tx.Commit() if err != nil { + impl.logger.Errorw("error in committing transaction for cdWorkflowRunner, ManualCdTrigger", "cdWorkflowId", cdWf.Id, "err", err) return err } + isRollbackNeeded = false runner.CdWorkflow = &pipelineConfig.CdWorkflow{ Pipeline: pipeline, } diff --git a/pkg/deployment/trigger/devtronApps/preStageHandlerCode.go b/pkg/deployment/trigger/devtronApps/preStageHandlerCode.go index ae8594cdc0..e91f9e67b9 100644 --- a/pkg/deployment/trigger/devtronApps/preStageHandlerCode.go +++ b/pkg/deployment/trigger/devtronApps/preStageHandlerCode.go @@ -312,7 +312,7 @@ func (impl *HandlerServiceImpl) createStartingWfAndRunner(request bean.CdTrigger ReferenceId: request.TriggerContext.ReferenceId, } _, span := otel.Tracer("orchestrator").Start(ctx, "cdWorkflowRepository.SaveWorkFlowRunner") - _, err = impl.cdWorkflowRunnerService.SaveCDWorkflowRunnerWithStage(runner) + _, err = impl.cdWorkflowRunnerService.SaveCDWorkflowRunnerWithStage(runner, cdWf, pipeline) span.End() if err != nil { return nil, nil, err diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index c0080b0dd0..ec4089c2eb 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -28,9 +28,12 @@ import ( repository3 "github.com/devtron-labs/devtron/pkg/cluster/environment/repository" common2 "github.com/devtron-labs/devtron/pkg/deployment/common" eventProcessorBean "github.com/devtron-labs/devtron/pkg/eventProcessor/bean" + repository2 "github.com/devtron-labs/devtron/pkg/pipeline/repository" "github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus" bean5 "github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus/bean" "github.com/devtron-labs/devtron/pkg/workflow/cd" + "github.com/devtron-labs/devtron/pkg/workflow/cd/read" + "github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest" "slices" "strconv" "strings" @@ -89,6 +92,9 @@ type CdHandlerImpl struct { deploymentConfigService common2.DeploymentConfigService workflowStageStatusService workflowStatus.WorkFlowStageStatusService cdWorkflowRunnerService cd.CdWorkflowRunnerService + WorkflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService + pipelineStageRepository repository2.PipelineStageRepository + cdWorkflowRunnerReadService read.CdWorkflowRunnerReadService } func NewCdHandlerImpl(Logger *zap.SugaredLogger, userService user.UserService, @@ -103,6 +109,9 @@ func NewCdHandlerImpl(Logger *zap.SugaredLogger, userService user.UserService, deploymentConfigService common2.DeploymentConfigService, workflowStageStatusService workflowStatus.WorkFlowStageStatusService, cdWorkflowRunnerService cd.CdWorkflowRunnerService, + WorkflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService, + pipelineStageRepository repository2.PipelineStageRepository, + cdWorkflowRunnerReadService read.CdWorkflowRunnerReadService, ) *CdHandlerImpl { cdh := &CdHandlerImpl{ Logger: Logger, @@ -121,6 +130,9 @@ func NewCdHandlerImpl(Logger *zap.SugaredLogger, userService user.UserService, deploymentConfigService: deploymentConfigService, workflowStageStatusService: workflowStageStatusService, cdWorkflowRunnerService: cdWorkflowRunnerService, + WorkflowStatusLatestService: WorkflowStatusLatestService, + pipelineStageRepository: pipelineStageRepository, + cdWorkflowRunnerReadService: cdWorkflowRunnerReadService, } config, err := types.GetCdConfig() if err != nil { @@ -594,16 +606,18 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerView(appId int) ([]*p return cdWorkflowStatus, nil } - cdMap := make(map[int]*pipelineConfig.CdWorkflowStatus) - result, err := impl.cdWorkflowRepository.FetchAllCdStagesLatestEntity(pipelineIds) + result, err := impl.cdWorkflowRunnerReadService.GetWfrStatusForLatestRunners(pipelineIds, pipelines) if err != nil { + impl.Logger.Errorw("error in fetching wfrIds", "pipelineIds", pipelineIds, "err", err) return cdWorkflowStatus, err } + var wfrIds []int for _, item := range result { wfrIds = append(wfrIds, item.WfrId) } + var cdMap = make(map[int]*pipelineConfig.CdWorkflowStatus) statusMap := make(map[int]string) if len(wfrIds) > 0 { wfrList, err := impl.cdWorkflowRepository.FetchAllCdStagesLatestEntityStatus(wfrIds) @@ -753,11 +767,15 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerViewForEnvironment(re if len(pipelineIds) == 0 { return cdWorkflowStatus, nil } + cdMap := make(map[int]*pipelineConfig.CdWorkflowStatus) - wfrStatus, err := impl.cdWorkflowRepository.FetchAllCdStagesLatestEntity(pipelineIds) + + wfrStatus, err := impl.cdWorkflowRunnerReadService.GetWfrStatusForLatestRunners(pipelineIds, pipelines) if err != nil { + impl.Logger.Errorw("error in fetching wfrIds", "pipelineIds", pipelineIds, "err", err) return cdWorkflowStatus, err } + var wfrIds []int for _, item := range wfrStatus { wfrIds = append(wfrIds, item.WfrId) @@ -904,7 +922,7 @@ func (impl *CdHandlerImpl) FetchAppDeploymentStatusForEnvironments(request resou return deploymentStatuses, nil } _, span = otel.Tracer("orchestrator").Start(request.Ctx, "pipelineBuilder.FetchAllCdStagesLatestEntity") - result, err := impl.cdWorkflowRepository.FetchAllCdStagesLatestEntity(pipelineIds) + result, err := impl.cdWorkflowRunnerReadService.GetWfrStatusForLatestRunners(pipelineIds, cdPipelines) span.End() if err != nil { return deploymentStatuses, err diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index e636f67d44..3bc8ab1c6f 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -27,8 +27,10 @@ import ( buildBean "github.com/devtron-labs/devtron/pkg/build/pipeline/bean" repository2 "github.com/devtron-labs/devtron/pkg/cluster/environment/repository" eventProcessorBean "github.com/devtron-labs/devtron/pkg/eventProcessor/bean" + "github.com/devtron-labs/devtron/pkg/pipeline/adapter" "github.com/devtron-labs/devtron/pkg/pipeline/constants" "github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus" + "github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest" "regexp" "slices" "strconv" @@ -97,6 +99,7 @@ type CiHandlerImpl struct { config *types.CiConfig k8sCommonService k8sPkg.K8sCommonService workFlowStageStatusService workflowStatus.WorkFlowStageStatusService + workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService } func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipelineMaterialRepository pipelineConfig.CiPipelineMaterialRepository, gitSensorClient gitSensor.Client, ciWorkflowRepository pipelineConfig.CiWorkflowRepository, @@ -104,6 +107,7 @@ func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipeline appListingRepository repository.AppListingRepository, cdPipelineRepository pipelineConfig.PipelineRepository, enforcerUtil rbac.EnforcerUtil, resourceGroupService resourceGroup.ResourceGroupService, envRepository repository2.EnvironmentRepository, imageTaggingService imageTagging.ImageTaggingService, k8sCommonService k8sPkg.K8sCommonService, appWorkflowRepository appWorkflow.AppWorkflowRepository, customTagService CustomTagService, workFlowStageStatusService workflowStatus.WorkFlowStageStatusService, + workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService, ) *CiHandlerImpl { cih := &CiHandlerImpl{ Logger: Logger, @@ -126,6 +130,7 @@ func NewCiHandlerImpl(Logger *zap.SugaredLogger, ciService CiService, ciPipeline appWorkflowRepository: appWorkflowRepository, k8sCommonService: k8sCommonService, workFlowStageStatusService: workFlowStageStatusService, + workflowStatusLatestService: workflowStatusLatestService, } config, err := types.GetCiConfig() if err != nil { @@ -597,6 +602,7 @@ func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus eventProcessorBean.CiCd impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id)) return savedWorkflow.Id, true, err } + impl.sendCIFailEvent(savedWorkflow, status, message) return savedWorkflow.Id, true, nil } @@ -644,13 +650,155 @@ func (impl *CiHandlerImpl) stateChanged(status string, podStatus string, msg str } func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewV1(appId int) ([]*pipelineConfig.CiWorkflowStatus, error) { - ciWorkflowStatuses, err := impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId) - if err != nil && !util.IsErrNoRows(err) { - impl.Logger.Errorw("err in fetching ciWorkflowStatuses from ciWorkflowRepository", "appId", appId, "err", err) - return ciWorkflowStatuses, err + allPipelineIds, err := impl.ciWorkflowRepository.FindCiPipelineIdsByAppId(appId) + if err != nil { + impl.Logger.Errorw("error in getting ci pipeline ids for app, falling back to old method", "appId", appId, "err", err) + return impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId) + } + + if len(allPipelineIds) == 0 { + return []*pipelineConfig.CiWorkflowStatus{}, nil + } + + latestStatusEntries, err := impl.workflowStatusLatestService.GetCiWorkflowStatusLatestByPipelineIds(allPipelineIds) + if err != nil { + impl.Logger.Errorw("error in checking latest status table, falling back to old method", "appId", appId, "err", err) + return impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId) + } + + var allStatuses []*pipelineConfig.CiWorkflowStatus + + if len(latestStatusEntries) > 0 { + statusesFromLatestTable, err := impl.fetchCiWorkflowStatusFromLatestEntries(latestStatusEntries) + if err != nil { + impl.Logger.Errorw("error in fetching ci workflow status from latest ci workflow entries ", "latestStatusEntries", latestStatusEntries, "err", err) + return nil, err + } else { + allStatuses = append(allStatuses, statusesFromLatestTable...) + } + } + + pipelinesNotInLatestTable := impl.getPipelineIdsNotInLatestTable(allPipelineIds, latestStatusEntries) + + if len(pipelinesNotInLatestTable) > 0 { + statusesFromOldQuery, err := impl.fetchCiStatusUsingFallbackMethod(pipelinesNotInLatestTable) + if err != nil { + impl.Logger.Errorw("error in fetching using fallback method by pipelineIds", "pipelineIds", pipelinesNotInLatestTable, "err", err) + return nil, err + } else { + allStatuses = append(allStatuses, statusesFromOldQuery...) + } + } + + return allStatuses, nil +} + +// fetchCiWorkflowStatusFromLatestEntries fetches CI status from ci_workflow_status_latest table +func (impl *CiHandlerImpl) fetchCiWorkflowStatusFromLatestEntries(latestCiWorkflowStatusEntries []*pipelineConfig.CiWorkflowStatusLatest) ([]*pipelineConfig.CiWorkflowStatus, error) { + var workflowIds []int + for _, entry := range latestCiWorkflowStatusEntries { + workflowIds = append(workflowIds, entry.CiWorkflowId) } - return ciWorkflowStatuses, err + workflows, err := impl.ciWorkflowRepository.FindWorkflowsByCiWorkflowIds(workflowIds) + if err != nil { + impl.Logger.Errorw("error in fetching ci workflows by ci workflow ids", "workflowIds", workflowIds, "err", err) + return nil, err + } + + var statuses []*pipelineConfig.CiWorkflowStatus + for _, workflow := range workflows { + status := adapter.GetCiWorkflowStatusFromCiWorkflow(workflow) + statuses = append(statuses, status) + } + + return statuses, nil +} + +// fetchCiStatusUsingFallbackMethod fetches CI status directly from ci_workflow table +func (impl *CiHandlerImpl) fetchCiStatusUsingFallbackMethod(pipelineIds []int) ([]*pipelineConfig.CiWorkflowStatus, error) { + workflows, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(pipelineIds) + if err != nil { + impl.Logger.Errorw("error in fetching ci workflows by ci ids", "pipelineIds", pipelineIds, "err", err) + return nil, err + } + + var statuses []*pipelineConfig.CiWorkflowStatus + for _, workflow := range workflows { + status := adapter.GetCiWorkflowStatusFromCiWorkflow(workflow) + statuses = append(statuses, status) + } + + return statuses, nil +} + +func (impl *CiHandlerImpl) fetchWorkflowsFromLatestTable(latestStatusEntries []*pipelineConfig.CiWorkflowStatusLatest) ([]*pipelineConfig.CiWorkflow, error) { + var workflowIds []int + for _, entry := range latestStatusEntries { + workflowIds = append(workflowIds, entry.CiWorkflowId) + } + + return impl.ciWorkflowRepository.FindWorkflowsByCiWorkflowIds(workflowIds) +} + +// fetchLastTriggeredWorkflowsHybrid implements hybrid approach for workflow fetching +// Uses latest status table for available pipelines, fallback to complex query for missing pipelines +func (impl *CiHandlerImpl) fetchLastTriggeredWorkflowsHybrid(pipelineIds []int) ([]*pipelineConfig.CiWorkflow, error) { + if len(pipelineIds) == 0 { + return []*pipelineConfig.CiWorkflow{}, nil + } + + latestStatusEntries, err := impl.workflowStatusLatestService.GetCiWorkflowStatusLatestByPipelineIds(pipelineIds) + if err != nil { + impl.Logger.Errorw("error in checking latest status table, falling back to complex query", "pipelineIds", pipelineIds, "err", err) + return impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(pipelineIds) + } + + var allWorkflows []*pipelineConfig.CiWorkflow + + if len(latestStatusEntries) > 0 { + workflowsFromLatestTable, err := impl.fetchWorkflowsFromLatestTable(latestStatusEntries) + if err != nil { + impl.Logger.Errorw("error in fetching from latest status table", "latestStatusEntries", latestStatusEntries, "err", err) + return nil, err + } else { + allWorkflows = append(allWorkflows, workflowsFromLatestTable...) + } + } + + pipelinesNotInLatestTable := impl.getPipelineIdsNotInLatestTable(pipelineIds, latestStatusEntries) + + if len(pipelinesNotInLatestTable) > 0 { + workflowsFromOldQuery, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(pipelinesNotInLatestTable) + if err != nil { + impl.Logger.Errorw("error in fetching using old query by pipeline ids", "pipelineIds", pipelinesNotInLatestTable, "err", err) + return nil, err + } else { + allWorkflows = append(allWorkflows, workflowsFromOldQuery...) + } + } + + return allWorkflows, nil +} + +// getPipelineIdsNotInLatestTable finds pipeline IDs that are NOT in the latest status table +func (impl *CiHandlerImpl) getPipelineIdsNotInLatestTable(allPipelineIds []int, latestStatusEntries []*pipelineConfig.CiWorkflowStatusLatest) []int { + var pipelinesInLatestTable []int + for _, entry := range latestStatusEntries { + pipelinesInLatestTable = append(pipelinesInLatestTable, entry.PipelineId) + } + pipelineIdMap := make(map[int]bool) + for _, id := range pipelinesInLatestTable { + pipelineIdMap[id] = true + } + + var missingPipelineIds []int + for _, id := range allPipelineIds { + if !pipelineIdMap[id] { + missingPipelineIds = append(missingPipelineIds, id) + } + } + return missingPipelineIds } func (impl *CiHandlerImpl) FetchCiStatusForTriggerView(appId int) ([]*pipelineConfig.CiWorkflowStatus, error) { @@ -861,9 +1009,9 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewForEnvironment(request res if len(ciPipelineIds) == 0 { return ciWorkflowStatuses, nil } - latestCiWorkflows, err := impl.ciWorkflowRepository.FindLastTriggeredWorkflowByCiIds(ciPipelineIds) + latestCiWorkflows, err := impl.fetchLastTriggeredWorkflowsHybrid(ciPipelineIds) if err != nil && !util.IsErrNoRows(err) { - impl.Logger.Errorw("err", "ciPipelineIds", ciPipelineIds, "err", err) + impl.Logger.Errorw("err in hybrid ci workflow fetch", "ciPipelineIds", ciPipelineIds, "err", err) return ciWorkflowStatuses, err } diff --git a/pkg/pipeline/CiService.go b/pkg/pipeline/CiService.go index 55fb2e1c20..e610af74e0 100644 --- a/pkg/pipeline/CiService.go +++ b/pkg/pipeline/CiService.go @@ -26,6 +26,7 @@ import ( "github.com/devtron-labs/devtron/pkg/pipeline/types" "github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus" "github.com/devtron-labs/devtron/pkg/sql" + "github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest" util3 "github.com/devtron-labs/devtron/util" util2 "github.com/devtron-labs/devtron/util/event" "go.uber.org/zap" @@ -39,28 +40,34 @@ type CiService interface { } type CiServiceImpl struct { - Logger *zap.SugaredLogger - workflowStageStatusService workflowStatus.WorkFlowStageStatusService - eventClient client.EventClient - eventFactory client.EventFactory - config *types.CiConfig - ciWorkflowRepository pipelineConfig.CiWorkflowRepository - transactionManager sql.TransactionWrapper + Logger *zap.SugaredLogger + workflowStageStatusService workflowStatus.WorkFlowStageStatusService + eventClient client.EventClient + eventFactory client.EventFactory + config *types.CiConfig + ciWorkflowRepository pipelineConfig.CiWorkflowRepository + ciPipelineRepository pipelineConfig.CiPipelineRepository + transactionManager sql.TransactionWrapper + workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService } func NewCiServiceImpl(Logger *zap.SugaredLogger, workflowStageStatusService workflowStatus.WorkFlowStageStatusService, eventClient client.EventClient, eventFactory client.EventFactory, ciWorkflowRepository pipelineConfig.CiWorkflowRepository, + ciPipelineRepository pipelineConfig.CiPipelineRepository, transactionManager sql.TransactionWrapper, + workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService, ) *CiServiceImpl { cis := &CiServiceImpl{ - Logger: Logger, - workflowStageStatusService: workflowStageStatusService, - eventClient: eventClient, - eventFactory: eventFactory, - ciWorkflowRepository: ciWorkflowRepository, - transactionManager: transactionManager, + Logger: Logger, + workflowStageStatusService: workflowStageStatusService, + eventClient: eventClient, + eventFactory: eventFactory, + ciWorkflowRepository: ciWorkflowRepository, + ciPipelineRepository: ciPipelineRepository, + transactionManager: transactionManager, + workflowStatusLatestService: workflowStatusLatestService, } config, err := types.GetCiConfig() if err != nil { @@ -131,11 +138,26 @@ func (impl *CiServiceImpl) SaveCiWorkflowWithStage(wf *pipelineConfig.CiWorkflow return err } + // Get appId from CI pipeline (not from workflow to avoid transaction issues) + ciPipeline, err := impl.ciPipelineRepository.FindById(wf.CiPipelineId) + if err != nil { + impl.Logger.Errorw("error in fetching ci pipeline for appId", "err", err, "ciPipelineId", wf.CiPipelineId) + return err + } + appId := ciPipeline.AppId + + err = impl.workflowStatusLatestService.SaveCiWorkflowStatusLatest(tx, wf.CiPipelineId, appId, wf.Id, wf.TriggeredBy) + if err != nil { + impl.Logger.Errorw("error in saving ci workflow status latest", "err", err, "pipelineId", wf.CiPipelineId, "workflowId", wf.Id) + return err + } + err = impl.transactionManager.CommitTx(tx) if err != nil { impl.Logger.Errorw("error in committing transaction", "workflowName", wf.Name, "error", err) return err } + return nil } @@ -172,6 +194,7 @@ func (impl *CiServiceImpl) UpdateCiWorkflowWithStage(wf *pipelineConfig.CiWorkfl impl.Logger.Errorw("error in committing transaction", "workflowName", wf.Name, "error", err) return err } + return nil } diff --git a/pkg/pipeline/adapter/adapter.go b/pkg/pipeline/adapter/adapter.go index 3ac8b35f7a..c4a8ddd5e4 100644 --- a/pkg/pipeline/adapter/adapter.go +++ b/pkg/pipeline/adapter/adapter.go @@ -406,3 +406,13 @@ func NewMigrateExternalAppValidationRequest(pipeline *bean.CDPipelineConfigObjec } return request } + +func GetCiWorkflowStatusFromCiWorkflow(ciWorkflow *pipelineConfig.CiWorkflow) *pipelineConfig.CiWorkflowStatus { + return &pipelineConfig.CiWorkflowStatus{ + CiPipelineId: ciWorkflow.CiPipelineId, + CiPipelineName: ciWorkflow.CiPipeline.Name, + CiStatus: ciWorkflow.Status, + StorageConfigured: ciWorkflow.BlobStorageEnabled, + CiWorkflowId: ciWorkflow.Id, + } +} diff --git a/pkg/workflow/cd/CdWorkflowRunnerService.go b/pkg/workflow/cd/CdWorkflowRunnerService.go index b27fd7a8cc..4546dd6053 100644 --- a/pkg/workflow/cd/CdWorkflowRunnerService.go +++ b/pkg/workflow/cd/CdWorkflowRunnerService.go @@ -28,6 +28,7 @@ import ( "github.com/devtron-labs/devtron/pkg/sql" "github.com/devtron-labs/devtron/pkg/workflow/cd/adapter" "github.com/devtron-labs/devtron/pkg/workflow/cd/bean" + "github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest" "github.com/devtron-labs/devtron/util" "github.com/go-pg/pg" "go.uber.org/zap" @@ -37,28 +38,31 @@ type CdWorkflowRunnerService interface { UpdateWfr(dto *bean.CdWorkflowRunnerDto, updatedBy int) error SaveWfr(tx *pg.Tx, wfr *pipelineConfig.CdWorkflowRunner) error UpdateIsArtifactUploaded(wfrId int, isArtifactUploaded bool) error - SaveCDWorkflowRunnerWithStage(wfr *pipelineConfig.CdWorkflowRunner) (*pipelineConfig.CdWorkflowRunner, error) + SaveCDWorkflowRunnerWithStage(wfr *pipelineConfig.CdWorkflowRunner, cdWf *pipelineConfig.CdWorkflow, pipeline *pipelineConfig.Pipeline) (*pipelineConfig.CdWorkflowRunner, error) UpdateCdWorkflowRunnerWithStage(wfr *pipelineConfig.CdWorkflowRunner) error GetPrePostWorkflowStagesByWorkflowRunnerIdsList(wfIdWfTypeMap map[int]bean4.CdWorkflowWithArtifact) (map[int]map[string][]*bean3.WorkflowStageDto, error) } type CdWorkflowRunnerServiceImpl struct { - logger *zap.SugaredLogger - cdWorkflowRepository pipelineConfig.CdWorkflowRepository - workflowStageService workflowStatus.WorkFlowStageStatusService - transactionManager sql.TransactionWrapper - config *types.CiConfig + logger *zap.SugaredLogger + cdWorkflowRepository pipelineConfig.CdWorkflowRepository + workflowStageService workflowStatus.WorkFlowStageStatusService + transactionManager sql.TransactionWrapper + config *types.CiConfig + workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService } func NewCdWorkflowRunnerServiceImpl(logger *zap.SugaredLogger, cdWorkflowRepository pipelineConfig.CdWorkflowRepository, workflowStageService workflowStatus.WorkFlowStageStatusService, - transactionManager sql.TransactionWrapper) *CdWorkflowRunnerServiceImpl { + transactionManager sql.TransactionWrapper, + workflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService) *CdWorkflowRunnerServiceImpl { impl := &CdWorkflowRunnerServiceImpl{ - logger: logger, - cdWorkflowRepository: cdWorkflowRepository, - workflowStageService: workflowStageService, - transactionManager: transactionManager, + logger: logger, + cdWorkflowRepository: cdWorkflowRepository, + workflowStageService: workflowStageService, + transactionManager: transactionManager, + workflowStatusLatestService: workflowStatusLatestService, } ciConfig, err := types.GetCiConfig() if err != nil { @@ -88,7 +92,7 @@ func (impl *CdWorkflowRunnerServiceImpl) UpdateIsArtifactUploaded(wfrId int, isA return nil } -func (impl *CdWorkflowRunnerServiceImpl) SaveCDWorkflowRunnerWithStage(wfr *pipelineConfig.CdWorkflowRunner) (*pipelineConfig.CdWorkflowRunner, error) { +func (impl *CdWorkflowRunnerServiceImpl) SaveCDWorkflowRunnerWithStage(wfr *pipelineConfig.CdWorkflowRunner, cdWf *pipelineConfig.CdWorkflow, pipeline *pipelineConfig.Pipeline) (*pipelineConfig.CdWorkflowRunner, error) { // implementation tx, err := impl.transactionManager.StartTx() if err != nil { @@ -117,6 +121,12 @@ func (impl *CdWorkflowRunnerServiceImpl) SaveCDWorkflowRunnerWithStage(wfr *pipe return wfr, err } + err = impl.workflowStatusLatestService.SaveCdWorkflowStatusLatest(tx, cdWf.PipelineId, pipeline.AppId, pipeline.EnvironmentId, wfr.Id, wfr.WorkflowType.String(), wfr.CreatedBy) + if err != nil { + impl.logger.Errorw("error in updating workflow status latest, ManualCdTrigger", "runnerId", wfr.CreatedBy, "err", err) + return wfr, err + } + err = impl.transactionManager.CommitTx(tx) if err != nil { impl.logger.Errorw("error in committing transaction", "workflowName", wfr.Name, "error", err) @@ -159,6 +169,7 @@ func (impl *CdWorkflowRunnerServiceImpl) UpdateCdWorkflowRunnerWithStage(wfr *pi impl.logger.Errorw("error in committing transaction", "workflowName", wfr.Name, "error", err) return err } + return nil } diff --git a/pkg/workflow/cd/read/CdWorkflowRunnerReadService.go b/pkg/workflow/cd/read/CdWorkflowRunnerReadService.go index e9b1fefb4c..ab44f45757 100644 --- a/pkg/workflow/cd/read/CdWorkflowRunnerReadService.go +++ b/pkg/workflow/cd/read/CdWorkflowRunnerReadService.go @@ -1,9 +1,12 @@ package read import ( + bean2 "github.com/devtron-labs/devtron/api/bean" "github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig" + repository2 "github.com/devtron-labs/devtron/pkg/pipeline/repository" "github.com/devtron-labs/devtron/pkg/workflow/cd/adapter" "github.com/devtron-labs/devtron/pkg/workflow/cd/bean" + "github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest" "github.com/go-pg/pg" "go.uber.org/zap" ) @@ -11,18 +14,25 @@ import ( type CdWorkflowRunnerReadService interface { FindWorkflowRunnerById(wfrId int) (*bean.CdWorkflowRunnerDto, error) CheckIfWfrLatest(wfrId, pipelineId int) (isLatest bool, err error) + GetWfrStatusForLatestRunners(pipelineIds []int, pipelines []*pipelineConfig.Pipeline) ([]*pipelineConfig.CdWorkflowStatus, error) } type CdWorkflowRunnerReadServiceImpl struct { - logger *zap.SugaredLogger - cdWorkflowRepository pipelineConfig.CdWorkflowRepository + logger *zap.SugaredLogger + cdWorkflowRepository pipelineConfig.CdWorkflowRepository + WorkflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService + pipelineStageRepository repository2.PipelineStageRepository } func NewCdWorkflowRunnerReadServiceImpl(logger *zap.SugaredLogger, - cdWorkflowRepository pipelineConfig.CdWorkflowRepository) *CdWorkflowRunnerReadServiceImpl { + cdWorkflowRepository pipelineConfig.CdWorkflowRepository, + WorkflowStatusLatestService workflowStatusLatest.WorkflowStatusLatestService, + pipelineStageRepository repository2.PipelineStageRepository) *CdWorkflowRunnerReadServiceImpl { return &CdWorkflowRunnerReadServiceImpl{ - logger: logger, - cdWorkflowRepository: cdWorkflowRepository, + logger: logger, + cdWorkflowRepository: cdWorkflowRepository, + WorkflowStatusLatestService: WorkflowStatusLatestService, + pipelineStageRepository: pipelineStageRepository, } } @@ -44,3 +54,95 @@ func (impl *CdWorkflowRunnerReadServiceImpl) CheckIfWfrLatest(wfrId, pipelineId } return isLatest, nil } + +func (impl *CdWorkflowRunnerReadServiceImpl) GetWfrStatusForLatestRunners(pipelineIds []int, pipelines []*pipelineConfig.Pipeline) ([]*pipelineConfig.CdWorkflowStatus, error) { + // fetching the latest pipeline from the index table - cdWorkflowLatest + var result []*pipelineConfig.CdWorkflowStatus + cdWorkflowLatest, err := impl.WorkflowStatusLatestService.GetCdWorkflowLatestByPipelineIds(pipelineIds) + if err != nil { + impl.logger.Errorw("error in getting latest by pipelineId", "pipelineId", pipelineIds, "err", err) + return nil, err + } + + pipelineIdToCiPipelineIdMap := make(map[int]int) + for _, item := range pipelines { + pipelineIdToCiPipelineIdMap[item.Id] = item.CiPipelineId + } + + for _, item := range cdWorkflowLatest { + result = append(result, &pipelineConfig.CdWorkflowStatus{ + CiPipelineId: pipelineIdToCiPipelineIdMap[item.PipelineId], + PipelineId: item.PipelineId, + WorkflowType: item.WorkflowType, + WfrId: item.WorkflowRunnerId, + }) + } + + cdWorfklowLatestMap := make(map[int][]bean2.WorkflowType) + for _, item := range cdWorkflowLatest { + if _, ok := cdWorfklowLatestMap[item.PipelineId]; !ok { + cdWorfklowLatestMap[item.PipelineId] = make([]bean2.WorkflowType, 0) + } + cdWorfklowLatestMap[item.PipelineId] = append(cdWorfklowLatestMap[item.PipelineId], bean2.WorkflowType(item.WorkflowType)) + } + + pipelineStage, err := impl.pipelineStageRepository.GetAllCdStagesByCdPipelineIds(pipelineIds) + if err != nil { + impl.logger.Errorw("error in fetching pipeline stages", "pipelineId", pipelineIds, "err", err) + return nil, err + } + pipelineStageMap := make(map[int][]bean2.WorkflowType) + for _, item := range pipelineStage { + if _, ok := pipelineStageMap[item.CdPipelineId]; !ok { + pipelineStageMap[item.CdPipelineId] = make([]bean2.WorkflowType, 0) + } + if item.Type == repository2.PIPELINE_STAGE_TYPE_PRE_CD { + pipelineStageMap[item.CdPipelineId] = append(pipelineStageMap[item.CdPipelineId], bean2.CD_WORKFLOW_TYPE_PRE) + } else if item.Type == repository2.PIPELINE_STAGE_TYPE_POST_CD { + pipelineStageMap[item.CdPipelineId] = append(pipelineStageMap[item.CdPipelineId], bean2.CD_WORKFLOW_TYPE_POST) + } + } + + // calculating all the pipelines not present in the index table cdWorkflowLatest + absentPipelineIds := make([]int, 0) + for _, item := range pipelines { + var isPreCDConfigured, isPostCDConfigured bool + if configuredStages, ok := pipelineStageMap[item.Id]; ok { + for _, stage := range configuredStages { + if stage == bean2.CD_WORKFLOW_TYPE_PRE { + isPreCDConfigured = true + } else if stage == bean2.CD_WORKFLOW_TYPE_POST { + isPostCDConfigured = true + } + } + } + + if _, ok := cdWorfklowLatestMap[item.Id]; !ok { + absentPipelineIds = append(absentPipelineIds, item.Id) + } else { + isPreCDStageAbsent, isPostCdStageAbsent, isDeployStageAbsent := true, true, true + for _, stage := range cdWorfklowLatestMap[item.Id] { + switch stage { + case bean2.CD_WORKFLOW_TYPE_PRE: + isPreCDStageAbsent = false + case bean2.CD_WORKFLOW_TYPE_POST: + isPostCdStageAbsent = false + case bean2.CD_WORKFLOW_TYPE_DEPLOY: + isDeployStageAbsent = false + } + } + if isDeployStageAbsent || (isPreCDConfigured && isPreCDStageAbsent) || (isPostCDConfigured && isPostCdStageAbsent) { + absentPipelineIds = append(absentPipelineIds, item.Id) + } + } + } + if len(absentPipelineIds) > 0 { + remainingRunners, err := impl.cdWorkflowRepository.FetchAllCdStagesLatestEntity(absentPipelineIds) + if err != nil { + impl.logger.Errorw("error in fetching all cd stages latest entity", "pipelineIds", absentPipelineIds, "err", err) + return nil, err + } + result = append(result, remainingRunners...) + } + return result, nil +} diff --git a/pkg/workflow/dag/WorkflowDagExecutor.go b/pkg/workflow/dag/WorkflowDagExecutor.go index d705aafb0b..a6e09f5295 100644 --- a/pkg/workflow/dag/WorkflowDagExecutor.go +++ b/pkg/workflow/dag/WorkflowDagExecutor.go @@ -21,6 +21,11 @@ import ( "context" "encoding/json" "fmt" + "net/http" + "strings" + "sync" + "time" + "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/devtron-labs/common-lib/async" "github.com/devtron-labs/common-lib/utils" @@ -71,10 +76,6 @@ import ( util2 "github.com/devtron-labs/devtron/util/event" errors2 "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/rest" - "net/http" - "strings" - "sync" - "time" "github.com/devtron-labs/devtron/api/bean" "github.com/devtron-labs/devtron/internal/sql/models" @@ -158,7 +159,7 @@ type WorkflowDagExecutorImpl struct { workflowService executor.WorkflowService ciHandlerService trigger.HandlerService workflowTriggerAuditService auditService.WorkflowTriggerAuditService - fluxApplicationService fluxApplication.FluxApplicationService + fluxApplicationService fluxApplication.FluxApplicationService } func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pipelineConfig.PipelineRepository, @@ -230,8 +231,7 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi workflowService: workflowService, ciHandlerService: ciHandlerService, workflowTriggerAuditService: workflowTriggerAuditService, - fluxApplicationService: fluxApplicationService, - } + fluxApplicationService: fluxApplicationService} config, err := types.GetCdConfig() if err != nil { return nil @@ -939,7 +939,6 @@ func (impl *WorkflowDagExecutorImpl) UpdateCiWorkflowForCiSuccess(request *bean2 impl.logger.Errorw("update wf failed for id ", "err", err) return err } - return nil } diff --git a/pkg/workflow/wire_workflow.go b/pkg/workflow/wire_workflow.go index 3b48d0ae5e..54ca23bdad 100644 --- a/pkg/workflow/wire_workflow.go +++ b/pkg/workflow/wire_workflow.go @@ -22,12 +22,14 @@ import ( "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/hook" "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/repository" "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service" + "github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest" "github.com/google/wire" ) var WorkflowWireSet = wire.NewSet( cd.CdWorkflowWireSet, status.WorkflowStatusWireSet, + workflowStatusLatest.WorkflowStatusLatestWireSet, hook.NewTriggerAuditHookImpl, wire.Bind(new(hook.TriggerAuditHook), new(*hook.TriggerAuditHookImpl)), service.NewWorkflowTriggerAuditServiceImpl, diff --git a/pkg/workflow/workflowStatusLatest/WorkflowStatusLatestService.go b/pkg/workflow/workflowStatusLatest/WorkflowStatusLatestService.go new file mode 100644 index 0000000000..e2c318e5b4 --- /dev/null +++ b/pkg/workflow/workflowStatusLatest/WorkflowStatusLatestService.go @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2024. Devtron Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workflowStatusLatest + +import ( + "fmt" + util2 "github.com/devtron-labs/devtron/internal/util" + "time" + + "github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig" + "github.com/go-pg/pg" + "go.uber.org/zap" +) + +type WorkflowStatusLatestService interface { + // CI Workflow Status Latest methods + SaveCiWorkflowStatusLatest(tx *pg.Tx, pipelineId, appId, ciWorkflowId int, userId int32) error + GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*pipelineConfig.CiWorkflowStatusLatest, error) + + // CD Workflow Status Latest methods + SaveCdWorkflowStatusLatest(tx *pg.Tx, pipelineId, appId, environmentId, workflowRunnerId int, workflowType string, userId int32) error + GetCdWorkflowLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error) +} + +type WorkflowStatusLatestServiceImpl struct { + logger *zap.SugaredLogger + workflowStatusLatestRepository pipelineConfig.WorkflowStatusLatestRepository + ciWorkflowRepository pipelineConfig.CiWorkflowRepository + cdWorkflowRepository pipelineConfig.CdWorkflowRepository + ciPipelineRepository pipelineConfig.CiPipelineRepository +} + +func NewWorkflowStatusLatestServiceImpl( + logger *zap.SugaredLogger, + workflowStatusLatestRepository pipelineConfig.WorkflowStatusLatestRepository, + ciWorkflowRepository pipelineConfig.CiWorkflowRepository, + cdWorkflowRepository pipelineConfig.CdWorkflowRepository, + ciPipelineRepository pipelineConfig.CiPipelineRepository, +) *WorkflowStatusLatestServiceImpl { + return &WorkflowStatusLatestServiceImpl{ + logger: logger, + workflowStatusLatestRepository: workflowStatusLatestRepository, + ciWorkflowRepository: ciWorkflowRepository, + cdWorkflowRepository: cdWorkflowRepository, + ciPipelineRepository: ciPipelineRepository, + } +} + +type CiWorkflowStatusLatest struct { + PipelineId int `json:"pipelineId"` + AppId int `json:"appId"` + CiWorkflowId int `json:"ciWorkflowId"` + Status string `json:"status"` // Derived from ci_workflow table + StorageConfigured bool `json:"storageConfigured"` +} + +type CdWorkflowStatusLatest struct { + PipelineId int `json:"pipelineId"` + AppId int `json:"appId"` + EnvironmentId int `json:"environmentId"` + WorkflowType string `json:"workflowType"` + WorkflowRunnerId int `json:"workflowRunnerId"` + Status string `json:"status"` // Derived from cd_workflow_runner table +} + +func (impl *WorkflowStatusLatestServiceImpl) SaveCiWorkflowStatusLatest(tx *pg.Tx, pipelineId, appId, ciWorkflowId int, userId int32) error { + // Validate required parameters + if ciWorkflowId <= 0 { + impl.logger.Errorw("invalid ciWorkflowId provided", "ciWorkflowId", ciWorkflowId) + return fmt.Errorf("invalid ciWorkflowId: %d", ciWorkflowId) + } + + // Check if entry exists + existingEntry, err := impl.workflowStatusLatestRepository.GetCiWorkflowStatusLatestByPipelineId(pipelineId) + if err != nil && !util2.IsErrNoRows(err) { + impl.logger.Errorw("error in getting ci workflow status latest", "err", err, "pipelineId", pipelineId) + return err + } + + now := time.Now() + if util2.IsErrNoRows(err) { + // Create new entry + model := &pipelineConfig.CiWorkflowStatusLatest{ + PipelineId: pipelineId, + AppId: appId, + CiWorkflowId: ciWorkflowId, + } + model.CreatedBy = userId + model.CreatedOn = now + model.UpdatedBy = userId + model.UpdatedOn = now + + return impl.workflowStatusLatestRepository.SaveCiWorkflowStatusLatest(tx, model) + } else { + // Update existing entry with latest workflow ID + existingEntry.CiWorkflowId = ciWorkflowId + existingEntry.UpdatedBy = userId + existingEntry.UpdatedOn = now + + return impl.workflowStatusLatestRepository.UpdateCiWorkflowStatusLatest(tx, existingEntry) + } +} + +func (impl *WorkflowStatusLatestServiceImpl) GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*pipelineConfig.CiWorkflowStatusLatest, error) { + return impl.workflowStatusLatestRepository.GetCiWorkflowStatusLatestByPipelineIds(pipelineIds) +} + +// CD Workflow Status Latest methods implementation +func (impl *WorkflowStatusLatestServiceImpl) SaveCdWorkflowStatusLatest(tx *pg.Tx, pipelineId, appId, environmentId, workflowRunnerId int, workflowType string, userId int32) error { + // Validate required parameters + if workflowRunnerId <= 0 { + impl.logger.Errorw("invalid workflowRunnerId provided", "workflowRunnerId", workflowRunnerId) + return fmt.Errorf("invalid workflowRunnerId: %d", workflowRunnerId) + } + + // Check if entry exists + existingEntry, err := impl.workflowStatusLatestRepository.GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx, pipelineId, workflowType) + if err != nil && err != pg.ErrNoRows { + impl.logger.Errorw("error in getting cd workflow status latest", "err", err, "pipelineId", pipelineId, "workflowType", workflowType) + return err + } + + now := time.Now() + if err == pg.ErrNoRows { + // Create new entry + model := &pipelineConfig.CdWorkflowStatusLatest{ + PipelineId: pipelineId, + AppId: appId, + EnvironmentId: environmentId, + WorkflowType: workflowType, + WorkflowRunnerId: workflowRunnerId, + } + model.CreatedBy = userId + model.CreatedOn = now + model.UpdatedBy = userId + model.UpdatedOn = now + + return impl.workflowStatusLatestRepository.SaveCdWorkflowStatusLatest(tx, model) + } else { + // Update existing entry with latest workflow runner ID + existingEntry.WorkflowRunnerId = workflowRunnerId + existingEntry.UpdatedBy = userId + existingEntry.UpdatedOn = now + + return impl.workflowStatusLatestRepository.UpdateCdWorkflowStatusLatest(tx, existingEntry) + } +} + +func (impl *WorkflowStatusLatestServiceImpl) GetCdWorkflowLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error) { + cdWorkflowStatusLatest, err := impl.workflowStatusLatestRepository.GetCdWorkflowStatusLatestByPipelineIds(pipelineIds) + if err != nil { + impl.logger.Errorw("error in getting cd workflow status latest by pipeline ids", "pipelineIds", pipelineIds, "err", err) + return nil, err + } + var result []*CdWorkflowStatusLatest + for _, model := range cdWorkflowStatusLatest { + result = append(result, &CdWorkflowStatusLatest{ + PipelineId: model.PipelineId, + AppId: model.AppId, + EnvironmentId: model.EnvironmentId, + WorkflowType: model.WorkflowType, + WorkflowRunnerId: model.WorkflowRunnerId, + }) + } + return result, nil +} diff --git a/pkg/workflow/workflowStatusLatest/wire_workflow_status_latest.go b/pkg/workflow/workflowStatusLatest/wire_workflow_status_latest.go new file mode 100644 index 0000000000..aebe5f18ce --- /dev/null +++ b/pkg/workflow/workflowStatusLatest/wire_workflow_status_latest.go @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024. Devtron Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workflowStatusLatest + +import ( + "github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig" + "github.com/google/wire" +) + +var WorkflowStatusLatestWireSet = wire.NewSet( + pipelineConfig.NewWorkflowStatusLatestRepositoryImpl, + wire.Bind(new(pipelineConfig.WorkflowStatusLatestRepository), new(*pipelineConfig.WorkflowStatusLatestRepositoryImpl)), + NewWorkflowStatusLatestServiceImpl, + wire.Bind(new(WorkflowStatusLatestService), new(*WorkflowStatusLatestServiceImpl)), +) diff --git a/scripts/sql/34203900_workflow_status_latest_tables.down.sql b/scripts/sql/34203900_workflow_status_latest_tables.down.sql new file mode 100644 index 0000000000..26ce9546a2 --- /dev/null +++ b/scripts/sql/34203900_workflow_status_latest_tables.down.sql @@ -0,0 +1,15 @@ +BEGIN; + +-- Drop cd_workflow_status_latest table +DROP TABLE IF EXISTS "public"."cd_workflow_status_latest"; + +-- Drop sequence for cd_workflow_status_latest +DROP SEQUENCE IF EXISTS id_seq_cd_workflow_status_latest; + +-- Drop ci_workflow_status_latest table +DROP TABLE IF EXISTS "public"."ci_workflow_status_latest"; + +-- Drop sequence for ci_workflow_status_latest +DROP SEQUENCE IF EXISTS id_seq_ci_workflow_status_latest; + +COMMIT; diff --git a/scripts/sql/34203900_workflow_status_latest_tables.up.sql b/scripts/sql/34203900_workflow_status_latest_tables.up.sql new file mode 100644 index 0000000000..6c87a7ec1d --- /dev/null +++ b/scripts/sql/34203900_workflow_status_latest_tables.up.sql @@ -0,0 +1,39 @@ +BEGIN; + +-- Create Sequence for ci_workflow_status_latest +CREATE SEQUENCE IF NOT EXISTS id_seq_ci_workflow_status_latest; + +-- Create ci_workflow_status_latest table +CREATE TABLE IF NOT EXISTS "public"."ci_workflow_status_latest" ( + "id" int4 NOT NULL DEFAULT nextval('id_seq_ci_workflow_status_latest'::regclass), + "pipeline_id" int4 NOT NULL, + "app_id" int4 NOT NULL, + "ci_workflow_id" int4 NOT NULL, + "created_on" timestamptz NOT NULL, + "created_by" int4 NOT NULL, + "updated_on" timestamptz NOT NULL, + "updated_by" int4 NOT NULL, + PRIMARY KEY ("id"), + UNIQUE ("pipeline_id") +); + +-- Create Sequence for cd_workflow_status_latest +CREATE SEQUENCE IF NOT EXISTS id_seq_cd_workflow_status_latest; + +-- Create cd_workflow_status_latest table +CREATE TABLE IF NOT EXISTS "public"."cd_workflow_status_latest" ( + "id" int4 NOT NULL DEFAULT nextval('id_seq_cd_workflow_status_latest'::regclass), + "pipeline_id" int4 NOT NULL, + "app_id" int4 NOT NULL, + "environment_id" int4 NOT NULL, + "workflow_type" varchar(20) NOT NULL, -- PRE, DEPLOY, POST + "workflow_runner_id" int4 NOT NULL, + "created_on" timestamptz NOT NULL, + "created_by" int4 NOT NULL, + "updated_on" timestamptz NOT NULL, + "updated_by" int4 NOT NULL, + PRIMARY KEY ("id"), + UNIQUE ("pipeline_id", "workflow_type") +); + +COMMIT; diff --git a/wire_gen.go b/wire_gen.go index 6127386c26..330781f817 100644 --- a/wire_gen.go +++ b/wire_gen.go @@ -185,7 +185,7 @@ import ( "github.com/devtron-labs/devtron/pkg/deployment/gitOps/validation" "github.com/devtron-labs/devtron/pkg/deployment/manifest" "github.com/devtron-labs/devtron/pkg/deployment/manifest/configMapAndSecret" - read19 "github.com/devtron-labs/devtron/pkg/deployment/manifest/configMapAndSecret/read" + read20 "github.com/devtron-labs/devtron/pkg/deployment/manifest/configMapAndSecret/read" "github.com/devtron-labs/devtron/pkg/deployment/manifest/deployedAppMetrics" repository17 "github.com/devtron-labs/devtron/pkg/deployment/manifest/deployedAppMetrics/repository" "github.com/devtron-labs/devtron/pkg/deployment/manifest/deploymentTemplate" @@ -248,7 +248,7 @@ import ( "github.com/devtron-labs/devtron/pkg/plugin" repository21 "github.com/devtron-labs/devtron/pkg/plugin/repository" "github.com/devtron-labs/devtron/pkg/policyGovernance/security/imageScanning" - read18 "github.com/devtron-labs/devtron/pkg/policyGovernance/security/imageScanning/read" + read19 "github.com/devtron-labs/devtron/pkg/policyGovernance/security/imageScanning/read" repository25 "github.com/devtron-labs/devtron/pkg/policyGovernance/security/imageScanning/repository" "github.com/devtron-labs/devtron/pkg/policyGovernance/security/scanTool" repository16 "github.com/devtron-labs/devtron/pkg/policyGovernance/security/scanTool/repository" @@ -270,12 +270,13 @@ import ( repository13 "github.com/devtron-labs/devtron/pkg/variables/repository" "github.com/devtron-labs/devtron/pkg/webhook/helm" "github.com/devtron-labs/devtron/pkg/workflow/cd" - read20 "github.com/devtron-labs/devtron/pkg/workflow/cd/read" + read18 "github.com/devtron-labs/devtron/pkg/workflow/cd/read" "github.com/devtron-labs/devtron/pkg/workflow/dag" status2 "github.com/devtron-labs/devtron/pkg/workflow/status" "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/hook" repository19 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/repository" service3 "github.com/devtron-labs/devtron/pkg/workflow/trigger/audit/service" + "github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest" util2 "github.com/devtron-labs/devtron/util" "github.com/devtron-labs/devtron/util/commonEnforcementFunctionsUtil" "github.com/devtron-labs/devtron/util/cron" @@ -623,7 +624,9 @@ func InitializeApp() (*App, error) { appListingServiceImpl := app2.NewAppListingServiceImpl(sugaredLogger, appListingRepositoryImpl, appDetailsReadServiceImpl, appRepositoryImpl, appListingViewBuilderImpl, pipelineRepositoryImpl, linkoutsRepositoryImpl, cdWorkflowRepositoryImpl, pipelineOverrideRepositoryImpl, environmentRepositoryImpl, chartRepositoryImpl, ciPipelineRepositoryImpl, dockerRegistryIpsConfigServiceImpl, userRepositoryImpl, deployedAppMetricsServiceImpl, ciArtifactRepositoryImpl, envConfigOverrideReadServiceImpl, ciPipelineConfigReadServiceImpl) workflowStageRepositoryImpl := repository18.NewWorkflowStageRepositoryImpl(sugaredLogger, db) workFlowStageStatusServiceImpl := workflowStatus.NewWorkflowStageFlowStatusServiceImpl(sugaredLogger, workflowStageRepositoryImpl, ciWorkflowRepositoryImpl, cdWorkflowRepositoryImpl, environmentRepositoryImpl, transactionUtilImpl) - cdWorkflowRunnerServiceImpl := cd.NewCdWorkflowRunnerServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl, workFlowStageStatusServiceImpl, transactionUtilImpl) + workflowStatusLatestRepositoryImpl := pipelineConfig.NewWorkflowStatusLatestRepositoryImpl(db, sugaredLogger) + workflowStatusLatestServiceImpl := workflowStatusLatest.NewWorkflowStatusLatestServiceImpl(sugaredLogger, workflowStatusLatestRepositoryImpl, ciWorkflowRepositoryImpl, cdWorkflowRepositoryImpl, ciPipelineRepositoryImpl) + cdWorkflowRunnerServiceImpl := cd.NewCdWorkflowRunnerServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl, workFlowStageStatusServiceImpl, transactionUtilImpl, workflowStatusLatestServiceImpl) deploymentEventHandlerImpl := app2.NewDeploymentEventHandlerImpl(sugaredLogger, eventRESTClientImpl, eventSimpleFactoryImpl, runnable) appServiceImpl := app2.NewAppService(pipelineOverrideRepositoryImpl, utilMergeUtil, sugaredLogger, pipelineRepositoryImpl, eventRESTClientImpl, eventSimpleFactoryImpl, appRepositoryImpl, configMapRepositoryImpl, chartRepositoryImpl, cdWorkflowRepositoryImpl, commonServiceImpl, chartTemplateServiceImpl, pipelineStatusTimelineRepositoryImpl, pipelineStatusTimelineResourcesServiceImpl, pipelineStatusSyncDetailServiceImpl, pipelineStatusTimelineServiceImpl, appServiceConfig, appStatusServiceImpl, installedAppReadServiceImpl, installedAppVersionHistoryRepositoryImpl, scopedVariableCMCSManagerImpl, acdConfig, gitOpsConfigReadServiceImpl, gitOperationServiceImpl, deploymentTemplateServiceImpl, appListingServiceImpl, deploymentConfigServiceImpl, envConfigOverrideReadServiceImpl, cdWorkflowRunnerServiceImpl, deploymentEventHandlerImpl) scopedVariableManagerImpl, err := variables.NewScopedVariableManagerImpl(sugaredLogger, scopedVariableServiceImpl, variableEntityMappingServiceImpl, variableSnapshotHistoryServiceImpl, variableTemplateParserImpl) @@ -688,7 +691,7 @@ func InitializeApp() (*App, error) { chartServiceImpl := chart.NewChartServiceImpl(chartRepositoryImpl, sugaredLogger, chartTemplateServiceImpl, chartRepoRepositoryImpl, appRepositoryImpl, mergeUtil, envConfigOverrideRepositoryImpl, pipelineConfigRepositoryImpl, environmentRepositoryImpl, deploymentTemplateHistoryServiceImpl, scopedVariableManagerImpl, deployedAppMetricsServiceImpl, chartRefServiceImpl, gitOpsConfigReadServiceImpl, deploymentConfigServiceImpl, envConfigOverrideReadServiceImpl, chartReadServiceImpl) ciCdPipelineOrchestratorImpl := pipeline.NewCiCdPipelineOrchestrator(appRepositoryImpl, sugaredLogger, materialRepositoryImpl, pipelineRepositoryImpl, ciPipelineRepositoryImpl, ciPipelineMaterialRepositoryImpl, cdWorkflowRepositoryImpl, clientImpl, ciCdConfig, appWorkflowRepositoryImpl, environmentRepositoryImpl, attributesServiceImpl, appCrudOperationServiceImpl, userAuthServiceImpl, prePostCdScriptHistoryServiceImpl, pipelineStageServiceImpl, gitMaterialHistoryServiceImpl, ciPipelineHistoryServiceImpl, ciTemplateReadServiceImpl, ciTemplateServiceImpl, dockerArtifactStoreRepositoryImpl, ciArtifactRepositoryImpl, configMapServiceImpl, customTagServiceImpl, genericNoteServiceImpl, chartServiceImpl, transactionUtilImpl, gitOpsConfigReadServiceImpl, deploymentConfigServiceImpl, deploymentConfigReadServiceImpl, chartReadServiceImpl) pluginInputVariableParserImpl := pipeline.NewPluginInputVariableParserImpl(sugaredLogger, dockerRegistryConfigImpl, customTagServiceImpl) - ciServiceImpl := pipeline.NewCiServiceImpl(sugaredLogger, workFlowStageStatusServiceImpl, eventRESTClientImpl, eventSimpleFactoryImpl, ciWorkflowRepositoryImpl, transactionUtilImpl) + ciServiceImpl := pipeline.NewCiServiceImpl(sugaredLogger, workFlowStageStatusServiceImpl, eventRESTClientImpl, eventSimpleFactoryImpl, ciWorkflowRepositoryImpl, ciPipelineRepositoryImpl, transactionUtilImpl, workflowStatusLatestServiceImpl) ciLogServiceImpl, err := pipeline.NewCiLogServiceImpl(sugaredLogger, k8sServiceImpl) if err != nil { return nil, err @@ -748,8 +751,9 @@ func InitializeApp() (*App, error) { deploymentTemplateValidationServiceEntImpl := validator.NewDeploymentTemplateValidationServiceEntImpl() deploymentTemplateValidationServiceImpl := validator.NewDeploymentTemplateValidationServiceImpl(sugaredLogger, chartRefServiceImpl, scopedVariableManagerImpl, deployedAppMetricsServiceImpl, deploymentTemplateValidationServiceEntImpl) devtronAppGitOpConfigServiceImpl := gitOpsConfig.NewDevtronAppGitOpConfigServiceImpl(sugaredLogger, chartRepositoryImpl, chartServiceImpl, gitOpsConfigReadServiceImpl, gitOpsValidationServiceImpl, argoClientWrapperServiceImpl, deploymentConfigServiceImpl, chartReadServiceImpl) - ciHandlerImpl := pipeline.NewCiHandlerImpl(sugaredLogger, ciServiceImpl, ciPipelineMaterialRepositoryImpl, clientImpl, ciWorkflowRepositoryImpl, ciArtifactRepositoryImpl, userServiceImpl, eventRESTClientImpl, eventSimpleFactoryImpl, ciPipelineRepositoryImpl, appListingRepositoryImpl, pipelineRepositoryImpl, enforcerUtilImpl, resourceGroupServiceImpl, environmentRepositoryImpl, imageTaggingServiceImpl, k8sCommonServiceImpl, appWorkflowRepositoryImpl, customTagServiceImpl, workFlowStageStatusServiceImpl) - cdHandlerImpl := pipeline.NewCdHandlerImpl(sugaredLogger, userServiceImpl, cdWorkflowRepositoryImpl, ciArtifactRepositoryImpl, ciPipelineMaterialRepositoryImpl, pipelineRepositoryImpl, environmentRepositoryImpl, ciWorkflowRepositoryImpl, enforcerUtilImpl, resourceGroupServiceImpl, imageTaggingServiceImpl, k8sServiceImpl, customTagServiceImpl, deploymentConfigServiceImpl, workFlowStageStatusServiceImpl, cdWorkflowRunnerServiceImpl) + ciHandlerImpl := pipeline.NewCiHandlerImpl(sugaredLogger, ciServiceImpl, ciPipelineMaterialRepositoryImpl, clientImpl, ciWorkflowRepositoryImpl, ciArtifactRepositoryImpl, userServiceImpl, eventRESTClientImpl, eventSimpleFactoryImpl, ciPipelineRepositoryImpl, appListingRepositoryImpl, pipelineRepositoryImpl, enforcerUtilImpl, resourceGroupServiceImpl, environmentRepositoryImpl, imageTaggingServiceImpl, k8sCommonServiceImpl, appWorkflowRepositoryImpl, customTagServiceImpl, workFlowStageStatusServiceImpl, workflowStatusLatestServiceImpl) + cdWorkflowRunnerReadServiceImpl := read18.NewCdWorkflowRunnerReadServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl, workflowStatusLatestServiceImpl, pipelineStageRepositoryImpl) + cdHandlerImpl := pipeline.NewCdHandlerImpl(sugaredLogger, userServiceImpl, cdWorkflowRepositoryImpl, ciArtifactRepositoryImpl, ciPipelineMaterialRepositoryImpl, pipelineRepositoryImpl, environmentRepositoryImpl, ciWorkflowRepositoryImpl, enforcerUtilImpl, resourceGroupServiceImpl, imageTaggingServiceImpl, k8sServiceImpl, customTagServiceImpl, deploymentConfigServiceImpl, workFlowStageStatusServiceImpl, cdWorkflowRunnerServiceImpl, workflowStatusLatestServiceImpl, pipelineStageRepositoryImpl, cdWorkflowRunnerReadServiceImpl) appWorkflowServiceImpl := appWorkflow2.NewAppWorkflowServiceImpl(sugaredLogger, appWorkflowRepositoryImpl, ciCdPipelineOrchestratorImpl, ciPipelineRepositoryImpl, pipelineRepositoryImpl, enforcerUtilImpl, resourceGroupServiceImpl, appRepositoryImpl, userAuthServiceImpl, chartServiceImpl, deploymentConfigServiceImpl, pipelineBuilderImpl) appCloneServiceImpl := appClone.NewAppCloneServiceImpl(sugaredLogger, pipelineBuilderImpl, attributesServiceImpl, chartServiceImpl, configMapServiceImpl, appWorkflowServiceImpl, appListingServiceImpl, propertiesConfigServiceImpl, pipelineStageServiceImpl, ciTemplateReadServiceImpl, appRepositoryImpl, ciPipelineRepositoryImpl, pipelineRepositoryImpl, ciPipelineConfigServiceImpl, gitOpsConfigReadServiceImpl, chartReadServiceImpl) deploymentTemplateRepositoryImpl := repository2.NewDeploymentTemplateRepositoryImpl(db, sugaredLogger) @@ -763,24 +767,24 @@ func InitializeApp() (*App, error) { imageScanDeployInfoRepositoryImpl := repository25.NewImageScanDeployInfoRepositoryImpl(db, sugaredLogger) imageScanObjectMetaRepositoryImpl := repository25.NewImageScanObjectMetaRepositoryImpl(db, sugaredLogger) imageScanHistoryRepositoryImpl := repository25.NewImageScanHistoryRepositoryImpl(db, sugaredLogger) - imageScanHistoryReadServiceImpl := read18.NewImageScanHistoryReadService(sugaredLogger, imageScanHistoryRepositoryImpl) + imageScanHistoryReadServiceImpl := read19.NewImageScanHistoryReadService(sugaredLogger, imageScanHistoryRepositoryImpl) cveStoreRepositoryImpl := repository25.NewCveStoreRepositoryImpl(db, sugaredLogger) policyServiceImpl := imageScanning.NewPolicyServiceImpl(environmentServiceImpl, sugaredLogger, appRepositoryImpl, pipelineOverrideRepositoryImpl, cvePolicyRepositoryImpl, clusterServiceImplExtended, pipelineRepositoryImpl, imageScanResultRepositoryImpl, imageScanDeployInfoRepositoryImpl, imageScanObjectMetaRepositoryImpl, httpClient, ciArtifactRepositoryImpl, ciCdConfig, imageScanHistoryReadServiceImpl, cveStoreRepositoryImpl, ciTemplateRepositoryImpl, clusterReadServiceImpl, transactionUtilImpl) - imageScanResultReadServiceImpl := read18.NewImageScanResultReadServiceImpl(sugaredLogger, imageScanResultRepositoryImpl) + imageScanResultReadServiceImpl := read19.NewImageScanResultReadServiceImpl(sugaredLogger, imageScanResultRepositoryImpl) draftAwareConfigServiceImpl := draftAwareConfigService.NewDraftAwareResourceServiceImpl(sugaredLogger, configMapServiceImpl, chartServiceImpl, propertiesConfigServiceImpl) gitOpsManifestPushServiceImpl := publish.NewGitOpsManifestPushServiceImpl(sugaredLogger, pipelineStatusTimelineServiceImpl, pipelineOverrideRepositoryImpl, acdConfig, chartRefServiceImpl, gitOpsConfigReadServiceImpl, chartServiceImpl, gitOperationServiceImpl, argoClientWrapperServiceImpl, transactionUtilImpl, deploymentConfigServiceImpl, chartTemplateServiceImpl) manifestCreationServiceImpl := manifest.NewManifestCreationServiceImpl(sugaredLogger, dockerRegistryIpsConfigServiceImpl, chartRefServiceImpl, scopedVariableCMCSManagerImpl, k8sCommonServiceImpl, deployedAppMetricsServiceImpl, imageDigestPolicyServiceImpl, utilMergeUtil, appCrudOperationServiceImpl, deploymentTemplateServiceImpl, argoClientWrapperServiceImpl, configMapHistoryRepositoryImpl, configMapRepositoryImpl, chartRepositoryImpl, envConfigOverrideRepositoryImpl, environmentRepositoryImpl, pipelineRepositoryImpl, ciArtifactRepositoryImpl, pipelineOverrideRepositoryImpl, pipelineStrategyHistoryRepositoryImpl, pipelineConfigRepositoryImpl, deploymentTemplateHistoryRepositoryImpl, deploymentConfigServiceImpl, envConfigOverrideReadServiceImpl) - configMapHistoryReadServiceImpl := read19.NewConfigMapHistoryReadService(sugaredLogger, configMapHistoryRepositoryImpl, scopedVariableCMCSManagerImpl) + configMapHistoryReadServiceImpl := read20.NewConfigMapHistoryReadService(sugaredLogger, configMapHistoryRepositoryImpl, scopedVariableCMCSManagerImpl) deployedConfigurationHistoryServiceImpl := history.NewDeployedConfigurationHistoryServiceImpl(sugaredLogger, userServiceImpl, deploymentTemplateHistoryServiceImpl, pipelineStrategyHistoryServiceImpl, configMapHistoryServiceImpl, cdWorkflowRepositoryImpl, scopedVariableCMCSManagerImpl, deploymentTemplateHistoryReadServiceImpl, configMapHistoryReadServiceImpl) userDeploymentRequestRepositoryImpl := repository26.NewUserDeploymentRequestRepositoryImpl(db, transactionUtilImpl) userDeploymentRequestServiceImpl := service4.NewUserDeploymentRequestServiceImpl(sugaredLogger, userDeploymentRequestRepositoryImpl) - imageScanDeployInfoReadServiceImpl := read18.NewImageScanDeployInfoReadService(sugaredLogger, imageScanDeployInfoRepositoryImpl) + imageScanDeployInfoReadServiceImpl := read19.NewImageScanDeployInfoReadService(sugaredLogger, imageScanDeployInfoRepositoryImpl) imageScanDeployInfoServiceImpl := imageScanning.NewImageScanDeployInfoService(sugaredLogger, imageScanDeployInfoRepositoryImpl) manifestPushConfigRepositoryImpl := repository20.NewManifestPushConfigRepository(sugaredLogger, db) scanToolExecutionHistoryMappingRepositoryImpl := repository25.NewScanToolExecutionHistoryMappingRepositoryImpl(db, sugaredLogger) - cdWorkflowReadServiceImpl := read20.NewCdWorkflowReadServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) + cdWorkflowReadServiceImpl := read18.NewCdWorkflowReadServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) imageScanServiceImpl := imageScanning.NewImageScanServiceImpl(sugaredLogger, imageScanHistoryRepositoryImpl, imageScanResultRepositoryImpl, imageScanObjectMetaRepositoryImpl, cveStoreRepositoryImpl, imageScanDeployInfoRepositoryImpl, userServiceImpl, appRepositoryImpl, environmentServiceImpl, ciArtifactRepositoryImpl, policyServiceImpl, pipelineRepositoryImpl, ciPipelineRepositoryImpl, scanToolMetadataRepositoryImpl, scanToolExecutionHistoryMappingRepositoryImpl, cvePolicyRepositoryImpl, cdWorkflowReadServiceImpl) - devtronAppsHandlerServiceImpl, err := devtronApps.NewHandlerServiceImpl(sugaredLogger, cdWorkflowCommonServiceImpl, gitOpsManifestPushServiceImpl, gitOpsConfigReadServiceImpl, argoK8sClientImpl, acdConfig, argoClientWrapperServiceImpl, pipelineStatusTimelineServiceImpl, chartTemplateServiceImpl, workflowEventPublishServiceImpl, manifestCreationServiceImpl, deployedConfigurationHistoryServiceImpl, pipelineStageServiceImpl, globalPluginServiceImpl, customTagServiceImpl, pluginInputVariableParserImpl, prePostCdScriptHistoryServiceImpl, scopedVariableCMCSManagerImpl, imageDigestPolicyServiceImpl, userServiceImpl, helmAppServiceImpl, enforcerUtilImpl, userDeploymentRequestServiceImpl, helmAppClientImpl, eventSimpleFactoryImpl, eventRESTClientImpl, environmentVariables, appRepositoryImpl, ciPipelineMaterialRepositoryImpl, imageScanHistoryReadServiceImpl, imageScanDeployInfoReadServiceImpl, imageScanDeployInfoServiceImpl, pipelineRepositoryImpl, pipelineOverrideRepositoryImpl, manifestPushConfigRepositoryImpl, chartRepositoryImpl, environmentRepositoryImpl, cdWorkflowRepositoryImpl, ciWorkflowRepositoryImpl, ciArtifactRepositoryImpl, ciTemplateReadServiceImpl, gitMaterialReadServiceImpl, appLabelRepositoryImpl, ciPipelineRepositoryImpl, appWorkflowRepositoryImpl, dockerArtifactStoreRepositoryImpl, imageScanServiceImpl, k8sServiceImpl, transactionUtilImpl, deploymentConfigServiceImpl, ciCdPipelineOrchestratorImpl, gitOperationServiceImpl, attributesServiceImpl, clusterRepositoryImpl, cdWorkflowRunnerServiceImpl, clusterServiceImplExtended, ciLogServiceImpl, workflowServiceImpl, blobStorageConfigServiceImpl, deploymentEventHandlerImpl, runnable, workflowTriggerAuditServiceImpl, deploymentServiceImpl) + devtronAppsHandlerServiceImpl, err := devtronApps.NewHandlerServiceImpl(sugaredLogger, cdWorkflowCommonServiceImpl, gitOpsManifestPushServiceImpl, gitOpsConfigReadServiceImpl, argoK8sClientImpl, acdConfig, argoClientWrapperServiceImpl, pipelineStatusTimelineServiceImpl, chartTemplateServiceImpl, workflowEventPublishServiceImpl, manifestCreationServiceImpl, deployedConfigurationHistoryServiceImpl, pipelineStageServiceImpl, globalPluginServiceImpl, customTagServiceImpl, pluginInputVariableParserImpl, prePostCdScriptHistoryServiceImpl, scopedVariableCMCSManagerImpl, imageDigestPolicyServiceImpl, userServiceImpl, helmAppServiceImpl, enforcerUtilImpl, userDeploymentRequestServiceImpl, helmAppClientImpl, eventSimpleFactoryImpl, eventRESTClientImpl, environmentVariables, appRepositoryImpl, ciPipelineMaterialRepositoryImpl, imageScanHistoryReadServiceImpl, imageScanDeployInfoReadServiceImpl, imageScanDeployInfoServiceImpl, pipelineRepositoryImpl, pipelineOverrideRepositoryImpl, manifestPushConfigRepositoryImpl, chartRepositoryImpl, environmentRepositoryImpl, cdWorkflowRepositoryImpl, ciWorkflowRepositoryImpl, ciArtifactRepositoryImpl, ciTemplateReadServiceImpl, gitMaterialReadServiceImpl, appLabelRepositoryImpl, ciPipelineRepositoryImpl, appWorkflowRepositoryImpl, dockerArtifactStoreRepositoryImpl, imageScanServiceImpl, k8sServiceImpl, transactionUtilImpl, deploymentConfigServiceImpl, ciCdPipelineOrchestratorImpl, gitOperationServiceImpl, attributesServiceImpl, clusterRepositoryImpl, cdWorkflowRunnerServiceImpl, clusterServiceImplExtended, ciLogServiceImpl, workflowServiceImpl, blobStorageConfigServiceImpl, deploymentEventHandlerImpl, runnable, workflowTriggerAuditServiceImpl, deploymentServiceImpl, workflowStatusLatestServiceImpl) if err != nil { return nil, err } @@ -1099,7 +1103,6 @@ func InitializeApp() (*App, error) { muxRouter := router.NewMuxRouter(sugaredLogger, environmentRouterImpl, clusterRouterImpl, webhookRouterImpl, userAuthRouterImpl, gitProviderRouterImpl, gitHostRouterImpl, dockerRegRouterImpl, notificationRouterImpl, teamRouterImpl, userRouterImpl, chartRefRouterImpl, configMapRouterImpl, appStoreRouterImpl, chartRepositoryRouterImpl, releaseMetricsRouterImpl, deploymentGroupRouterImpl, batchOperationRouterImpl, chartGroupRouterImpl, imageScanRouterImpl, policyRouterImpl, gitOpsConfigRouterImpl, dashboardRouterImpl, attributesRouterImpl, userAttributesRouterImpl, commonRouterImpl, grafanaRouterImpl, ssoLoginRouterImpl, telemetryRouterImpl, telemetryEventClientImplExtended, bulkUpdateRouterImpl, webhookListenerRouterImpl, appRouterImpl, coreAppRouterImpl, helmAppRouterImpl, k8sApplicationRouterImpl, pProfRouterImpl, deploymentConfigRouterImpl, dashboardTelemetryRouterImpl, commonDeploymentRouterImpl, externalLinkRouterImpl, globalPluginRouterImpl, moduleRouterImpl, serverRouterImpl, apiTokenRouterImpl, cdApplicationStatusUpdateHandlerImpl, k8sCapacityRouterImpl, webhookHelmRouterImpl, globalCMCSRouterImpl, userTerminalAccessRouterImpl, jobRouterImpl, ciStatusUpdateCronImpl, resourceGroupingRouterImpl, rbacRoleRouterImpl, scopedVariableRouterImpl, ciTriggerCronImpl, proxyRouterImpl, deploymentConfigurationRouterImpl, infraConfigRouterImpl, argoApplicationRouterImpl, devtronResourceRouterImpl, fluxApplicationRouterImpl, scanningResultRouterImpl, routerImpl) loggingMiddlewareImpl := util4.NewLoggingMiddlewareImpl(userServiceImpl) cdWorkflowServiceImpl := cd.NewCdWorkflowServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) - cdWorkflowRunnerReadServiceImpl := read20.NewCdWorkflowRunnerReadServiceImpl(sugaredLogger, cdWorkflowRepositoryImpl) webhookServiceImpl := pipeline.NewWebhookServiceImpl(ciArtifactRepositoryImpl, sugaredLogger, ciPipelineRepositoryImpl, ciWorkflowRepositoryImpl, cdWorkflowCommonServiceImpl, workFlowStageStatusServiceImpl, ciServiceImpl) workflowEventProcessorImpl, err := in.NewWorkflowEventProcessorImpl(sugaredLogger, pubSubClientServiceImpl, cdWorkflowServiceImpl, cdWorkflowReadServiceImpl, cdWorkflowRunnerServiceImpl, cdWorkflowRunnerReadServiceImpl, workflowDagExecutorImpl, ciHandlerImpl, cdHandlerImpl, eventSimpleFactoryImpl, eventRESTClientImpl, devtronAppsHandlerServiceImpl, deployedAppServiceImpl, webhookServiceImpl, validate, environmentVariables, cdWorkflowCommonServiceImpl, cdPipelineConfigServiceImpl, userDeploymentRequestServiceImpl, serviceImpl, pipelineRepositoryImpl, ciArtifactRepositoryImpl, cdWorkflowRepositoryImpl, deploymentConfigServiceImpl, handlerServiceImpl, runnable) if err != nil {