Skip to content

Commit

Permalink
fix: goroutine test and context cancel
Browse files Browse the repository at this point in the history
Signed-off-by: richardlt <richard.le.terrier@gmail.com>
  • Loading branch information
richardlt committed Dec 22, 2023
1 parent e1418f7 commit 20e872b
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 87 deletions.
11 changes: 6 additions & 5 deletions engine/api/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,18 @@ func Subscribe(ch chan<- sdk.Event) {
// DequeueEvent runs in a goroutine and dequeue event from cache
func DequeueEvent(ctx context.Context, db *gorp.DbMap) {
for {
if err := ctx.Err(); err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "Exiting event.DequeueEvent : %v", err)
return
}

e := sdk.Event{}
if err := store.DequeueWithContext(ctx, "events", 250*time.Millisecond, &e); err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "Event.DequeueEvent> store.DequeueWithContext err: %v", err)
continue
}
if err := ctx.Err(); err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "Exiting event.DequeueEvent : %v", err)
return
}

// Filter "EventJobSummary" for globalKafka Broker
if e.EventType != "sdk.EventJobSummary" {
Expand Down
13 changes: 6 additions & 7 deletions engine/api/event_v2/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ func publish(ctx context.Context, store cache.Store, event sdk.EventV2) {
log.Error(ctx, "EventV2.publish: %s", err)
return
}
return
}

// Dequeue runs in a goroutine and dequeue event from cache
func Dequeue(ctx context.Context, db *gorp.DbMap, store cache.Store, goroutines *sdk.GoRoutines) {
for {
if err := ctx.Err(); err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "EventV2.DequeueEvent> Exiting: %v", err)
return
}

e := sdk.EventV2{}
if err := store.DequeueWithContext(ctx, eventQueue, 50*time.Millisecond, &e); err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
Expand Down Expand Up @@ -75,12 +80,6 @@ func Dequeue(ctx context.Context, db *gorp.DbMap, store cache.Store, goroutines
})

wg.Wait()

if err := ctx.Err(); err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "EventV2.DequeueEvent> Exiting : %v", err)
continue
}
}
}

Expand Down
13 changes: 7 additions & 6 deletions engine/api/repositoriesmanager/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@ import (
"github.com/ovh/cds/sdk"
)

