Skip to content

Commit

Permalink
Move task deadline compute logic to processor
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Feb 19, 2022
1 parent d865d89 commit bca6247
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 1 deletion.
23 changes: 22 additions & 1 deletion processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package asynq
import (
"context"
"fmt"
"math"
"math/rand"
"runtime"
"runtime/debug"
Expand All @@ -19,12 +20,14 @@ import (
asynqcontext "github.com/hibiken/asynq/internal/context"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/timeutil"
"golang.org/x/time/rate"
)

type processor struct {
logger *log.Logger
broker base.Broker
clock timeutil.Clock

handler Handler
baseCtxFn func() context.Context
Expand Down Expand Up @@ -97,6 +100,7 @@ func newProcessor(params processorParams) *processor {
logger: params.logger,
broker: params.broker,
baseCtxFn: params.baseCtxFn,
clock: timeutil.NewRealClock(),
queueConfig: queues,
orderedQueues: orderedQueues,
retryDelayFunc: params.retryDelayFunc,
Expand Down Expand Up @@ -167,7 +171,7 @@ func (p *processor) exec() {
return
case p.sema <- struct{}{}: // acquire token
qnames := p.queues()
msg, deadline, err := p.broker.Dequeue(qnames...)
msg, err := p.broker.Dequeue(qnames...)
switch {
case errors.Is(err, errors.ErrNoProcessableTask):
p.logger.Debug("All queues are empty")
Expand All @@ -186,6 +190,7 @@ func (p *processor) exec() {
return
}

deadline := p.computeDeadline(msg)
p.starting <- &workerInfo{msg, time.Now(), deadline}
go func() {
defer func() {
Expand Down Expand Up @@ -486,3 +491,19 @@ func gcd(xs ...int) int {
}
return res
}

// computeDeadline returns the given task's deadline,
func (p *processor) computeDeadline(msg *base.TaskMessage) time.Time {
if msg.Timeout == 0 && msg.Deadline == 0 {
p.logger.Errorf("asynq: internal error: both timeout and deadline are not set for the task message: %s", msg.ID)
return p.clock.Now().Add(defaultTimeout)
}
if msg.Timeout != 0 && msg.Deadline != 0 {
deadlineUnix := math.Min(float64(p.clock.Now().Unix()+msg.Timeout), float64(msg.Deadline))
return time.Unix(int64(deadlineUnix), 0)
}
if msg.Timeout != 0 {
return p.clock.Now().Add(time.Duration(msg.Timeout) * time.Second)
}
return time.Unix(msg.Deadline, 0)
}
68 changes: 68 additions & 0 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/timeutil"
)

var taskCmpOpts = []cmp.Option{
Expand Down Expand Up @@ -754,3 +756,69 @@ func TestNormalizeQueues(t *testing.T) {
}
}
}

func TestProcessorComputeDeadline(t *testing.T) {
now := time.Now()
p := processor{
logger: log.NewLogger(nil),
clock: timeutil.NewSimulatedClock(now),
}

tests := []struct {
desc string
msg *base.TaskMessage
want time.Time
}{
{
desc: "message with only timeout specified",
msg: &base.TaskMessage{
Timeout: int64((30 * time.Minute).Seconds()),
},
want: now.Add(30 * time.Minute),
},
{
desc: "message with only deadline specified",
msg: &base.TaskMessage{
Deadline: now.Add(24 * time.Hour).Unix(),
},
want: now.Add(24 * time.Hour),
},
{
desc: "message with both timeout and deadline set (now+timeout < deadline)",
msg: &base.TaskMessage{
Deadline: now.Add(24 * time.Hour).Unix(),
Timeout: int64((30 * time.Minute).Seconds()),
},
want: now.Add(30 * time.Minute),
},
{
desc: "message with both timeout and deadline set (now+timeout > deadline)",
msg: &base.TaskMessage{
Deadline: now.Add(10 * time.Minute).Unix(),
Timeout: int64((30 * time.Minute).Seconds()),
},
want: now.Add(10 * time.Minute),
},
{
desc: "message with both timeout and deadline set (now+timeout == deadline)",
msg: &base.TaskMessage{
Deadline: now.Add(30 * time.Minute).Unix(),
Timeout: int64((30 * time.Minute).Seconds()),
},
want: now.Add(30 * time.Minute),
},
{
desc: "message without timeout and deadline",
msg: &base.TaskMessage{},
want: now.Add(defaultTimeout),
},
}

for _, tc := range tests {
got := p.computeDeadline(tc.msg)
// Compare the Unix epoch with seconds granularity
if got.Unix() != tc.want.Unix() {
t.Errorf("%s: got=%v, want=%v", tc.desc, got.Unix(), tc.want.Unix())
}
}
}

0 comments on commit bca6247

Please sign in to comment.