Skip to content

Commit

Permalink
Add a per-worker type configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
jinroh committed Jan 3, 2018
1 parent fa0a221 commit b58e15a
Show file tree
Hide file tree
Showing 24 changed files with 319 additions and 134 deletions.
4 changes: 0 additions & 4 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os/signal"
"path"
"path/filepath"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -164,9 +163,6 @@ func init() {
flags.String("downloads-url", "", "URL for the download secret storage, redis or in-memory")
checkNoErr(viper.BindPFlag("downloads.url", flags.Lookup("downloads-url")))

flags.Int("jobs-workers", runtime.NumCPU(), "Number of parallel workers (0 to disable the processing of jobs)")
checkNoErr(viper.BindPFlag("jobs.workers", flags.Lookup("jobs-workers")))

flags.String("jobs-url", "", "URL for the jobs system synchronization, redis or in-memory")
checkNoErr(viper.BindPFlag("jobs.url", flags.Lookup("jobs-url")))

Expand Down
46 changes: 43 additions & 3 deletions cozy.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,49 @@ couchdb:

# jobs parameters to configure the job system
jobs:
# workers define the number of concurrent workers running on the system. if
# this parameter is set to 0, no worker is running
workers: 1
# workers individual configrations.
#
# For each worker type it is possible to configure the following fields:
# - concurrency: the maximum number of jobs executed in parallel. when set
# to zero, the worker is deactivated
# - max_exec_count: the maximum number of retries for one job in case of an
# error
# - timeout: the maximum amount of time allowed for one execution of a job
#
# List of available workers:
#
# - "konnector": launching konnectors
# - "push": sending push notifications
# - "sendmail": sending mails
# - "service": launching services
# - "sharedata": data sharing between cozies
# - "sharingupdates": update of sharings data
# - "thumbnail": creatings and deleting thumbnails for images
# - "unzip": unzipping tarball
#
# When no configuration is given for a worker, a default configuration is
# used. When a false boolean value is given, the worker is deactivated.
#
# To deactivate all workers, the workers field can be set to "false" or
# "none".
workers:
# thumbnail:
# concurrency: {{mul .NumCPU 4}}
# max_exec_count: 2
# timeout: 15s

# konnector:
# concurrency: {{.NumCPU}}
# max_exec_count: 2
# timeout: 200s

# service:
# concurrency: {{.NumCPU}}
# max_exec_count: 2
# timeout: 200s

# sharedata: false
# sharingupdates: false

# konnectors execution parameters for executing external processes.
konnectors:
Expand Down
74 changes: 67 additions & 7 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"text/template"
"time"

"github.com/Masterminds/sprig"
"github.com/cozy/cozy-stack/pkg/logger"
"github.com/cozy/cozy-stack/pkg/utils"
"github.com/cozy/gomail"
Expand Down Expand Up @@ -128,7 +131,10 @@ type CouchDB struct {
// synchronization
type Jobs struct {
RedisConfig
Workers int
NoWorkers bool
Workers []Worker
// XXX for retro-compatibility
NbWorkers int
}

// Konnectors contains the configuration values for the konnectors
Expand All @@ -155,6 +161,14 @@ type Notifications struct {
IOSTeamID string
}

// Worker contains the configuration fields for a specific worker type.
type Worker struct {
WorkerType string
Concurrency *int
MaxExecCount *int
Timeout *time.Duration
}

// RedisConfig contains the configuration values for a redis system
type RedisConfig struct {
cli redis.UniversalClient
Expand Down Expand Up @@ -278,14 +292,20 @@ func Setup(cfgFile string) (err error) {

tmpl := template.New(filepath.Base(cfgFile))
tmpl = tmpl.Option("missingkey=zero")
tmpl, err = tmpl.ParseFiles(cfgFile)
tmpl, err = tmpl.Funcs(sprig.TxtFuncMap()).ParseFiles(cfgFile)
if err != nil {
return fmt.Errorf("Unable to open and parse configuration file "+
"template %s: %s", cfgFile, err)
}

dest := new(bytes.Buffer)
ctxt := &struct{ Env map[string]string }{Env: envMap()}
ctxt := &struct {
Env map[string]string
NumCPU int
}{
Env: envMap(),
NumCPU: runtime.NumCPU(),
}
err = tmpl.ExecuteTemplate(dest, filepath.Base(cfgFile), ctxt)
if err != nil {
return fmt.Errorf("Template error for config file %s: %s", cfgFile, err)
Expand Down Expand Up @@ -421,6 +441,49 @@ func UseViper(v *viper.Viper) error {
adminSecretFile = defaultAdminSecretFileName
}

jobs := Jobs{RedisConfig: jobsRedis}
{
if nbWorkers := v.GetInt("jobs.workers"); nbWorkers > 0 {
jobs.NbWorkers = nbWorkers
} else if ws := v.GetString("jobs.workers"); ws == "false" || ws == "none" {
jobs.NoWorkers = true
} else if workersMap := v.GetStringMap("jobs.workers"); len(workersMap) > 0 {
workers := make([]Worker, 0, len(workersMap))

for workerType, mapInterface := range workersMap {
w := Worker{WorkerType: workerType}

if enabled, ok := mapInterface.(bool); ok {
if !enabled {
zero := 0
w.Concurrency = &zero
}
} else if m, ok := mapInterface.(map[string]interface{}); ok {
if concurrency, ok := m["concurrency"].(int); ok {
w.Concurrency = &concurrency
}
if maxExecCount, ok := m["max_exec_count"].(int); ok {
w.MaxExecCount = &maxExecCount
}
if timeout, ok := m["timeout"].(string); ok {
d, err := time.ParseDuration(timeout)
if err != nil {
return fmt.Errorf("config: could not parse timeout duration for worker %q: %s",
workerType, err)
}
w.Timeout = &d
}
} else {
return fmt.Errorf("config: expecting a map in the key %q",
"jobs.workers."+workerType)
}

workers = append(workers, w)
}
jobs.Workers = workers
}
}

config = &Config{
Host: v.GetString("host"),
Port: v.GetInt("port"),
Expand All @@ -440,10 +503,7 @@ func UseViper(v *viper.Viper) error {
Auth: couchAuth,
URL: couchURL,
},
Jobs: Jobs{
Workers: v.GetInt("jobs.workers"),
RedisConfig: jobsRedis,
},
Jobs: jobs,
Konnectors: Konnectors{
Cmd: v.GetString("konnectors.cmd"),
},
Expand Down
49 changes: 28 additions & 21 deletions pkg/jobs/mem_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"sync/atomic"

"github.com/cozy/cozy-stack/pkg/config"
multierror "github.com/hashicorp/go-multierror"
)

Expand All @@ -23,11 +24,10 @@ type (

// memBroker is an in-memory broker implementation of the Broker interface.
memBroker struct {
nbWorkers int
queues map[string]*memQueue
workers []*Worker
running uint32
closed chan struct{}
queues map[string]*memQueue
workers []*Worker
running uint32
closed chan struct{}
}
)

Expand Down Expand Up @@ -77,43 +77,50 @@ func (q *memQueue) Len() int {
//
// The in-memory implementation of the job system has the specifity that
// workers are actually launched by the broker at its creation.
func NewMemBroker(nbWorkers int) Broker {
func NewMemBroker() Broker {
return &memBroker{
nbWorkers: nbWorkers,
queues: make(map[string]*memQueue),
closed: make(chan struct{}),
queues: make(map[string]*memQueue),
closed: make(chan struct{}),
}
}

func (b *memBroker) Start(ws WorkersList) error {
if !atomic.CompareAndSwapUint32(&b.running, 0, 1) {
return ErrClosed
}
if b.nbWorkers <= 0 {
return nil
}
joblog.Infof("Starting in-memory broker with %d workers", b.nbWorkers)
setNbSlots(b.nbWorkers)
for workerType, conf := range ws {
q := newMemQueue(workerType)
w := &Worker{
Type: workerType,
Conf: conf,

for _, conf := range ws {
if conf.Concurrency <= 0 {
continue
}
b.queues[workerType] = q
q := newMemQueue(conf.WorkerType)
w := NewWorker(conf)
b.queues[conf.WorkerType] = q
b.workers = append(b.workers, w)
if err := w.Start(q.Jobs); err != nil {
return err
}
}

if len(b.workers) > 0 {
joblog.Infof("Started in-memory broker for %d workers type", len(b.workers))
}

// XXX for retro-compat
if slots := config.GetConfig().Jobs.NbWorkers; len(b.workers) > 0 && slots > 0 {
joblog.Warnf("Limiting the number of total concurrent workers to %d", slots)
joblog.Warnf("Please update your configuration file to avoid a hard limit")
setNbSlots(slots)
}

return nil
}

func (b *memBroker) Shutdown(ctx context.Context) error {
if !atomic.CompareAndSwapUint32(&b.running, 1, 0) {
return ErrClosed
}
if b.nbWorkers <= 0 {
if len(b.workers) == 0 {
return nil
}
fmt.Print(" shutting down in-memory broker...")
Expand Down
34 changes: 20 additions & 14 deletions pkg/jobs/mem_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func TestInMemoryJobs(t *testing.T) {
var w sync.WaitGroup

var workersTestList = WorkersList{
"test": {
{
WorkerType: "test",
Concurrency: 4,
WorkerFunc: func(ctx *WorkerContext) error {
var msg string
Expand All @@ -90,8 +91,8 @@ func TestInMemoryJobs(t *testing.T) {
},
}

broker1 := NewMemBroker(4)
broker2 := NewMemBroker(4)
broker1 := NewMemBroker()
broker2 := NewMemBroker()
broker1.Start(workersTestList)
broker2.Start(workersTestList)
w.Add(2)
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestInMemoryJobs(t *testing.T) {
}

func TestUnknownWorkerError(t *testing.T) {
broker := NewMemBroker(1)
broker := NewMemBroker()
broker.Start(WorkersList{})
_, err := broker.PushJob(&JobRequest{
Domain: "cozy.local",
Expand All @@ -144,9 +145,10 @@ func TestUnknownWorkerError(t *testing.T) {
func TestUnknownMessageType(t *testing.T) {
var w sync.WaitGroup

broker := NewMemBroker(4)
broker := NewMemBroker()
broker.Start(WorkersList{
"test": {
{
WorkerType: "test",
Concurrency: 4,
WorkerFunc: func(ctx *WorkerContext) error {
var msg string
Expand All @@ -173,9 +175,10 @@ func TestUnknownMessageType(t *testing.T) {
func TestTimeout(t *testing.T) {
var w sync.WaitGroup

broker := NewMemBroker(1)
broker := NewMemBroker()
broker.Start(WorkersList{
"timeout": {
{
WorkerType: "timeout",
Concurrency: 1,
MaxExecCount: 1,
Timeout: 1 * time.Millisecond,
Expand Down Expand Up @@ -204,9 +207,10 @@ func TestRetry(t *testing.T) {
maxExecCount := 4

var count int
broker := NewMemBroker(1)
broker := NewMemBroker()
broker.Start(WorkersList{
"test": {
{
WorkerType: "test",
Concurrency: 1,
MaxExecCount: maxExecCount,
Timeout: 1 * time.Millisecond,
Expand Down Expand Up @@ -239,9 +243,10 @@ func TestPanicRetried(t *testing.T) {

maxExecCount := 4

broker := NewMemBroker(1)
broker := NewMemBroker()
broker.Start(WorkersList{
"panic": {
{
WorkerType: "panic",
Concurrency: 1,
MaxExecCount: maxExecCount,
RetryDelay: 1 * time.Millisecond,
Expand Down Expand Up @@ -269,9 +274,10 @@ func TestPanic(t *testing.T) {
even, _ := NewMessage(0)
odd, _ := NewMessage(1)

broker := NewMemBroker(1)
broker := NewMemBroker()
broker.Start(WorkersList{
"panic2": {
{
WorkerType: "panic2",
Concurrency: 1,
MaxExecCount: 1,
RetryDelay: 1 * time.Millisecond,
Expand Down
Loading

0 comments on commit b58e15a

Please sign in to comment.