Skip to content

Commit

Permalink
grpc: implement book stream
Browse files Browse the repository at this point in the history
  • Loading branch information
c9s committed Apr 12, 2022
1 parent 46cf220 commit fb5703b
Show file tree
Hide file tree
Showing 4 changed files with 510 additions and 384 deletions.
90 changes: 81 additions & 9 deletions pkg/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"

"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

Expand All @@ -30,21 +31,23 @@ func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataSer
return fmt.Errorf("exchange %s not found", sub.Exchange)
}

switch types.Channel(sub.Channel) {
case types.MarketTradeChannel:
// TODO
case types.BookTickerChannel:
// TODO
switch sub.Channel {
case pb.Channel_TRADE:
exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{
Symbol: sub.Symbol,
Channel: types.MarketTradeChannel,
})

case types.BookChannel:
case pb.Channel_BOOK:
exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{
Symbol: sub.Symbol,
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.Depth(sub.Depth),
},
})
case types.KLineChannel:

case pb.Channel_KLINE:
exchangeSubscriptions[session.Name] = append(exchangeSubscriptions[session.Name], types.Subscription{
Symbol: sub.Symbol,
Channel: types.KLineChannel,
Expand All @@ -56,12 +59,31 @@ func (s *Server) Subscribe(request *pb.SubscribeRequest, server pb.MarketDataSer
}

for sessionName, subs := range exchangeSubscriptions {
if session, ok := s.Environ.Session(sessionName) ; ok {
if session, ok := s.Environ.Session(sessionName); ok {
stream := session.Exchange.NewStream()
stream.SetPublicOnly()
for _, sub := range subs {
stream.Subscribe(sub.Channel, sub.Symbol, sub.Options)
}

stream.OnBookSnapshot(func(book types.SliceOrderBook) {
if err := server.Send(transBook(session, book, pb.Event_SNAPSHOT)); err != nil {
log.WithError(err).Error("grpc stream send error")
}
})

stream.OnBookUpdate(func(book types.SliceOrderBook) {
if err := server.Send(transBook(session, book, pb.Event_UPDATE)); err != nil {
log.WithError(err).Error("grpc stream send error")
}
})
stream.OnKLineClosed(func(kline types.KLine) {
err := server.Send(transKLine(session, kline))
if err != nil {
log.WithError(err).Error("grpc stream send error")
}
})
go stream.Connect(server.Context())
}
}

Expand Down Expand Up @@ -94,7 +116,7 @@ func (s *Server) QueryKLines(ctx context.Context, request *pb.QueryKLinesRequest
response.Klines = append(response.Klines, &pb.KLine{
Exchange: kline.Exchange.String(),
Symbol: kline.Symbol,
Timestamp: kline.StartTime.Unix(),
StartTime: kline.StartTime.UnixMilli(),
Open: kline.Open.Float64(),
High: kline.High.Float64(),
Low: kline.Low.Float64(),
Expand Down Expand Up @@ -128,3 +150,53 @@ func (s *Server) ListenAndServe(bind string) error {

return nil
}

func transPriceVolume(srcPvs types.PriceVolumeSlice) (pvs []*pb.PriceVolume) {
for _, srcPv := range srcPvs {
pvs = append(pvs, &pb.PriceVolume{
Price: srcPv.Price.String(),
Volume: srcPv.Volume.String(),
})
}
return pvs
}

func transBook(session *bbgo.ExchangeSession, book types.SliceOrderBook, event pb.Event) *pb.SubscribeResponse {
return &pb.SubscribeResponse{
Session: session.Name,
Exchange: session.ExchangeName.String(),
Symbol: book.Symbol,
Channel: pb.Channel_BOOK,
Event: event, // pb.Event_UPDATE
Depth: &pb.Depth{
Exchange: session.ExchangeName.String(),
Symbol: book.Symbol,
Asks: transPriceVolume(book.Asks),
Bids: transPriceVolume(book.Bids),
},
}
}

func transKLine(session *bbgo.ExchangeSession, kline types.KLine) *pb.SubscribeResponse {
return &pb.SubscribeResponse{
Session: session.Name,
Exchange: kline.Exchange.String(),
Symbol: kline.Symbol,
Channel: pb.Channel_KLINE,
Event: pb.Event_UPDATE,
Kline: &pb.KLine{
Session: session.Name,
Exchange: kline.Exchange.String(),
Symbol: kline.Symbol,
Open: kline.Open.Float64(),
High: kline.High.Float64(),
Low: kline.Low.Float64(),
Close: kline.Close.Float64(),
Volume: kline.Volume.Float64(),
QuoteVolume: kline.QuoteVolume.Float64(),
StartTime: kline.StartTime.UnixMilli(),
EndTime: kline.StartTime.UnixMilli(),
},
SubscribedAt: 0,
}
}
Loading

0 comments on commit fb5703b

Please sign in to comment.