Skip to content

Commit

Permalink
Merge pull request goharbor#4773 from steven-zou/fix_js_restarting_issue
Browse files Browse the repository at this point in the history
Fix the issue of intermittent restarting of job service
  • Loading branch information
steven-zou authored Apr 26, 2018
2 parents eedbc83 + e1b509e commit 2dd5b01
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 22 deletions.
4 changes: 3 additions & 1 deletion src/jobservice/core/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ func createJobReq(kind string, isUnique bool, withHook bool) models.JobRequest {

type fakePool struct{}

func (f *fakePool) Start() {}
func (f *fakePool) Start() error {
return nil
}

func (f *fakePool) RegisterJob(name string, job interface{}) error {
return nil
Expand Down
5 changes: 4 additions & 1 deletion src/jobservice/pool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import "github.com/vmware/harbor/src/jobservice/models"
//More like a driver to transparent the lower queue.
type Interface interface {
//Start to serve
Start()
//
//Return:
// error if failed to start
Start() error

//Register job to the pool.
//
Expand Down
17 changes: 10 additions & 7 deletions src/jobservice/pool/message_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"time"

Expand All @@ -18,6 +19,10 @@ import (
"github.com/vmware/harbor/src/jobservice/utils"
)

const (
msgServerRetryTimes = 5
)

//MessageServer implements the sub/pub mechanism via redis to do async message exchanging.
type MessageServer struct {
context context.Context
Expand All @@ -42,14 +47,13 @@ func (ms *MessageServer) Start() error {
logger.Info("Message server is stopped")
}()

//As we get one connection from the pool, don't try to close it.
conn := ms.redisPool.Get()
defer conn.Close()

conn := ms.redisPool.Get() //Get one backend connection!
psc := redis.PubSubConn{
Conn: conn,
}
defer psc.Close()

//Subscribe channel
err := psc.Subscribe(redis.Args{}.AddFlat(utils.KeyPeriodicNotification(ms.namespace))...)
if err != nil {
return err
Expand All @@ -60,8 +64,7 @@ func (ms *MessageServer) Start() error {
for {
switch res := psc.Receive().(type) {
case error:
done <- res
return
done <- fmt.Errorf("error occurred when receiving from pub/sub channel of message server: %s", res.(error).Error())
case redis.Message:
m := &models.Message{}
if err := json.Unmarshal(res.Data, m); err != nil {
Expand Down Expand Up @@ -131,12 +134,12 @@ func (ms *MessageServer) Start() error {
case <-ms.context.Done():
err = errors.New("context exit")
case err = <-done:
return err
}
}

//Unsubscribe all
psc.Unsubscribe()

return <-done
}

Expand Down
42 changes: 39 additions & 3 deletions src/jobservice/pool/redis_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package pool
import (
"errors"
"fmt"
"math"
"time"

"github.com/garyburd/redigo/redis"
Expand All @@ -29,6 +30,8 @@ const (

//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute

pingRedisMaxTimes = 10
)

//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
Expand Down Expand Up @@ -80,13 +83,17 @@ func NewGoCraftWorkPool(ctx *env.Context, namespace string, workerCount uint, re

//Start to serve
//Unblock action
func (gcwp *GoCraftWorkPool) Start() {
func (gcwp *GoCraftWorkPool) Start() error {
if gcwp.redisPool == nil ||
gcwp.pool == nil ||
gcwp.context.SystemContext == nil {
//report and exit
gcwp.context.ErrorChan <- errors.New("Redis worker pool can not start as it's not correctly configured")
return
return errors.New("Redis worker pool can not start as it's not correctly configured")
}

//Test the redis connection
if err := gcwp.ping(); err != nil {
return err
}

done := make(chan interface{}, 1)
Expand Down Expand Up @@ -130,8 +137,18 @@ func (gcwp *GoCraftWorkPool) Start() {
return
}

startTimes := 0
START_MSG_SERVER:
//Start message server
if err = gcwp.messageServer.Start(); err != nil {
logger.Errorf("Message server exits with error: %s\n", err.Error())
if startTimes < msgServerRetryTimes {
startTimes++
time.Sleep(time.Duration((int)(math.Pow(2, (float64)(startTimes)))+5) * time.Second)
logger.Infof("Restart message server (%d times)\n", startTimes)
goto START_MSG_SERVER
}

return
}
}()
Expand Down Expand Up @@ -177,6 +194,8 @@ func (gcwp *GoCraftWorkPool) Start() {

gcwp.pool.Stop()
}()

return nil
}

//RegisterJob is used to register the job to the pool.
Expand Down Expand Up @@ -593,6 +612,23 @@ func (rpc *RedisPoolContext) logJob(job *work.Job, next work.NextMiddlewareFunc)
return next()
}

//Ping the redis server
func (gcwp *GoCraftWorkPool) ping() error {
conn := gcwp.redisPool.Get()
defer conn.Close()

var err error
for count := 1; count <= pingRedisMaxTimes; count++ {
if _, err = conn.Do("ping"); err == nil {
return nil
}

time.Sleep(time.Duration(count+4) * time.Second)
}

return fmt.Errorf("connect to redis server timeout: %s", err.Error())
}

//generate the job stats data
func generateResult(j *work.Job, jobKind string, isUnique bool) models.JobStats {
if j == nil {
Expand Down
27 changes: 17 additions & 10 deletions src/jobservice/runtime/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,21 @@ func (bs *Bootstrap) LoadAndRun() {
}

//Start the pool
var backendPool pool.Interface
var (
backendPool pool.Interface
wpErr error
)
if config.DefaultConfig.PoolConfig.Backend == config.JobServicePoolBackendRedis {
backendPool = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig)
backendPool, wpErr = bs.loadAndRunRedisWorkerPool(rootContext, config.DefaultConfig)
if wpErr != nil {
logger.Fatalf("Failed to load and run worker pool: %s\n", wpErr.Error())
}
} else {
logger.Fatalf("Worker pool backend '%s' is not supported", config.DefaultConfig.PoolConfig.Backend)
}

//Initialize controller
ctl := core.NewController(backendPool)

//Start the API server
apiServer := bs.loadAndRunAPIServer(rootContext, config.DefaultConfig, ctl)
logger.Infof("Server is started at %s:%d with %s", "", config.DefaultConfig.Port, config.DefaultConfig.Protocol)
Expand Down Expand Up @@ -144,7 +151,7 @@ func (bs *Bootstrap) loadAndRunAPIServer(ctx *env.Context, cfg *config.Configura
}

//Load and run the worker pool
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) pool.Interface {
func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Configuration) (pool.Interface, error) {
redisPool := &redis.Pool{
MaxActive: 6,
MaxIdle: 6,
Expand All @@ -166,8 +173,7 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
//Register jobs here
if err := redisWorkerPool.RegisterJob(impl.KnownJobDemo, (*impl.DemoJob)(nil)); err != nil {
//exit
ctx.ErrorChan <- err
return redisWorkerPool //avoid nil pointer issue
return nil, err
}
if err := redisWorkerPool.RegisterJobs(
map[string]interface{}{
Expand All @@ -177,11 +183,12 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(ctx *env.Context, cfg *config.Con
job.ImageReplicate: (*replication.Replicator)(nil),
}); err != nil {
//exit
ctx.ErrorChan <- err
return redisWorkerPool //avoid nil pointer issue
return nil, err
}

redisWorkerPool.Start()
if err := redisWorkerPool.Start(); err != nil {
return nil, err
}

return redisWorkerPool
return redisWorkerPool, nil
}

0 comments on commit 2dd5b01

Please sign in to comment.