Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// The returne TriggerResponse will indicate whether the Job was successfully
// triggered, the trigger failed, or the Job need to be put into the staging
// queue.
type TriggerFunction func(context.Context, *TriggerRequest) *TriggerResponse
type TriggerFunction func(*TriggerRequest, func(*TriggerResponse))

// Interface is a cron interface. It schedules and manages job which are stored
// and informed from ETCD. It uses a trigger function to call when a job is
Expand Down
22 changes: 22 additions & 0 deletions cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ type Options struct {
// queue unless read from.
// Channel will be closed on final shutdown.
ConsumerSink chan<- *api.InformerEvent

// Workers is the number of workers that handle job events. The higher the
// number the more go routines will be spawned, each working over a partition
// of the total job space. The higher the number, the higher the number of
// jobs which can be concurrently executed at a time. Increasing this number,
// increases the number of go routines for this instance. This number should
// be tuned to the bottleneck of job execution, i.e., CPU, I/O, memory, etc.
// Defaults to 64.
// Every instance will have this number of workers.
// Must be larger than 0.
Workers *uint32
}

// cron is the implementation of the cron interface.
Expand All @@ -76,6 +87,7 @@ type cron struct {
triggerFn api.TriggerFunction
replicaData *anypb.Any
consumerSink chan<- *api.InformerEvent
workers uint32

api *retry.Retry
elected atomic.Bool
Expand All @@ -94,6 +106,14 @@ func New(opts Options) (api.Interface, error) {
return nil, errors.New("client is required")
}

var workers uint32 = 64
if opts.Workers != nil {
workers = *opts.Workers
}
if workers == 0 {
return nil, errors.New("workers must be larger than 0")
}

key, err := key.New(key.Options{
Namespace: opts.Namespace,
ID: opts.ID,
Expand Down Expand Up @@ -127,6 +147,7 @@ func New(opts Options) (api.Interface, error) {
wleaderCh: opts.WatchLeadership,
api: retry.New(retry.Options{Log: log}),
consumerSink: opts.ConsumerSink,
workers: workers,
}, nil
}

Expand Down Expand Up @@ -242,6 +263,7 @@ func (c *cron) runEngine(ctx context.Context, elected *elector.Elected) error {
Client: c.client,
TriggerFn: c.triggerFn,
ConsumerSink: c.consumerSink,
Workers: c.workers,
})
if err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions cron/cron_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func Test_Run(t *testing.T) {
Client: client,
Namespace: "abc",
ID: "0",
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
TriggerFn: func(_ *api.TriggerRequest, fn func(*api.TriggerResponse)) {
triggered.Add(1)
return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS}
fn(&api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS})
},
ReplicaData: replicaData,
})
Expand Down Expand Up @@ -90,8 +90,8 @@ func Test_Run(t *testing.T) {
Client: client,
Namespace: "abc",
ID: "0",
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
return &api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS}
TriggerFn: func(_ *api.TriggerRequest, fn func(*api.TriggerResponse)) {
fn(&api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS})
},
ReplicaData: replicaData,
})
Expand Down
4 changes: 2 additions & 2 deletions example/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/diagridio/go-etcd-cron/example

go 1.24.3
go 1.24.10

replace github.com/diagridio/go-etcd-cron => ../

