-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
lilh
committed
Aug 12, 2022
1 parent
aed064b
commit b83e16b
Showing
9 changed files
with
373 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package delayq | ||
|
||
import ( | ||
"time" | ||
|
||
s "github.com/deckarep/golang-set" | ||
) | ||
|
||
type Client struct { | ||
storage Storage | ||
jobPool map[string]*Job | ||
|
||
readyQueue map[string]s.Set | ||
} | ||
|
||
func NewClient(config RedisConfiguration) Client { | ||
storage, err := NewStorage(config) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
client := Client{ | ||
storage: storage, | ||
jobPool: make(map[string]*Job), | ||
readyQueue: make(map[string]s.Set), | ||
} | ||
return client | ||
} | ||
|
||
func (c *Client) Dequeue(topic string, jobID string) { | ||
for k := range c.jobPool { | ||
if k == jobID { | ||
delete(c.jobPool, jobID) | ||
} | ||
} | ||
|
||
} | ||
|
||
func (c *Client) Enqueue(payload []byte, topic string, jobID string, opts ...Option) error { | ||
job := &Job{ | ||
Topic: topic, | ||
ID: jobID, | ||
Boday: payload, | ||
} | ||
|
||
for _, opt := range opts { | ||
switch opt := opt.(type) { | ||
case timeoutOption: | ||
job.TTR = time.Duration(opt).Milliseconds() | ||
case processAtOption: | ||
delay := time.Time(opt).UnixNano() | ||
job.Delay = delay | ||
case processInOption: | ||
delay := time.Now().Add(time.Duration(opt)).UnixNano() | ||
job.Delay = delay | ||
default: | ||
|
||
} | ||
} | ||
if job.Delay == 0 { | ||
job.State = JobReady | ||
} else { | ||
job.State = JobDelay | ||
} | ||
|
||
// 如果不是延迟任务,马上加入就绪队列 | ||
if job.State == JobReady { | ||
queue, ok := c.readyQueue[topic] | ||
if !ok { | ||
queue = s.NewSet() | ||
} | ||
queue.Add(jobID) | ||
|
||
c.readyQueue[topic] = queue | ||
} else { | ||
c.storage.ZAdd(DelayBucket, job) | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,27 @@ | ||
package delayq | ||
|
||
type JobState int | ||
|
||
const ( | ||
JobReady JobState = iota | ||
JobDelay | ||
JobReserved | ||
JobDeleted | ||
) | ||
|
||
type Job struct { | ||
Topic string | ||
ID string | ||
Delay int64 | ||
TTR int64 | ||
Boday []byte | ||
State JobState | ||
} | ||
|
||
type JobPool struct { | ||
pool []Job | ||
} | ||
|
||
// Delay Bucket => zset | ||
|
||
// Ready Queue => redis list |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,111 @@ | ||
package delayq | ||
|
||
import ( | ||
"encoding/json" | ||
"testing" | ||
"time" | ||
|
||
"github.com/go-redis/redis/v8" | ||
) | ||
|
||
func TestInitRedis(t *testing.T) { | ||
InitRedis() | ||
func TestNewClient(t *testing.T) { | ||
config := RedisConfiguration{ | ||
Host: "192.168.89.160", | ||
Port: "6379", | ||
} | ||
client := NewClient(config) | ||
a := []int{1, 2, 3} | ||
b, err := json.Marshal(a) | ||
if err != nil { | ||
panic(err) | ||
} | ||
client.Enqueue(b, "aaa", "job_id", ProcessAt(time.Now().AddDate(1, 0, 0))) | ||
|
||
} | ||
|
||
// func TestInitRedis(t *testing.T) { | ||
|
||
// InitRedis() | ||
|
||
// for i := 0; i < 10; i++ { | ||
// s := time.Duration(time.Second).Milliseconds() * int64(i) | ||
// delay := time.Duration(time.Now().UnixNano()).Milliseconds() + s | ||
// RedisCli.ZAdd(context.Background(), "zadd_test", &redis.Z{ | ||
// Score: float64(delay), | ||
// Member: fmt.Sprintf("lilh-%d", i), | ||
// }) | ||
// } | ||
|
||
// } | ||
|
||
// func TestZset(t *testing.T) { | ||
// InitRedis() | ||
|
||
// script := redis.NewScript(` | ||
// local vals = redis.call("zrangebyscore",KEYS[1],"-inf",ARGV[1],"limit",0,20) | ||
// if (next(vals) ~= nil) then | ||
// redis.call("zremrangebyrank",KEYS[1], 0, #vals -1) | ||
// end | ||
// return #vals | ||
// `) | ||
// nums, err := script.Run(context.Background(), RedisCli, []string{"zadd1"}, time.Now().UnixNano()).Result() | ||
// if err != nil { | ||
// panic(err) | ||
// } | ||
// t.Log("nums: ", nums) | ||
// } | ||
|
||
// func TestLuaIncrBy(t *testing.T) { | ||
// InitRedis() | ||
|
||
// keys := []string{"test1"} | ||
// values := []interface{}{5} | ||
// num, err := incrBy.Run(context.Background(), RedisCli, keys, values...).Int() | ||
// if err != nil { | ||
// panic(err) | ||
// } | ||
// t.Log("num: ", num) | ||
|
||
// } | ||
|
||
// func TestLuaSum(t *testing.T) { | ||
// InitRedis() | ||
// keys := []string{"testsum"} | ||
// values := []interface{}{1, 2, 3, 4} | ||
// num, err := sum.Run(context.Background(), RedisCli, keys, values...).Int() | ||
// if err != nil { | ||
// panic(err) | ||
// } | ||
// t.Log("num: ", num) | ||
// } | ||
|
||
var sum = redis.NewScript(` | ||
local key = KEYS[1] | ||
local sum = redis.call("GET",key) | ||
if not sum then | ||
sum = 0 | ||
end | ||
local num_arg = #ARGV | ||
for i = 1, num_arg do | ||
sum = sum + ARGV[i] | ||
end | ||
redis.call("SET",key,sum) | ||
return sum | ||
`) | ||
|
||
var incrBy = redis.NewScript(` | ||
local key = KEYS[1] | ||
local change = ARGV[1] | ||
local value = redis.call("GET",key) | ||
if not value then | ||
value = 0 | ||
end | ||
value = value + change | ||
redis.call("SET",key,value) | ||
return value | ||
`) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,3 +4,5 @@ type RedisConfiguration struct { | |
Host string | ||
Port string | ||
} | ||
|
||
var DelayBucket = "delay_bucket" |
Oops, something went wrong.