Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queues: Move Notification to use a Queue #9902

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions integrations/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func initIntegrationTest() {
defer db.Close()
}
routers.GlobalInit(graceful.GetManager().HammerContext())
NotifierListenerInit()
}

func prepareTestEnv(t testing.TB, skip ...int) func() {
Expand Down
7 changes: 7 additions & 0 deletions integrations/mssql.ini.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,10 @@ LEVEL = Debug
INSTALL_LOCK = true
SECRET_KEY = 9pCviYTWSb
INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTU1NTE2MTh9.hhSVGOANkaKk3vfCd2jDOIww4pUk0xtg9JRde5UogyQ

[queue]
TYPE=channel

[queue.test-notifier]
BATCH_LENGTH=1
LENGTH=20
7 changes: 7 additions & 0 deletions integrations/mysql.ini.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,10 @@ LEVEL = Debug
INSTALL_LOCK = true
SECRET_KEY = 9pCviYTWSb
INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTU1NTE2MTh9.hhSVGOANkaKk3vfCd2jDOIww4pUk0xtg9JRde5UogyQ

[queue]
TYPE=channel

[queue.test-notifier]
BATCH_LENGTH=1
LENGTH=20
7 changes: 7 additions & 0 deletions integrations/mysql8.ini.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,10 @@ LEVEL = Debug
INSTALL_LOCK = true
SECRET_KEY = 9pCviYTWSb
INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTU1NTE2MTh9.hhSVGOANkaKk3vfCd2jDOIww4pUk0xtg9JRde5UogyQ

[queue]
TYPE=channel

[queue.test-notifier]
BATCH_LENGTH=1
LENGTH=20
117 changes: 117 additions & 0 deletions integrations/notification_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package integrations

import (
"encoding/json"
"reflect"
"sync"
"testing"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/notification/base"
"code.gitea.io/gitea/modules/queue"
)

var notifierListener *NotifierListener

var once = sync.Once{}

type NotifierListener struct {
lock sync.RWMutex
callbacks map[string][]*func(string, [][]byte)
notifier base.Notifier
}

func NotifierListenerInit() {
once.Do(func() {
notifierListener = &NotifierListener{
callbacks: map[string][]*func(string, [][]byte){},
}
notifierListener.notifier = base.NewQueueNotifierWithHandle("test-notifier", notifierListener.handle)
notification.RegisterNotifier(notifierListener.notifier)
})
}

// Register will register a callback with the provided notifier function
func (n *NotifierListener) Register(functionName string, callback *func(string, [][]byte)) {
n.lock.Lock()
n.callbacks[functionName] = append(n.callbacks[functionName], callback)
n.lock.Unlock()
}

// Deregister will remove the provided callback from the provided notifier function
func (n *NotifierListener) Deregister(functionName string, callback *func(string, [][]byte)) {
n.lock.Lock()
defer n.lock.Unlock()
for i, callbackPtr := range n.callbacks[functionName] {
if callbackPtr == callback {
n.callbacks[functionName] = append(n.callbacks[functionName][0:i], n.callbacks[functionName][i+1:]...)
return
}
}
}

// RegisterChannel will return a registered channel with function name and return a function to deregister it and close the channel at the end
func (n *NotifierListener) RegisterChannel(name string, argNumber int, exemplar interface{}) (<-chan interface{}, func()) {
t := reflect.TypeOf(exemplar)
channel := make(chan interface{}, 10)
callback := func(_ string, args [][]byte) {
n := reflect.New(t).Elem()
err := json.Unmarshal(args[argNumber], n.Addr().Interface())
if err != nil {
log.Error("Wrong Argument passed to register channel: %v ", err)
}
channel <- n.Interface()
}
n.Register(name, &callback)

return channel, func() {
n.Deregister(name, &callback)
close(channel)
}
}

func (n *NotifierListener) handle(data ...queue.Data) {
n.lock.RLock()
defer n.lock.RUnlock()
for _, datum := range data {
call := datum.(*base.FunctionCall)
callbacks, ok := n.callbacks[call.Name]
if ok && len(callbacks) > 0 {
for _, callback := range callbacks {
(*callback)(call.Name, call.Args)
}
}
}
}

func TestNotifierListener(t *testing.T) {
defer prepareTestEnv(t)()

createPullNotified, deregister := notifierListener.RegisterChannel("NotifyNewPullRequest", 0, &models.PullRequest{})

bs, _ := json.Marshal(&models.PullRequest{})
notifierListener.handle(&base.FunctionCall{
Name: "NotifyNewPullRequest",
Args: [][]byte{
bs,
},
})
<-createPullNotified

notifierListener.notifier.NotifyNewPullRequest(&models.PullRequest{})
<-createPullNotified

notification.NotifyNewPullRequest(&models.PullRequest{})
<-createPullNotified

deregister()

notification.NotifyNewPullRequest(&models.PullRequest{})
// would panic if not deregistered
}
7 changes: 7 additions & 0 deletions integrations/pgsql.ini.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,10 @@ LEVEL = Debug
INSTALL_LOCK = true
SECRET_KEY = 9pCviYTWSb
INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTU1NTE2MTh9.hhSVGOANkaKk3vfCd2jDOIww4pUk0xtg9JRde5UogyQ

[queue]
TYPE=channel

[queue.test-notifier]
BATCH_LENGTH=1
LENGTH=20
93 changes: 65 additions & 28 deletions integrations/pull_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,33 +59,67 @@ func testPullCleanUp(t *testing.T, session *TestSession, user, repo, pullnum str
return resp
}

func checkChannelWithTimeout(c <-chan interface{}, timeout time.Duration, callback func(interface{}) bool) bool {
timer := time.NewTimer(500 * time.Millisecond)
for {
select {
case received := <-c:
if callback(received) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
return true
}
case <-timer.C:
return false
}
}
}

