Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: [bybit] integrate the v5 trade history #1759

Merged
merged 5 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions pkg/exchange/bybit/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,42 @@ func v3ToGlobalTrade(trade v3.Trade) (*types.Trade, error) {
}, nil
}

func toGlobalTrade(trade bybitapi.Trade, feeDetail SymbolFeeDetail) (*types.Trade, error) {
side, err := toGlobalSideType(trade.Side)
if err != nil {
return nil, fmt.Errorf("unexpected side: %s, err: %w", trade.Side, err)
}
orderIdNum, err := strconv.ParseUint(trade.OrderId, 10, 64)
if err != nil {
return nil, fmt.Errorf("unexpected order id: %s, err: %w", trade.OrderId, err)
}
tradeIdNum, err := strconv.ParseUint(trade.ExecId, 10, 64)
if err != nil {
return nil, fmt.Errorf("unexpected trade id: %s, err: %w", trade.ExecId, err)
}

fc, _ := calculateFee(trade, feeDetail)

return &types.Trade{
ID: tradeIdNum,
OrderID: orderIdNum,
Exchange: types.ExchangeBybit,
Price: trade.ExecPrice,
Quantity: trade.ExecQty,
QuoteQuantity: trade.ExecPrice.Mul(trade.ExecQty),
Symbol: trade.Symbol,
Side: side,
IsBuyer: side == types.SideTypeBuy,
IsMaker: trade.IsMaker,
Time: types.Time(trade.ExecTime),
Fee: trade.ExecFee,
FeeCurrency: fc,
IsMargin: false,
IsFutures: false,
IsIsolated: false,
}, nil
}

func toGlobalBalanceMap(events []bybitapi.WalletBalances) types.BalanceMap {
bm := types.BalanceMap{}
for _, event := range events {
Expand Down
171 changes: 87 additions & 84 deletions pkg/exchange/bybit/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
"golang.org/x/time/rate"

"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
v3 "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi/v3"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)

const (
maxOrderIdLen = 36
defaultQueryLimit = 50
defaultKLineLimit = 1000
maxOrderIdLen = 36
defaultQueryLimit = 50
defaultQueryTradeLimit = 100
defaultKLineLimit = 1000

halfYearDuration = 6 * 30 * 24 * time.Hour
queryTradeDurationLimit = 7 * 24 * time.Hour
)

// https://bybit-exchange.github.io/docs/zh-TW/v5/rate-limit
Expand Down Expand Up @@ -52,26 +52,39 @@ var (
type Exchange struct {
key, secret string
client *bybitapi.RestClient
v3client *v3.Client
marketsInfo types.MarketMap

// feeRateProvider provides the fee rate and fee currency for each symbol.
// Because the bybit exchange does not provide a fee currency on traditional SPOT accounts, we need to query the marker
// fee rate to get the fee currency.
// https://bybit-exchange.github.io/docs/v5/enum#spot-fee-currency-instruction
FeeRatePoller
}

func New(key, secret string) (*Exchange, error) {
client, err := bybitapi.NewClient()
if err != nil {
return nil, err
}

ex := &Exchange{
key: key,
// pragma: allowlist nextline secret
secret: secret,
client: client,
}
if len(key) > 0 && len(secret) > 0 {
client.Auth(key, secret)
ex.FeeRatePoller = newFeeRatePoller(ex)

ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, errors.New("query markets timeout"))
defer cancel()
ex.marketsInfo, err = ex.QueryMarkets(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query markets, err: %w", err)
}
}

return &Exchange{
key: key,
// pragma: allowlist nextline secret
secret: secret,
client: client,
v3client: v3.NewClient(client),
}, nil
return ex, nil
}

func (e *Exchange) Name() types.ExchangeName {
Expand Down Expand Up @@ -226,35 +239,14 @@ func (e *Exchange) QueryOrderTrades(ctx context.Context, q types.OrderQuery) (tr
if len(q.OrderID) == 0 {
return nil, errors.New("orderID is required parameter")
}
req := e.v3client.NewGetTradesRequest().OrderId(q.OrderID)
req := e.client.NewGetExecutionListRequest().OrderId(q.OrderID)

if len(q.Symbol) != 0 {
req.Symbol(q.Symbol)
}
req.Limit(defaultQueryTradeLimit)

if err := queryOrderTradeRateLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("trade rate limiter wait error: %w", err)
}
response, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query order trades, err: %w", err)
}

var errs error
for _, trade := range response.List {
res, err := v3ToGlobalTrade(trade)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
trades = append(trades, *res)
}

if errs != nil {
return nil, errs
}

return trades, nil
return e.queryTrades(ctx, req)
}