//ReceiveEvents has to be launched as a goroutine.
// ReceiveEvents has to be launched as a goroutine.
func ReceiveEvents(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.Store) {
for {
if err := ctx.Err(); err != nil {
log.Error(ctx, "repositoriesmanager.ReceiveEvents> exiting: %v", err)
return
}

e := sdk.Event{}
if err := store.DequeueWithContext(ctx, "events_repositoriesmanager", 250*time.Millisecond, &e); err != nil {
log.Error(ctx, "repositoriesmanager.ReceiveEvents > store.DequeueWithContext err: %v", err)
continue
}
if err := ctx.Err(); err != nil {
log.Error(ctx, "Exiting repositoriesmanager.ReceiveEvents: %v", err)
return
}

db := DBFunc()
if db != nil {
Expand All @@ -50,7 +51,7 @@ func ReceiveEvents(ctx context.Context, DBFunc func() *gorp.DbMap, store cache.S
}
}

//RetryEvent retries the events
// RetryEvent retries the events
func RetryEvent(e *sdk.Event, err error, store cache.Store) error {
e.Attempts++
if e.Attempts > 2 {
Expand Down
20 changes: 10 additions & 10 deletions engine/api/v2_workflow_run_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ func (api *API) V2WorkflowRunEngineChan(ctx context.Context) {

func (api *API) V2WorkflowRunEngineDequeue(ctx context.Context) {
for {
if err := ctx.Err(); err != nil {
ctx := sdk.ContextWithStacktrace(ctx, err)
log.Error(ctx, "V2WorkflowRunEngine> Exiting: %v", err)
return
}

var wrEnqueue sdk.V2WorkflowRunEnqueue
if err := api.Cache.DequeueWithContext(ctx, workflow_v2.WorkflowEngineKey, 250*time.Millisecond, &wrEnqueue); err != nil {
log.Error(ctx, "V2WorkflowRunEngine > DequeueWithContext err: %v", err)
log.Error(ctx, "V2WorkflowRunEngine> DequeueWithContext err: %v", err)
continue
}
if err := api.workflowRunV2Trigger(ctx, wrEnqueue); err != nil {
log.ErrorWithStackTrace(ctx, err)
}
if ctx.Err() != nil {
if ctx.Err() != nil {
log.Error(ctx, "%v", ctx.Err())
}
return
}
}
}

Expand Down Expand Up @@ -115,10 +115,10 @@ func (api *API) workflowRunV2Trigger(ctx context.Context, wrEnqueue sdk.V2Workfl

// Load run by id
run, err := workflow_v2.LoadRunByID(ctx, api.mustDB(), wrEnqueue.RunID, workflow_v2.WithRunResults)
if sdk.ErrorIs(err, sdk.ErrNotFound) {
return nil
}
if err != nil {
if sdk.ErrorIs(err, sdk.ErrNotFound) {
return nil
}
return sdk.WrapError(err, "unable to load workflow run %s", wrEnqueue.RunID)
}

Expand Down
2 changes: 1 addition & 1 deletion engine/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (s *RedisStore) DequeueWithContext(c context.Context, queueName string, wai
break
}
case <-c.Done():
return nil
return c.Err()
}
}
if elem != "" {
Expand Down
3 changes: 3 additions & 0 deletions engine/hatchery/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ func (c *Common) Init(ctx context.Context, h hatchery.Interface) error {
var cfg sdk.CDNConfig
var err error
for {
if err := ctx.Err(); err != nil {
return err
}
cfg, err = c.Client.ConfigCDN()
if err == nil {
break
Expand Down
6 changes: 3 additions & 3 deletions engine/repositories/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (

func (s *Service) processor(ctx context.Context) error {
for {
if ctx.Err() != nil {
return ctx.Err()
}
var uuid string
if err := s.dao.store.DequeueWithContext(ctx, processorKey, 250*time.Millisecond, &uuid); err != nil {
log.Error(ctx, "repositories > processor > store.DequeueWithContext err: %v", err)
Expand All @@ -36,9 +39,6 @@ func (s *Service) processor(ctx context.Context) error {
}
}
}
if ctx.Err() != nil {
return ctx.Err()
}
}
}

Expand Down
98 changes: 48 additions & 50 deletions sdk/goroutine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,78 +3,76 @@ package sdk
import (
"bytes"
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func Test_GoroutineTools(t *testing.T) {
t.Run("GoroutineID()", func(t *testing.T) {
id := GoroutineID()
var zero uint64
assert.NotEqual(t, zero, id)
require.NotEqual(t, uint64(0), GoroutineID())
})

t.Run("writeGoroutineStacks(...)", func(t *testing.T) {
ctx := context.Background()
var wg = new(sync.WaitGroup)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
NewGoRoutines(ctx).Exec(ctx, "test_goroutine", func(ctx context.Context) {
wg.Add(1)
<-ctx.Done()
wg.Done()
})

t.Run("GoRoutineStacks(...)", func(t *testing.T) {
var w = new(bytes.Buffer)
err := writeGoroutineStacks(w)
assert.NoError(t, err)
t.Log(w.String())
wg.Wait()
require.NoError(t, writeGoroutineStacks(w))
_, err := parseGoRoutineStacks(w, nil)
require.NoError(t, err)
})

t.Run("parseGoRoutineStacks(...)", func(t *testing.T) {
ctx := context.Background()
var wg = new(sync.WaitGroup)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
t.Run("GoRoutineRun", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
m := NewGoRoutines(ctx)

NewGoRoutines(ctx).Exec(ctx, "test_goroutine", func(ctx context.Context) {
wg.Add(1)
<-ctx.Done()
wg.Done()
m.Run(context.TODO(), "test_goroutine_run", func(ctx context.Context) {
time.Sleep(1 * time.Second)
})

var w = new(bytes.Buffer)
err := writeGoroutineStacks(w)
assert.NoError(t, err)
s := m.GoRoutine("test_goroutine_run")
require.NotNil(t, s)
require.True(t, s.Active)
require.Len(t, m.GetStatus(), 1)

_, err = parseGoRoutineStacks(w, nil)
assert.NoError(t, err)
wg.Wait()
})
time.Sleep(1 * time.Second)

t.Run("GoRoutineLoop", func(t *testing.T) {
ctx := context.Background()
var wg = new(sync.WaitGroup)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
s = m.GoRoutine("test_goroutine_run")
require.NotNil(t, s)
require.False(t, s.Active)
})

t.Run("GoRoutineRunCancel", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
t.Cleanup(cancel)
m := NewGoRoutines(ctx)
m.Run(ctx, "test_goroutine_loop", func(ctx context.Context) {
wg.Add(1)
s := m.GoRoutine("test_goroutine_loop")
require.NotNil(t, s)
require.True(t, s.Active)

ctxToCancelled, cancelRoutine := context.WithTimeout(context.TODO(), 5*time.Second)
var cancelled bool
m.Run(context.TODO(), "test_goroutine_run_cancel", func(ctx context.Context) {
<-ctx.Done()
wg.Done()
cancelled = true
cancelRoutine()
})

s := m.GoRoutine("test_goroutine_loop")
require.NotNil(t, s)
require.Equal(t, 1, len(m.GetStatus()))
require.False(t, cancelled)
m.Stop("test_goroutine_run_cancel")
<-ctxToCancelled.Done()
require.True(t, cancelled)
})

t.Run("GoRoutineRunWithRestart", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
t.Cleanup(cancel)
m := NewGoRoutines(ctx)

var count int
m.RunWithRestart(context.TODO(), "test_goroutine_run_with_restart", func(ctx context.Context) {
count++
})

// the routine should have restart 1 time
<-ctx.Done()
require.Equal(t, 2, count)
})
}
5 changes: 0 additions & 5 deletions sdk/namesgenerator/namesgenerator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@ package namesgenerator
import (
"fmt"
"math/rand"
"time"

"github.com/ovh/cds/sdk/slug"
)

func init() {
rand.Seed(time.Now().UTC().UnixNano())
}

var (
left = [...]string{
"admiring",
Expand Down
3 changes: 3 additions & 0 deletions sdk/vcs/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func Test_gitCloneOverHTTPS(t *testing.T) {
}
for _, tt := range tests {
os.RemoveAll(test.GetTestName(t))
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
out := new(bytes.Buffer)
err := new(bytes.Buffer)
tt.args.output = &OutputOpts{
Expand Down Expand Up @@ -142,6 +143,7 @@ func Test_gitCloneOverSSH(t *testing.T) {
}

os.RemoveAll(test.GetTestName(t))
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
out := new(bytes.Buffer)
err := new(bytes.Buffer)
tt.args.output = &OutputOpts{
Expand Down Expand Up @@ -231,6 +233,7 @@ func Test_gitCommand(t *testing.T) {
}
for _, tt := range tests {
os.RemoveAll(test.GetTestName(t))
t.Cleanup(func() { os.RemoveAll(test.GetTestName(t)) })
os.MkdirAll(test.GetTestName(t), os.FileMode(0755))
if _, got, _ := prepareGitCloneCommands(tt.args.repo, test.GetTestName(t), tt.args.path, tt.args.opts); !reflect.DeepEqual(got.Strings(), tt.want) {
t.Errorf("%q. gitCloneCommand() = %v, want %v", tt.name, got, tt.want)
Expand Down

0 comments on commit 20e872b

Please sign in to comment.