Skip to content
Merged
Prev Previous commit
Next Next commit
subsume workerpool into the queues and create a flushable interface
  • Loading branch information
zeripath committed Jan 28, 2020
commit 1f031cb901b30a2b39b56db6e741caf879f843b4
86 changes: 56 additions & 30 deletions modules/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,37 @@ type ManagedQueue struct {
Name string
Configuration interface{}
ExemplarType string
Pool ManagedPool
Managed interface{}
counter int64
PoolWorkers map[int64]*PoolWorkers
}

// Flushable represents a pool or queue that is flushable
type Flushable interface {
// Flush will add a flush worker to the pool
Flush(time.Duration) error
// IsEmpty will return if the managed pool is empty and has no work
IsEmpty() bool
}

// ManagedPool is a simple interface to get certain details from a worker pool
type ManagedPool interface {
// AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
AddWorkers(number int, timeout time.Duration) context.CancelFunc
// NumberOfWorkers returns the total number of workers in the pool
NumberOfWorkers() int
// MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
MaxNumberOfWorkers() int
// SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
SetMaxNumberOfWorkers(int)
// BoostTimeout returns the current timeout for worker groups created during a boost
BoostTimeout() time.Duration
// BlockTimeout returns the timeout the internal channel can block for before a boost would occur
BlockTimeout() time.Duration
// BoostWorkers sets the number of workers to be created during a boost
BoostWorkers() int
SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
Flush(time.Duration) error
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
}

// ManagedQueueList implements the sort.Interface
Expand All @@ -66,7 +81,7 @@ type PoolWorkers struct {
Timeout time.Time
HasTimeout bool
Cancel context.CancelFunc
IsFlush bool
IsFlusher bool
}

// PoolWorkersList implements the sort.Interface for PoolWorkers
Expand All @@ -87,26 +102,28 @@ func GetManager() *Manager {
}

