Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
oldfritter committed May 7, 2020
1 parent 00aaf24 commit d81920d
Show file tree
Hide file tree
Showing 20 changed files with 132 additions and 156 deletions.
4 changes: 2 additions & 2 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func initialize() {
utils.InitBackupDB()
models.AutoMigrations()
utils.InitRedisPools()
utils.InitializeAmqpConfig()
initializers.InitializeAmqpConfig()

initializers.LoadInterfaces()
initializers.InitI18n()
Expand All @@ -88,7 +88,7 @@ func initialize() {
}

func closeResource() {
utils.CloseAmqpConnection()
initializers.CloseAmqpConnection()
utils.CloseRedisPools()
utils.CloseMainDB()
}
5 changes: 3 additions & 2 deletions api/v1/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"

"github.com/labstack/echo"
"github.com/oldfritter/goDCE/initializers"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
"github.com/shopspring/decimal"
Expand Down Expand Up @@ -212,8 +213,8 @@ func pushMessageToMatching(order *Order, market *Market, option string) {
fmt.Println("error:", err)
}

err = utils.PublishMessageWithRouteKey(
utils.AmqpGlobalConfig.Exchange.Matching["key"],
err = initializers.PublishMessageWithRouteKey(
initializers.AmqpGlobalConfig.Exchange["matching"]["key"],
market.Code, "text/plain",
&b,
amqp.Table{},
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/aws/aws-sdk-go v1.29.30
github.com/dafiti/echo-middleware v0.0.0-20180423194757-e57a87d075ea // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/emirpasic/gods v1.12.0 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gorilla/context v1.1.1 // indirect
Expand All @@ -18,6 +19,7 @@ require (
github.com/labstack/gommon v0.3.0 // indirect
github.com/microcosm-cc/bluemonday v1.0.2 // indirect
github.com/newrelic/go-agent v3.4.0+incompatible // indirect
github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0
github.com/oldfritter/sneaker-go v1.0.6
github.com/qiniu/api.v7 v7.2.5+incompatible
github.com/qiniu/x v7.0.8+incompatible // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
Expand Down Expand Up @@ -51,10 +53,10 @@ github.com/microcosm-cc/bluemonday v1.0.2 h1:5lPfLTTAvAbtS0VqT+94yOtFnGfUWYyx0+i
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/newrelic/go-agent v3.4.0+incompatible h1:GhUhNLDdR3ETfUVJAN/czXlqRTcgbPs6U02jYhf15rg=
github.com/newrelic/go-agent v3.4.0+incompatible/go.mod h1:a8Fv1b/fYhFSReoTU6HDkTYIMZeSVNffmoS726Y0LzQ=
github.com/oldfritter/sneaker-go v0.0.0-20200313032517-0409eddc53a8 h1:Fqh28MncN3MG+Vcan05/xsB0gKOYOYRAjBQDhNg7lzQ=
github.com/oldfritter/sneaker-go v0.0.0-20200313032517-0409eddc53a8/go.mod h1:q9+KC43OfLklfiKffi11et1HpMufAyD5q3mI/cdpOHw=
github.com/oldfritter/sneaker-go v1.0.5 h1:op5shCbJMbXSHy4rTbSkrDrKb53vuUE36UIDuTk2V10=
github.com/oldfritter/sneaker-go v1.0.5/go.mod h1:q9+KC43OfLklfiKffi11et1HpMufAyD5q3mI/cdpOHw=
github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0 h1:IojUXXuzJ+K7qisHlNqgB07U+E7j+moRnGU1F6WYors=
github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0/go.mod h1:qtjPJJjsQnSacfr0tDIkqP91I69DFJKwY/0EBlwVumA=
github.com/oldfritter/sneaker-go v1.0.6 h1:cz8PgNZ9cJ6OouFTIvRDdXFvC0v92NL7FXwY3ZepD/c=
github.com/oldfritter/sneaker-go v1.0.6/go.mod h1:q9+KC43OfLklfiKffi11et1HpMufAyD5q3mI/cdpOHw=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/qiniu/api.v7 v7.2.5+incompatible h1:6KKaGt7MbFzVGSniwzv7qsM/Qv0or4SkRJfmak8LqZE=
Expand Down
6 changes: 3 additions & 3 deletions initializers/cacheData.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ func InitCacheData() {

func LoadCacheData() {
go func() {
channel, err := utils.RabbitMqConnect.Channel()
channel, err := RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
channel.ExchangeDeclare(utils.AmqpGlobalConfig.Exchange.Fanout["name"], "fanout", true, false, false, false, nil)
channel.ExchangeDeclare(AmqpGlobalConfig.Exchange["fanout"]["name"], "fanout", true, false, false, false, nil)
queue, err := channel.QueueDeclare("", true, true, false, false, nil)
if err != nil {
return
}
channel.QueueBind(queue.Name, queue.Name, utils.AmqpGlobalConfig.Exchange.Fanout["name"], false, nil)
channel.QueueBind(queue.Name, queue.Name, AmqpGlobalConfig.Exchange["fanout"]["name"], false, nil)
msgs, _ := channel.Consume(queue.Name, "", true, true, false, false, nil)
for _ = range msgs {
InitCacheData()
Expand Down
2 changes: 1 addition & 1 deletion initializers/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func checkTimestamp(context echo.Context, params *map[string]string) bool {
}

func IsRabbitMqConnected() bool {
c := utils.RabbitMqConnect
c := RabbitMqConnect
ok := true
if c.IsClosed() {
fmt.Println("Connection state: closed")
Expand Down
65 changes: 65 additions & 0 deletions initializers/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package initializers

import (
"fmt"
"io/ioutil"
"log"
"path/filepath"
Expand All @@ -9,6 +10,8 @@ import (
"github.com/oldfritter/sneaker-go/utils"
"github.com/streadway/amqp"
"gopkg.in/yaml.v2"

. "github.com/oldfritter/goDCE/models"
)

type Amqp struct {
Expand All @@ -21,6 +24,7 @@ type Amqp struct {
} `yaml:"connect"`

Exchange map[string]map[string]string `yaml:"exchange"`
Queue map[string]map[string]string `yaml:"queue"`
}

var (
Expand All @@ -41,6 +45,7 @@ func InitializeAmqpConfig() {
return
}
InitializeAmqpConnection()
initMarkets()
}

func InitializeAmqpConnection() {
Expand All @@ -65,3 +70,63 @@ func CloseAmqpConnection() {
func GetRabbitMqConnect() utils.RabbitMqConnect {
return RabbitMqConnect
}

func initMarkets() {
for i, _ := range Markets {
Markets[i].Matching = AmqpGlobalConfig.Exchange["matching"]["key"]
Markets[i].TradeTreat = AmqpGlobalConfig.Exchange["trade"]["key"]
Markets[i].OrderCancel = AmqpGlobalConfig.Exchange["cancel"]["key"]
}
}

func PublishMessageWithRouteKey(exchange, routeKey, contentType string, message *[]byte, arguments amqp.Table, deliveryMode uint8) error {
channel, err := RabbitMqConnect.Channel()
defer channel.Close()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
if err = channel.Publish(
exchange, // publish to an exchange
routeKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: contentType,
ContentEncoding: "",
Body: *message,
DeliveryMode: deliveryMode, // amqp.Persistent, amqp.Transient // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Queue Publish: %s", err)
}
return nil
}

func PublishMessageToQueue(queue, contentType string, message *[]byte, arguments amqp.Table, deliveryMode uint8) error {
channel, err := RabbitMqConnect.Channel()
defer channel.Close()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
if err = channel.Publish(
"", // publish to an exchange
queue, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: contentType,
ContentEncoding: "",
Body: *message,
DeliveryMode: deliveryMode, // amqp.Persistent, amqp.Transient // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
); err != nil {
return fmt.Errorf("Queue Publish: %s", err)
}
return nil
}
10 changes: 7 additions & 3 deletions models/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Market struct {
TradeTreatNode string `json:"-" gorm:"default:'a'; type:varchar(11)"`
OrderCancelNode string `json:"-" gorm:"default:'a'; type:varchar(11)"`
Running bool `json:"-" sql:"-"`

Matching string `json:"-"`
TradeTreat string `json:"-"`
OrderCancel string `json:"-"`
}

var Markets []Market
Expand Down Expand Up @@ -84,13 +88,13 @@ func (market *Market) AfterCreate(db *gorm.DB) {

// Exchange
func (assignment *Market) MatchingExchange() string {
return utils.AmqpGlobalConfig.Exchange.Matching["key"]
return assignment.Matching
}
func (assignment *Market) TradeTreatExchange() string {
return utils.AmqpGlobalConfig.Exchange.Trade["key"]
return assignment.TradeTreat
}
func (assignment *Market) OrderCancelExchange() string {
return utils.AmqpGlobalConfig.Exchange.Cancel["key"]
return assignment.OrderCancel
}

// Queue
Expand Down
8 changes: 4 additions & 4 deletions order/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func initialize() {
utils.InitBackupDB()
models.AutoMigrations()
utils.InitRedisPools()
utils.InitializeAmqpConfig()
initializers.InitializeAmqpConfig()
initializers.LoadCacheData()

err := ioutil.WriteFile("pids/cancel.pid", []byte(strconv.Itoa(os.Getpid())), 0644)
Expand All @@ -40,7 +40,7 @@ func initialize() {
}

func closeResource() {
utils.CloseAmqpConnection()
initializers.CloseAmqpConnection()
utils.CloseRedisPools()
utils.CloseMainDB()
}
Expand All @@ -50,11 +50,11 @@ func initAssignments() {
cancel.SubscribeReload()

go func() {
channel, err := utils.RabbitMqConnect.Channel()
channel, err := initializers.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
queueName := utils.AmqpGlobalConfig.Queue.Cancel["reload"]
queueName := initializers.AmqpGlobalConfig.Queue["cancel"]["reload"]
queue, err := channel.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
return
Expand Down
14 changes: 8 additions & 6 deletions order/cancel/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package cancel
import (
"fmt"

"github.com/streadway/amqp"

envConfig "github.com/oldfritter/goDCE/config"
"github.com/oldfritter/goDCE/initializers"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
"github.com/streadway/amqp"
)

var (
Expand Down Expand Up @@ -41,7 +43,7 @@ func InitAssignments() {
}

func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error {
channel, err := utils.RabbitMqConnect.Channel()
channel, err := initializers.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
Expand All @@ -50,7 +52,7 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error {
channel.QueueBind((*assignment).OrderCancelQueue(), (*assignment).Code, (*assignment).OrderCancelExchange(), false, nil)

go func(id int) {
channel, err := utils.RabbitMqConnect.Channel()
channel, err := initializers.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
Expand All @@ -75,12 +77,12 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error {
}

func SubscribeReload() (err error) {
channel, err := utils.RabbitMqConnect.Channel()
channel, err := initializers.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
return
}
channel.ExchangeDeclare(utils.AmqpGlobalConfig.Exchange.Default["key"], "topic", true, false, false, false, nil)
channel.QueueBind(utils.AmqpGlobalConfig.Queue.Cancel["reload"], utils.AmqpGlobalConfig.Queue.Cancel["reload"], utils.AmqpGlobalConfig.Exchange.Default["key"], false, nil)
channel.ExchangeDeclare(initializers.AmqpGlobalConfig.Exchange["default"]["key"], "topic", true, false, false, false, nil)
channel.QueueBind(initializers.AmqpGlobalConfig.Queue["cancel"]["reload"], initializers.AmqpGlobalConfig.Queue["cancel"]["reload"], initializers.AmqpGlobalConfig.Exchange["default"]["key"], false, nil)
return
}
2 changes: 1 addition & 1 deletion schedules/kLine/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func CreateLatestKLine() {
if err != nil {
fmt.Println("error:", err)
}
err = utils.PublishMessageWithRouteKey("goDCE.default", "goDCE.k", "text/plain", &b, amqp.Table{}, amqp.Persistent)
err = initializers.PublishMessageWithRouteKey("goDCE.default", "goDCE.k", "text/plain", &b, amqp.Table{}, amqp.Persistent)
if err != nil {
fmt.Println("{ error:", err, "}")
panic(err)
Expand Down
4 changes: 2 additions & 2 deletions schedules/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
utils.InitMainDB()
utils.InitBackupDB()
utils.InitRedisPools()
utils.InitializeAmqpConfig()
initializers.InitializeAmqpConfig()

initializers.LoadCacheData()

Expand All @@ -41,7 +41,7 @@ func main() {
}

func closeResource() {
utils.CloseAmqpConnection()
initializers.CloseAmqpConnection()
utils.CloseRedisPools()
utils.CloseMainDB()
utils.CloseBackupDB()
Expand Down
8 changes: 4 additions & 4 deletions trade/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func initialize() {
utils.InitBackupDB()
models.AutoMigrations()
utils.InitRedisPools()
utils.InitializeAmqpConfig()
initializers.InitializeAmqpConfig()
initializers.LoadCacheData()

err := ioutil.WriteFile("pids/matching.pid", []byte(strconv.Itoa(os.Getpid())), 0644)
Expand All @@ -40,7 +40,7 @@ func initialize() {
}

func closeResource() {
utils.CloseAmqpConnection()
initializers.CloseAmqpConnection()
utils.CloseRedisPools()
utils.CloseMainDB()
}
Expand All @@ -50,11 +50,11 @@ func initAssignments() {
matching.SubscribeReload()

go func() {
channel, err := utils.RabbitMqConnect.Channel()
channel, err := initializers.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
queueName := utils.AmqpGlobalConfig.Queue.Matching["reload"]
queueName := initializers.AmqpGlobalConfig.Queue["matching"]["reload"]
queue, err := channel.QueueDeclare(queueName, true, false, false, false, nil)
if err != nil {
return
Expand Down
Loading

0 comments on commit d81920d

Please sign in to comment.