Expand All @@ -17,7 +17,7 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/dapr/kit v0.15.4 // indirect
github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
6 changes: 2 additions & 4 deletions example/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr
github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/dapr/kit v0.15.4 h1:29DezCR22OuZhXX4yPEc+lqcOf/PNaeAuIEx9nGv394=
github.com/dapr/kit v0.15.4/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw=
github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d h1:csljij9d1IO6u9nqbg+TuSRmTZ+OXT8G49yh6zie1yI=
github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -191,8 +191,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand Down
6 changes: 3 additions & 3 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ func main() {
Client: client,
Namespace: "abc",
ID: "helloworld",
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
TriggerFn: func(_ *api.TriggerRequest, fn func(*api.TriggerResponse)) {
// Do something with your trigger here.
// Return SUCCESS if the trigger was successful, FAILED if the trigger
// failed and should be subject to the FailurePolicy, or UNDELIVERABLE if
// the job is currently undeliverable and should be moved to the staging
// queue. Use `cron.DeliverablePrefixes` elsewhere to mark jobs with the
// given prefixes as now deliverable.
return &api.TriggerResponse{
fn(&api.TriggerResponse{
Result: api.TriggerResponseResult_SUCCESS,
// Result: api.TriggerResponseResult_FAILED,
// Result: api.TriggerResponseResult_UNDELIVERABLE,
}
})
},
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/diagridio/go-etcd-cron

go 1.24.3
go 1.24.10

require (
github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d
Expand Down
34 changes: 21 additions & 13 deletions internal/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package counter

import (
"context"
"sync"
"time"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -55,6 +56,12 @@ type Interface interface {
TriggerFailed(ctx context.Context) (bool, error)
}

var CounterCache = sync.Pool{
New: func() any {
return new(counter)
},
}

// counter is the implementation of the counter interface.
type counter struct {
name string
Expand All @@ -79,20 +86,21 @@ func New(ctx context.Context, opts Options) (Interface, bool, error) {
return nil, false, err
}

c := &counter{
name: opts.Name,
counterKey: counterKey,
jobKey: jobKey,
client: opts.Client,
schedule: opts.Schedule,
job: opts.Job,
modRevision: opts.JobModRevision,
triggerRequest: &api.TriggerRequest{
Name: opts.Name,
Metadata: opts.Job.GetJob().GetMetadata(),
Payload: opts.Job.GetJob().GetPayload(),
},
c := CounterCache.Get().(*counter)
c.name = opts.Name
c.jobKey = jobKey
c.counterKey = counterKey
c.client = opts.Client
c.schedule = opts.Schedule
c.job = opts.Job
c.count = nil
c.next = time.Time{}
c.triggerRequest = &api.TriggerRequest{
Name: opts.Name,
Metadata: opts.Job.GetJob().GetMetadata(),
Payload: opts.Job.GetJob().GetPayload(),
}
c.modRevision = opts.JobModRevision

if res.Count == 0 {
c.count = &stored.Counter{JobPartitionId: opts.Job.GetPartitionId()}
Expand Down
3 changes: 3 additions & 0 deletions internal/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Options struct {

// ConsumerSink is an optional sink to receive informer events.
ConsumerSink chan<- *api.InformerEvent

Workers uint32
}

type Interface interface {
Expand Down Expand Up @@ -78,6 +80,7 @@ func New(opts Options) (Interface, error) {
SchedulerBuilder: schedBuilder,
TriggerFn: opts.TriggerFn,
ConsumerSink: opts.ConsumerSink,
Workers: opts.Workers,
})

handler := handler.New(handler.Options{
Expand Down
16 changes: 5 additions & 11 deletions internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ func Test_New(t *testing.T) {
Key: key,
Partitioner: part,
Client: client,
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
return nil
},
TriggerFn: func(*api.TriggerRequest, func(*api.TriggerResponse)) {},
})

require.NoError(t, err)
Expand Down Expand Up @@ -76,8 +74,8 @@ func Test_Run(t *testing.T) {
Key: key,
Partitioner: part,
Client: client,
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
return nil
TriggerFn: func(_ *api.TriggerRequest, fn func(*api.TriggerResponse)) {
fn(&api.TriggerResponse{Result: api.TriggerResponseResult_SUCCESS})
},
})

Expand Down Expand Up @@ -116,9 +114,7 @@ func Test_Run(t *testing.T) {
Key: key,
Partitioner: part,
Client: client,
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
return nil
},
TriggerFn: func(_ *api.TriggerRequest, fn func(*api.TriggerResponse)) {},
})

require.NoError(t, err)
Expand Down Expand Up @@ -156,9 +152,7 @@ func Test_API(t *testing.T) {
Key: key,
Partitioner: part,
Client: client,
TriggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse {
return nil
},
TriggerFn: func(*api.TriggerRequest, func(*api.TriggerResponse)) {},
})

require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions internal/engine/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ func newAPINotReady(t *testing.T) *handler {
Client: client,
Key: key,
SchedulerBuilder: schedulerBuilder,
TriggerFn: func(context.Context, *cronapi.TriggerRequest) *cronapi.TriggerResponse {
return &cronapi.TriggerResponse{Result: cronapi.TriggerResponseResult_SUCCESS}
TriggerFn: func(_ *cronapi.TriggerRequest, fn func(*cronapi.TriggerResponse)) {
fn(&cronapi.TriggerResponse{Result: cronapi.TriggerResponseResult_SUCCESS})
},
})

Expand Down
3 changes: 1 addition & 2 deletions internal/engine/informer/informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ func Test_Run(t *testing.T) {
select {
case ev := <-ch:
assert.True(t, ev.IsPut)
//nolint:govet
assert.Equal(t, jobs[i], *ev.Job)
assert.True(t, proto.Equal(&jobs[i], ev.Job))
case <-time.After(time.Second):
t.Fatalf("timed out waiting for event %d", i)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/engine/queue/actioner/actioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Interface interface {
Enqueue(counter.Interface)
Deschedule(counter.Interface)
RemoveConsumer(counter.Interface)
Trigger(context.Context, *api.TriggerRequest) *api.TriggerResponse
Trigger(*api.TriggerRequest, func(*api.TriggerResponse))
AddToControlLoop(*queue.ControlEvent)
DeliverablePrefixes(...string) []string
UnDeliverablePrefixes(...string)
Expand Down Expand Up @@ -108,8 +108,8 @@ func (a *actioner) RemoveConsumer(counter counter.Interface) {
a.consumer.Delete(counter.JobName(), counter.Job())
}

func (a *actioner) Trigger(ctx context.Context, req *api.TriggerRequest) *api.TriggerResponse {
return a.triggerFn(ctx, req)
func (a *actioner) Trigger(req *api.TriggerRequest, fn func(*api.TriggerResponse)) {
a.triggerFn(req, fn)
}

func (a *actioner) AddToControlLoop(event *queue.ControlEvent) {
Expand Down
10 changes: 5 additions & 5 deletions internal/engine/queue/actioner/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Fake struct {
enqueueFn func(counter.Interface)
descheduleFn func(counter.Interface)
removeConsumerFn func(counter.Interface)
triggerFn func(context.Context, *api.TriggerRequest) *api.TriggerResponse
triggerFn func(*api.TriggerRequest, func(*api.TriggerResponse))
addToControlLoopFn func(*queue.ControlEvent)
deliverablePrefixesFn func(...string) []string
unDeliverablePrefixesFn func(...string)
Expand All @@ -32,7 +32,7 @@ func New() *Fake {
scheduleFn: func(context.Context, string, int64, *stored.Job) (counter.Interface, error) { return nil, nil },
enqueueFn: func(counter.Interface) {},
descheduleFn: func(counter.Interface) {},
triggerFn: func(context.Context, *api.TriggerRequest) *api.TriggerResponse { return nil },
triggerFn: func(*api.TriggerRequest, func(*api.TriggerResponse)) {},
removeConsumerFn: func(counter.Interface) {},
addToControlLoopFn: func(*queue.ControlEvent) {},
deliverablePrefixesFn: func(...string) []string { return nil },
Expand All @@ -57,7 +57,7 @@ func (f *Fake) WithDeschedule(fn func(counter.Interface)) *Fake {
return f
}

func (f *Fake) WithTrigger(fn func(context.Context, *api.TriggerRequest) *api.TriggerResponse) *Fake {
func (f *Fake) WithTrigger(fn func(*api.TriggerRequest, func(*api.TriggerResponse))) *Fake {
f.triggerFn = fn
return f
}
Expand Down Expand Up @@ -104,8 +104,8 @@ func (f *Fake) Deschedule(c counter.Interface) {
f.descheduleFn(c)
}

func (f *Fake) Trigger(ctx context.Context, req *api.TriggerRequest) *api.TriggerResponse {
return f.triggerFn(ctx, req)
func (f *Fake) Trigger(req *api.TriggerRequest, fn func(*api.TriggerResponse)) {
f.triggerFn(req, fn)
}

func (f *Fake) AddToControlLoop(event *queue.ControlEvent) {
Expand Down
Loading