Skip to content

Commit 39718b2

Browse files
authored
Merge branch 'main' into feature/webhook-payload-optimization
Signed-off-by: Kerwin Bryant <kerwin612@qq.com>
2 parents 5472d66 + 54fe47f commit 39718b2

File tree

115 files changed

+2498
-5625
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+2498
-5625
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ require (
131131
mvdan.cc/xurls/v2 v2.6.0
132132
strk.kbt.io/projects/go/libravatar v0.0.0-20191008002943-06d1c002b251
133133
xorm.io/builder v0.3.13
134-
xorm.io/xorm v1.3.9
134+
xorm.io/xorm v1.3.10
135135
)
136136

137137
require (

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,5 +955,5 @@ strk.kbt.io/projects/go/libravatar v0.0.0-20191008002943-06d1c002b251 h1:mUcz5b3
955955
strk.kbt.io/projects/go/libravatar v0.0.0-20191008002943-06d1c002b251/go.mod h1:FJGmPh3vz9jSos1L/F91iAgnC/aejc0wIIrF2ZwJxdY=
956956
xorm.io/builder v0.3.13 h1:a3jmiVVL19psGeXx8GIurTp7p0IIgqeDmwhcR6BAOAo=
957957
xorm.io/builder v0.3.13/go.mod h1:aUW0S9eb9VCaPohFCH3j7czOx1PMW3i1HrSzbLYGBSE=
958-
xorm.io/xorm v1.3.9 h1:TUovzS0ko+IQ1XnNLfs5dqK1cJl1H5uHpWbWqAQ04nU=
959-
xorm.io/xorm v1.3.9/go.mod h1:LsCCffeeYp63ssk0pKumP6l96WZcHix7ChpurcLNuMw=
958+
xorm.io/xorm v1.3.10 h1:yR83hTT4mKIPyA/lvWFTzS35xjLwkiYnwdw0Qupeh0o=
959+
xorm.io/xorm v1.3.10/go.mod h1:Lo7hmsFF0F0GbDE7ubX5ZKa+eCf0eCuiJAUG3oI5cxQ=

models/actions/run.go

Lines changed: 55 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -282,77 +282,72 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
282282
// InsertRun inserts a run
283283
// The title will be cut off at 255 characters if it's longer than 255 characters.
284284
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
285-
ctx, committer, err := db.TxContext(ctx)
286-
if err != nil {
287-
return err
288-
}
289-
defer committer.Close()
290-
291-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
292-
if err != nil {
293-
return err
294-
}
295-
run.Index = index
296-
run.Title = util.EllipsisDisplayString(run.Title, 255)
297-
298-
if err := db.Insert(ctx, run); err != nil {
299-
return err
300-
}
301-
302-
if run.Repo == nil {
303-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
285+
return db.WithTx(ctx, func(ctx context.Context) error {
286+
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
304287
if err != nil {
305288
return err
306289
}
307-
run.Repo = repo
308-
}
290+
run.Index = index
291+
run.Title = util.EllipsisDisplayString(run.Title, 255)
309292

310-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
311-
return err
312-
}
313-
314-
runJobs := make([]*ActionRunJob, 0, len(jobs))
315-
var hasWaiting bool
316-
for _, v := range jobs {
317-
id, job := v.Job()
318-
needs := job.Needs()
319-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
293+
if err := db.Insert(ctx, run); err != nil {
320294
return err
321295
}
322-
payload, _ := v.Marshal()
323-
status := StatusWaiting
324-
if len(needs) > 0 || run.NeedApproval {
325-
status = StatusBlocked
326-
} else {
327-
hasWaiting = true
296+
297+
if run.Repo == nil {
298+
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
299+
if err != nil {
300+
return err
301+
}
302+
run.Repo = repo
328303
}
329-
job.Name = util.EllipsisDisplayString(job.Name, 255)
330-
runJobs = append(runJobs, &ActionRunJob{
331-
RunID: run.ID,
332-
RepoID: run.RepoID,
333-
OwnerID: run.OwnerID,
334-
CommitSHA: run.CommitSHA,
335-
IsForkPullRequest: run.IsForkPullRequest,
336-
Name: job.Name,
337-
WorkflowPayload: payload,
338-
JobID: id,
339-
Needs: needs,
340-
RunsOn: job.RunsOn(),
341-
Status: status,
342-
})
343-
}
344-
if err := db.Insert(ctx, runJobs); err != nil {
345-
return err
346-
}
347304

348-
// if there is a job in the waiting status, increase tasks version.
349-
if hasWaiting {
350-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
305+
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
351306
return err
352307
}
353-
}
354308

355-
return committer.Commit()
309+
runJobs := make([]*ActionRunJob, 0, len(jobs))
310+
var hasWaiting bool
311+
for _, v := range jobs {
312+
id, job := v.Job()
313+
needs := job.Needs()
314+
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
315+
return err
316+
}
317+
payload, _ := v.Marshal()
318+
status := StatusWaiting
319+
if len(needs) > 0 || run.NeedApproval {
320+
status = StatusBlocked
321+
} else {
322+
hasWaiting = true
323+
}
324+
job.Name = util.EllipsisDisplayString(job.Name, 255)
325+
runJobs = append(runJobs, &ActionRunJob{
326+
RunID: run.ID,
327+
RepoID: run.RepoID,
328+
OwnerID: run.OwnerID,
329+
CommitSHA: run.CommitSHA,
330+
IsForkPullRequest: run.IsForkPullRequest,
331+
Name: job.Name,
332+
WorkflowPayload: payload,
333+
JobID: id,
334+
Needs: needs,
335+
RunsOn: job.RunsOn(),
336+
Status: status,
337+
})
338+
}
339+
if err := db.Insert(ctx, runJobs); err != nil {
340+
return err
341+
}
342+
343+
// if there is a job in the waiting status, increase tasks version.
344+
if hasWaiting {
345+
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
346+
return err
347+
}
348+
}
349+
return nil
350+
})
356351
}
357352

