Skip to content

Commit

Permalink
binance: add futures exchange stream connection
Browse files Browse the repository at this point in the history
  • Loading branch information
austin362667 committed Dec 12, 2021
1 parent 3ceee97 commit 79ded00
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 25 deletions.
46 changes: 32 additions & 14 deletions pkg/exchange/binance/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package binance
import (
"context"
"fmt"
"github.com/adshao/go-binance/v2/futures"
"net/http"
"os"
"strconv"
Expand All @@ -14,7 +15,6 @@ import (
"github.com/adshao/go-binance/v2"
"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/sirupsen/logrus"

"github.com/c9s/bbgo/pkg/fixedpoint"
Expand All @@ -27,14 +27,14 @@ const BNB = "BNB"
// 50 per 10 seconds = 5 per second
var orderLimiter = rate.NewLimiter(5, 5)


var log = logrus.WithFields(logrus.Fields{
"exchange": "binance",
})

func init() {
_ = types.Exchange(&Exchange{})
_ = types.MarginExchange(&Exchange{})
_ = types.FuturesExchange(&Exchange{})

// FIXME: this is not effected since dotenv is loaded in the rootCmd, not in the init function
if ok, _ := strconv.ParseBool(os.Getenv("DEBUG_BINANCE_STREAM")); ok {
Expand All @@ -46,20 +46,38 @@ type Exchange struct {
types.MarginSettings
types.FuturesSettings

key, secret string
Client *binance.Client
key, secret string
Client *binance.Client // Spot & Margin
futuresClient *futures.Client // USDT-M Futures
// deliveryClient *delivery.Client // Coin-M Futures
}

func New(key, secret string) *Exchange {
var client = binance.NewClient(key, secret)
client.HTTPClient = &http.Client{Timeout: 15 * time.Second}

_, _ = client.NewSetServerTimeService().Do(context.Background())
return &Exchange{
key: key,
secret: secret,

Client: client,
var futuresClient = binance.NewFuturesClient(key, secret)
futuresClient.HTTPClient = &http.Client{Timeout: 15 * time.Second}
_, _ = futuresClient.NewSetServerTimeService().Do(context.Background())

var err error
_, err = client.NewSetServerTimeService().Do(context.Background())
if err != nil {
panic(err)
}

_, err = futuresClient.NewSetServerTimeService().Do(context.Background())
if err != nil {
panic(err)
}

return &Exchange{
key: key,
secret: secret,
Client: client,
futuresClient: futuresClient,
// deliveryClient: deliveryClient,
}
}

Expand Down Expand Up @@ -152,8 +170,9 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6
}

func (e *Exchange) NewStream() types.Stream {
stream := NewStream(e.Client)
stream := NewStream(e.Client, e.futuresClient)
stream.MarginSettings = e.MarginSettings
stream.FuturesSettings = e.FuturesSettings
return stream
}

Expand All @@ -180,7 +199,6 @@ func (e *Exchange) QueryIsolatedMarginAccount(ctx context.Context, symbols ...st
return toGlobalIsolatedMarginAccount(account), nil
}


func (e *Exchange) Withdrawal(ctx context.Context, asset string, amount fixedpoint.Value, address string, options *types.WithdrawalOptions) error {
req := e.Client.NewCreateWithdrawService()
req.Coin(asset)
Expand Down Expand Up @@ -700,7 +718,7 @@ func (e *Exchange) submitSpotOrder(ctx context.Context, order types.SubmitOrder)

func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
for _, order := range orders {
if err := orderLimiter.Wait(ctx) ; err != nil {
if err := orderLimiter.Wait(ctx); err != nil {
log.WithError(err).Errorf("order rate limiter wait error")
}

Expand Down Expand Up @@ -847,7 +865,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
}

for _, t := range remoteTrades {
localTrade, err := ToGlobalTrade(*t, e.IsMargin)
localTrade, err := toGlobalTrade(*t, e.IsMargin)
if err != nil {
log.WithError(err).Errorf("can not convert binance trade: %+v", t)
continue
Expand Down Expand Up @@ -932,4 +950,4 @@ func getLaunchDate() (time.Time, error) {
}

return time.Date(2017, time.July, 14, 0, 0, 0, 0, loc), nil
}
}
101 changes: 90 additions & 11 deletions pkg/exchange/binance/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"time"

"github.com/adshao/go-binance/v2"
"github.com/adshao/go-binance/v2/futures"

"github.com/gorilla/websocket"

"github.com/c9s/bbgo/pkg/types"
Expand Down Expand Up @@ -61,7 +63,9 @@ type Stream struct {
types.FuturesSettings
types.StandardStream

Client *binance.Client
Client *binance.Client
futuresClient *futures.Client

Conn *websocket.Conn
ConnLock sync.Mutex

Expand All @@ -76,23 +80,28 @@ type Stream struct {
kLineClosedEventCallbacks []func(e *KLineEvent)

markPriceUpdateEventCallbacks []func(e *MarkPriceUpdateEvent)
continuousKLineEventCallbacks []func(e *ContinuousKLineEvent)

continuousKLineEventCallbacks []func(e *ContinuousKLineEvent)
continuousKLineClosedEventCallbacks []func(e *ContinuousKLineEvent)

balanceUpdateEventCallbacks []func(event *BalanceUpdateEvent)
outboundAccountInfoEventCallbacks []func(event *OutboundAccountInfoEvent)
outboundAccountPositionEventCallbacks []func(event *OutboundAccountPositionEvent)
executionReportEventCallbacks []func(event *ExecutionReportEvent)

orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)

depthFrames map[string]*DepthFrame
}

func NewStream(client *binance.Client) *Stream {
func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
stream := &Stream{
StandardStream: types.StandardStream{
ReconnectC: make(chan struct{}, 1),
},
Client: client,
depthFrames: make(map[string]*DepthFrame),
Client: client,
futuresClient: futuresClient,
depthFrames: make(map[string]*DepthFrame),
}

stream.OnDepthEvent(func(e *DepthEvent) {
Expand Down Expand Up @@ -207,6 +216,54 @@ func NewStream(client *binance.Client) *Stream {
}
})

stream.OnContinuousKLineEvent(func(e *ContinuousKLineEvent) {
kline := e.KLine.KLine()
if e.KLine.Closed {
stream.EmitContinuousKLineClosedEvent(e)
stream.EmitKLineClosed(kline)
} else {
stream.EmitKLine(kline)
}
})

stream.OnOrderTradeUpdateEvent(func(e *OrderTradeUpdateEvent) {
switch e.OrderTrade.CurrentExecutionType {

case "NEW", "CANCELED", "EXPIRED":
order, err := e.OrderFutures()
if err != nil {
log.WithError(err).Error("order convert error")
return
}

stream.EmitOrderUpdate(*order)

case "TRADE":
// TODO

// trade, err := e.Trade()
// if err != nil {
// log.WithError(err).Error("trade convert error")
// return
// }

// stream.EmitTradeUpdate(*trade)

// order, err := e.OrderFutures()
// if err != nil {
// log.WithError(err).Error("order convert error")
// return
// }

// Update Order with FILLED event
// if order.Status == types.OrderStatusFilled {
// stream.EmitOrderUpdate(*order)
// }
case "CALCULATED - Liquidation Execution":
log.Infof("CALCULATED - Liquidation Execution not support yet.")
}
})

stream.OnDisconnect(func() {
log.Infof("resetting depth snapshots...")
for _, f := range stream.depthFrames {
Expand Down Expand Up @@ -246,9 +303,17 @@ func (s *Stream) SetPublicOnly() {
func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
var url string
if s.publicOnly {
url = "wss://stream.binance.com:9443/ws"
if s.IsFutures {
url = "wss://fstream.binance.com/ws/"
} else {
url = "wss://stream.binance.com:9443/ws"
}
} else {
url = "wss://stream.binance.com:9443/ws/" + listenKey
if s.IsFutures {
url = "wss://fstream.binance.com/ws/" + listenKey
} else {
url = "wss://stream.binance.com:9443/ws/" + listenKey
}
}

conn, _, err := defaultDialer.Dial(url, nil)
Expand Down Expand Up @@ -278,7 +343,12 @@ func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {
log.Infof("margin mode is enabled, requesting margin user stream listen key...")
req := s.Client.NewStartMarginUserStreamService()
return req.Do(ctx)
} else if s.IsFutures {
log.Infof("futures mode is enabled, requesting futures user stream listen key...")
req := s.futuresClient.NewStartUserStreamService()
return req.Do(ctx)
}
log.Infof("spot mode is enabled, requesting margin user stream listen key...")

return s.Client.NewStartUserStreamService().Do(ctx)
}
Expand All @@ -290,9 +360,11 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error
req.Symbol(s.IsolatedMarginSymbol)
return req.Do(ctx)
}

req := s.Client.NewKeepaliveMarginUserStreamService().ListenKey(listenKey)
return req.Do(ctx)
} else if s.IsFutures {
req := s.futuresClient.NewKeepaliveUserStreamService().ListenKey(listenKey)
return req.Do(ctx)
}

return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx)
Expand Down Expand Up @@ -541,11 +613,15 @@ func (s *Stream) read(ctx context.Context) {

case *ExecutionReportEvent:
s.EmitExecutionReportEvent(e)

case *MarkPriceUpdateEvent:
s.EmitMarkPriceUpdateEvent(e)
case *ContinuousKLineEvent:
s.EmitContinuousKLineEvent(e)

case *ContinuousKLineEvent:
s.EmitContinuousKLineEvent(e)

case *OrderTradeUpdateEvent:
s.EmitOrderTradeUpdateEvent(e)
}
}
}
Expand All @@ -565,6 +641,9 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err
err = req.Do(ctx)
}

} else if s.IsFutures {
req := s.futuresClient.NewCloseUserStreamService().ListenKey(listenKey)
err = req.Do(ctx)
} else {
err = s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/exchange/binance/stream_callbacks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 79ded00

Please sign in to comment.