func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) {
Expand Down Expand Up @@ -432,32 +424,65 @@ func (e *Exchange) QueryClosedOrders(
return types.SortOrdersAscending(orders), nil
}

func (e *Exchange) queryTrades(ctx context.Context, req *bybitapi.GetExecutionListRequest) (trades []types.Trade, err error) {
cursor := ""
for {
if len(cursor) != 0 {
req = req.Cursor(cursor)
}

res, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query trades, err: %w", err)
}

for _, trade := range res.List {
feeRate, err := pollAndGetFeeRate(ctx, trade.Symbol, e.FeeRatePoller, e.marketsInfo)
if err != nil {
return nil, fmt.Errorf("failed to get fee rate, err: %v", err)
}
trade, err := toGlobalTrade(trade, feeRate)
if err != nil {
return nil, fmt.Errorf("failed to convert trade, err: %v", err)
}

trades = append(trades, *trade)
}

if len(res.NextPageCursor) == 0 {
break
}
cursor = res.NextPageCursor
}

return trades, nil

}

/*
QueryTrades queries trades by time range or trade id range.
If options.StartTime is not specified, you can only query for records in the last 7 days.
If you want to query for records older than 7 days, options.StartTime is required.
It supports to query records up to 180 days.

** Here includes MakerRebate. If needed, let's discuss how to modify it to return in trade. **
** StartTime and EndTime are inclusive. **
** StartTime and EndTime cannot exceed 180 days. **
** StartTime, EndTime, FromTradeId can be used together. **
** If the `FromTradeId` is passed, and `ToTradeId` is null, then the result is sorted by tradeId in `ascend`.
Otherwise, the result is sorted by tradeId in `descend`. **
QueryTrades queries trades by time range.
** startTime and endTime are not passed, return 7 days by default **
** Only startTime is passed, return range between startTime and startTime+7 days **
** Only endTime is passed, return range between endTime-7 days and endTime **
** If both are passed, the rule is endTime - startTime <= 7 days **
*/
func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
// using v3 client, since the v5 API does not support feeCurrency.
req := e.v3client.NewGetTradesRequest()
req := e.client.NewGetExecutionListRequest()
req.Symbol(symbol)

// If `lastTradeId` is given and greater than 0, the query will use it as a condition and the retrieved result will be
// in `ascending` order. We can use `lastTradeId` to retrieve all the data. So we hack it to '1' if `lastTradeID` is '0'.
// If 0 is given, it will not be used as a condition and the result will be in `descending` order. The FromTradeId
// option cannot be used to retrieve more data.
req.FromTradeId(strconv.FormatUint(options.LastTradeID, 10))
if options.LastTradeID == 0 {
req.FromTradeId("1")
if options.StartTime != nil && options.EndTime != nil {
if options.EndTime.Before(*options.StartTime) {
return nil, fmt.Errorf("end time is before start time, start time: %s, end time: %s", options.StartTime.String(), options.EndTime.String())
}

if options.EndTime.Sub(*options.StartTime) > queryTradeDurationLimit {
newStartTime := options.EndTime.Add(-queryTradeDurationLimit)

log.Warnf("!!!BYBIT EXCHANGE API NOTICE!!! The time range exceeds the server boundary: %s, start time: %s, end time: %s, updated start time %s -> %s", queryTradeDurationLimit, options.StartTime.String(), options.EndTime.String(), options.StartTime.String(), newStartTime.String())
options.StartTime = &newStartTime
}
}

if options.StartTime != nil {
req.StartTime(options.StartTime.UTC())
}
Expand All @@ -466,35 +491,13 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
}

limit := uint64(options.Limit)
if limit > defaultQueryLimit || limit <= 0 {
log.Debugf("the parameter limit exceeds the server boundary or is set to zero. changed to %d, original value: %d", defaultQueryLimit, options.Limit)
limit = defaultQueryLimit
if limit > defaultQueryTradeLimit || limit <= 0 {
log.Debugf("the parameter limit exceeds the server boundary or is set to zero. changed to %d, original value: %d", defaultQueryTradeLimit, options.Limit)
limit = defaultQueryTradeLimit
}
req.Limit(limit)

if err := queryOrderTradeRateLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("trade rate limiter wait error: %w", err)
}
response, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query trades, err: %w", err)
}

var errs error
for _, trade := range response.List {
res, err := v3ToGlobalTrade(trade)
if err != nil {
errs = multierr.Append(errs, err)
continue
}
trades = append(trades, *res)
}

