Skip to content

Commit

Permalink
update: 优化ticker性能
Browse files Browse the repository at this point in the history
  • Loading branch information
leon authored and oldfritter committed Sep 22, 2020
1 parent 620ad5d commit c24408d
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 90 deletions.
3 changes: 3 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func main() {
<-quit
fmt.Println("accepted signal")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)

defer cancel()
if err := e.Shutdown(ctx); err != nil {
fmt.Println("shutting down failed, err:" + err.Error())
Expand Down Expand Up @@ -80,6 +81,8 @@ func initialize() {
initializers.LoadInterfaces()
initializers.InitI18n()
initializers.LoadCacheData()
initializers.LoadLatestKLines()
initializers.LoadLatestTickers()

err := ioutil.WriteFile("pids/api.pid", []byte(strconv.Itoa(os.Getpid())), 0644)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions api/v1/orderBook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func V1Getdepth(context echo.Context) error {
var market Market
mainDB := utils.MainDbBegin()
defer mainDB.DbRollback()
tickerRedis := utils.GetRedisConn("ticker")
defer tickerRedis.Close()
dataRedis := utils.GetRedisConn("data")
defer dataRedis.Close()
if mainDB.Where("name = ?", context.QueryParam("market")).First(&market).RecordNotFound() {
return utils.BuildError("1021")
}
Expand All @@ -35,8 +35,8 @@ func V1Getdepth(context echo.Context) error {
limit = 300
}
}
vAsk, _ := redis.String(tickerRedis.Do("GET", market.AskRedisKey()))
vBid, _ := redis.String(tickerRedis.Do("GET", market.BidRedisKey()))
vAsk, _ := redis.String(dataRedis.Do("GET", market.AskRedisKey()))
vBid, _ := redis.String(dataRedis.Do("GET", market.BidRedisKey()))
if vAsk == "" || vBid == "" {
return utils.BuildError("1026")
}
Expand Down
59 changes: 17 additions & 42 deletions api/v1/ticker.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,37 @@
package v1

import (
"encoding/json"
"net/http"
"time"

"github.com/gomodule/redigo/redis"
"github.com/labstack/echo"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
)

func V1GetTickers(context echo.Context) error {
mainDB := utils.MainDbBegin()
defer mainDB.DbRollback()
tickerRedis := utils.GetRedisConn("ticker")
defer tickerRedis.Close()
values, _ := redis.Values(tickerRedis.Do("HGETALL", TickersRedisKey))
tickers := make([]interface{}, 0)
var market Market
for i, value := range values {
if i%2 == 1 {
ticker := Ticker{
MarketId: market.Id,
At: time.Now().Unix(),
Name: market.Name,
}
json.Unmarshal(value.([]byte), &ticker.TickerAspect)
tickers = append(tickers, ticker)
} else {
marketId, _ := redis.String(value, nil)
if mainDB.Where("id = ?", marketId).First(&market).RecordNotFound() {
return utils.BuildError("1021")
}
}
func V1GetTickersMarket(context echo.Context) error {
market, err := FindMarketByCode(context.Param("market"))
if err != nil {
return utils.BuildError("1021")
}
ticker := Ticker{MarketId: market.Id, TickerAspect: market.Ticker}
response := utils.SuccessResponse
response.Body = tickers
response.Body = ticker
return context.JSON(http.StatusOK, response)
}

func V1GetTickersMarket(context echo.Context) error {
var market Market
mainDB := utils.MainDbBegin()
defer mainDB.DbRollback()
if mainDB.Where("code = ?", context.Param("market")).First(&market).RecordNotFound() {
return utils.BuildError("1021")
}
tickerRedis := utils.GetRedisConn("ticker")
defer tickerRedis.Close()
value, _ := tickerRedis.Do("HGET", TickersRedisKey, market.Id)
ticker := Ticker{
MarketId: market.Id,
At: time.Now().Unix(),
Name: market.Name,
func V1GetTickers(context echo.Context) error {
var tickers []Ticker
for _, market := range AllMarkets {
tickers = append(tickers, Ticker{
MarketId: market.Id,
At: time.Now().Unix(),
Name: market.Name,
TickerAspect: market.Ticker,
})
}
json.Unmarshal(value.([]byte), &ticker.TickerAspect)

response := utils.SuccessResponse
response.Body = ticker
response.Body = tickers
return context.JSON(http.StatusOK, response)
}
4 changes: 3 additions & 1 deletion config/amqp.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ exchange:
key: goDCE.order.cancel
type: direct
fanout:
name: goDCE.fanout
default: goDCE.fanout
k: goDCE.fanout.notify.k
ticker: goDCE.fanout.notify.ticker

queue:
matching:
Expand Down
4 changes: 2 additions & 2 deletions config/env.yml.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
model: production
node: a
newrelic:
app_name: goDCE
license_key: fd4037a09661ecc12378f9da59b161e4a88c9c7e
app_name:
license_key:
38 changes: 32 additions & 6 deletions initializers/cacheData.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package initializers

import (
"encoding/json"
"fmt"
"log"
"reflect"

"github.com/oldfritter/goDCE/config"
. "github.com/oldfritter/goDCE/models"
"github.com/oldfritter/goDCE/utils"
)

type Payload struct {
Update string `json:"update"`
Symbol int `json:"symbol"`
}

func InitCacheData() {
db := utils.MainDbBegin()
defer db.DbRollback()
Expand All @@ -16,22 +24,40 @@ func InitCacheData() {
}

func LoadCacheData() {
InitCacheData()
go func() {
channel, err := config.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
log.Println(fmt.Errorf("Channel: %s", err))
}
channel.ExchangeDeclare(config.AmqpGlobalConfig.Exchange["fanout"]["name"], "fanout", true, false, false, false, nil)
channel.ExchangeDeclare(config.AmqpGlobalConfig.Exchange["fanout"]["default"], "fanout", true, false, false, false, nil)
queue, err := channel.QueueDeclare("", true, true, false, false, nil)
if err != nil {
return
}
channel.QueueBind(queue.Name, queue.Name, config.AmqpGlobalConfig.Exchange["fanout"]["name"], false, nil)
msgs, _ := channel.Consume(queue.Name, "", true, true, false, false, nil)
for _ = range msgs {
InitCacheData()
channel.QueueBind(queue.Name, queue.Name, config.AmqpGlobalConfig.Exchange["fanout"]["default"], false, nil)
msgs, _ := channel.Consume(queue.Name, "", true, false, false, false, nil)
for d := range msgs {
var payload Payload
err := json.Unmarshal(d.Body, &payload)
if err == nil {
reflect.ValueOf(&payload).MethodByName(payload.Update).Call([]reflect.Value{reflect.ValueOf(payload.Symbol)})
} else {
log.Println(fmt.Sprintf("{error: %v}", err))
}
}
return
}()
}

func (payload *Payload) ReloadCurrencies() {
db := utils.MainDbBegin()
defer db.DbRollback()
InitAllCurrencies(db)
}

func (payload *Payload) ReloadMarkets() {
db := utils.MainDbBegin()
defer db.DbRollback()
InitAllMarkets(db)
}
35 changes: 35 additions & 0 deletions initializers/latestKLine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package initializers

import (
"encoding/json"
"fmt"

"github.com/oldfritter/goDCE/config"
. "github.com/oldfritter/goDCE/models"
)

func LoadLatestKLines() {
go func() {
channel, err := config.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
channel.ExchangeDeclare(config.AmqpGlobalConfig.Exchange["fanout"]["k"], "fanout", true, false, false, false, nil)
queue, err := channel.QueueDeclare("", true, true, false, false, nil)
if err != nil {
return
}
channel.QueueBind(queue.Name, queue.Name, config.AmqpGlobalConfig.Exchange["fanout"]["k"], false, nil)
msgs, _ := channel.Consume(queue.Name, "", true, false, false, false, nil)
for d := range msgs {
var notifyKLine KLine
json.Unmarshal(d.Body, &notifyKLine)
for i, _ := range AllMarkets {
if AllMarkets[i].Id == notifyKLine.MarketId {
AllMarkets[i].LatestKLines[notifyKLine.Period] = notifyKLine
}
}
}
return
}()
}
35 changes: 35 additions & 0 deletions initializers/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package initializers

import (
"encoding/json"
"fmt"

"github.com/oldfritter/goDCE/config"
. "github.com/oldfritter/goDCE/models"
)

func LoadLatestTickers() {
go func() {
channel, err := config.RabbitMqConnect.Channel()
if err != nil {
fmt.Errorf("Channel: %s", err)
}
channel.ExchangeDeclare(config.AmqpGlobalConfig.Exchange["fanout"]["ticker"], "fanout", true, false, false, false, nil)
queue, err := channel.QueueDeclare("", true, true, false, false, nil)
if err != nil {
return
}
channel.QueueBind(queue.Name, queue.Name, config.AmqpGlobalConfig.Exchange["fanout"]["ticker"], false, nil)
msgs, _ := channel.Consume(queue.Name, "", true, false, false, false, nil)
for d := range msgs {
var ticker Ticker
json.Unmarshal(d.Body, &ticker)
for i, _ := range AllMarkets {
if AllMarkets[i].Id == ticker.MarketId {
AllMarkets[i].Ticker = ticker.TickerAspect
}
}
}
return
}()
}
32 changes: 23 additions & 9 deletions models/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ type Market struct {
Visible bool `json:"visible"`
Tradable bool `json:"tradable"`

// 暂存数据
Ticker TickerAspect `sql:"-" json:"ticker"`
LatestKLines map[int]KLine `sql:"-" json:"-"`

// 撮合相关属性
Ack bool `json:"-"`
Durable bool `json:"-"`
Expand All @@ -45,34 +49,32 @@ type Market struct {
OrderCancel string `json:"-"`
}

var Markets []Market
var AllMarkets []Market

func InitAllMarkets(db *utils.GormDB) {
db.Where("visible = ?", true).Find(&Markets)
db.Where("visible = ?", true).Find(&AllMarkets)
}

func FindAllMarket() []Market {
return Markets
return AllMarkets
}

func FindMarketById(id int) (Market, error) {
for _, market := range Markets {
for _, market := range AllMarkets {
if market.Id == id {
return market, nil
}
}
var market Market
return market, fmt.Errorf("No market can be found.")
return Market{}, fmt.Errorf("No market can be found.")
}

func FindMarketByCode(code string) (Market, error) {
for _, market := range Markets {
for _, market := range AllMarkets {
if market.Code == code {
return market, nil
}
}
var market Market
return market, fmt.Errorf("No market can be found.")
return Market{}, fmt.Errorf("No market can be found.")
}

func (market *Market) AfterCreate(db *gorm.DB) {
Expand All @@ -86,6 +88,10 @@ func (market *Market) AfterCreate(db *gorm.DB) {
}
}

func (market *Market) AfterFind(db *gorm.DB) {
market.LatestKLines = make(map[int]KLine)
}

// Exchange
func (assignment *Market) MatchingExchange() string {
return assignment.Matching
Expand Down Expand Up @@ -124,3 +130,11 @@ func (market *Market) AskRedisKey() string {
func (market *Market) BidRedisKey() string {
return fmt.Sprintf("goDCE:depth:%v:bid", market.Id)
}

// Notify
func (market *Market) KLineNotify(period int64) string {
return "market:kLine:notify"
}
func (market *Market) TickerNotify() string {
return fmt.Sprintf("market:ticker:notify:%v", market.Id)
}
2 changes: 1 addition & 1 deletion schedules/order/waitingCheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func WaitingOrderCheck() {
db := utils.MainDbBegin()
defer db.DbRollback()

for _, market := range Markets {
for _, market := range AllMarkets {
ordersPerMarket(db, &market)
}
db.DbCommit()
Expand Down
4 changes: 2 additions & 2 deletions trade/matching/depth.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ func buildDepth(market *Market) {
dataRedis := utils.GetRedisConn("data")
defer dataRedis.Close()

dataRedis.Do((*market).AskRedisKey(), depth.AskOrders)
dataRedis.Do((*market).BidRedisKey(), depth.BidOrders)
dataRedis.Do("SET", (*market).AskRedisKey(), depth.AskOrders)
dataRedis.Do("SET", (*market).BidRedisKey(), depth.BidOrders)
}
Loading

0 comments on commit c24408d

Please sign in to comment.