Skip to content

Commit

Permalink
use sneaker-go v 3.0.2
Browse files Browse the repository at this point in the history
  • Loading branch information
oldfritter committed Aug 20, 2020
1 parent 16529a5 commit 620ad5d
Show file tree
Hide file tree
Showing 25 changed files with 237 additions and 301 deletions.
14 changes: 9 additions & 5 deletions api/v1/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"strconv"

"github.com/labstack/echo"
"github.com/oldfritter/goDCE/initializers"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
"github.com/shopspring/decimal"
"github.com/streadway/amqp"

envConfig "github.com/oldfritter/goDCE/config"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
)

func V1GetOrder(context echo.Context) error {
Expand Down Expand Up @@ -213,12 +214,15 @@ func pushMessageToMatching(order *Order, market *Market, option string) {
fmt.Println("error:", err)
}

err = initializers.PublishMessageWithRouteKey(
initializers.AmqpGlobalConfig.Exchange["matching"]["key"],
err = envConfig.RabbitMqConnect.PublishMessageWithRouteKey(
envConfig.AmqpGlobalConfig.Exchange["matching"]["key"],
market.Code, "text/plain",
false,
false,
&b,
amqp.Table{},
amqp.Persistent,
"",
)
if err != nil {
fmt.Println("{ error:", err, "}")
Expand Down
41 changes: 41 additions & 0 deletions config/amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package config

import (
"io/ioutil"
"log"
"path/filepath"

"github.com/oldfritter/sneaker-go/v3"
"gopkg.in/yaml.v2"
)

var (
RabbitMqConnect sneaker.RabbitMqConnect
)

var AmqpGlobalConfig struct {
Connect struct {
Host string `yaml:"host"`
Port string `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Vhost string `yaml:"vhost"`
} `yaml:"connect"`

Exchange map[string]map[string]string `yaml:"exchange"`
Queue map[string]map[string]string `yaml:"queue"`
}

func InitAmqpConfig() {
pathStr, _ := filepath.Abs("config/amqp.yml")
content, err := ioutil.ReadFile(pathStr)
if err != nil {
log.Fatal(err)
return
}
err = yaml.Unmarshal(content, &AmqpGlobalConfig)
if err != nil {
log.Fatal(err)
return
}
}
10 changes: 10 additions & 0 deletions config/workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

import (
sneaker "github.com/oldfritter/sneaker-go/v3"
)

var (
AllWorkers []sneaker.Worker
AllWorkerIs []sneaker.WorkerI
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/microcosm-cc/bluemonday v1.0.2 // indirect
github.com/newrelic/go-agent v3.4.0+incompatible // indirect
github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0
github.com/oldfritter/sneaker-go/v2 v2.0.8
github.com/oldfritter/sneaker-go/v3 v3.0.2
github.com/qiniu/api.v7/v7 v7.5.0
github.com/qor/admin v0.0.0-20200315024928-877b98a68a6f // indirect
github.com/qor/assetfs v0.0.0-20170713023933-ff57fdc13a14 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xb
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
Expand Down Expand Up @@ -59,11 +60,14 @@ github.com/microcosm-cc/bluemonday v1.0.2 h1:5lPfLTTAvAbtS0VqT+94yOtFnGfUWYyx0+i
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/newrelic/go-agent v3.4.0+incompatible h1:GhUhNLDdR3ETfUVJAN/czXlqRTcgbPs6U02jYhf15rg=
github.com/newrelic/go-agent v3.4.0+incompatible/go.mod h1:a8Fv1b/fYhFSReoTU6HDkTYIMZeSVNffmoS726Y0LzQ=
github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0 h1:IojUXXuzJ+K7qisHlNqgB07U+E7j+moRnGU1F6WYors=
github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0/go.mod h1:qtjPJJjsQnSacfr0tDIkqP91I69DFJKwY/0EBlwVumA=
github.com/oldfritter/sneaker-go v1.1.0 h1:J5bXkAx73eVylAgqM+4x8oNdoHKmAHK9IRmTyNJtRnU=
github.com/oldfritter/sneaker-go/v2 v2.0.8 h1:GAQ1QyI/XDg3SK6hfhRoY684zsVzVGldgc4gPFah2fk=
github.com/oldfritter/sneaker-go/v2 v2.0.8/go.mod h1:bryvrz4puhkiLZVVZFVC/VcPmZbPHC5mq9o7UKTJoj4=
github.com/oldfritter/sneaker-go/v2 v2.1.1 h1:o130Ma1FUUpqgc9ti/Ks2wea1w5oB1iXpAFgWBNu96A=
github.com/oldfritter/sneaker-go/v3 v3.0.2 h1:gxSMDuUxGFNzoV3SE65I1OlUaS5NRZS3jJDiT3bQZLA=
github.com/oldfritter/sneaker-go/v3 v3.0.2/go.mod h1:I0YXd0M0D/xc1M4LsDriRf0oBwuD7TAvAH2w9RuvHqc=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/qiniu/api.v7/v7 v7.5.0 h1:DY6NrIp6FZ1GP4Roc9hRnO2m+OLzASYNnvz5Mbgw1rk=
Expand Down Expand Up @@ -93,6 +97,7 @@ github.com/qor/validations v0.0.0-20171228122639-f364bca61b46 h1:dRlsVUhwD1pwras
github.com/qor/validations v0.0.0-20171228122639-f364bca61b46/go.mod h1:UJsA0AuvrKNaWtrb1UzKai10mN3ZBbQkPjUHpxwahTc=
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be h1:ta7tUOvsPHVHGom5hKW5VXNc2xZIkfCKP8iaqOyYtUQ=
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be/go.mod h1:MIDFMn7db1kT65GmV94GzpX9Qdi7N/pQlwb+AN8wh+Q=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc h1:jUIKcSPO9MoMJBbEoyE/RJoE8vz7Mb8AjvifMMwSyvY=
github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down
7 changes: 4 additions & 3 deletions initializers/cacheData.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package initializers
import (
"fmt"

"github.com/oldfritter/goDCE/config"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
)
Expand All @@ -16,16 +17,16 @@ func InitCacheData() {

func LoadCacheData() {
go func() {
channel, err := RabbitMqConnect.Channel()
channel, err := config.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
channel.ExchangeDeclare(AmqpGlobalConfig.Exchange["fanout"]["name"], "fanout", true, false, false, false, nil)
channel.ExchangeDeclare(config.AmqpGlobalConfig.Exchange["fanout"]["name"], "fanout", true, false, false, false, nil)
queue, err := channel.QueueDeclare("", true, true, false, false, nil)
if err != nil {
return
}
channel.QueueBind(queue.Name, queue.Name, AmqpGlobalConfig.Exchange["fanout"]["name"], false, nil)
channel.QueueBind(queue.Name, queue.Name, config.AmqpGlobalConfig.Exchange["fanout"]["name"], false, nil)
msgs, _ := channel.Consume(queue.Name, "", true, true, false, false, nil)
for _ = range msgs {
InitCacheData()
Expand Down
4 changes: 3 additions & 1 deletion initializers/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (

"github.com/gomodule/redigo/redis"
"github.com/labstack/echo"

"github.com/oldfritter/goDCE/config"
"github.com/oldfritter/goDCE/initializers/locale"
"github.com/oldfritter/goDCE/utils"
)
Expand Down Expand Up @@ -106,7 +108,7 @@ func checkTimestamp(context echo.Context, params *map[string]string) bool {
}

func IsRabbitMqConnected() bool {
c := RabbitMqConnect
c := config.RabbitMqConnect
ok := true
if c.IsClosed() {
fmt.Println("Connection state: closed")
Expand Down
25 changes: 25 additions & 0 deletions initializers/initializeWorkers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package initializers

import (
"io/ioutil"
"log"
"path/filepath"

"gopkg.in/yaml.v2"

"github.com/oldfritter/goDCE/config"
"github.com/oldfritter/goDCE/workers/sneakerWorkers"
)

func InitWorkers() {
pathStr, _ := filepath.Abs("config/workers.yml")
content, err := ioutil.ReadFile(pathStr)
if err != nil {
log.Fatal(err)
}
yaml.Unmarshal(content, &config.AllWorkers)
sneakerWorkers.InitializeKLineWorker()
sneakerWorkers.InitializeTickerWorker()
sneakerWorkers.InitializeRebuildKLineToRedisWorker()
sneakerWorkers.InitializeAccountVersionCheckPointWorker()
}
96 changes: 8 additions & 88 deletions initializers/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,16 @@
package initializers

import (
"fmt"
"io/ioutil"
"log"
"path/filepath"
"time"

"github.com/oldfritter/sneaker-go/v2"
"github.com/oldfritter/sneaker-go/v3"
"github.com/streadway/amqp"
"gopkg.in/yaml.v2"

. "github.com/oldfritter/goDCE/models"
)

type Amqp struct {
Connect struct {
Host string `yaml:"host"`
Port string `yaml:"port"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Vhost string `yaml:"vhost"`
} `yaml:"connect"`

Exchange map[string]map[string]string `yaml:"exchange"`
Queue map[string]map[string]string `yaml:"queue"`
}

var (
AmqpGlobalConfig Amqp
RabbitMqConnect sneaker.RabbitMqConnect
"github.com/oldfritter/goDCE/config"
)

func InitializeAmqpConfig() {
Expand All @@ -39,94 +20,33 @@ func InitializeAmqpConfig() {
log.Fatal(err)
return
}
err = yaml.Unmarshal(content, &AmqpGlobalConfig)
err = yaml.Unmarshal(content, &config.AmqpGlobalConfig)
if err != nil {
log.Fatal(err)
return
}
InitializeAmqpConnection()
initMarkets()
}

func InitializeAmqpConnection() {
var err error
conn, err := amqp.Dial("amqp://" + AmqpGlobalConfig.Connect.Username + ":" + AmqpGlobalConfig.Connect.Password + "@" + AmqpGlobalConfig.Connect.Host + ":" + AmqpGlobalConfig.Connect.Port + "/" + AmqpGlobalConfig.Connect.Vhost)
RabbitMqConnect = sneaker.RabbitMqConnect{conn}
conn, err := amqp.Dial("amqp://" + config.AmqpGlobalConfig.Connect.Username + ":" + config.AmqpGlobalConfig.Connect.Password + "@" + config.AmqpGlobalConfig.Connect.Host + ":" + config.AmqpGlobalConfig.Connect.Port + "/" + config.AmqpGlobalConfig.Connect.Vhost)
config.RabbitMqConnect = sneaker.RabbitMqConnect{conn}
if err != nil {
time.Sleep(5000)
InitializeAmqpConnection()
return
}
go func() {
<-RabbitMqConnect.NotifyClose(make(chan *amqp.Error))
<-config.RabbitMqConnect.NotifyClose(make(chan *amqp.Error))
InitializeAmqpConnection()
}()
}

func CloseAmqpConnection() {
RabbitMqConnect.Close()
config.RabbitMqConnect.Close()
}

func GetRabbitMqConnect() sneaker.RabbitMqConnect {
return RabbitMqConnect
}

func initMarkets() {
for i, _ := range Markets {
Markets[i].Matching = AmqpGlobalConfig.Exchange["matching"]["key"]
Markets[i].TradeTreat = AmqpGlobalConfig.Exchange["trade"]["key"]
Markets[i].OrderCancel = AmqpGlobalConfig.Exchange["cancel"]["key"]
}
}

func PublishMessageWithRouteKey(exchange, routeKey, contentType string, message *[]byte, arguments amqp.Table, deliveryMode uint8) error {
channel, err := RabbitMqConnect.Channel()
defer channel.Close()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
if err = channel.Publish(
exchange, // publish to an exchange
routeKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: contentType,
ContentEncoding: "",
Body: *message,
DeliveryMode: deliveryMode, // amqp.Persistent, amqp.Transient // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Queue Publish: %s", err)
}
return nil
}

func PublishMessageToQueue(queue, contentType string, message *[]byte, arguments amqp.Table, deliveryMode uint8) error {
channel, err := RabbitMqConnect.Channel()
defer channel.Close()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
if err = channel.Publish(
"", // publish to an exchange
queue, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: contentType,
ContentEncoding: "",
Body: *message,
DeliveryMode: deliveryMode, // amqp.Persistent, amqp.Transient // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Queue Publish: %s", err)
}
return nil
return config.RabbitMqConnect
}
Loading

0 comments on commit 620ad5d

Please sign in to comment.