Skip to content

Commit

Permalink
binance: Desync books on disconnect.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoeGruffins committed Oct 10, 2024
1 parent bbc1dc3 commit 1ea349e
Showing 1 changed file with 41 additions and 7 deletions.
48 changes: 41 additions & 7 deletions client/mm/libxc/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type binanceOrderBook struct {
baseConversionFactor uint64
quoteConversionFactor uint64
log dex.Logger

connectedChan chan bool
}

func newBinanceOrderBook(
Expand All @@ -86,6 +88,7 @@ func newBinanceOrderBook(
quoteConversionFactor: quoteConversionFactor,
log: log,
getSnapshot: getSnapshot,
connectedChan: make(chan bool),
}
}

Expand Down Expand Up @@ -161,7 +164,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error

resyncChan := make(chan struct{}, 1)

desync := func() {
desync := func(resync bool) {
// clear the sync cache, set the special ID, trigger a book refresh.
syncMtx.Lock()
defer syncMtx.Unlock()
Expand All @@ -170,7 +173,9 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
if updateID != updateIDUnsynced {
b.synced.Store(false)
updateID = updateIDUnsynced
resyncChan <- struct{}{}
if resync {
resyncChan <- struct{}{}
}
}
}

Expand Down Expand Up @@ -265,7 +270,7 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
case update := <-b.updateQueue:
if !processUpdate(update) {
b.log.Tracef("Bad %s update with ID %d", b.mktID, update.LastUpdateID)
desync()
desync(true)
}
case <-ctx.Done():
return
Expand All @@ -288,6 +293,13 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
if retry != nil { // don't hammer
continue
}
case connected := <-b.connectedChan:
if !connected {
b.log.Debugf("Unsyncing %s orderbook due to disconnect.", b.mktID, retryFrequency)
desync(false)
retry = nil
continue
}
case <-ctx.Done():
return
}
Expand All @@ -296,8 +308,8 @@ func (b *binanceOrderBook) Connect(ctx context.Context) (*sync.WaitGroup, error
b.log.Infof("Synced %s orderbook", b.mktID)
retry = nil
} else {
b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency)
desync() // Clears the syncCache
b.log.Infof("Failed to sync %s orderbook. Trying again in %s", b.mktID, retryFrequency)
desync(false) // Clears the syncCache
retry = time.After(retryFrequency)
}
}
Expand Down Expand Up @@ -1742,10 +1754,27 @@ out:
// subscribeToAdditionalMarketDataStream.
func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quoteID uint32) error {
reconnectC := make(chan struct{})
checkSubsC := make(chan struct{})

newConnection := func() (comms.WsConn, *dex.ConnectionMaster, error) {
addr := fmt.Sprintf("%s/stream?streams=%s", bnc.wsURL, strings.Join(bnc.streams(), "/"))
// Need to send key but not signature
connectEventFunc := func(cs comms.ConnectionStatus) {
if cs != comms.Disconnected && cs != comms.Connected {
return
}
// If disconnected, set all books to unsynced so bots
// will not place new orders.
connected := cs == comms.Connected
bnc.booksMtx.RLock()
defer bnc.booksMtx.RUnlock()
for _, b := range bnc.books {
select {
case b.connectedChan <- connected:
default:
}
}
}
conn, err := comms.NewWsConn(&comms.WsCfg{
URL: addr,
// Binance Docs: The websocket server will send a ping frame every 3
Expand All @@ -1757,11 +1786,11 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
ReconnectSync: func() {
bnc.log.Debugf("Binance reconnected")
select {
case reconnectC <- struct{}{}:
case checkSubsC <- struct{}{}:
default:
}
},
ConnectEventFunc: func(cs comms.ConnectionStatus) {},
ConnectEventFunc: connectEventFunc,
Logger: bnc.log.SubLogger("BNCBOOK"),
RawHandler: bnc.handleMarketDataNote,
})
Expand Down Expand Up @@ -1849,6 +1878,11 @@ func (bnc *binance) connectToMarketDataStream(ctx context.Context, baseID, quote
bnc.log.Errorf("Error checking subscriptions: %v", err)
}
checkSubs = time.After(checkSubsInterval)
case <-checkSubsC:
if err := bnc.checkSubs(ctx); err != nil {
bnc.log.Errorf("Error checking subscriptions: %v", err)
}
checkSubs = time.After(checkSubsInterval)
case <-ctx.Done():
bnc.marketStreamMtx.Lock()
bnc.marketStream = nil
Expand Down

0 comments on commit 1ea349e

Please sign in to comment.