Skip to content

Commit

Permalink
自动维护rabbitmq queue
Browse files Browse the repository at this point in the history
  • Loading branch information
oldfritter committed Apr 1, 2020
1 parent 4c5515f commit 583b945
Show file tree
Hide file tree
Showing 7 changed files with 4 additions and 21 deletions.
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
newrelic "github.com/dafiti/echo-middleware"
"github.com/labstack/echo"
"github.com/labstack/echo/middleware"

envConfig "github.com/oldfritter/goDCE/config"
"github.com/oldfritter/goDCE/initializers"
"github.com/oldfritter/goDCE/models"
Expand Down Expand Up @@ -51,7 +52,6 @@ func main() {
<-quit
fmt.Println("accepted signal")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
initializers.DeleteListeQueue()
defer cancel()
if err := e.Shutdown(ctx); err != nil {
fmt.Println("shutting down failed, err:" + err.Error())
Expand Down
18 changes: 3 additions & 15 deletions initializers/cacheData.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/oldfritter/goDCE/utils"
)

var QueueName string

func InitCacheData() {
db := utils.MainDbBegin()
defer db.DbRollback()
Expand All @@ -23,26 +21,16 @@ func LoadCacheData() {
fmt.Errorf("Channel: %s", err)
}
channel.ExchangeDeclare(utils.AmqpGlobalConfig.Exchange.Fanout["name"], "fanout", true, false, false, false, nil)
queue, err := channel.QueueDeclare("", true, false, false, false, nil)
queue, err := channel.QueueDeclare("", true, true, false, false, nil)
if err != nil {
return
}
QueueName = queue.Name
channel.QueueBind(queue.Name, QueueName, utils.AmqpGlobalConfig.Exchange.Fanout["name"], false, nil)
msgs, _ := channel.Consume(queue.Name, "", true, false, false, false, nil)
channel.QueueBind(queue.Name, queue.Name, utils.AmqpGlobalConfig.Exchange.Fanout["name"], false, nil)
msgs, _ := channel.Consume(queue.Name, "", true, true, false, false, nil)
for _ = range msgs {
InitCacheData()
}
return
}()

}

func DeleteListeQueue() {
channel, err := utils.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
channel.QueueDelete(QueueName, false, false, false)

}
1 change: 0 additions & 1 deletion order/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func main() {
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
<-quit
initializers.DeleteListeQueue()
closeResource()
}

Expand Down
1 change: 0 additions & 1 deletion schedules/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func main() {
}

func closeResource() {
initializers.DeleteListeQueue()
utils.CloseAmqpConnection()
utils.CloseRedisPools()
utils.CloseMainDB()
Expand Down
1 change: 0 additions & 1 deletion trade/matching.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func main() {
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
<-quit
initializers.DeleteListeQueue()
closeResource()
}

Expand Down
1 change: 0 additions & 1 deletion trade/treat.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func main() {
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
<-quit
initializers.DeleteListeQueue()
closeResource()
}

Expand Down
1 change: 0 additions & 1 deletion workers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func initialize() {
}

func closeResource() {
initializers.DeleteListeQueue()
utils.CloseAmqpConnection()
utils.CloseRedisPools()
utils.CloseMainDB()
Expand Down

0 comments on commit 583b945

Please sign in to comment.