// Add adds a queue to this manager
func (m *Manager) Add(name string,
func (m *Manager) Add(managed interface{},
t Type,
configuration,
exemplar interface{},
pool ManagedPool) int64 {
exemplar interface{}) int64 {

cfg, _ := json.Marshal(configuration)
mq := &ManagedQueue{
Type: t,
Configuration: string(cfg),
ExemplarType: reflect.TypeOf(exemplar).String(),
PoolWorkers: make(map[int64]*PoolWorkers),
Pool: pool,
Managed: managed,
}
m.mutex.Lock()
m.counter++
mq.QID = m.counter
mq.Name = fmt.Sprintf("queue-%d", mq.QID)
if len(name) > 0 {
mq.Name = name
if named, ok := managed.(Named); ok {
name := named.Name()
if len(name) > 0 {
mq.Name = name
}
}
m.Queues[mq.QID] = mq
m.mutex.Unlock()
Expand Down Expand Up @@ -155,7 +172,7 @@ func (q *ManagedQueue) Workers() []*PoolWorkers {
}

// RegisterWorkers registers workers to this queue
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlush bool) int64 {
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
q.mutex.Lock()
defer q.mutex.Unlock()
q.counter++
Expand All @@ -166,7 +183,7 @@ func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout b
Timeout: timeout,
HasTimeout: hasTimeout,
Cancel: cancel,
IsFlush: isFlush,
IsFlusher: isFlusher,
}
return q.counter
}
Expand Down Expand Up @@ -195,65 +212,74 @@ func (q *ManagedQueue) RemoveWorkers(pid int64) {

// AddWorkers adds workers to the queue if it has registered an add worker function
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
if q.Pool != nil {
if pool, ok := q.Managed.(ManagedPool); ok {
// the cancel will be added to the pool workers description above
return q.Pool.AddWorkers(number, timeout)
return pool.AddWorkers(number, timeout)
}
return nil
}

// Flush flushes the queue with a timeout
func (q *ManagedQueue) Flush(timeout time.Duration) error {
if q.Pool != nil {
return q.Pool.Flush(timeout)
if flushable, ok := q.Managed.(Flushable); ok {
// the cancel will be added to the pool workers description above
return flushable.Flush(timeout)
}
return nil
}

// IsEmpty returns if the queue is empty
func (q *ManagedQueue) IsEmpty() bool {
if flushable, ok := q.Managed.(Flushable); ok {
return flushable.IsEmpty()
}
return true
}

// NumberOfWorkers returns the number of workers in the queue
func (q *ManagedQueue) NumberOfWorkers() int {
if q.Pool != nil {
return q.Pool.NumberOfWorkers()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.NumberOfWorkers()
}
return -1
}

// MaxNumberOfWorkers returns the maximum number of workers for the pool
func (q *ManagedQueue) MaxNumberOfWorkers() int {
if q.Pool != nil {
return q.Pool.MaxNumberOfWorkers()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.MaxNumberOfWorkers()
}
return 0
}

// BoostWorkers returns the number of workers for a boost
func (q *ManagedQueue) BoostWorkers() int {
if q.Pool != nil {
return q.Pool.BoostWorkers()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BoostWorkers()
}
return -1
}

// BoostTimeout returns the timeout of the next boost
func (q *ManagedQueue) BoostTimeout() time.Duration {
if q.Pool != nil {
return q.Pool.BoostTimeout()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BoostTimeout()
}
return 0
}

// BlockTimeout returns the timeout til the next boost
func (q *ManagedQueue) BlockTimeout() time.Duration {
if q.Pool != nil {
return q.Pool.BlockTimeout()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BlockTimeout()
}
return 0
}

// SetSettings sets the setable boost values
func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
if q.Pool != nil {
q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
// SetPoolSettings sets the setable boost values
func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
if pool, ok := q.Managed.(ManagedPool); ok {
pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
}
}

Expand Down
2 changes: 1 addition & 1 deletion modules/queue/queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
workers: config.Workers,
name: config.Name,
}
queue.qid = GetManager().Add(config.Name, ChannelQueueType, config, exemplar, queue)
queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
return queue, nil
}

Expand Down
33 changes: 18 additions & 15 deletions modules/queue/queue_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"

"code.gitea.io/gitea/modules/log"
Expand All @@ -29,7 +30,7 @@ type LevelQueueConfiguration struct {

// LevelQueue implements a disk library queue
type LevelQueue struct {
pool *WorkerPool
*WorkerPool
queue *levelqueue.Queue
closed chan struct{}
terminated chan struct{}
Expand All @@ -53,15 +54,15 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
}

queue := &LevelQueue{
pool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
queue: internal,
exemplar: exemplar,
closed: make(chan struct{}),
terminated: make(chan struct{}),
workers: config.Workers,
name: config.Name,
}
queue.pool.qid = GetManager().Add(config.Name, LevelQueueType, config, exemplar, queue.pool)
queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
return queue, nil
}

Expand All @@ -72,7 +73,7 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
log.Debug("LevelQueue: %s Starting", l.name)

go func() {
_ = l.pool.AddWorkers(l.workers, 0)
_ = l.AddWorkers(l.workers, 0)
}()

go l.readToChan()
Expand All @@ -81,12 +82,12 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
<-l.closed

log.Trace("LevelQueue: %s Waiting til done", l.name)
l.pool.Wait()
l.Wait()

log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
ctx, cancel := context.WithCancel(context.Background())
atTerminate(ctx, cancel)
l.pool.CleanUp(ctx)
l.CleanUp(ctx)
cancel()
log.Trace("LevelQueue: %s Cleaned", l.name)

Expand All @@ -97,33 +98,37 @@ func (l *LevelQueue) readToChan() {
select {
case <-l.closed:
// tell the pool to shutdown.
l.pool.cancel()
l.cancel()
return
default:
atomic.AddInt64(&l.numInQueue, 1)
bs, err := l.queue.RPop()
if err != nil {
if err != levelqueue.ErrNotFound {
log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
}
atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}

if len(bs) == 0 {
atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}

data, err := unmarshalAs(bs, l.exemplar)
if err != nil {
log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}

log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
l.pool.Push(data)

l.WorkerPool.Push(data)
atomic.AddInt64(&l.numInQueue, -1)
}
}
}
Expand All @@ -140,14 +145,9 @@ func (l *LevelQueue) Push(data Data) error {
return l.queue.LPush(bs)
}

// Flush flushes the queue and blocks till the queue is empty
func (l *LevelQueue) Flush(timeout time.Duration) error {
return l.pool.Flush(timeout)
}

// IsEmpty checks whether the queue is empty
func (l *LevelQueue) IsEmpty() bool {
if !l.pool.IsEmpty() {
if !l.WorkerPool.IsEmpty() {
return false
}
return l.queue.Len() == 0
Expand Down Expand Up @@ -177,6 +177,9 @@ func (l *LevelQueue) Terminate() {
default:
close(l.terminated)
l.lock.Unlock()
if log.IsDebug() {
log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len())
}
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
}
Expand Down
Loading