Skip to content

Commit

Permalink
Merge pull request #1268 from bailantaotao/edwin/stream-query-book
Browse files Browse the repository at this point in the history
FEATURE: [bybit] implement order book streaming
  • Loading branch information
bailantaotao authored Aug 7, 2023
2 parents 616e939 + a6047f6 commit a8697ab
Show file tree
Hide file tree
Showing 7 changed files with 572 additions and 46 deletions.
18 changes: 1 addition & 17 deletions pkg/exchange/bybit/convert_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,19 @@
package bybit

import (
"context"
"fmt"
"math"
"strconv"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"go.uber.org/multierr"

"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
v3 "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi/v3"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
)

func TestU(t *testing.T) {
e := returnErr()

t.Log(errors.Is(e, context.DeadlineExceeded))

}

func returnErr() error {
var err error
return multierr.Append(multierr.Append(err, fmt.Errorf("got err: %w", context.DeadlineExceeded)), fmt.Errorf("GG"))
}

func TestToGlobalMarket(t *testing.T) {
// sample:
//{
Expand Down
95 changes: 92 additions & 3 deletions pkg/exchange/bybit/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bybit
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/gorilla/websocket"
Expand All @@ -15,10 +16,16 @@ const (
// Bybit: To avoid network or program issues, we recommend that you send the ping heartbeat packet every 20 seconds
// to maintain the WebSocket connection.
pingInterval = 20 * time.Second

// spotArgsLimit can input up to 10 args for each subscription request sent to one connection.
spotArgsLimit = 10
)

//go:generate callbackgen -type Stream
type Stream struct {
types.StandardStream

bookEventCallbacks []func(e BookEvent)
}

func NewStream() *Stream {
Expand All @@ -31,6 +38,8 @@ func NewStream() *Stream {
stream.SetDispatcher(stream.dispatchEvent)
stream.SetHeartBeat(stream.ping)

stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent)
return stream
}

Expand All @@ -46,16 +55,43 @@ func (s *Stream) createEndpoint(_ context.Context) (string, error) {

func (s *Stream) dispatchEvent(event interface{}) {
switch e := event.(type) {
case *WebSocketEvent:
case *WebSocketOpEvent:
if err := e.IsValid(); err != nil {
log.Errorf("invalid event: %v", err)
}

case *BookEvent:
s.EmitBookEvent(*e)
}
}

func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
var resp WebSocketEvent
return &resp, json.Unmarshal(in, &resp)
var e WsEvent

err := json.Unmarshal(in, &e)
if err != nil {
return nil, err
}

switch {
case e.IsOp():
return e.WebSocketOpEvent, nil

case e.IsTopic():
switch getTopicType(e.Topic) {
case TopicTypeOrderBook:
var book BookEvent
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", string(e.WebSocketTopicEvent.Data), err)
}

book.Type = e.WebSocketTopicEvent.Type
return &book, nil
}
}

return nil, fmt.Errorf("unhandled websocket event: %+v", string(in))
}

// ping implements the Bybit text message of WebSocket PingPong.
Expand Down Expand Up @@ -94,3 +130,56 @@ func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, cancelFunc cont
}
}
}

func (s *Stream) handlerConnect() {
if s.PublicOnly {
var topics []string

for _, subscription := range s.Subscriptions {
topic, err := convertSubscription(subscription)
if err != nil {
log.WithError(err).Errorf("subscription convert error")
continue
}

topics = append(topics, topic)
}
if len(topics) > spotArgsLimit {
log.Debugf("topics exceeds limit: %d, drop of: %v", spotArgsLimit, topics[spotArgsLimit:])
topics = topics[:spotArgsLimit]
}
log.Infof("subscribing channels: %+v", topics)
if err := s.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: topics,
}); err != nil {
log.WithError(err).Error("failed to send subscription request")
}
}
}

func convertSubscription(s types.Subscription) (string, error) {
switch s.Channel {
case types.BookChannel:
depth := types.DepthLevel1
if len(s.Options.Depth) > 0 && s.Options.Depth == types.DepthLevel50 {
depth = types.DepthLevel50
}
return genTopic(TopicTypeOrderBook, depth, s.Symbol), nil
}

return "", fmt.Errorf("unsupported stream channel: %s", s.Channel)
}

func (s *Stream) handleBookEvent(e BookEvent) {
orderBook := e.OrderBook()
switch {
// Occasionally, you'll receive "UpdateId"=1, which is a snapshot data due to the restart of
// the service. So please overwrite your local orderbook
case e.Type == DataTypeSnapshot || e.UpdateId.Int() == 1:
s.EmitBookSnapshot(orderBook)

case e.Type == DataTypeDelta:
s.EmitBookUpdate(orderBook)
}
}
15 changes: 15 additions & 0 deletions pkg/exchange/bybit/stream_callbacks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

155 changes: 155 additions & 0 deletions pkg/exchange/bybit/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package bybit

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"

"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)

func TestStream_parseWebSocketEvent(t *testing.T) {
s := Stream{}

t.Run("op", func(t *testing.T) {
input := `{
"success":true,
"ret_msg":"subscribe",
"conn_id":"a403c8e5-e2b6-4edd-a8f0-1a64fa7227a5",
"op":"subscribe"
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
opEvent, ok := res.(*WebSocketOpEvent)
assert.True(t, ok)
expSucceeds := true
expRetMsg := "subscribe"
assert.Equal(t, WebSocketOpEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ReqId: nil,
ConnId: "a403c8e5-e2b6-4edd-a8f0-1a64fa7227a5",
Op: WsOpTypeSubscribe,
Args: nil,
}, *opEvent)
})
t.Run("TopicTypeOrderBook with delta", func(t *testing.T) {
input := `{
"topic":"orderbook.50.BTCUSDT",
"ts":1691130685111,
"type":"delta",
"data":{
"s":"BTCUSDT",
"b":[
],
"a":[
[
"29239.37",
"0.082356"
],
[
"29236.1",
"0"
]
],
"u":1854104,
"seq":10559247733
}
}`

res, err := s.parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
book, ok := res.(*BookEvent)
assert.True(t, ok)
assert.Equal(t, BookEvent{
Symbol: "BTCUSDT",
Bids: nil,
Asks: types.PriceVolumeSlice{
{
fixedpoint.NewFromFloat(29239.37),
fixedpoint.NewFromFloat(0.082356),
},
{
fixedpoint.NewFromFloat(29236.1),
fixedpoint.NewFromFloat(0),
},
},
UpdateId: fixedpoint.NewFromFloat(1854104),
SequenceId: fixedpoint.NewFromFloat(10559247733),
Type: DataTypeDelta,
}, *book)
})

t.Run("Parse fails", func(t *testing.T) {
input := `{
"topic":"orderbook.50.BTCUSDT",
"ts":1691130685111,
"type":"delta",
"data":{
"GG": "test",
}
}`

res, err := s.parseWebSocketEvent([]byte(input))
assert.Error(t, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", `{
"GG": "test",
}`, err), err)
assert.Equal(t, nil, res)
})
}

func Test_convertSubscription(t *testing.T) {
t.Run("BookChannel.DepthLevel1", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.DepthLevel1,
},
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("BookChannel. with default depth", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("BookChannel.DepthLevel50", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.DepthLevel50,
},
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"), res)
})
t.Run("BookChannel. not support depth, use default level 1", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: "20",
},
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})

t.Run("unsupported channel", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: "unsupported",
})
assert.Error(t, fmt.Errorf("unsupported stream channel: %s", "unsupported"), err)
assert.Equal(t, "", res)
})
}
Loading

0 comments on commit a8697ab

Please sign in to comment.