Skip to content

Commit

Permalink
use sneaker-go
Browse files Browse the repository at this point in the history
  • Loading branch information
oldfritter committed Mar 25, 2020
1 parent 5e5a49e commit 4c5515f
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 135 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Go版 数字货币交易所
# Go(Golang)版 数字货币交易所

```
_______ ______ ________
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/labstack/gommon v0.3.0 // indirect
github.com/microcosm-cc/bluemonday v1.0.2 // indirect
github.com/newrelic/go-agent v3.4.0+incompatible // indirect
github.com/oldfritter/sneaker-go v0.0.0-20200313032517-0409eddc53a8 // indirect
github.com/qiniu/api.v7 v7.2.5+incompatible
github.com/qiniu/x v7.0.8+incompatible // indirect
github.com/qor/admin v0.0.0-20200315024928-877b98a68a6f // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/microcosm-cc/bluemonday v1.0.2 h1:5lPfLTTAvAbtS0VqT+94yOtFnGfUWYyx0+i
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/newrelic/go-agent v3.4.0+incompatible h1:GhUhNLDdR3ETfUVJAN/czXlqRTcgbPs6U02jYhf15rg=
github.com/newrelic/go-agent v3.4.0+incompatible/go.mod h1:a8Fv1b/fYhFSReoTU6HDkTYIMZeSVNffmoS726Y0LzQ=
github.com/oldfritter/sneaker-go v0.0.0-20200313032517-0409eddc53a8 h1:Fqh28MncN3MG+Vcan05/xsB0gKOYOYRAjBQDhNg7lzQ=
github.com/oldfritter/sneaker-go v0.0.0-20200313032517-0409eddc53a8/go.mod h1:q9+KC43OfLklfiKffi11et1HpMufAyD5q3mI/cdpOHw=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/qiniu/api.v7 v7.2.5+incompatible h1:6KKaGt7MbFzVGSniwzv7qsM/Qv0or4SkRJfmak8LqZE=
Expand Down
2 changes: 1 addition & 1 deletion workers/sneakerWorkers/accountVersionWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/shopspring/decimal"
)

func (worker *Worker) AccountVersionCheckPointWorker(payloadJson *[]byte) (queueName string, message []byte) {
func (worker Worker) AccountVersionCheckPointWorker(payloadJson *[]byte) (queueName string, message []byte) {
var payload struct {
AccountId string `json:"account_id"`
}
Expand Down
127 changes: 0 additions & 127 deletions workers/sneakerWorkers/base.go

This file was deleted.

2 changes: 1 addition & 1 deletion workers/sneakerWorkers/kLineWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/shopspring/decimal"
)

func (worker *Worker) KLineWorker(payloadJson *[]byte) (queueName string, message []byte) {
func (worker Worker) KLineWorker(payloadJson *[]byte) (queueName string, message []byte) {
start := time.Now().UnixNano()
var payload struct {
MarketId int `json:"market_id"`
Expand Down
2 changes: 1 addition & 1 deletion workers/sneakerWorkers/rebuildkLineToRedisWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/oldfritter/goDCE/utils"
)

func (worker *Worker) RebuildKLineToRedisWorker(payloadJson *[]byte) (queueName string, message []byte) {
func (worker Worker) RebuildKLineToRedisWorker(payloadJson *[]byte) (queueName string, message []byte) {
var payload struct {
MarketId int `json:"market_id"`
Period int `json:"period"`
Expand Down
2 changes: 1 addition & 1 deletion workers/sneakerWorkers/tickerWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/oldfritter/goDCE/utils"
)

func (worker *Worker) TickerWorker(payloadJson *[]byte) (queueName string, message []byte) {
func (worker Worker) TickerWorker(payloadJson *[]byte) (queueName string, message []byte) {
var payload struct {
MarketId int `json:"market_id"`
}
Expand Down
68 changes: 68 additions & 0 deletions workers/sneakerWorkers/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package sneakerWorkers

import (
"io/ioutil"
"log"
"path/filepath"

"gopkg.in/yaml.v2"
)

type Worker struct {
Name string `yaml:"name"`
Exchange string `yaml:"exchange"`
RoutingKey string `yaml:"routing_key"`
Queue string `yaml:"queue"`
Durable bool `yaml:"durable"`
Ack bool `yaml:"ack"`
Options map[string]string `yaml:"options"`
Arguments map[string]string `yaml:"arguments"`
Delays []int32 `yaml:"delays"`
Steps []int32 `yaml:"steps"`
Threads int `yaml:"threads"`
}

var AllWorkers []Worker

func InitWorkers() {
path_str, _ := filepath.Abs("config/workers.yml")
content, err := ioutil.ReadFile(path_str)
if err != nil {
log.Fatal(err)
}
yaml.Unmarshal(content, &AllWorkers)
}

func (worker Worker) GetName() string {
return worker.Name
}
func (worker Worker) GetExchange() string {
return worker.Exchange
}
func (worker Worker) GetRoutingKey() string {
return worker.RoutingKey
}
func (worker Worker) GetQueue() string {
return worker.Queue
}
func (worker Worker) GetDurable() bool {
return worker.Durable
}
func (worker Worker) GetAck() bool {
return worker.Ack
}
func (worker Worker) GetOptions() map[string]string {
return worker.Options
}
func (worker Worker) GetArguments() map[string]string {
return worker.Arguments
}
func (worker Worker) GetDelays() []int32 {
return worker.Delays
}
func (worker Worker) GetSteps() []int32 {
return worker.Steps
}
func (worker Worker) GetThreads() int {
return worker.Threads
}
10 changes: 7 additions & 3 deletions workers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"os/signal"
"strconv"

sneaker "github.com/oldfritter/sneaker-go"
sneakerUtils "github.com/oldfritter/sneaker-go/utils"
"github.com/streadway/amqp"

envConfig "github.com/oldfritter/goDCE/config"
"github.com/oldfritter/goDCE/initializers"
"github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
"github.com/oldfritter/goDCE/workers/sneakerWorkers"
"github.com/streadway/amqp"
)

func main() {
Expand All @@ -35,6 +38,7 @@ func initialize() {
utils.InitRedisPools()
utils.InitializeAmqpConfig()
initializers.LoadCacheData()
sneakerUtils.InitializeAmqpConfig()

err := ioutil.WriteFile("pids/workers.pid", []byte(strconv.Itoa(os.Getpid())), 0644)
if err != nil {
Expand All @@ -56,9 +60,9 @@ func initWorkers() {

func StartAllWorkers() {
for _, w := range sneakerWorkers.AllWorkers {
for i := 0; i < w.Threads; i++ {
for i := 0; i < w.GetThreads(); i++ {
go func(w sneakerWorkers.Worker) {
w.SubscribeMessageByQueue(amqp.Table{})
sneaker.SubscribeMessageByQueue(w, amqp.Table{})
}(w)
}
}
Expand Down

0 comments on commit 4c5515f

Please sign in to comment.