Skip to content

Commit

Permalink
Update RDB methods to work with lease
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Feb 19, 2022
1 parent b9943de commit dfae863
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 69 deletions.
10 changes: 5 additions & 5 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,13 +684,13 @@ type Broker interface {
Enqueue(ctx context.Context, msg *TaskMessage) error
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
Done(msg *TaskMessage) error
MarkAsComplete(msg *TaskMessage) error
Requeue(msg *TaskMessage) error
Done(ctx context.Context, msg *TaskMessage) error
MarkAsComplete(ctx context.Context, msg *TaskMessage) error
Requeue(ctx context.Context, msg *TaskMessage) error
Schedule(ctx context.Context, msg *TaskMessage, processAt time.Time) error
ScheduleUnique(ctx context.Context, msg *TaskMessage, processAt time.Time, ttl time.Duration) error
Retry(msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error
Archive(msg *TaskMessage, errMsg string) error
Retry(ctx context.Context, msg *TaskMessage, processAt time.Time, errMsg string, isFailure bool) error
Archive(ctx context.Context, msg *TaskMessage, errMsg string) error
ForwardIfReady(qnames ...string) error
DeleteExpiredCompletedTasks(qname string) error
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
Expand Down
16 changes: 10 additions & 6 deletions internal/rdb/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func BenchmarkDequeueSingleQueue(b *testing.B) {
}
b.StartTimer()

if _, err := r.Dequeue(base.DefaultQueueName); err != nil {
if _, _, err := r.Dequeue(base.DefaultQueueName); err != nil {
b.Fatalf("Dequeue failed: %v", err)
}
}
Expand All @@ -139,7 +139,7 @@ func BenchmarkDequeueMultipleQueues(b *testing.B) {
}
b.StartTimer()

if _, err := r.Dequeue(qnames...); err != nil {
if _, _, err := r.Dequeue(qnames...); err != nil {
b.Fatalf("Dequeue failed: %v", err)
}
}
Expand All @@ -156,6 +156,7 @@ func BenchmarkDone(b *testing.B) {
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
ctx := context.Background()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -165,7 +166,7 @@ func BenchmarkDone(b *testing.B) {
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()

if err := r.Done(msgs[0]); err != nil {
if err := r.Done(ctx, msgs[0]); err != nil {
b.Fatalf("Done failed: %v", err)
}
}
Expand All @@ -182,6 +183,7 @@ func BenchmarkRetry(b *testing.B) {
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
ctx := context.Background()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -191,7 +193,7 @@ func BenchmarkRetry(b *testing.B) {
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()

if err := r.Retry(msgs[0], time.Now().Add(1*time.Minute), "error", true /*isFailure*/); err != nil {
if err := r.Retry(ctx, msgs[0], time.Now().Add(1*time.Minute), "error", true /*isFailure*/); err != nil {
b.Fatalf("Retry failed: %v", err)
}
}
Expand All @@ -208,6 +210,7 @@ func BenchmarkArchive(b *testing.B) {
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
ctx := context.Background()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -217,7 +220,7 @@ func BenchmarkArchive(b *testing.B) {
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()

if err := r.Archive(msgs[0], "error"); err != nil {
if err := r.Archive(ctx, msgs[0], "error"); err != nil {
b.Fatalf("Archive failed: %v", err)
}
}
Expand All @@ -234,6 +237,7 @@ func BenchmarkRequeue(b *testing.B) {
{Message: m2, Score: time.Now().Add(20 * time.Second).Unix()},
{Message: m3, Score: time.Now().Add(30 * time.Second).Unix()},
}
ctx := context.Background()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -243,7 +247,7 @@ func BenchmarkRequeue(b *testing.B) {
asynqtest.SeedDeadlines(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()

if err := r.Requeue(msgs[0]); err != nil {
if err := r.Requeue(ctx, msgs[0]); err != nil {
b.Fatalf("Requeue failed: %v", err)
}
}
Expand Down
41 changes: 21 additions & 20 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ end
return nil`)

// Dequeue queries given queues in order and pops a task message
// off a queue if one exists and returns the message.
// off a queue if one exists and returns the message and its lease expiration time.
// Dequeue skips a queue if the queue is paused.
// If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, err error) {
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, leaseExpirationTime time.Time, err error) {
var op errors.Op = "rdb.Dequeue"
for _, qname := range qnames {
keys := []string{
Expand All @@ -253,26 +253,27 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, err error) {
base.ActiveKey(qname),
base.LeaseKey(qname),
}
leaseExpirationTime = r.clock.Now().Add(LeaseDuration)
argv := []interface{}{
r.clock.Now().Add(LeaseDuration).Unix(),
leaseExpirationTime.Unix(),
base.TaskKeyPrefix(qname),
}
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err == redis.Nil {
continue
} else if err != nil {
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
}
encoded, err := cast.ToStringE(res)
if err != nil {
return nil, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
}
if msg, err = base.DecodeMessage([]byte(encoded)); err != nil {
return nil, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err))
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err))
}
return msg, nil
return msg, leaseExpirationTime, nil
}
return nil, errors.E(op, errors.NotFound, errors.ErrNoProcessableTask)
return nil, time.Time{}, errors.E(op, errors.NotFound, errors.ErrNoProcessableTask)
}

// KEYS[1] -> asynq:{<qname>}:active
Expand Down Expand Up @@ -345,9 +346,8 @@ return redis.status_reply("OK")

// Done removes the task from active queue and deletes the task.
// It removes a uniqueness lock acquired by the task, if any.
func (r *RDB) Done(msg *base.TaskMessage) error {
func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error {
var op errors.Op = "rdb.Done"
ctx := context.Background()
now := r.clock.Now()
expireAt := now.Add(statsTTL)
keys := []string{
Expand Down Expand Up @@ -448,9 +448,8 @@ return redis.status_reply("OK")

// MarkAsComplete removes the task from active queue to mark the task as completed.
// It removes a uniqueness lock acquired by the task, if any.
func (r *RDB) MarkAsComplete(msg *base.TaskMessage) error {
func (r *RDB) MarkAsComplete(ctx context.Context, msg *base.TaskMessage) error {
var op errors.Op = "rdb.MarkAsComplete"
ctx := context.Background()
now := r.clock.Now()
statsExpireAt := now.Add(statsTTL)
msg.CompletedAt = now.Unix()
Expand Down Expand Up @@ -499,9 +498,8 @@ redis.call("HSET", KEYS[4], "state", "pending")
return redis.status_reply("OK")`)

// Requeue moves the task from active queue to the specified queue.
func (r *RDB) Requeue(msg *base.TaskMessage) error {
func (r *RDB) Requeue(ctx context.Context, msg *base.TaskMessage) error {
var op errors.Op = "rdb.Requeue"
ctx := context.Background()
keys := []string{
base.ActiveKey(msg.Queue),
base.LeaseKey(msg.Queue),
Expand Down Expand Up @@ -682,9 +680,8 @@ return redis.status_reply("OK")`)
// Retry moves the task from active to retry queue.
// It also annotates the message with the given error message and
// if isFailure is true increments the retried counter.
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, isFailure bool) error {
func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.Time, errMsg string, isFailure bool) error {
var op errors.Op = "rdb.Retry"
ctx := context.Background()
now := r.clock.Now()
modified := *msg
if isFailure {
Expand Down Expand Up @@ -770,9 +767,8 @@ return redis.status_reply("OK")`)

// Archive sends the given task to archive, attaching the error message to the task.
// It also trims the archive by timestamp and set size.
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error {
var op errors.Op = "rdb.Archive"
ctx := context.Background()
now := r.clock.Now()
modified := *msg
modified.ErrorMsg = errMsg
Expand Down Expand Up @@ -959,14 +955,19 @@ func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.Task
}

// ExtendLease extends the lease for the given tasks by LeaseDuration (30s).
func (r *RDB) ExtendLease(qname string, ids ...string) error {
// It returns a new expiration time if the operation was successful.
func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error) {
expireAt := r.clock.Now().Add(LeaseDuration)
var zs []redis.Z
for _, id := range ids {
zs = append(zs, redis.Z{Member: id, Score: float64(expireAt.Unix())})
}
// Use XX option to only update elements that already exist; Don't add new elements
return r.client.ZAddArgs(context.Background(), base.LeaseKey(qname), redis.ZAddArgs{XX: true, GT: true, Members: zs}).Err()
err = r.client.ZAddArgs(context.Background(), base.LeaseKey(qname), redis.ZAddArgs{XX: true, GT: true, Members: zs}).Err()
if err != nil {
return time.Time{}, err
}
return expireAt, nil
}

// KEYS[1] -> asynq:servers:{<host:pid:sid>}
Expand Down
Loading

0 comments on commit dfae863

Please sign in to comment.