if errs != nil {
return nil, errs
}

return trades, nil
return e.queryTrades(ctx, req)
}

func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
Expand Down
42 changes: 28 additions & 14 deletions pkg/exchange/bybit/market_info_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ var (
pollFeeRateRateLimiter = rate.NewLimiter(rate.Every(10*time.Minute), 1)
)

type symbolFeeDetail struct {
type FeeRatePoller interface {
StartFeeRatePoller(ctx context.Context)
GetFeeRate(symbol string) (SymbolFeeDetail, bool)
PollFeeRate(ctx context.Context) error
}

type SymbolFeeDetail struct {
bybitapi.FeeRate

BaseCoin string
Expand All @@ -34,24 +40,27 @@ type feeRatePoller struct {
once sync.Once
client MarketInfoProvider

symbolFeeDetail map[string]symbolFeeDetail
// lastSyncTime is the last time the fee rate was updated.
lastSyncTime time.Time

symbolFeeDetail map[string]SymbolFeeDetail
}

func newFeeRatePoller(marketInfoProvider MarketInfoProvider) *feeRatePoller {
return &feeRatePoller{
client: marketInfoProvider,
symbolFeeDetail: map[string]symbolFeeDetail{},
symbolFeeDetail: map[string]SymbolFeeDetail{},
}
}

func (p *feeRatePoller) Start(ctx context.Context) {
func (p *feeRatePoller) StartFeeRatePoller(ctx context.Context) {
p.once.Do(func() {
p.startLoop(ctx)
})
}

func (p *feeRatePoller) startLoop(ctx context.Context) {
err := p.poll(ctx)
err := p.PollFeeRate(ctx)
if err != nil {
log.WithError(err).Warn("failed to initialize the fee rate, the ticker is scheduled to update it subsequently")
}
Expand All @@ -67,22 +76,27 @@ func (p *feeRatePoller) startLoop(ctx context.Context) {

return
case <-ticker.C:
if err := p.poll(ctx); err != nil {
if err := p.PollFeeRate(ctx); err != nil {
log.WithError(err).Warn("failed to update fee rate")
}
}
}
}

func (p *feeRatePoller) poll(ctx context.Context) error {
func (p *feeRatePoller) PollFeeRate(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()
// the poll will be called frequently, so we need to check the last sync time.
if time.Since(p.lastSyncTime) < feeRatePollingPeriod {
return nil
}
symbolFeeRate, err := p.getAllFeeRates(ctx)
if err != nil {
return err
}

p.mu.Lock()
p.symbolFeeDetail = symbolFeeRate
p.mu.Unlock()
p.lastSyncTime = time.Now()

if pollFeeRateRateLimiter.Allow() {
log.Infof("updated fee rate: %+v", p.symbolFeeDetail)
Expand All @@ -91,24 +105,24 @@ func (p *feeRatePoller) poll(ctx context.Context) error {
return nil
}

func (p *feeRatePoller) Get(symbol string) (symbolFeeDetail, bool) {
func (p *feeRatePoller) GetFeeRate(symbol string) (SymbolFeeDetail, bool) {
p.mu.Lock()
defer p.mu.Unlock()

fee, found := p.symbolFeeDetail[symbol]
return fee, found
}

func (e *feeRatePoller) getAllFeeRates(ctx context.Context) (map[string]symbolFeeDetail, error) {
func (e *feeRatePoller) getAllFeeRates(ctx context.Context) (map[string]SymbolFeeDetail, error) {
feeRates, err := e.client.GetAllFeeRates(ctx)
if err != nil {
return nil, fmt.Errorf("failed to call get fee rates: %w", err)
}

symbolMap := map[string]symbolFeeDetail{}
symbolMap := map[string]SymbolFeeDetail{}
for _, f := range feeRates.List {
if _, found := symbolMap[f.Symbol]; !found {
symbolMap[f.Symbol] = symbolFeeDetail{FeeRate: f}
symbolMap[f.Symbol] = SymbolFeeDetail{FeeRate: f}
}
}

Expand All @@ -117,7 +131,7 @@ func (e *feeRatePoller) getAllFeeRates(ctx context.Context) (map[string]symbolFe
return nil, fmt.Errorf("failed to get markets: %w", err)
}

// update base coin, quote coin into symbolFeeDetail
// update base coin, quote coin into SymbolFeeDetail
for _, mkt := range mkts {
feeRate, found := symbolMap[mkt.Symbol]
if !found {
Expand Down
Loading
Loading