358353
func GetRunByRepoAndID(ctx context.Context, repoID, runID int64) (*ActionRun, error) {

models/actions/schedule.go

Lines changed: 39 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -56,65 +56,54 @@ func CreateScheduleTask(ctx context.Context, rows []*ActionSchedule) error {
5656
return nil
5757
}
5858

59-
// Begin transaction
60-
ctx, committer, err := db.TxContext(ctx)
61-
if err != nil {
62-
return err
63-
}
64-
defer committer.Close()
65-
66-
// Loop through each schedule row
67-
for _, row := range rows {
68-
row.Title = util.EllipsisDisplayString(row.Title, 255)
69-
// Create new schedule row
70-
if err = db.Insert(ctx, row); err != nil {
71-
return err
72-
}
73-
74-
// Loop through each schedule spec and create a new spec row
75-
now := time.Now()
76-
77-
for _, spec := range row.Specs {
78-
specRow := &ActionScheduleSpec{
79-
RepoID: row.RepoID,
80-
ScheduleID: row.ID,
81-
Spec: spec,
82-
}
83-
// Parse the spec and check for errors
84-
schedule, err := specRow.Parse()
85-
if err != nil {
86-
continue // skip to the next spec if there's an error
59+
return db.WithTx(ctx, func(ctx context.Context) error {
60+
// Loop through each schedule row
61+
for _, row := range rows {
62+
row.Title = util.EllipsisDisplayString(row.Title, 255)
63+
// Create new schedule row
64+
if err := db.Insert(ctx, row); err != nil {
65+
return err
8766
}
8867

89-
specRow.Next = timeutil.TimeStamp(schedule.Next(now).Unix())
90-
91-
// Insert the new schedule spec row
92-
if err = db.Insert(ctx, specRow); err != nil {
93-
return err
68+
// Loop through each schedule spec and create a new spec row
69+
now := time.Now()
70+
71+
for _, spec := range row.Specs {
72+
specRow := &ActionScheduleSpec{
73+
RepoID: row.RepoID,
74+
ScheduleID: row.ID,
75+
Spec: spec,
76+
}
77+
// Parse the spec and check for errors
78+
schedule, err := specRow.Parse()
79+
if err != nil {
80+
continue // skip to the next spec if there's an error
81+
}
82+
83+
specRow.Next = timeutil.TimeStamp(schedule.Next(now).Unix())
84+
85+
// Insert the new schedule spec row
86+
if err = db.Insert(ctx, specRow); err != nil {
87+
return err
88+
}
9489
}
9590
}
96-
}
97-
98-
// Commit transaction
99-
return committer.Commit()
91+
return nil
92+
})
10093
}
10194

10295
func DeleteScheduleTaskByRepo(ctx context.Context, id int64) error {
103-
ctx, committer, err := db.TxContext(ctx)
104-
if err != nil {
105-
return err
106-
}
107-
defer committer.Close()
108-
109-
if _, err := db.GetEngine(ctx).Delete(&ActionSchedule{RepoID: id}); err != nil {
110-
return err
111-
}
96+
return db.WithTx(ctx, func(ctx context.Context) error {
97+
if _, err := db.GetEngine(ctx).Delete(&ActionSchedule{RepoID: id}); err != nil {
98+
return err
99+
}
112100

113-
if _, err := db.GetEngine(ctx).Delete(&ActionScheduleSpec{RepoID: id}); err != nil {
114-
return err
115-
}
101+
if _, err := db.GetEngine(ctx).Delete(&ActionScheduleSpec{RepoID: id}); err != nil {
102+
return err
103+
}
116104

117-
return committer.Commit()
105+
return nil
106+
})
118107
}
119108

120109
func CleanRepoScheduleTasks(ctx context.Context, repo *repo_model.Repository) ([]*ActionRunJob, error) {

models/actions/task.go

Lines changed: 54 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -352,78 +352,70 @@ func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.Task
352352
stepStates[v.Id] = v
353353
}
354354

355-
ctx, committer, err := db.TxContext(ctx)
356-
if err != nil {
357-
return nil, err
358-
}
359-
defer committer.Close()
360-
361-
e := db.GetEngine(ctx)
362-
363-
task := &ActionTask{}
364-
if has, err := e.ID(state.Id).Get(task); err != nil {
365-
return nil, err
366-
} else if !has {
367-
return nil, util.ErrNotExist
368-
} else if runnerID != task.RunnerID {
369-
return nil, errors.New("invalid runner for task")
370-
}
371-
372-
if task.Status.IsDone() {
373-
// the state is final, do nothing
374-
return task, nil
375-
}
355+
return db.WithTx2(ctx, func(ctx context.Context) (*ActionTask, error) {
356+
e := db.GetEngine(ctx)
376357

377-
// state.Result is not unspecified means the task is finished
378-
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
379-
task.Status = Status(state.Result)
380-
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
381-
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
358+
task := &ActionTask{}
359+
if has, err := e.ID(state.Id).Get(task); err != nil {
382360
return nil, err
361+
} else if !has {
362+
return nil, util.ErrNotExist
363+
} else if runnerID != task.RunnerID {
364+
return nil, errors.New("invalid runner for task")
383365
}
384-
if _, err := UpdateRunJob(ctx, &ActionRunJob{
385-
ID: task.JobID,
386-
Status: task.Status,
387-
Stopped: task.Stopped,
388-
}, nil); err != nil {
389-
return nil, err
390-
}
391-
} else {
392-
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
393-
task.Updated = timeutil.TimeStampNow()
394-
if err := UpdateTask(ctx, task, "updated"); err != nil {
395-
return nil, err
396-
}
397-
}
398-
399-
if err := task.LoadAttributes(ctx); err != nil {
400-
return nil, err
401-
}
402366

403-
for _, step := range task.Steps {
404-
var result runnerv1.Result
405-
if v, ok := stepStates[step.Index]; ok {
406-
result = v.Result
407-
step.LogIndex = v.LogIndex
408-
step.LogLength = v.LogLength
409-
step.Started = convertTimestamp(v.StartedAt)
410-
step.Stopped = convertTimestamp(v.StoppedAt)
367+
if task.Status.IsDone() {
368+
// the state is final, do nothing
369+
return task, nil
411370
}
412-
if result != runnerv1.Result_RESULT_UNSPECIFIED {
413-
step.Status = Status(result)
414-
} else if step.Started != 0 {
415-
step.Status = StatusRunning
371+
372+
// state.Result is not unspecified means the task is finished
373+
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
374+
task.Status = Status(state.Result)
375+
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
376+
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
377+
return nil, err
378+
}
379+
if _, err := UpdateRunJob(ctx, &ActionRunJob{
380+
ID: task.JobID,
381+
Status: task.Status,
382+
Stopped: task.Stopped,
383+
}, nil); err != nil {
384+
return nil, err
385+
}
386+
} else {
387+
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
388+
task.Updated = timeutil.TimeStampNow()
389+
if err := UpdateTask(ctx, task, "updated"); err != nil {
390+
return nil, err
391+
}
416392
}
417-
if _, err := e.ID(step.ID).Update(step); err != nil {
393+
394+
if err := task.LoadAttributes(ctx); err != nil {
418395
return nil, err
419396
}
420-
}
421397

422-
if err := committer.Commit(); err != nil {
423-
return nil, err
424-
}
398+
for _, step := range task.Steps {
399+
var result runnerv1.Result
400+
if v, ok := stepStates[step.Index]; ok {
401+
result = v.Result
402+
step.LogIndex = v.LogIndex
403+
step.LogLength = v.LogLength
404+
step.Started = convertTimestamp(v.StartedAt)
405+
step.Stopped = convertTimestamp(v.StoppedAt)
406+
}
407+
if result != runnerv1.Result_RESULT_UNSPECIFIED {
408+
step.Status = Status(result)
409+
} else if step.Started != 0 {
410+
step.Status = StatusRunning
411+
}
412+
if _, err := e.ID(step.ID).Update(step); err != nil {
413+
return nil, err
414+
}
415+
}
425416

426-
return task, nil
417+
return task, nil
418+
})
427419
}
428420

429421
func StopTask(ctx context.Context, taskID int64, status Status) error {

0 commit comments

Comments
 (0)