Skip to content

Commit 1f24c6f

Browse files
author
Vic Shóstak
committed
first commit
0 parents  commit 1f24c6f

File tree

12 files changed

+334
-0
lines changed

12 files changed

+334
-0
lines changed

.editorconfig

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
root = true
2+
3+
[*]
4+
indent_style = space
5+
indent_size = 2
6+
charset = utf-8
7+
trim_trailing_whitespace = true
8+
insert_final_newline = true
9+
10+
[{go.mod,go.sum,*.go}]
11+
indent_style = tab
12+
indent_size = 4
13+
14+
[{Makefile,Dockerfile,*.yml,*.yaml}]
15+
indent_style = tab
16+
indent_size = 2

.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go.sum merge=union

.gitignore

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# macOS
2+
**/.DS_Store
3+
4+
# Dev build
5+
build/
6+
tmp/
7+
8+
# Test
9+
*.out
10+
11+
# Environment variables
12+
.env

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2021 Vic Shóstak <truewebartisans@gmail.com> (https://1wa.co)
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

Makefile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
.PHONY: worker client
2+
3+
worker:
4+
go run ./worker/server.go
5+
6+
stop:
7+
go run ./client/main.go

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# 📖 Tutorial: Asynq. Simple, reliable & efficient distributed task queue for your next Go project
2+
3+
👉 The full article is published on **April 7, 2021**, on Dev.to:
4+
5+
## Quick start
6+
7+
1. Clone this repository and go to it root folder.
8+
2. Start a Redis server (by Docker or locally).
9+
3. Start Asynq worker server:
10+
11+
```console
12+
make worker
13+
```
14+
15+
4. Start generating tasks by Asynq client:
16+
17+
```console
18+
make client
19+
```
20+
21+
5. Install [Asynqmon](https://github.com/hibiken/asynqmon) (Asynq web UI) to your system.
22+
6. Go to [localhost:8080](http://localhost:8080) and see:
23+
24+
![Screenshot](https://user-images.githubusercontent.com/11155743/113557216-57af2b80-9606-11eb-8ab6-df023b14e5c1.png)
25+
26+
## ⚠️ License
27+
28+
MIT &copy; [Vic Shóstak](https://shostak.dev/) & [True web artisans](https://1wa.co/).

client/main.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"math/rand"
6+
"time"
7+
8+
"tutorial-go-asynq/tasks"
9+
10+
"github.com/hibiken/asynq"
11+
)
12+
13+
func main() {
14+
// Create a new Redis connection for the client.
15+
redisConnection := asynq.RedisClientOpt{
16+
Addr: "localhost:6379", // Redis server address
17+
}
18+
19+
// Create a new Asynq client.
20+
client := asynq.NewClient(redisConnection)
21+
defer client.Close()
22+
23+
// Infinite loop to create tasks as Asynq client.
24+
for {
25+
// Generate a random user ID.
26+
userID := rand.Intn(1000) + 10
27+
28+
// Set a delay duration to 2 minutes.
29+
delay := 2 * time.Minute
30+
31+
// Define tasks.
32+
task1 := tasks.NewWelcomeEmailTask(userID)
33+
task2 := tasks.NewReminderEmailTask(userID, time.Now().Add(delay))
34+
35+
// Process the task immediately in critical queue.
36+
if _, err := client.Enqueue(
37+
task1, // task payload
38+
asynq.Queue("critical"), // set queue for task
39+
); err != nil {
40+
log.Fatal(err)
41+
}
42+
43+
// Process the task 2 minutes later in low queue.
44+
if _, err := client.Enqueue(
45+
task2, // task payload
46+
asynq.Queue("low"), // set queue for task
47+
asynq.ProcessIn(delay), // set time to process task
48+
); err != nil {
49+
log.Fatal(err)
50+
}
51+
}
52+
}

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module tutorial-go-asynq
2+
3+
go 1.16
4+
5+
require github.com/hibiken/asynq v0.17.1

go.sum

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
4+
github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4=
5+
github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
6+
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
7+
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
8+
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
9+
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
10+
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
11+
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
12+
github.com/hibiken/asynq v0.17.1 h1:vNnK+wbnzjzs9E6WN0Im8M6a6oxXaRKK5wue1KsEC30=
13+
github.com/hibiken/asynq v0.17.1/go.mod h1:yfQUmjFqSBSUIVxTK0WyW4LPj4gpr283UpWb6hKYaqE=
14+
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
15+
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
16+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
17+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
18+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
19+
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
20+
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
21+
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
22+
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
23+
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
24+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
25+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
26+
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
27+
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
28+
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
29+
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
30+
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
31+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
32+
go.uber.org/goleak v0.10.0 h1:G3eWbSNIskeRqtsN/1uI5B+eP73y3JUuBsv9AZjehb4=
33+
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
34+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
35+
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
36+
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
37+
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
38+
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
39+
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
40+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
41+
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
42+
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e h1:9vRrk9YW2BTzLP0VCB9ZDjU4cPqkg+IDWL7XgxA1yxQ=
43+
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
44+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
45+
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
46+
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
47+
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
48+
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
49+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
50+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
51+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
52+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
53+
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
54+
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
55+
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
56+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
57+
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
58+
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
59+
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
60+
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
61+
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

tasks/handlers.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package tasks
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/hibiken/asynq"
8+
)
9+
10+
// HandleWelcomeEmailTask handler for welcome email task.
11+
func HandleWelcomeEmailTask(c context.Context, t *asynq.Task) error {
12+
// Get user ID from given task.
13+
id, err := t.Payload.GetInt("user_id")
14+
if err != nil {
15+
return err
16+
}
17+
18+
// Dummy message to the worker's output.
19+
fmt.Printf("Send Welcome Email to User ID %d\n", id)
20+
21+
return nil
22+
}
23+
24+
// HandleReminderEmailTask for reminder email task.
25+
func HandleReminderEmailTask(c context.Context, t *asynq.Task) error {
26+
// Get int with the user ID from the given task.
27+
id, err := t.Payload.GetInt("user_id")
28+
if err != nil {
29+
return err
30+
}
31+
32+
// Get string with the sent time from the given task.
33+
time, err := t.Payload.GetString("sent_in")
34+
if err != nil {
35+
return err
36+
}
37+
38+
// Dummy message to the worker's output.
39+
fmt.Printf("Send Reminder Email to User ID %d\n", id)
40+
fmt.Printf("Reason: time is up (%v)\n", time)
41+
42+
return nil
43+
}

tasks/payloads.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package tasks
2+
3+
import (
4+
"time"
5+
6+
"github.com/hibiken/asynq"
7+
)
8+
9+
const (
10+
// TypeWelcomeEmail is a name of the task type
11+
// for sending a welcome email.
12+
TypeWelcomeEmail = "email:welcome"
13+
14+
// TypeReminderEmail is a name of the task type
15+
// for sending a reminder email.
16+
TypeReminderEmail = "email:reminder"
17+
)
18+
19+
// NewWelcomeEmailTask task payload for a new welcome email.
20+
func NewWelcomeEmailTask(id int) *asynq.Task {
21+
// Specify task payload.
22+
payload := map[string]interface{}{
23+
"user_id": id, // set user ID
24+
}
25+
26+
// Return a new task with given type and payload.
27+
return asynq.NewTask(TypeWelcomeEmail, payload)
28+
}
29+
30+
// NewReminderEmailTask task payload for a reminder email.
31+
func NewReminderEmailTask(id int, ts time.Time) *asynq.Task {
32+
// Specify task payload.
33+
payload := map[string]interface{}{
34+
"user_id": id, // set user ID
35+
"sent_in": ts.String(), // set time to sending
36+
}
37+
38+
// Return a new task with given type and payload.
39+
return asynq.NewTask(TypeReminderEmail, payload)
40+
}

worker/server.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
"tutorial-go-asynq/tasks"
7+
8+
"github.com/hibiken/asynq"
9+
)
10+
11+
func main() {
12+
// Create and configuring Redis connection.
13+
redisConnection := asynq.RedisClientOpt{
14+
Addr: "localhost:6379", // Redis server address
15+
}
16+
17+
// Create and configuring Asynq worker server.
18+
worker := asynq.NewServer(redisConnection, asynq.Config{
19+
// Specify how many concurrent workers to use.
20+
Concurrency: 10,
21+
// Specify multiple queues with different priority.
22+
Queues: map[string]int{
23+
"critical": 6, // processed 60% of the time
24+
"default": 3, // processed 30% of the time
25+
"low": 1, // processed 10% of the time
26+
},
27+
})
28+
29+
// Create a new task's mux instance.
30+
mux := asynq.NewServeMux()
31+
32+
// Define a task handler for the welcome email task.
33+
mux.HandleFunc(
34+
tasks.TypeWelcomeEmail, // task type
35+
tasks.HandleWelcomeEmailTask, // handler function
36+
)
37+
38+
// Define a task handler for the reminder email task.
39+
mux.HandleFunc(
40+
tasks.TypeReminderEmail, // task type
41+
tasks.HandleReminderEmailTask, // handler function
42+
)
43+
44+
// Run worker server.
45+
if err := worker.Run(mux); err != nil {
46+
log.Fatal(err)
47+
}
48+
}

0 commit comments

Comments
 (0)