go get github.com/jmuyuyang/queue_proxy
queue:
-
name: "hlg-kafka"
type: kafka
attr:
bind: 172.16.2.216:9092
timeout: 3
pool_size: 5
-
name: "hlg-redis"
type: redis
attr:
bind: 127.0.0.1:6379
timeout: 3
pool_size: 5
disk:
path: "./data"
flush_timeout: 2
compress_type: "gzip" //文件压缩方式
import queue "github.com/jmuyuyang/queue_proxy"
val config queue.QueueConfig
config = queue.ParseConfigFile(cfgFile)
queue.NewQueueProducer(config)
queue.InitQueue("producer-name","topic-name","hlg-redis")
queue.Start()
queue.SendMessage(dateByte)
queue.SetRateLimit(ratePerSecond) //限制限流(每秒流速)
queue.Stop()
queue.NewQueueConsumer(config)
queue.InitQueue("hlg-kafka","logcenter")
queue.Start()
msg := <-queue.GetMessageChan()