Skip to content

Support Actions concurrency syntax #32751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d6a95ce
support concurrency
Zettat123 Dec 7, 2024
f713c68
Merge branch 'main' of https://github.com/go-gitea/gitea into pr/Zett…
ChristopherHX Jul 8, 2025
76f58ef
fix: lint
ChristopherHX Jul 8, 2025
249e315
fix: assert correct status code
ChristopherHX Jul 8, 2025
b012126
fix: rerun all jobs did not respect concurrency
ChristopherHX Jul 9, 2025
91fccb7
Fix and Test rerun single jobs
ChristopherHX Jul 9, 2025
2e6032a
Remove CancelJobsByJobConcurrency in PickTask
ChristopherHX Jul 13, 2025
1a08b87
Merge branch 'main' of https://github.com/go-gitea/gitea into pr/Zett…
ChristopherHX Jul 13, 2025
9bba25e
remove code
ChristopherHX Jul 15, 2025
e6d25e2
Merge branch 'main' of https://github.com/go-gitea/gitea into pr/Zett…
ChristopherHX Jul 15, 2025
dc86086
call EmitJobsIfReady when cancelling a run
Zettat123 Jul 19, 2025
8721155
add TestCancelConcurrentRun
Zettat123 Jul 19, 2025
c5444e7
improve concurrency index
Zettat123 Jul 21, 2025
63eaf15
modify cancel-in-progress behavior
Zettat123 Jul 22, 2025
96d6938
fix bugs and tests
Zettat123 Jul 22, 2025
ea2bc92
Merge branch 'main' into support-actions-concurrency
Zettat123 Jul 22, 2025
a4b76b6
update InsertRun
Zettat123 Jul 22, 2025
ca18672
modify cancel-in-progress behavior
Zettat123 Jul 23, 2025
dc003e4
fix lint
Zettat123 Jul 23, 2025
e45ca56
improve handling cancellation
Zettat123 Jul 25, 2025
5e2059a
rename migration
Zettat123 Jul 25, 2025
4ba788d
Merge branch 'main' into support-actions-concurrency
Zettat123 Jul 25, 2025
2481e83
fix migration
Zettat123 Jul 25, 2025
f585f90
remove blockRunByConcurrency todo for jobs
ChristopherHX Jul 25, 2025
5e43aa2
fix migration to use same index as model
ChristopherHX Jul 25, 2025
d5f6c44
implement evaluate concurrency expression again, vars may change afte…
ChristopherHX Jul 25, 2025
796d2c1
Use a single RawConcurrency in db
ChristopherHX Jul 25, 2025
6a9eb9e
fix fmt
ChristopherHX Jul 25, 2025
42c2ca2
add TestAbandonConcurrentRun
Zettat123 Jul 25, 2025
5a45b76
update to act dev version add new tests
ChristopherHX Jul 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ require (

replace github.com/hashicorp/go-version => github.com/6543/go-version v1.3.1

replace github.com/nektos/act => gitea.com/gitea/act v0.261.6
replace github.com/nektos/act => gitea.com/gitea/act v0.261.7-0.20250729091447-39509e9ad05c

// TODO: the only difference is in `PutObject`: the fork doesn't use `NewVerifyingReader(r, sha256.New(), oid, expectedSize)`, need to figure out why
replace github.com/charmbracelet/git-lfs-transfer => gitea.com/gitea/git-lfs-transfer v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
gitea.com/gitea/act v0.261.6 h1:CjZwKOyejonNFDmsXOw3wGm5Vet573hHM6VMLsxtvPY=
gitea.com/gitea/act v0.261.6/go.mod h1:Pg5C9kQY1CEA3QjthjhlrqOC/QOT5NyWNjOjRHw23Ok=
gitea.com/gitea/act v0.261.7-0.20250729091447-39509e9ad05c h1:RnH1R929Bab0g86vou4qR3EtY4llbSn34OYGaU0MaD4=
gitea.com/gitea/act v0.261.7-0.20250729091447-39509e9ad05c/go.mod h1:Pg5C9kQY1CEA3QjthjhlrqOC/QOT5NyWNjOjRHw23Ok=
gitea.com/gitea/git-lfs-transfer v0.2.0 h1:baHaNoBSRaeq/xKayEXwiDQtlIjps4Ac/Ll4KqLMB40=
gitea.com/gitea/git-lfs-transfer v0.2.0/go.mod h1:UrXUCm3xLQkq15fu7qlXHUMlrhdlXHoi13KH2Dfiits=
gitea.com/gitea/go-xsd-duration v0.0.0-20220703122237-02e73435a078 h1:BAFmdZpRW7zMQZQDClaCWobRj9uL1MR3MzpCVJvc5s4=
Expand Down
220 changes: 124 additions & 96 deletions models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import (
"code.gitea.io/gitea/modules/util"
webhook_module "code.gitea.io/gitea/modules/webhook"

"github.com/nektos/act/pkg/jobparser"
"xorm.io/builder"
)

// ActionRun represents a run of a workflow file
type ActionRun struct {
ID int64
Title string
RepoID int64 `xorm:"index unique(repo_index)"`
RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"`
Repo *repo_model.Repository `xorm:"-"`
OwnerID int64 `xorm:"index"`
WorkflowID string `xorm:"index"` // the name of workflow file
Expand All @@ -49,6 +48,9 @@ type ActionRun struct {
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
Status Status `xorm:"index"`
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
RawConcurrency string // raw concurrency
ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
ConcurrencyCancel bool
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
Started timeutil.TimeStamp
Stopped timeutil.TimeStamp
Expand Down Expand Up @@ -181,7 +183,7 @@ func (run *ActionRun) IsSchedule() bool {
return run.ScheduleID > 0
}

func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
_, err := db.GetEngine(ctx).ID(repo.ID).
NoAutoTime().
SetExpr("num_action_runs",
Expand Down Expand Up @@ -238,116 +240,73 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
return cancelledJobs, err
}

// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}

// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return cancelledJobs, err
}

// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return cancelledJobs, errors.New("job has changed, try again")
}

cancelledJobs = append(cancelledJobs, job)
// Continue with the next job.
continue
}

// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return cancelledJobs, err
}
cancelledJobs = append(cancelledJobs, job)
cjs, err := CancelJobs(ctx, jobs)
if err != nil {
return cancelledJobs, err
}
cancelledJobs = append(cancelledJobs, cjs...)
}

// Return nil to indicate successful cancellation of all running and waiting jobs.
return cancelledJobs, nil
}

// InsertRun inserts a run
// The title will be cut off at 255 characters if it's longer than 255 characters.
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
return db.WithTx(ctx, func(ctx context.Context) error {
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
if err != nil {
return err
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, error) {
cancelledJobs := make([]*ActionRunJob, 0, len(jobs))
// Iterate over each job and attempt to cancel it.
for _, job := range jobs {
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
status := job.Status
if status.IsDone() {
continue
}
run.Index = index
run.Title = util.EllipsisDisplayString(run.Title, 255)

if err := db.Insert(ctx, run); err != nil {
return err
}
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
if job.TaskID == 0 {
job.Status = StatusCancelled
job.Stopped = timeutil.TimeStampNow()

if run.Repo == nil {
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
return cancelledJobs, err
}
run.Repo = repo
}

if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
if n == 0 {
return cancelledJobs, errors.New("job has changed, try again")
}

cancelledJobs = append(cancelledJobs, job)
// Continue with the next job.
continue
}

runJobs := make([]*ActionRunJob, 0, len(jobs))
var hasWaiting bool
for _, v := range jobs {
id, job := v.Job()
needs := job.Needs()
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
return err
}
payload, _ := v.Marshal()
status := StatusWaiting
if len(needs) > 0 || run.NeedApproval {
status = StatusBlocked
} else {
hasWaiting = true
}
job.Name = util.EllipsisDisplayString(job.Name, 255)
runJobs = append(runJobs, &ActionRunJob{
RunID: run.ID,
RepoID: run.RepoID,
OwnerID: run.OwnerID,
CommitSHA: run.CommitSHA,
IsForkPullRequest: run.IsForkPullRequest,
Name: job.Name,
WorkflowPayload: payload,
JobID: id,
Needs: needs,
RunsOn: job.RunsOn(),
Status: status,
})
// If the job has an associated task, try to stop the task, effectively cancelling the job.
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
return cancelledJobs, err
}
if err := db.Insert(ctx, runJobs); err != nil {
return err
updatedJob, err := GetRunJobByID(ctx, job.ID)
if err != nil {
return cancelledJobs, fmt.Errorf("get job: %w", err)
}
cancelledJobs = append(cancelledJobs, updatedJob)
}

// if there is a job in the waiting status, increase tasks version.
if hasWaiting {
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
return err
}
}
return nil
})
// Return nil to indicate successful cancellation of all running and waiting jobs.
return cancelledJobs, nil
}

func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
var run ActionRun
has, err := db.GetEngine(ctx).Where("id=?", id).Get(&run)
if err != nil {
return nil, err
} else if !has {
return nil, fmt.Errorf("run with id %d: %w", id, util.ErrNotExist)
}

return &run, nil
}

func GetRunByRepoAndID(ctx context.Context, repoID, runID int64) (*ActionRun, error) {
Expand Down Expand Up @@ -432,7 +391,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
if err = run.LoadRepo(ctx); err != nil {
return err
}
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
return err
}
}
Expand All @@ -441,3 +400,72 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
}

type ActionRunIndex db.ResourceIndex

func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) {
if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel {
return false, nil
}

runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, []Status{StatusRunning})
if err != nil {
return false, fmt.Errorf("find concurrent runs and jobs: %w", err)
}

return len(runs) > 0 || len(jobs) > 0, nil
}

func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) {
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
RepoID: repoID,
ConcurrencyGroup: concurrencyGroup,
Status: status,
})
if err != nil {
return nil, nil, fmt.Errorf("find runs: %w", err)
}

jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
RepoID: repoID,
ConcurrencyGroup: concurrencyGroup,
Statuses: status,
})
if err != nil {
return nil, nil, fmt.Errorf("find jobs: %w", err)
}

return runs, jobs, nil
}

func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
if actionRun.ConcurrencyGroup == "" {
return nil, nil
}

var jobsToCancel []*ActionRunJob

statusFindOption := []Status{StatusWaiting, StatusBlocked}
if actionRun.ConcurrencyCancel {
statusFindOption = append(statusFindOption, StatusRunning)
}
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption)
if err != nil {
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
}
jobsToCancel = append(jobsToCancel, jobs...)

// cancel runs in the same concurrency group
for _, run := range runs {
if run.ID == actionRun.ID {
continue
}
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
RunID: run.ID,
})
if err != nil {
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
}
jobsToCancel = append(jobsToCancel, jobs...)
}

return CancelJobs(ctx, jobsToCancel)
}
Loading