Skip to content

Commit ca18672

Browse files
committed
modify cancel-in-progress behavior
1 parent a4b76b6 commit ca18672

File tree

4 files changed

+237
-144
lines changed

4 files changed

+237
-144
lines changed

models/actions/run.go

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -401,39 +401,54 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo
401401
return false, nil
402402
}
403403

404-
concurrentRuns, err := db.Find[ActionRun](ctx, &FindRunOptions{
405-
RepoID: actionRun.RepoID,
406-
ConcurrencyGroup: actionRun.ConcurrencyGroup,
407-
Status: []Status{StatusRunning},
404+
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, []Status{StatusRunning})
405+
if err != nil {
406+
return false, fmt.Errorf("find concurrent runs and jobs: %w", err)
407+
}
408+
409+
return len(runs) > 0 || len(jobs) > 0, nil
410+
}
411+
412+
func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) {
413+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
414+
RepoID: repoID,
415+
ConcurrencyGroup: concurrencyGroup,
416+
Status: status,
417+
})
418+
if err != nil {
419+
return nil, nil, fmt.Errorf("find runs: %w", err)
420+
}
421+
422+
jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
423+
RepoID: repoID,
424+
ConcurrencyGroup: concurrencyGroup,
425+
Statuses: status,
408426
})
409427
if err != nil {
410-
return false, fmt.Errorf("find running and waiting runs: %w", err)
428+
return nil, nil, fmt.Errorf("find jobs: %w", err)
411429
}
412-
previousRuns := slices.DeleteFunc(concurrentRuns, func(r *ActionRun) bool { return r.ID == actionRun.ID })
413430

414-
return len(previousRuns) > 0, nil
431+
return runs, jobs, nil
415432
}
416433

417434
func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
418435
if actionRun.ConcurrencyGroup == "" {
419436
return nil, nil
420437
}
421438

422-
var cancelledJobs []*ActionRunJob
439+
var jobsToCancel []*ActionRunJob
423440

424441
statusFindOption := []Status{StatusWaiting, StatusBlocked}
425442
if actionRun.ConcurrencyCancel {
426443
statusFindOption = append(statusFindOption, StatusRunning)
427444
}
428-
// cancel previous runs in the same concurrency group
429-
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
430-
RepoID: actionRun.RepoID,
431-
ConcurrencyGroup: actionRun.ConcurrencyGroup,
432-
Status: statusFindOption,
433-
})
445+
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption)
434446
if err != nil {
435-
return cancelledJobs, fmt.Errorf("find runs: %w", err)
447+
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
436448
}
449+
jobsToCancel = append(jobsToCancel, jobs...)
450+
451+
// cancel runs in the same concurrency group
437452
for _, run := range runs {
438453
if run.ID == actionRun.ID {
439454
continue
@@ -442,14 +457,10 @@ func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRu
442457
RunID: run.ID,
443458
})
444459
if err != nil {
445-
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
460+
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
446461
}
447-
cjs, err := CancelJobs(ctx, jobs)
448-
if err != nil {
449-
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
450-
}
451-
cancelledJobs = append(cancelledJobs, cjs...)
462+
jobsToCancel = append(jobsToCancel, jobs...)
452463
}
453464

454-
return cancelledJobs, nil
465+
return CancelJobs(ctx, jobsToCancel)
455466
}

models/actions/run_job.go

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -219,61 +219,54 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool,
219219
return false, nil
220220
}
221221

222-
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
223-
RepoID: job.RepoID,
224-
ConcurrencyGroup: job.ConcurrencyGroup,
225-
Statuses: []Status{StatusRunning},
226-
})
222+
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, []Status{StatusRunning})
227223
if err != nil {
228-
return false, fmt.Errorf("count running and waiting jobs: %w", err)
229-
}
230-
if concurrentJobsNum > 0 {
231-
return true, nil
232-
}
233-
234-
if err := job.LoadRun(ctx); err != nil {
235-
return false, fmt.Errorf("load run: %w", err)
224+
return false, fmt.Errorf("find concurrent runs and jobs: %w", err)
236225
}
237226

238-
return ShouldBlockRunByConcurrency(ctx, job.Run)
227+
return len(runs) > 0 || len(jobs) > 0, nil
239228
}
240229

241230
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
242231
if job.RawConcurrencyGroup == "" {
243232
return nil, nil
244233
}
245234

246-
var cancelledJobs []*ActionRunJob
235+
var jobsToCancel []*ActionRunJob
247236

248237
if !job.IsConcurrencyEvaluated {
249-
return cancelledJobs, ErrUnevaluatedConcurrency{
238+
return nil, ErrUnevaluatedConcurrency{
250239
Group: job.RawConcurrencyGroup,
251240
CancelInProgress: job.RawConcurrencyCancel,
252241
}
253242
}
254-
if job.ConcurrencyGroup != "" {
255-
statusFindOption := []Status{StatusWaiting, StatusBlocked}
256-
if job.ConcurrencyCancel {
257-
statusFindOption = append(statusFindOption, StatusRunning)
258-
}
259-
// cancel previous jobs in the same concurrency group
260-
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
261-
RepoID: job.RepoID,
262-
ConcurrencyGroup: job.ConcurrencyGroup,
263-
Statuses: statusFindOption,
243+
if job.ConcurrencyGroup == "" {
244+
return nil, nil
245+
}
246+
247+
statusFindOption := []Status{StatusWaiting, StatusBlocked}
248+
if job.ConcurrencyCancel {
249+
statusFindOption = append(statusFindOption, StatusRunning)
250+
}
251+
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, statusFindOption)
252+
if err != nil {
253+
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
254+
}
255+
jobs = slices.DeleteFunc(jobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
256+
jobsToCancel = append(jobsToCancel, jobs...)
257+
258+
// cancel runs in the same concurrency group
259+
for _, run := range runs {
260+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
261+
RunID: run.ID,
264262
})
265263
if err != nil {
266-
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
267-
}
268-
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
269-
cjs, err := CancelJobs(ctx, previousJobs)
270-
if err != nil {
271-
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
264+
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
272265
}
273-
cancelledJobs = append(cancelledJobs, cjs...)
266+
jobsToCancel = append(jobsToCancel, jobs...)
274267
}
275268

276-
return cancelledJobs, nil
269+
return CancelJobs(ctx, jobsToCancel)
277270
}
278271

279272
type ErrUnevaluatedConcurrency struct {

services/actions/job_emitter.go

Lines changed: 77 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -62,95 +62,11 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
6262
jobs = append(jobs, js...)
6363
updatedJobs = append(updatedJobs, ujs...)
6464
}
65-
// check run (workflow-level) concurrency
66-
concurrentRunIDs := make(container.Set[int64])
67-
concurrentRunIDs.Add(run.ID)
68-
if run.ConcurrencyGroup != "" {
69-
concurrentRuns, err := db.Find[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
70-
RepoID: run.RepoID,
71-
ConcurrencyGroup: run.ConcurrencyGroup,
72-
Status: []actions_model.Status{actions_model.StatusBlocked},
73-
})
74-
if err != nil {
75-
return err
76-
}
77-
for _, concurrentRun := range concurrentRuns {
78-
if concurrentRunIDs.Contains(concurrentRun.ID) {
79-
continue
80-
}
81-
concurrentRunIDs.Add(concurrentRun.ID)
82-
if concurrentRun.NeedApproval {
83-
continue
84-
}
85-
if js, ujs, err := checkJobsOfRun(ctx, concurrentRun); err != nil {
86-
return err
87-
} else {
88-
jobs = append(jobs, js...)
89-
updatedJobs = append(updatedJobs, ujs...)
90-
}
91-
updatedRun, err := actions_model.GetRunByID(ctx, concurrentRun.ID)
92-
if err != nil {
93-
return err
94-
}
95-
if updatedRun.Status == actions_model.StatusWaiting {
96-
// only run one blocked action run in the same concurrency group
97-
break
98-
}
99-
}
100-
}
101-
102-
// check job concurrency
103-
runJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
104-
if err != nil {
65+
if js, ujs, err := checkRunConcurrency(ctx, run); err != nil {
10566
return err
106-
}
107-
for _, job := range runJobs {
108-
if job.Status.IsDone() && job.ConcurrencyGroup != "" {
109-
waitingConcurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
110-
RepoID: job.RepoID,
111-
ConcurrencyGroup: job.ConcurrencyGroup,
112-
Statuses: []actions_model.Status{actions_model.StatusWaiting},
113-
})
114-
if err != nil {
115-
return err
116-
}
117-
if len(waitingConcurrentJobs) == 0 {
118-
blockedConcurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
119-
RepoID: job.RepoID,
120-
ConcurrencyGroup: job.ConcurrencyGroup,
121-
Statuses: []actions_model.Status{actions_model.StatusBlocked},
122-
})
123-
if err != nil {
124-
return err
125-
}
126-
for _, concurrentJob := range blockedConcurrentJobs {
127-
if concurrentRunIDs.Contains(concurrentJob.RunID) {
128-
continue
129-
}
130-
concurrentRunIDs.Add(concurrentJob.RunID)
131-
concurrentRun, err := actions_model.GetRunByID(ctx, concurrentJob.RunID)
132-
if err != nil {
133-
return err
134-
}
135-
if concurrentRun.NeedApproval {
136-
continue
137-
}
138-
if js, ujs, err := checkJobsOfRun(ctx, concurrentRun); err != nil {
139-
return err
140-
} else {
141-
jobs = append(jobs, js...)
142-
updatedJobs = append(updatedJobs, ujs...)
143-
}
144-
updatedJob, err := actions_model.GetRunJobByID(ctx, concurrentJob.ID)
145-
if err != nil {
146-
return err
147-
}
148-
if updatedJob.Status == actions_model.StatusWaiting {
149-
break
150-
}
151-
}
152-
}
153-
}
67+
} else {
68+
jobs = append(jobs, js...)
69+
updatedJobs = append(updatedJobs, ujs...)
15470
}
15571
return nil
15672
}); err != nil {
@@ -176,6 +92,79 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
17692
return nil
17793
}
17894

