diff --git a/api/api.go b/api/api.go index 29a7a9e..8eb3743 100644 --- a/api/api.go +++ b/api/api.go @@ -75,7 +75,7 @@ func initialize() { utils.InitBackupDB() models.AutoMigrations() utils.InitRedisPools() - utils.InitializeAmqpConfig() + initializers.InitializeAmqpConfig() initializers.LoadInterfaces() initializers.InitI18n() @@ -88,7 +88,7 @@ func initialize() { } func closeResource() { - utils.CloseAmqpConnection() + initializers.CloseAmqpConnection() utils.CloseRedisPools() utils.CloseMainDB() } diff --git a/api/v1/order.go b/api/v1/order.go index 5a0a75b..780337b 100644 --- a/api/v1/order.go +++ b/api/v1/order.go @@ -7,6 +7,7 @@ import ( "strconv" "github.com/labstack/echo" + "github.com/oldfritter/goDCE/initializers" . "github.com/oldfritter/goDCE/models" "github.com/oldfritter/goDCE/utils" "github.com/shopspring/decimal" @@ -212,8 +213,8 @@ func pushMessageToMatching(order *Order, market *Market, option string) { fmt.Println("error:", err) } - err = utils.PublishMessageWithRouteKey( - utils.AmqpGlobalConfig.Exchange.Matching["key"], + err = initializers.PublishMessageWithRouteKey( + initializers.AmqpGlobalConfig.Exchange["matching"]["key"], market.Code, "text/plain", &b, amqp.Table{}, diff --git a/go.mod b/go.mod index 1a5bedd..4b67b5c 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/aws/aws-sdk-go v1.29.30 github.com/dafiti/echo-middleware v0.0.0-20180423194757-e57a87d075ea // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect + github.com/emirpasic/gods v1.12.0 // indirect github.com/go-sql-driver/mysql v1.5.0 github.com/gomodule/redigo v2.0.0+incompatible github.com/gorilla/context v1.1.1 // indirect @@ -18,6 +19,7 @@ require ( github.com/labstack/gommon v0.3.0 // indirect github.com/microcosm-cc/bluemonday v1.0.2 // indirect github.com/newrelic/go-agent v3.4.0+incompatible // indirect + github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0 github.com/oldfritter/sneaker-go v1.0.6 github.com/qiniu/api.v7 v7.2.5+incompatible github.com/qiniu/x v7.0.8+incompatible // indirect diff --git a/go.sum b/go.sum index 21b23c9..0c5b29f 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= +github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= @@ -51,10 +53,10 @@ github.com/microcosm-cc/bluemonday v1.0.2 h1:5lPfLTTAvAbtS0VqT+94yOtFnGfUWYyx0+i github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/newrelic/go-agent v3.4.0+incompatible h1:GhUhNLDdR3ETfUVJAN/czXlqRTcgbPs6U02jYhf15rg= github.com/newrelic/go-agent v3.4.0+incompatible/go.mod h1:a8Fv1b/fYhFSReoTU6HDkTYIMZeSVNffmoS726Y0LzQ= -github.com/oldfritter/sneaker-go v0.0.0-20200313032517-0409eddc53a8 h1:Fqh28MncN3MG+Vcan05/xsB0gKOYOYRAjBQDhNg7lzQ= -github.com/oldfritter/sneaker-go v0.0.0-20200313032517-0409eddc53a8/go.mod h1:q9+KC43OfLklfiKffi11et1HpMufAyD5q3mI/cdpOHw= -github.com/oldfritter/sneaker-go v1.0.5 h1:op5shCbJMbXSHy4rTbSkrDrKb53vuUE36UIDuTk2V10= -github.com/oldfritter/sneaker-go v1.0.5/go.mod h1:q9+KC43OfLklfiKffi11et1HpMufAyD5q3mI/cdpOHw= +github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0 h1:IojUXXuzJ+K7qisHlNqgB07U+E7j+moRnGU1F6WYors= +github.com/oldfritter/matching v0.0.0-20190827024937-efdf83ea2ab0/go.mod h1:qtjPJJjsQnSacfr0tDIkqP91I69DFJKwY/0EBlwVumA= +github.com/oldfritter/sneaker-go v1.0.6 h1:cz8PgNZ9cJ6OouFTIvRDdXFvC0v92NL7FXwY3ZepD/c= +github.com/oldfritter/sneaker-go v1.0.6/go.mod h1:q9+KC43OfLklfiKffi11et1HpMufAyD5q3mI/cdpOHw= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/qiniu/api.v7 v7.2.5+incompatible h1:6KKaGt7MbFzVGSniwzv7qsM/Qv0or4SkRJfmak8LqZE= diff --git a/initializers/cacheData.go b/initializers/cacheData.go index ac3e9c9..8ca2c7b 100644 --- a/initializers/cacheData.go +++ b/initializers/cacheData.go @@ -16,16 +16,16 @@ func InitCacheData() { func LoadCacheData() { go func() { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } - channel.ExchangeDeclare(utils.AmqpGlobalConfig.Exchange.Fanout["name"], "fanout", true, false, false, false, nil) + channel.ExchangeDeclare(AmqpGlobalConfig.Exchange["fanout"]["name"], "fanout", true, false, false, false, nil) queue, err := channel.QueueDeclare("", true, true, false, false, nil) if err != nil { return } - channel.QueueBind(queue.Name, queue.Name, utils.AmqpGlobalConfig.Exchange.Fanout["name"], false, nil) + channel.QueueBind(queue.Name, queue.Name, AmqpGlobalConfig.Exchange["fanout"]["name"], false, nil) msgs, _ := channel.Consume(queue.Name, "", true, true, false, false, nil) for _ = range msgs { InitCacheData() diff --git a/initializers/filters.go b/initializers/filters.go index d3933bd..2752dd8 100644 --- a/initializers/filters.go +++ b/initializers/filters.go @@ -106,7 +106,7 @@ func checkTimestamp(context echo.Context, params *map[string]string) bool { } func IsRabbitMqConnected() bool { - c := utils.RabbitMqConnect + c := RabbitMqConnect ok := true if c.IsClosed() { fmt.Println("Connection state: closed") diff --git a/initializers/rabbitmq.go b/initializers/rabbitmq.go index 944a1a3..ee52fd0 100644 --- a/initializers/rabbitmq.go +++ b/initializers/rabbitmq.go @@ -1,6 +1,7 @@ package initializers import ( + "fmt" "io/ioutil" "log" "path/filepath" @@ -9,6 +10,8 @@ import ( "github.com/oldfritter/sneaker-go/utils" "github.com/streadway/amqp" "gopkg.in/yaml.v2" + + . "github.com/oldfritter/goDCE/models" ) type Amqp struct { @@ -21,6 +24,7 @@ type Amqp struct { } `yaml:"connect"` Exchange map[string]map[string]string `yaml:"exchange"` + Queue map[string]map[string]string `yaml:"queue"` } var ( @@ -41,6 +45,7 @@ func InitializeAmqpConfig() { return } InitializeAmqpConnection() + initMarkets() } func InitializeAmqpConnection() { @@ -65,3 +70,63 @@ func CloseAmqpConnection() { func GetRabbitMqConnect() utils.RabbitMqConnect { return RabbitMqConnect } + +func initMarkets() { + for i, _ := range Markets { + Markets[i].Matching = AmqpGlobalConfig.Exchange["matching"]["key"] + Markets[i].TradeTreat = AmqpGlobalConfig.Exchange["trade"]["key"] + Markets[i].OrderCancel = AmqpGlobalConfig.Exchange["cancel"]["key"] + } +} + +func PublishMessageWithRouteKey(exchange, routeKey, contentType string, message *[]byte, arguments amqp.Table, deliveryMode uint8) error { + channel, err := RabbitMqConnect.Channel() + defer channel.Close() + if err != nil { + return fmt.Errorf("Channel: %s", err) + } + if err = channel.Publish( + exchange, // publish to an exchange + routeKey, // routing to 0 or more queues + false, // mandatory + false, // immediate + amqp.Publishing{ + Headers: amqp.Table{}, + ContentType: contentType, + ContentEncoding: "", + Body: *message, + DeliveryMode: deliveryMode, // amqp.Persistent, amqp.Transient // 1=non-persistent, 2=persistent + Priority: 0, // 0-9 + // a bunch of application/implementation-specific fields + }, + ); err != nil { + return fmt.Errorf("Queue Publish: %s", err) + } + return nil +} + +func PublishMessageToQueue(queue, contentType string, message *[]byte, arguments amqp.Table, deliveryMode uint8) error { + channel, err := RabbitMqConnect.Channel() + defer channel.Close() + if err != nil { + return fmt.Errorf("Channel: %s", err) + } + if err = channel.Publish( + "", // publish to an exchange + queue, // routing to 0 or more queues + false, // mandatory + false, // immediate + amqp.Publishing{ + Headers: amqp.Table{}, + ContentType: contentType, + ContentEncoding: "", + Body: *message, + DeliveryMode: deliveryMode, // amqp.Persistent, amqp.Transient // 1=non-persistent, 2=persistent + Priority: 0, // 0-9 + // a bunch of application/implementation-specific fields + }, + ); err != nil { + return fmt.Errorf("Queue Publish: %s", err) + } + return nil +} diff --git a/models/market.go b/models/market.go index 33329f8..d735966 100644 --- a/models/market.go +++ b/models/market.go @@ -39,6 +39,10 @@ type Market struct { TradeTreatNode string `json:"-" gorm:"default:'a'; type:varchar(11)"` OrderCancelNode string `json:"-" gorm:"default:'a'; type:varchar(11)"` Running bool `json:"-" sql:"-"` + + Matching string `json:"-"` + TradeTreat string `json:"-"` + OrderCancel string `json:"-"` } var Markets []Market @@ -84,13 +88,13 @@ func (market *Market) AfterCreate(db *gorm.DB) { // Exchange func (assignment *Market) MatchingExchange() string { - return utils.AmqpGlobalConfig.Exchange.Matching["key"] + return assignment.Matching } func (assignment *Market) TradeTreatExchange() string { - return utils.AmqpGlobalConfig.Exchange.Trade["key"] + return assignment.TradeTreat } func (assignment *Market) OrderCancelExchange() string { - return utils.AmqpGlobalConfig.Exchange.Cancel["key"] + return assignment.OrderCancel } // Queue diff --git a/order/cancel.go b/order/cancel.go index 53ff79c..500b14b 100644 --- a/order/cancel.go +++ b/order/cancel.go @@ -30,7 +30,7 @@ func initialize() { utils.InitBackupDB() models.AutoMigrations() utils.InitRedisPools() - utils.InitializeAmqpConfig() + initializers.InitializeAmqpConfig() initializers.LoadCacheData() err := ioutil.WriteFile("pids/cancel.pid", []byte(strconv.Itoa(os.Getpid())), 0644) @@ -40,7 +40,7 @@ func initialize() { } func closeResource() { - utils.CloseAmqpConnection() + initializers.CloseAmqpConnection() utils.CloseRedisPools() utils.CloseMainDB() } @@ -50,11 +50,11 @@ func initAssignments() { cancel.SubscribeReload() go func() { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } - queueName := utils.AmqpGlobalConfig.Queue.Cancel["reload"] + queueName := initializers.AmqpGlobalConfig.Queue["cancel"]["reload"] queue, err := channel.QueueDeclare(queueName, true, false, false, false, nil) if err != nil { return diff --git a/order/cancel/base.go b/order/cancel/base.go index f709c12..3ebfb92 100644 --- a/order/cancel/base.go +++ b/order/cancel/base.go @@ -3,10 +3,12 @@ package cancel import ( "fmt" + "github.com/streadway/amqp" + envConfig "github.com/oldfritter/goDCE/config" + "github.com/oldfritter/goDCE/initializers" . "github.com/oldfritter/goDCE/models" "github.com/oldfritter/goDCE/utils" - "github.com/streadway/amqp" ) var ( @@ -41,7 +43,7 @@ func InitAssignments() { } func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } @@ -50,7 +52,7 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { channel.QueueBind((*assignment).OrderCancelQueue(), (*assignment).Code, (*assignment).OrderCancelExchange(), false, nil) go func(id int) { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } @@ -75,12 +77,12 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { } func SubscribeReload() (err error) { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) return } - channel.ExchangeDeclare(utils.AmqpGlobalConfig.Exchange.Default["key"], "topic", true, false, false, false, nil) - channel.QueueBind(utils.AmqpGlobalConfig.Queue.Cancel["reload"], utils.AmqpGlobalConfig.Queue.Cancel["reload"], utils.AmqpGlobalConfig.Exchange.Default["key"], false, nil) + channel.ExchangeDeclare(initializers.AmqpGlobalConfig.Exchange["default"]["key"], "topic", true, false, false, false, nil) + channel.QueueBind(initializers.AmqpGlobalConfig.Queue["cancel"]["reload"], initializers.AmqpGlobalConfig.Queue["cancel"]["reload"], initializers.AmqpGlobalConfig.Exchange["default"]["key"], false, nil) return } diff --git a/schedules/kLine/create.go b/schedules/kLine/create.go index 8a916b9..0ae8e51 100644 --- a/schedules/kLine/create.go +++ b/schedules/kLine/create.go @@ -31,7 +31,7 @@ func CreateLatestKLine() { if err != nil { fmt.Println("error:", err) } - err = utils.PublishMessageWithRouteKey("goDCE.default", "goDCE.k", "text/plain", &b, amqp.Table{}, amqp.Persistent) + err = initializers.PublishMessageWithRouteKey("goDCE.default", "goDCE.k", "text/plain", &b, amqp.Table{}, amqp.Persistent) if err != nil { fmt.Println("{ error:", err, "}") panic(err) diff --git a/schedules/schedule.go b/schedules/schedule.go index 138b5ba..1e99a4f 100644 --- a/schedules/schedule.go +++ b/schedules/schedule.go @@ -23,7 +23,7 @@ func main() { utils.InitMainDB() utils.InitBackupDB() utils.InitRedisPools() - utils.InitializeAmqpConfig() + initializers.InitializeAmqpConfig() initializers.LoadCacheData() @@ -41,7 +41,7 @@ func main() { } func closeResource() { - utils.CloseAmqpConnection() + initializers.CloseAmqpConnection() utils.CloseRedisPools() utils.CloseMainDB() utils.CloseBackupDB() diff --git a/trade/matching.go b/trade/matching.go index ba97271..50c9cdc 100644 --- a/trade/matching.go +++ b/trade/matching.go @@ -30,7 +30,7 @@ func initialize() { utils.InitBackupDB() models.AutoMigrations() utils.InitRedisPools() - utils.InitializeAmqpConfig() + initializers.InitializeAmqpConfig() initializers.LoadCacheData() err := ioutil.WriteFile("pids/matching.pid", []byte(strconv.Itoa(os.Getpid())), 0644) @@ -40,7 +40,7 @@ func initialize() { } func closeResource() { - utils.CloseAmqpConnection() + initializers.CloseAmqpConnection() utils.CloseRedisPools() utils.CloseMainDB() } @@ -50,11 +50,11 @@ func initAssignments() { matching.SubscribeReload() go func() { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } - queueName := utils.AmqpGlobalConfig.Queue.Matching["reload"] + queueName := initializers.AmqpGlobalConfig.Queue["matching"]["reload"] queue, err := channel.QueueDeclare(queueName, true, false, false, false, nil) if err != nil { return diff --git a/trade/matching/base.go b/trade/matching/base.go index 49b490f..70f21eb 100644 --- a/trade/matching/base.go +++ b/trade/matching/base.go @@ -6,6 +6,7 @@ import ( "runtime" envConfig "github.com/oldfritter/goDCE/config" + "github.com/oldfritter/goDCE/initializers" . "github.com/oldfritter/goDCE/models" "github.com/oldfritter/goDCE/utils" "github.com/oldfritter/matching" @@ -56,7 +57,7 @@ func InitAssignments() { } func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } @@ -65,7 +66,7 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { go func(id int) { a := Assignments[id] - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } @@ -101,7 +102,7 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { if err != nil { fmt.Println("error:", err) } - err = utils.PublishMessageWithRouteKey((*assignment).TradeTreatExchange(), (*assignment).Code, "text/plain", &b, amqp.Table{}, amqp.Persistent) + err = initializers.PublishMessageWithRouteKey((*assignment).TradeTreatExchange(), (*assignment).Code, "text/plain", &b, amqp.Table{}, amqp.Persistent) if err != nil { fmt.Println("{ error:", err, "}") } else { @@ -118,7 +119,7 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { if err != nil { fmt.Println("error:", err) } - err = utils.PublishMessageWithRouteKey((*assignment).OrderCancelExchange(), (*assignment).Code, "text/plain", &b, amqp.Table{}, amqp.Persistent) + err = initializers.PublishMessageWithRouteKey((*assignment).OrderCancelExchange(), (*assignment).Code, "text/plain", &b, amqp.Table{}, amqp.Persistent) if err != nil { fmt.Println("{ error:", err, "}") } else { @@ -131,12 +132,12 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { } func SubscribeReload() (err error) { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) return } - channel.ExchangeDeclare(utils.AmqpGlobalConfig.Exchange.Default["key"], "topic", true, false, false, false, nil) - channel.QueueBind(utils.AmqpGlobalConfig.Queue.Matching["reload"], utils.AmqpGlobalConfig.Queue.Matching["reload"], utils.AmqpGlobalConfig.Exchange.Default["key"], false, nil) + channel.ExchangeDeclare(initializers.AmqpGlobalConfig.Exchange["default"]["key"], "topic", true, false, false, false, nil) + channel.QueueBind(initializers.AmqpGlobalConfig.Queue["matching"]["reload"], initializers.AmqpGlobalConfig.Queue["matching"]["reload"], initializers.AmqpGlobalConfig.Exchange["default"]["key"], false, nil) return } diff --git a/trade/treat.go b/trade/treat.go index b4a598b..6ce5994 100644 --- a/trade/treat.go +++ b/trade/treat.go @@ -31,7 +31,7 @@ func initialize() { utils.InitBackupDB() models.AutoMigrations() utils.InitRedisPools() - utils.InitializeAmqpConfig() + initializers.InitializeAmqpConfig() sneakerWorkers.InitWorkers() initializers.LoadCacheData() @@ -42,7 +42,7 @@ func initialize() { } func closeResource() { - utils.CloseAmqpConnection() + initializers.CloseAmqpConnection() utils.CloseRedisPools() utils.CloseMainDB() } @@ -52,11 +52,11 @@ func initAssignments() { treat.SubscribeReload() go func() { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } - queueName := utils.AmqpGlobalConfig.Queue.Trade["reload"] + queueName := initializers.AmqpGlobalConfig.Queue["trade"]["reload"] queue, err := channel.QueueDeclare(queueName, true, false, false, false, nil) if err != nil { return diff --git a/trade/treat/base.go b/trade/treat/base.go index 811300c..7fb7d14 100644 --- a/trade/treat/base.go +++ b/trade/treat/base.go @@ -4,6 +4,7 @@ import ( "fmt" envConfig "github.com/oldfritter/goDCE/config" + "github.com/oldfritter/goDCE/initializers" . "github.com/oldfritter/goDCE/models" "github.com/oldfritter/goDCE/utils" "github.com/streadway/amqp" @@ -41,7 +42,7 @@ func InitAssignments() { } func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } @@ -51,7 +52,7 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { go func(id int) { a := Assignments[id] - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) } @@ -75,12 +76,12 @@ func subscribeMessageByQueue(assignment *Market, arguments amqp.Table) error { } func SubscribeReload() (err error) { - channel, err := utils.RabbitMqConnect.Channel() + channel, err := initializers.RabbitMqConnect.Channel() if err != nil { fmt.Errorf("Channel: %s", err) return } - channel.ExchangeDeclare(utils.AmqpGlobalConfig.Exchange.Default["key"], "topic", true, false, false, false, nil) - channel.QueueBind(utils.AmqpGlobalConfig.Queue.Trade["reload"], utils.AmqpGlobalConfig.Queue.Trade["reload"], utils.AmqpGlobalConfig.Exchange.Default["key"], false, nil) + channel.ExchangeDeclare(initializers.AmqpGlobalConfig.Exchange["default"]["key"], "topic", true, false, false, false, nil) + channel.QueueBind(initializers.AmqpGlobalConfig.Queue["trade"]["reload"], initializers.AmqpGlobalConfig.Queue["trade"]["reload"], initializers.AmqpGlobalConfig.Exchange["default"]["key"], false, nil) return } diff --git a/trade/treat/worker.go b/trade/treat/worker.go index fa9de6b..c262094 100644 --- a/trade/treat/worker.go +++ b/trade/treat/worker.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/oldfritter/goDCE/initializers" . "github.com/oldfritter/goDCE/models" "github.com/oldfritter/goDCE/utils" "github.com/oldfritter/goDCE/workers/sneakerWorkers" @@ -83,7 +84,7 @@ func pushMessageToRefreshTicker(marketId int) { routingKey = w.RoutingKey } } - err = utils.PublishMessageWithRouteKey(exchange, routingKey, "text/plain", &b, amqp.Table{}, amqp.Persistent) + err = initializers.PublishMessageWithRouteKey(exchange, routingKey, "text/plain", &b, amqp.Table{}, amqp.Persistent) if err != nil { fmt.Println("{ error:", err, "}") panic(err) @@ -112,7 +113,7 @@ func pushMessageToRefreshKLine(marketId int) { if err != nil { fmt.Println("error:", err) } - err = utils.PublishMessageWithRouteKey(exchange, routingKey, "text/plain", &b, amqp.Table{}, amqp.Persistent) + err = initializers.PublishMessageWithRouteKey(exchange, routingKey, "text/plain", &b, amqp.Table{}, amqp.Persistent) if err != nil { fmt.Println("{ error:", err, "}") return diff --git a/utils/rabbitmq.go b/utils/rabbitmq.go deleted file mode 100644 index 1266139..0000000 --- a/utils/rabbitmq.go +++ /dev/null @@ -1,103 +0,0 @@ -package utils - -import ( - "fmt" - "io/ioutil" - "log" - "path/filepath" - "time" - - "github.com/streadway/amqp" - "gopkg.in/yaml.v2" -) - -type Amqp struct { - Connect struct { - Host string `yaml:"host"` - Port string `yaml:"port"` - Username string `yaml:"username"` - Password string `yaml:"password"` - } `yaml:"connect"` - - Exchange struct { - Default map[string]string `yaml:"default"` - Matching map[string]string `yaml:"matching"` - Trade map[string]string `yaml:"trade"` - Cancel map[string]string `yaml:"cancel"` - Fanout map[string]string `yaml:"fanout"` - } `yaml:"exchange"` - - Queue struct { - Matching map[string]string `yaml:"matching"` - Trade map[string]string `yaml:"trade"` - Cancel map[string]string `yaml:"cancel"` - } `yaml:"queue"` -} - -var ( - AmqpGlobalConfig Amqp - RabbitMqConnect *amqp.Connection -) - -func InitializeAmqpConfig() { - path_str, _ := filepath.Abs("config/amqp.yml") - content, err := ioutil.ReadFile(path_str) - if err != nil { - log.Fatal(err) - return - } - err = yaml.Unmarshal(content, &AmqpGlobalConfig) - if err != nil { - log.Fatal(err) - return - } - InitializeAmqpConnection() -} - -func InitializeAmqpConnection() { - var err error - RabbitMqConnect, err = amqp.Dial("amqp://" + AmqpGlobalConfig.Connect.Username + ":" + AmqpGlobalConfig.Connect.Password + "@" + AmqpGlobalConfig.Connect.Host + ":" + AmqpGlobalConfig.Connect.Port + "/") - if err != nil { - time.Sleep(5000) - InitializeAmqpConnection() - return - } - go func() { - <-RabbitMqConnect.NotifyClose(make(chan *amqp.Error)) - InitializeAmqpConnection() - }() -} - -func CloseAmqpConnection() { - RabbitMqConnect.Close() -} - -func GetRabbitMqConnect() *amqp.Connection { - return RabbitMqConnect -} - -func PublishMessageWithRouteKey(exchange, routeKey, contentType string, message *[]byte, arguments amqp.Table, deliveryMode uint8) error { - channel, err := RabbitMqConnect.Channel() - defer channel.Close() - if err != nil { - return fmt.Errorf("Channel: %s", err) - } - if err = channel.Publish( - exchange, // publish to an exchange - routeKey, // routing to 0 or more queues - false, // mandatory - false, // immediate - amqp.Publishing{ - Headers: amqp.Table{}, - ContentType: contentType, - ContentEncoding: "", - Body: *message, - DeliveryMode: deliveryMode, // amqp.Persistent, amqp.Transient // 1=non-persistent, 2=persistent - Priority: 0, // 0-9 - // a bunch of application/implementation-specific fields - }, - ); err != nil { - return fmt.Errorf("Queue Publish: %s", err) - } - return nil -} diff --git a/workers/sneakerWorkers/worker.go b/workers/sneakerWorkers/worker.go index ab0b52a..62e97aa 100644 --- a/workers/sneakerWorkers/worker.go +++ b/workers/sneakerWorkers/worker.go @@ -63,8 +63,8 @@ func (worker Worker) GetLog() string { return DefaultLog } func (worker Worker) GetLogFolder() string { - re := regexp.MustCompile("/.*.log$") - return strings.TrimSuffix(worker.Log, re.FindString(worker.Log)) + re := regexp.MustCompile(`\/.*\.log$`) + return strings.TrimSuffix(worker.GetLog(), re.FindString(worker.GetLog())) } func (worker Worker) GetDurable() bool { return worker.Durable diff --git a/workers/workers.go b/workers/workers.go index 29bcd03..970bd76 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -35,7 +35,7 @@ func initialize() { utils.InitBackupDB() models.AutoMigrations() utils.InitRedisPools() - utils.InitializeAmqpConfig() + initializers.InitializeAmqpConfig() initializers.LoadCacheData() initializers.InitializeAmqpConfig()