Skip to content

withseid/delayq

Repository files navigation

延迟队列 delayq

业务背景

在工作时,遇到以下业务需求

  • 某个资源过期后,在过期后的第 30 天后会自动删除,但是在自动删除前,需要在在过期第 23 天,28天,29天发短息通知用户,该过期资源将被删除

  • 用户要删除某个重要资源时,需要留 24 小时的缓冲时间给用户考虑是否真的要删掉,若 24 小时内用户没有撤销删除操作,则 24 小时后将该资源删除

怎么去解决以上的业务需求呢? 一开始最容易想到的就是后台一直去轮询 mysql 的表,看看是否该删除空间/发短信通知用户。这种做法存在的问题是: mysql 的性能比较容易到达瓶颈,一直去扫表,mysql 的压力会很大。因此这个方案是不可行的。

后来,我们采用的方案是延迟队列,一开始考虑的是用 beanstalkd , 但是 beanstalkd 存在一个问题是,只支持入队操作,不支持出队操作。因此 beanstalkd 也被抛弃了。因为在我们的需求中,是需要有出队操作的。

例如,当用户删除某个资源时,将该准备删除的资源加入延迟队列,24小时后自动删除该资源,如果在 24 小时内,用户点击了撤销操作,此时需要将该资源的信息从延迟队列中删除。

后面参考了有赞的延迟队列设计 的文章,实现了一个延迟队列。在延迟队列设计中,最重要的就是以下三个组件

  • Job Pool:存放 Job 的元信息
  • Delay Queue: 存放暂不可被消费的 jobID
  • Ready Queue: 存放着可以被消费的 jobID

将一个延迟任务入队的流程如下:

  • 将 Job 的元信息添加到 Job Pool 中
  • 将 Job ID 和 到期时间存入到 Delay Queue
  • Timer 每隔 1 秒钟去扫描 Delay Queue ,如果发现当前时间大于等于某个 Job 的到期时间,将 Job ID 添加到 Ready Queue 中,并且把 Delay Queue 中的对应的 Job ID 删除
  • Ready Queue 中的 Job ID 会被消费者消费掉

将一个非延迟任务队列入队流程如下:

  • 将 Job 的元信息添加到 Job Pool 中
  • 将 Job ID 存入到 Ready Queue
  • Ready Queue 中的 Job ID 会被消费者消费掉

将一个延迟队列出队流程如下:

  • 根据 JobID 将 Delay Queue 中对应的数据删掉
  • 根据 JobID 将 Job Pool 中的元信息删除

Quickstart

通过 go get 安装 delayq

go get -u github.com/withseid/delayq

初始化客户端

package main

import (
	"encoding/json"
	"time"

	"github.com/withseid/delayq"
	"github.com/withseid/delayq/example/model"
)

func main() {
	config := delayq.RedisConfiguration{
		Host: "192.168.89.160",
		Port: "6379",
	}
	client := delayq.NewClient(config)




}

延迟任务入队可选 Option

  • ProcessAt: 在某个时间点执行任务
  • ProcessIn: 从当前时间算起,延迟多久后再执行任务
  • Retry:重试次数,此 Option 不配置时,默认无限重试,重试时间每次递增,从 21 -> 212 s

在可选 Option 中,若 ProcessAt 和 ProcessIn 都不选,则该任务立即执行。若 ProcessAt 和 ProcessIn 都选了,则只有最后一个会生效。

可选 Option ProcessAt, 例如 delayq.ProcessAt(time.Now().AddDate(0, 0, 1)) 表示将在第二天的这个时间执行某个任务

	space1 := model.DeletedSpace{
		SpaceID: "space1",
	}
	data, err := json.Marshal(space1)
	if err != nil {
		panic(err)
	}

	// 假设当前时间是 2022-08-16 18:12
	// delayq.ProcessAt(time.Now().AddDate(0, 0, 1)) 表示将在 2022-08-17 18:12 执行该任务
	client.Enqueue(space1.Topic(), space1.SpaceID, data, delayq.ProcessAt(time.Now().AddDate(0, 0, 1)))

可选 Option ProcessIn,例如 delayq.ProcessIn(time.Second*10) 表示 10s 后执行某个任务,

	space2 := model.DeletedSpace{
		SpaceID: "space2",
	}
	data, err := json.Marshal(space2)
	if err != nil {
		panic(err)
	}
	// delayq.ProcessIn(time.Second*24) 表示将在当前时间的基础上,延迟 10s 后执行
	client.Enqueue(space2.Topic(), space2.SpaceID, data, delayq.ProcessIn(time.Second*10))

可选 Option Retry, 例如 delayq.Retry(6), 表示重试 6 次,若重试 6 次都失败,则自动丢弃任务,默认为无限重试

	space3 := model.DeletedSpace{
		SpaceID: "space3",
	}
	data, err := json.Marshal(space3)
	if err != nil {
		panic(err)
	}
	client.Enqueue(space3.Topic(), space3.SpaceID, data, delayq.Retry(6))

出队

	// 将 JobID 为 space2 的延迟任务出队
	client.Dequeue(space2.Topic(), space2.SpaceID)

新建一个 Server,负责消费延迟队列中的消息并且执行延迟延迟

package main

import (
	"context"

	"github.com/withseid/delayq"
	"github.com/withseid/delayq/example/model"
)

func main() {
	config := delayq.RedisConfiguration{
		Host: "192.168.89.160",
		Port: "6379",
	}
	server := delayq.NewServer(config)
	ds := model.DeletedSpace{}

	server.HandlerFunc(ds.Topic(), &ds)
	server.Run(context.TODO())
}

DeletedSpace 类型的延迟任务的处理逻辑

package model

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/withseid/delayq"
)

// const DeletedSpaceTopic = "deleted_space"

type DeletedSpace struct {
	SpaceID string
}

func (d *DeletedSpace) Topic() string {
	return "deleted_space"
}

func (d *DeletedSpace) Execute(ctx context.Context, job *delayq.Job) error {
	ds := DeletedSpace{}
	err := json.Unmarshal(job.Boday, &ds)
	if err != nil {
		return err
	}

	fmt.Println("DeletedSpace: ", ds)
	return nil
}

About

delay queue in Go

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages