@@ -10,31 +10,40 @@ import (
1010 "fmt"
1111 "io/ioutil"
1212 "os"
13+ "strconv"
1314 "strings"
1415
1516 "code.gitea.io/gitea/models"
1617 "code.gitea.io/gitea/modules/git"
1718 "code.gitea.io/gitea/modules/graceful"
1819 "code.gitea.io/gitea/modules/log"
1920 "code.gitea.io/gitea/modules/notification"
20- "code.gitea.io/gitea/modules/setting"
21- "code.gitea.io/gitea/modules/sync"
21+ "code.gitea.io/gitea/modules/queue"
2222 "code.gitea.io/gitea/modules/timeutil"
2323
2424 "github.com/unknwon/com"
2525)
2626
27- // pullRequestQueue represents a queue to handle update pull request tests
28- var pullRequestQueue = sync . NewUniqueQueue ( setting . Repository . PullRequestQueueLength )
27+ // prQueue represents a queue to handle update pull request tests
28+ var prQueue queue. UniqueQueue
2929
3030// AddToTaskQueue adds itself to pull request test task queue.
3131func AddToTaskQueue (pr * models.PullRequest ) {
32- go pullRequestQueue .AddFunc (pr .ID , func () {
33- pr .Status = models .PullRequestStatusChecking
34- if err := pr .UpdateCols ("status" ); err != nil {
35- log .Error ("AddToTaskQueue.UpdateCols[%d].(add to queue): %v" , pr .ID , err )
32+ go func () {
33+ err := prQueue .PushFunc (strconv .FormatInt (pr .ID , 10 ), func () error {
34+ pr .Status = models .PullRequestStatusChecking
35+ err := pr .UpdateCols ("status" )
36+ if err != nil {
37+ log .Error ("AddToTaskQueue.UpdateCols[%d].(add to queue): %v" , pr .ID , err )
38+ } else {
39+ log .Trace ("Adding PR ID: %d to the test pull requests queue" , pr .ID )
40+ }
41+ return err
42+ })
43+ if err != nil && err != queue .ErrAlreadyInQueue {
44+ log .Error ("Error adding prID %d to the test pull requests queue: %v" , pr .ID , err )
3645 }
37- })
46+ }( )
3847}
3948
4049// checkAndUpdateStatus checks if pull request is possible to leaving checking status,
@@ -46,7 +55,12 @@ func checkAndUpdateStatus(pr *models.PullRequest) {
4655 }
4756
4857 // Make sure there is no waiting test to process before leaving the checking status.
49- if ! pullRequestQueue .Exist (pr .ID ) {
58+ has , err := prQueue .Has (strconv .FormatInt (pr .ID , 10 ))
59+ if err != nil {
60+ log .Error ("Unable to check if the queue is waiting to reprocess pr.ID %d. Error: %v" , pr .ID , err )
61+ }
62+
63+ if ! has {
5064 if err := pr .UpdateCols ("status, conflicted_files" ); err != nil {
5165 log .Error ("Update[%d]: %v" , pr .ID , err )
5266 }
@@ -155,61 +169,65 @@ func manuallyMerged(pr *models.PullRequest) bool {
155169 return false
156170}
157171
158- // TestPullRequests checks and tests untested patches of pull requests.
159- // TODO: test more pull requests at same time.
160- func TestPullRequests (ctx context.Context ) {
161-
162- go func () {
163- prs , err := models .GetPullRequestIDsByCheckStatus (models .PullRequestStatusChecking )
164- if err != nil {
165- log .Error ("Find Checking PRs: %v" , err )
172+ // InitializePullRequests checks and tests untested patches of pull requests.
173+ func InitializePullRequests (ctx context.Context ) {
174+ prs , err := models .GetPullRequestIDsByCheckStatus (models .PullRequestStatusChecking )
175+ if err != nil {
176+ log .Error ("Find Checking PRs: %v" , err )
177+ return
178+ }
179+ for _ , prID := range prs {
180+ select {
181+ case <- ctx .Done ():
166182 return
167- }
168- for _ , prID := range prs {
169- select {
170- case <- ctx .Done ():
171- return
172- default :
173- pullRequestQueue .Add (prID )
183+ default :
184+ if err := prQueue .PushFunc (strconv .FormatInt (prID , 10 ), func () error {
185+ log .Trace ("Adding PR ID: %d to the test pull requests queue" , prID )
186+ return nil
187+ }); err != nil {
188+ log .Error ("Error adding prID: %s to the test pull requests queue %v" , prID , err )
174189 }
175190 }
176- }()
191+ }
192+ }
177193
178- // Start listening on new test requests.
179- for {
180- select {
181- case prID := <- pullRequestQueue .Queue ():
182- log .Trace ("TestPullRequests[%v]: processing test task" , prID )
183- pullRequestQueue .Remove (prID )
194+ // handle passed PR IDs and test the PRs
195+ func handle (data ... queue.Data ) {
196+ for _ , datum := range data {
197+ prID := datum .(string )
198+ id := com .StrTo (prID ).MustInt64 ()
184199
185- id := com . StrTo ( prID ). MustInt64 ( )
200+ log . Trace ( "Testing PR ID %d from the test pull requests queue" , id )
186201
187- pr , err := models .GetPullRequestByID (id )
188- if err != nil {
189- log .Error ("GetPullRequestByID[%s]: %v" , prID , err )
190- continue
191- } else if pr .Status != models .PullRequestStatusChecking {
192- continue
193- } else if manuallyMerged (pr ) {
194- continue
195- } else if err = TestPatch (pr ); err != nil {
196- log .Error ("testPatch[%d]: %v" , pr .ID , err )
197- pr .Status = models .PullRequestStatusError
198- if err := pr .UpdateCols ("status" ); err != nil {
199- log .Error ("update pr [%d] status to PullRequestStatusError failed: %v" , pr .ID , err )
200- }
201- continue
202+ pr , err := models .GetPullRequestByID (id )
203+ if err != nil {
204+ log .Error ("GetPullRequestByID[%s]: %v" , prID , err )
205+ continue
206+ } else if pr .Status != models .PullRequestStatusChecking {
207+ continue
208+ } else if manuallyMerged (pr ) {
209+ continue
210+ } else if err = TestPatch (pr ); err != nil {
211+ log .Error ("testPatch[%d]: %v" , pr .ID , err )
212+ pr .Status = models .PullRequestStatusError
213+ if err := pr .UpdateCols ("status" ); err != nil {
214+ log .Error ("update pr [%d] status to PullRequestStatusError failed: %v" , pr .ID , err )
202215 }
203- checkAndUpdateStatus (pr )
204- case <- ctx .Done ():
205- pullRequestQueue .Close ()
206- log .Info ("PID: %d Pull Request testing shutdown" , os .Getpid ())
207- return
216+ continue
208217 }
218+ checkAndUpdateStatus (pr )
209219 }
210220}
211221
212222// Init runs the task queue to test all the checking status pull requests
213- func Init () {
214- go graceful .GetManager ().RunWithShutdownContext (TestPullRequests )
223+ func Init () error {
224+ prQueue = queue .CreateUniqueQueue ("test_pull_requests" , handle , "" ).(queue.UniqueQueue )
225+
226+ if prQueue == nil {
227+ return fmt .Errorf ("Unable to create test_pull_requests Queue" )
228+ }
229+
230+ go graceful .GetManager ().RunWithShutdownFns (prQueue .Run )
231+ go graceful .GetManager ().RunWithShutdownContext (InitializePullRequests )
232+ return nil
215233}
0 commit comments