func TestPullMerge(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
assert.NoError(t, err)
hookTasksLenBefore := len(hookTasks)
createPullNotified, unregisterNewPull := notifierListener.RegisterChannel("NotifyNewPullRequest", 0, &models.PullRequest{})
defer unregisterNewPull()

mergePullNotified, unregisterMergePull := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
defer unregisterMergePull()

session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
testEditFile(t, session, "user1", "repo1", "master", "README.md", "Hello, World (Edited)\n")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1-TestPullMerge")
testEditFile(t, session, "user1", "repo1-TestPullMerge", "master", "README.md", "Hello, World (Edited)\n")

resp := testPullCreate(t, session, "user1", "repo1", "master", "This is a pull title")
resp := testPullCreate(t, session, "user1", "repo1-TestPullMerge", "master", "This is a pull title")

isOurPR := func(received interface{}) bool {
pr := received.(*models.PullRequest)
pr.LoadBaseRepo()
pr.LoadHeadRepo()
return pr.BaseRepo.FullName() == "user2/repo1" &&
pr.BaseBranch == "master" &&
pr.HeadRepo.FullName() == "user1/repo1-TestPullMerge" &&
pr.HeadBranch == "master"
}

assert.True(t, checkChannelWithTimeout(createPullNotified, 500*time.Millisecond, isOurPR), "Failed to be notified pull created")

elem := strings.Split(test.RedirectURL(resp), "/")
assert.EqualValues(t, "pulls", elem[3])

testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleMerge)
assert.True(t, checkChannelWithTimeout(mergePullNotified, 500*time.Millisecond, isOurPR), "Failed to be notified pull merged")

hookTasks, err = models.HookTasks(1, 1)
assert.NoError(t, err)
assert.Len(t, hookTasks, hookTasksLenBefore+1)
assert.False(t, checkChannelWithTimeout(createPullNotified, 100*time.Millisecond, isOurPR), "Duplicate notified pull created")
assert.False(t, checkChannelWithTimeout(mergePullNotified, 100*time.Millisecond, isOurPR), "Duplicate notified pull merged")
})
}

func TestPullRebase(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
assert.NoError(t, err)
hookTasksLenBefore := len(hookTasks)
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
defer deferable()

session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
Expand All @@ -96,18 +130,18 @@ func TestPullRebase(t *testing.T) {
elem := strings.Split(test.RedirectURL(resp), "/")
assert.EqualValues(t, "pulls", elem[3])
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleRebase)

hookTasks, err = models.HookTasks(1, 1)
assert.NoError(t, err)
assert.Len(t, hookTasks, hookTasksLenBefore+1)
select {
case <-mergePullNotified:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Took too long to notify!")
}
})
}

