From 79ded00e8f7ac25f8e075b9b2003e07fd7040f92 Mon Sep 17 00:00:00 2001 From: austin362667 Date: Sun, 12 Dec 2021 15:40:03 +0800 Subject: [PATCH] binance: add futures exchange stream connection --- pkg/exchange/binance/exchange.go | 46 +++++++---- pkg/exchange/binance/stream.go | 101 ++++++++++++++++++++--- pkg/exchange/binance/stream_callbacks.go | 24 ++++++ 3 files changed, 146 insertions(+), 25 deletions(-) diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index e136dd12b9..1864a1020c 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -3,6 +3,7 @@ package binance import ( "context" "fmt" + "github.com/adshao/go-binance/v2/futures" "net/http" "os" "strconv" @@ -14,7 +15,6 @@ import ( "github.com/adshao/go-binance/v2" "github.com/google/uuid" "github.com/pkg/errors" - "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/fixedpoint" @@ -27,7 +27,6 @@ const BNB = "BNB" // 50 per 10 seconds = 5 per second var orderLimiter = rate.NewLimiter(5, 5) - var log = logrus.WithFields(logrus.Fields{ "exchange": "binance", }) @@ -35,6 +34,7 @@ var log = logrus.WithFields(logrus.Fields{ func init() { _ = types.Exchange(&Exchange{}) _ = types.MarginExchange(&Exchange{}) + _ = types.FuturesExchange(&Exchange{}) // FIXME: this is not effected since dotenv is loaded in the rootCmd, not in the init function if ok, _ := strconv.ParseBool(os.Getenv("DEBUG_BINANCE_STREAM")); ok { @@ -46,20 +46,38 @@ type Exchange struct { types.MarginSettings types.FuturesSettings - key, secret string - Client *binance.Client + key, secret string + Client *binance.Client // Spot & Margin + futuresClient *futures.Client // USDT-M Futures + // deliveryClient *delivery.Client // Coin-M Futures } func New(key, secret string) *Exchange { var client = binance.NewClient(key, secret) client.HTTPClient = &http.Client{Timeout: 15 * time.Second} - _, _ = client.NewSetServerTimeService().Do(context.Background()) - return &Exchange{ - key: key, - secret: secret, - Client: client, + var futuresClient = binance.NewFuturesClient(key, secret) + futuresClient.HTTPClient = &http.Client{Timeout: 15 * time.Second} + _, _ = futuresClient.NewSetServerTimeService().Do(context.Background()) + + var err error + _, err = client.NewSetServerTimeService().Do(context.Background()) + if err != nil { + panic(err) + } + + _, err = futuresClient.NewSetServerTimeService().Do(context.Background()) + if err != nil { + panic(err) + } + + return &Exchange{ + key: key, + secret: secret, + Client: client, + futuresClient: futuresClient, + // deliveryClient: deliveryClient, } } @@ -152,8 +170,9 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6 } func (e *Exchange) NewStream() types.Stream { - stream := NewStream(e.Client) + stream := NewStream(e.Client, e.futuresClient) stream.MarginSettings = e.MarginSettings + stream.FuturesSettings = e.FuturesSettings return stream } @@ -180,7 +199,6 @@ func (e *Exchange) QueryIsolatedMarginAccount(ctx context.Context, symbols ...st return toGlobalIsolatedMarginAccount(account), nil } - func (e *Exchange) Withdrawal(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *types.WithdrawalOptions) error { req := e.Client.NewCreateWithdrawService() req.Coin(asset) @@ -700,7 +718,7 @@ func (e *Exchange) submitSpotOrder(ctx context.Context, order types.SubmitOrder) func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { for _, order := range orders { - if err := orderLimiter.Wait(ctx) ; err != nil { + if err := orderLimiter.Wait(ctx); err != nil { log.WithError(err).Errorf("order rate limiter wait error") } @@ -847,7 +865,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type } for _, t := range remoteTrades { - localTrade, err := ToGlobalTrade(*t, e.IsMargin) + localTrade, err := toGlobalTrade(*t, e.IsMargin) if err != nil { log.WithError(err).Errorf("can not convert binance trade: %+v", t) continue @@ -932,4 +950,4 @@ func getLaunchDate() (time.Time, error) { } return time.Date(2017, time.July, 14, 0, 0, 0, 0, loc), nil -} \ No newline at end of file +} diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index d773713521..ced2d9e24b 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -12,6 +12,8 @@ import ( "time" "github.com/adshao/go-binance/v2" + "github.com/adshao/go-binance/v2/futures" + "github.com/gorilla/websocket" "github.com/c9s/bbgo/pkg/types" @@ -61,7 +63,9 @@ type Stream struct { types.FuturesSettings types.StandardStream - Client *binance.Client + Client *binance.Client + futuresClient *futures.Client + Conn *websocket.Conn ConnLock sync.Mutex @@ -76,23 +80,28 @@ type Stream struct { kLineClosedEventCallbacks []func(e *KLineEvent) markPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent) - continuousKLineEventCallbacks []func(e *ContinuousKLineEvent) + + continuousKLineEventCallbacks []func(e *ContinuousKLineEvent) + continuousKLineClosedEventCallbacks []func(e *ContinuousKLineEvent) balanceUpdateEventCallbacks []func(event *BalanceUpdateEvent) outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent) outboundAccountPositionEventCallbacks []func(event *OutboundAccountPositionEvent) executionReportEventCallbacks []func(event *ExecutionReportEvent) + orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent) + depthFrames map[string]*DepthFrame } -func NewStream(client *binance.Client) *Stream { +func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { stream := &Stream{ StandardStream: types.StandardStream{ ReconnectC: make(chan struct{}, 1), }, - Client: client, - depthFrames: make(map[string]*DepthFrame), + Client: client, + futuresClient: futuresClient, + depthFrames: make(map[string]*DepthFrame), } stream.OnDepthEvent(func(e *DepthEvent) { @@ -207,6 +216,54 @@ func NewStream(client *binance.Client) *Stream { } }) + stream.OnContinuousKLineEvent(func(e *ContinuousKLineEvent) { + kline := e.KLine.KLine() + if e.KLine.Closed { + stream.EmitContinuousKLineClosedEvent(e) + stream.EmitKLineClosed(kline) + } else { + stream.EmitKLine(kline) + } + }) + + stream.OnOrderTradeUpdateEvent(func(e *OrderTradeUpdateEvent) { + switch e.OrderTrade.CurrentExecutionType { + + case "NEW", "CANCELED", "EXPIRED": + order, err := e.OrderFutures() + if err != nil { + log.WithError(err).Error("order convert error") + return + } + + stream.EmitOrderUpdate(*order) + + case "TRADE": + // TODO + + // trade, err := e.Trade() + // if err != nil { + // log.WithError(err).Error("trade convert error") + // return + // } + + // stream.EmitTradeUpdate(*trade) + + // order, err := e.OrderFutures() + // if err != nil { + // log.WithError(err).Error("order convert error") + // return + // } + + // Update Order with FILLED event + // if order.Status == types.OrderStatusFilled { + // stream.EmitOrderUpdate(*order) + // } + case "CALCULATED - Liquidation Execution": + log.Infof("CALCULATED - Liquidation Execution not support yet.") + } + }) + stream.OnDisconnect(func() { log.Infof("resetting depth snapshots...") for _, f := range stream.depthFrames { @@ -246,9 +303,17 @@ func (s *Stream) SetPublicOnly() { func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { var url string if s.publicOnly { - url = "wss://stream.binance.com:9443/ws" + if s.IsFutures { + url = "wss://fstream.binance.com/ws/" + } else { + url = "wss://stream.binance.com:9443/ws" + } } else { - url = "wss://stream.binance.com:9443/ws/" + listenKey + if s.IsFutures { + url = "wss://fstream.binance.com/ws/" + listenKey + } else { + url = "wss://stream.binance.com:9443/ws/" + listenKey + } } conn, _, err := defaultDialer.Dial(url, nil) @@ -278,7 +343,12 @@ func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { log.Infof("margin mode is enabled, requesting margin user stream listen key...") req := s.Client.NewStartMarginUserStreamService() return req.Do(ctx) + } else if s.IsFutures { + log.Infof("futures mode is enabled, requesting futures user stream listen key...") + req := s.futuresClient.NewStartUserStreamService() + return req.Do(ctx) } + log.Infof("spot mode is enabled, requesting margin user stream listen key...") return s.Client.NewStartUserStreamService().Do(ctx) } @@ -290,9 +360,11 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error req.Symbol(s.IsolatedMarginSymbol) return req.Do(ctx) } - req := s.Client.NewKeepaliveMarginUserStreamService().ListenKey(listenKey) return req.Do(ctx) + } else if s.IsFutures { + req := s.futuresClient.NewKeepaliveUserStreamService().ListenKey(listenKey) + return req.Do(ctx) } return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx) @@ -541,11 +613,15 @@ func (s *Stream) read(ctx context.Context) { case *ExecutionReportEvent: s.EmitExecutionReportEvent(e) - + case *MarkPriceUpdateEvent: s.EmitMarkPriceUpdateEvent(e) - case *ContinuousKLineEvent: - s.EmitContinuousKLineEvent(e) + + case *ContinuousKLineEvent: + s.EmitContinuousKLineEvent(e) + + case *OrderTradeUpdateEvent: + s.EmitOrderTradeUpdateEvent(e) } } } @@ -565,6 +641,9 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err err = req.Do(ctx) } + } else if s.IsFutures { + req := s.futuresClient.NewCloseUserStreamService().ListenKey(listenKey) + err = req.Do(ctx) } else { err = s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx) } diff --git a/pkg/exchange/binance/stream_callbacks.go b/pkg/exchange/binance/stream_callbacks.go index df6cbc15f5..5c7c3c0518 100644 --- a/pkg/exchange/binance/stream_callbacks.go +++ b/pkg/exchange/binance/stream_callbacks.go @@ -54,6 +54,16 @@ func (s *Stream) EmitContinuousKLineEvent(e *ContinuousKLineEvent) { } } +func (s *Stream) OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent)) { + s.continuousKLineClosedEventCallbacks = append(s.continuousKLineClosedEventCallbacks, cb) +} + +func (s *Stream) EmitContinuousKLineClosedEvent(e *ContinuousKLineEvent) { + for _, cb := range s.continuousKLineClosedEventCallbacks { + cb(e) + } +} + func (s *Stream) OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) { s.balanceUpdateEventCallbacks = append(s.balanceUpdateEventCallbacks, cb) } @@ -94,6 +104,16 @@ func (s *Stream) EmitExecutionReportEvent(event *ExecutionReportEvent) { } } +func (s *Stream) OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent)) { + s.orderTradeUpdateEventCallbacks = append(s.orderTradeUpdateEventCallbacks, cb) +} + +func (s *Stream) EmitOrderTradeUpdateEvent(e *OrderTradeUpdateEvent) { + for _, cb := range s.orderTradeUpdateEventCallbacks { + cb(e) + } +} + type StreamEventHub interface { OnDepthEvent(cb func(e *DepthEvent)) @@ -105,6 +125,8 @@ type StreamEventHub interface { OnContinuousKLineEvent(cb func(e *ContinuousKLineEvent)) + OnContinuousKLineClosedEvent(cb func(e *ContinuousKLineEvent)) + OnBalanceUpdateEvent(cb func(event *BalanceUpdateEvent)) OnOutboundAccountInfoEvent(cb func(event *OutboundAccountInfoEvent)) @@ -112,4 +134,6 @@ type StreamEventHub interface { OnOutboundAccountPositionEvent(cb func(event *OutboundAccountPositionEvent)) OnExecutionReportEvent(cb func(event *ExecutionReportEvent)) + + OnOrderTradeUpdateEvent(cb func(e *OrderTradeUpdateEvent)) }