95+
func findBlockedRunByConcurrency(ctx context.Context, repoID int64, concurrencyGroup string) (*actions_model.ActionRun, bool, error) {
96+
if concurrencyGroup == "" {
97+
return nil, false, nil
98+
}
99+
cRuns, cJobs, err := actions_model.GetConcurrentRunsAndJobs(ctx, repoID, concurrencyGroup, []actions_model.Status{actions_model.StatusBlocked})
100+
if err != nil {
101+
return nil, false, fmt.Errorf("find concurrent runs and jobs: %w", err)
102+
}
103+
104+
// There can be at most one blocked run or job
105+
var concurrentRun *actions_model.ActionRun
106+
if len(cRuns) > 0 {
107+
concurrentRun = cRuns[0]
108+
} else if len(cJobs) > 0 {
109+
jobRun, err := actions_model.GetRunByID(ctx, cJobs[0].RunID)
110+
if err != nil {
111+
return nil, false, fmt.Errorf("get run by job %d: %w", cJobs[0].ID, err)
112+
}
113+
concurrentRun = jobRun
114+
}
115+
116+
return concurrentRun, concurrentRun != nil, nil
117+
}
118+
119+
func checkRunConcurrency(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) {
120+
checkedConcurrencyGroup := make(container.Set[string])
121+
122+
// check run (workflow-level) concurrency
123+
if run.ConcurrencyGroup != "" {
124+
concurrentRun, found, err := findBlockedRunByConcurrency(ctx, run.RepoID, run.ConcurrencyGroup)
125+
if err != nil {
126+
return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err)
127+
}
128+
if found && !concurrentRun.NeedApproval {
129+
if js, ujs, err := checkJobsOfRun(ctx, concurrentRun); err != nil {
130+
return nil, nil, err
131+
} else {
132+
jobs = append(jobs, js...)
133+
updatedJobs = append(updatedJobs, ujs...)
134+
}
135+
}
136+
checkedConcurrencyGroup.Add(run.ConcurrencyGroup)
137+
}
138+
139+
// check job concurrency
140+
runJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
141+
if err != nil {
142+
return nil, nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
143+
}
144+
for _, job := range runJobs {
145+
if !job.Status.IsDone() {
146+
continue
147+
}
148+
if job.ConcurrencyGroup == "" && checkedConcurrencyGroup.Contains(job.ConcurrencyGroup) {
149+
continue
150+
}
151+
concurrentRun, found, err := findBlockedRunByConcurrency(ctx, job.RepoID, job.ConcurrencyGroup)
152+
if err != nil {
153+
return nil, nil, fmt.Errorf("find blocked run by concurrency: %w", err)
154+
}
155+
if found && !concurrentRun.NeedApproval {
156+
if js, ujs, err := checkJobsOfRun(ctx, concurrentRun); err != nil {
157+
return nil, nil, err
158+
} else {
159+
jobs = append(jobs, js...)
160+
updatedJobs = append(updatedJobs, ujs...)
161+
}
162+
}
163+
checkedConcurrencyGroup.Add(job.ConcurrencyGroup)
164+
}
165+
return
166+
}
167+
179168
func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) (jobs, updatedJobs []*actions_model.ActionRunJob, err error) {
180169
jobs, err = db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
181170
if err != nil {

0 commit comments

Comments
 (0)