func TestPullRebaseMerge(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
assert.NoError(t, err)
hookTasksLenBefore := len(hookTasks)
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
defer deferable()

session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
Expand All @@ -119,17 +153,18 @@ func TestPullRebaseMerge(t *testing.T) {
assert.EqualValues(t, "pulls", elem[3])
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleRebaseMerge)

hookTasks, err = models.HookTasks(1, 1)
assert.NoError(t, err)
assert.Len(t, hookTasks, hookTasksLenBefore+1)
select {
case <-mergePullNotified:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Took too long to notify!")
}
})
}

func TestPullSquash(t *testing.T) {
onGiteaRun(t, func(t *testing.T, giteaURL *url.URL) {
hookTasks, err := models.HookTasks(1, 1) //Retrieve previous hook number
assert.NoError(t, err)
hookTasksLenBefore := len(hookTasks)
mergePullNotified, deferable := notifierListener.RegisterChannel("NotifyMergePullRequest", 0, &models.PullRequest{})
defer deferable()

session := loginUser(t, "user1")
testRepoFork(t, session, "user2", "repo1", "user1", "repo1")
Expand All @@ -142,9 +177,11 @@ func TestPullSquash(t *testing.T) {
assert.EqualValues(t, "pulls", elem[3])
testPullMerge(t, session, elem[1], elem[2], elem[4], models.MergeStyleSquash)

hookTasks, err = models.HookTasks(1, 1)
assert.NoError(t, err)
assert.Len(t, hookTasks, hookTasksLenBefore+1)
select {
case <-mergePullNotified:
case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Took too long to notify!")
}
})
}

Expand Down
8 changes: 8 additions & 0 deletions integrations/sqlite.ini.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,11 @@ INTERNAL_TOKEN = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYmYiOjE0OTI3OTU5ODN9.O

[oauth2]
JWT_SECRET = KZb_QLUd4fYVyxetjxC4eZkrBgWM2SndOOWDNtgUUko

[queue]
TYPE=channel

[queue.test-notifier]
BATCH_LENGTH=1
LENGTH=20

2 changes: 1 addition & 1 deletion models/issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Issue struct {
IsClosed bool `xorm:"INDEX"`
IsRead bool `xorm:"-"`
IsPull bool `xorm:"INDEX"` // Indicates whether is a pull request or not.
PullRequest *PullRequest `xorm:"-"`
PullRequest *PullRequest `xorm:"-" json:"-"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether attachments, comments, reactions, assignees, etc. should be skipped too. Because there are reference fields in those structs as well (isn't there an Issue property in Comment?). Sounds like recursions can cause some headaches. On the other hand, the serialized data is a snapshot of the object state that protects the notifier from other processes changing the data, so it makes some sense to keep it as is. Decisions, decisions... 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. I decided to bridge the problem as and when we see it. The trouble is under the default testing case you might not see the issue as we don't necessarily try to jsonify these things. (Hmm maybe I should add a jsontestqueue that will always json.Marshal stuff.)

As we pass into notifier pull requests and issues which definitely have a cyclic reference and therefore MarshalJSON doesn't work - we need to break the cycle. Every PR has an issue. Not all issues have PRs - so breaking the cycle at issue seemed best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not particularly worried about cyclic references; those should stand out soon enough. But I think the overhead of serializing the whole bunch might be excessive. What I'm thinking is to use the opposite strategy: pass on only what is not derived, unless there's a reason to.

On the other hand, passing data which is tightly-coupled with the running Gitea version poses a problem for upgrading if any queues contain data during the process (e.g. file-system backed queues retained across shutdowns). One possible solution to this problem is to prevent upgrading if any queues contain data, but that may require a migration step being aware of the queue configuration and it would be very difficult to implement. May be we could drop a record on the database when shutting down if all queues are empty and the shutdown was orderly? A migration step could refuse to continue if that record is missing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trouble with not passing "derived" data is that the notification you may get may be different from what was done.

You have to pass and parse all the data that makes the notification because you can't be certain what's going to change in future.

If you migrate and you can't deserialize mostly the notification will be dropped or default values added.

As you say we need a flush mechanism. That may mean opening a non listening Gitea to run through its queues.

NumComments int
Ref string

Expand Down
Loading