Skip to content

Commit

Permalink
add asynchronous worker (techschool#69)
Browse files Browse the repository at this point in the history
Co-authored-by: phamlequang <phamlequang@gmail.com>
  • Loading branch information
techschool and phamlequang authored Nov 12, 2022
1 parent 132d9cb commit cb1b9fa
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 23 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ proto:
evans:
evans --host localhost --port 9090 -r repl

.PHONY: network postgres createdb dropdb migrateup migratedown migrateup1 migratedown1 db_docs db_schema sqlc test server mock proto evans
redis:
docker run --name redis -p 6379:6379 -d redis:7-alpine

.PHONY: network postgres createdb dropdb migrateup migratedown migrateup1 migratedown1 db_docs db_schema sqlc test server mock proto evans redis
1 change: 1 addition & 0 deletions app.env
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ GRPC_SERVER_ADDRESS=0.0.0.0:9090
TOKEN_SYMMETRIC_KEY=12345678901234567890123456789012
ACCESS_TOKEN_DURATION=15m
REFRESH_TOKEN_DURATION=24h
REDIS_ADDRESS=0.0.0.0:6379
17 changes: 17 additions & 0 deletions gapi/rpc_create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package gapi

import (
"context"
"time"

"github.com/hibiken/asynq"
"github.com/lib/pq"
db "github.com/techschool/simplebank/db/sqlc"
"github.com/techschool/simplebank/pb"
"github.com/techschool/simplebank/util"
"github.com/techschool/simplebank/val"
"github.com/techschool/simplebank/worker"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -42,6 +45,20 @@ func (server *Server) CreateUser(ctx context.Context, req *pb.CreateUserRequest)
return nil, status.Errorf(codes.Internal, "failed to create user: %s", err)
}

// TODO: use db transaction
taskPayload := &worker.PayloadSendVerifyEmail{
Username: user.Username,
}
opts := []asynq.Option{
asynq.MaxRetry(10),
asynq.ProcessIn(10 * time.Second),
asynq.Queue(worker.QueueCritical),
}
err = server.taskDistributor.DistributeTaskSendVerifyEmail(ctx, taskPayload, opts...)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed to distribute task to send verify email: %s", err)
}

rsp := &pb.CreateUserResponse{
User: convertUser(user),
}
Expand Down
17 changes: 10 additions & 7 deletions gapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,30 @@ import (
"github.com/techschool/simplebank/pb"
"github.com/techschool/simplebank/token"
"github.com/techschool/simplebank/util"
"github.com/techschool/simplebank/worker"
)

// Server serves gRPC requests for our banking service.
type Server struct {
pb.UnimplementedSimpleBankServer
config util.Config
store db.Store
tokenMaker token.Maker
config util.Config
store db.Store
tokenMaker token.Maker
taskDistributor worker.TaskDistributor
}

// NewServer creates a new gRPC server.
func NewServer(config util.Config, store db.Store) (*Server, error) {
func NewServer(config util.Config, store db.Store, taskDistributor worker.TaskDistributor) (*Server, error) {
tokenMaker, err := token.NewPasetoMaker(config.TokenSymmetricKey)
if err != nil {
return nil, fmt.Errorf("cannot create token maker: %w", err)
}

server := &Server{
config: config,
store: store,
tokenMaker: tokenMaker,
config: config,
store: store,
tokenMaker: tokenMaker,
taskDistributor: taskDistributor,
}

return server, nil
Expand Down
12 changes: 9 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.0
github.com/hibiken/asynq v0.23.0
github.com/lib/pq v1.10.5
github.com/o1egl/paseto v1.0.0
github.com/rakyll/statik v0.1.7
Expand All @@ -21,17 +22,20 @@ require (
google.golang.org/genproto v0.0.0-20220317150908-0efb43f6373e
google.golang.org/grpc v1.45.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
google.golang.org/protobuf v1.28.0
google.golang.org/protobuf v1.28.1
)

require (
github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect
github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
Expand All @@ -48,16 +52,18 @@ require (
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
go.uber.org/atomic v1.6.0 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.2.0 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
Loading

0 comments on commit cb1b9fa

Please sign in to comment.