From 583b945b07617edcfc1f278d9213eb4de78b88e2 Mon Sep 17 00:00:00 2001 From: Leon Oldfritter Date: Wed, 1 Apr 2020 10:09:13 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=8A=A8=E7=BB=B4=E6=8A=A4rabbitmq=20?= =?UTF-8?q?queue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/api.go | 2 +- initializers/cacheData.go | 18 +++--------------- order/cancel.go | 1 - schedules/schedule.go | 1 - trade/matching.go | 1 - trade/treat.go | 1 - workers/workers.go | 1 - 7 files changed, 4 insertions(+), 21 deletions(-) diff --git a/api/api.go b/api/api.go index dcce005..29a7a9e 100644 --- a/api/api.go +++ b/api/api.go @@ -13,6 +13,7 @@ import ( newrelic "github.com/dafiti/echo-middleware" "github.com/labstack/echo" "github.com/labstack/echo/middleware" + envConfig "github.com/oldfritter/goDCE/config" "github.com/oldfritter/goDCE/initializers" "github.com/oldfritter/goDCE/models" @@ -51,7 +52,6 @@ func main() { <-quit fmt.Println("accepted signal") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - initializers.DeleteListeQueue() defer cancel() if err := e.Shutdown(ctx); err != nil { fmt.Println("shutting down failed, err:" + err.Error()) diff --git a/initializers/cacheData.go b/initializers/cacheData.go index 81143d9..ac3e9c9 100644 --- a/initializers/cacheData.go +++ b/initializers/cacheData.go @@ -7,8 +7,6 @@ import ( "github.com/oldfritter/goDCE/utils" ) -var QueueName string - func InitCacheData() { db := utils.MainDbBegin() defer db.DbRollback() @@ -23,13 +21,12 @@ func LoadCacheData() { fmt.Errorf("Channel: %s", err) } channel.ExchangeDeclare(utils.AmqpGlobalConfig.Exchange.Fanout["name"], "fanout", true, false, false, false, nil) - queue, err := channel.QueueDeclare("", true, false, false, false, nil) + queue, err := channel.QueueDeclare("", true, true, false, false, nil) if err != nil { return } - QueueName = queue.Name - channel.QueueBind(queue.Name, QueueName, utils.AmqpGlobalConfig.Exchange.Fanout["name"], false, nil) - msgs, _ := channel.Consume(queue.Name, "", true, false, false, false, nil) + channel.QueueBind(queue.Name, queue.Name, utils.AmqpGlobalConfig.Exchange.Fanout["name"], false, nil) + msgs, _ := channel.Consume(queue.Name, "", true, true, false, false, nil) for _ = range msgs { InitCacheData() } @@ -37,12 +34,3 @@ func LoadCacheData() { }() } - -func DeleteListeQueue() { - channel, err := utils.RabbitMqConnect.Channel() - if err != nil { - fmt.Errorf("Channel: %s", err) - } - channel.QueueDelete(QueueName, false, false, false) - -} diff --git a/order/cancel.go b/order/cancel.go index c02ef2b..53ff79c 100644 --- a/order/cancel.go +++ b/order/cancel.go @@ -21,7 +21,6 @@ func main() { quit := make(chan os.Signal) signal.Notify(quit, os.Interrupt) <-quit - initializers.DeleteListeQueue() closeResource() } diff --git a/schedules/schedule.go b/schedules/schedule.go index 23b2e26..138b5ba 100644 --- a/schedules/schedule.go +++ b/schedules/schedule.go @@ -41,7 +41,6 @@ func main() { } func closeResource() { - initializers.DeleteListeQueue() utils.CloseAmqpConnection() utils.CloseRedisPools() utils.CloseMainDB() diff --git a/trade/matching.go b/trade/matching.go index afd57e1..ba97271 100644 --- a/trade/matching.go +++ b/trade/matching.go @@ -21,7 +21,6 @@ func main() { quit := make(chan os.Signal) signal.Notify(quit, os.Interrupt) <-quit - initializers.DeleteListeQueue() closeResource() } diff --git a/trade/treat.go b/trade/treat.go index 141d5db..b4a598b 100644 --- a/trade/treat.go +++ b/trade/treat.go @@ -22,7 +22,6 @@ func main() { quit := make(chan os.Signal) signal.Notify(quit, os.Interrupt) <-quit - initializers.DeleteListeQueue() closeResource() } diff --git a/workers/workers.go b/workers/workers.go index d12012f..f0e31f9 100644 --- a/workers/workers.go +++ b/workers/workers.go @@ -47,7 +47,6 @@ func initialize() { } func closeResource() { - initializers.DeleteListeQueue() utils.CloseAmqpConnection() utils.CloseRedisPools() utils.CloseMainDB()