Skip to content

Commit

Permalink
feat(binance): add listenOrderBook
Browse files Browse the repository at this point in the history
  • Loading branch information
iam4x committed Apr 12, 2023
1 parent e2fa19c commit 1779ae9
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/exchanges/binance/binance.exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
Market,
OHLCVOptions,
Order,
OrderBook,
PlaceOrderOpts,
Position,
Ticker,
Expand Down Expand Up @@ -397,6 +398,13 @@ export class Binance extends BaseExchange {
return this.publicWebsocket.listenOHLCV(opts, callback);
};

listenOrderBook = (
symbol: string,
callback: (orderBook: OrderBook) => void
) => {
return this.publicWebsocket.listenOrderBook(symbol, callback);
};

fetchPositionMode = async () => {
const { data } = await this.xhr.get(ENDPOINTS.HEDGE_MODE);
return data.dualSidePosition === true;
Expand Down
1 change: 1 addition & 0 deletions src/exchanges/binance/binance.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const ENDPOINTS = {
BATCH_ORDERS: '/fapi/v1/batchOrders',
KLINE: '/fapi/v1/klines',
LISTEN_KEY: '/fapi/v1/listenKey',
ORDERBOOK: '/fapi/v1/depth',
};

export const ORDER_TYPE: Record<string, OrderType> = {
Expand Down
142 changes: 140 additions & 2 deletions src/exchanges/binance/binance.ws-public.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { OHLCVOptions, Candle } from '../../types';
import type { OHLCVOptions, Candle, OrderBook } from '../../types';
import { v } from '../../utils/get-key';
import { calcOrderBookTotal, sortOrderBook } from '../../utils/orderbook';
import { BaseWebSocket } from '../base.ws';

import type { Binance } from './binance.exchange';
import { BASE_WS_URL } from './binance.types';
import { BASE_WS_URL, ENDPOINTS } from './binance.types';

type Data = Array<Record<string, any>>;
type MessageHandlers = {
Expand Down Expand Up @@ -131,4 +132,141 @@ export class BinancePublicWebsocket extends BaseWebSocket<Binance> {
delete this.messageHandlers.kline;
};
};

listenOrderBook = (
symbol: string,
callback: (orderBook: OrderBook) => void
) => {
const topic = `${symbol.toLowerCase()}@depth`;

const orderBook: OrderBook = {
bids: [],
asks: [],
};

const innerState = {
updates: [] as any[],
isSnapshotLoaded: false,
};

const fetchSnapshot = async () => {
const { data } = await this.parent.xhr.get(ENDPOINTS.ORDERBOOK, {
params: { symbol, limit: 1000 },
});

// save snapshot into orderBook object
orderBook.bids = data.bids.map(([price, amount]: string[]) => ({
price: parseFloat(price),
amount: parseFloat(amount),
total: 0,
}));

orderBook.asks = data.asks.map(([price, amount]: string[]) => ({
price: parseFloat(price),
amount: parseFloat(amount),
total: 0,
}));

// drop events where u < lastUpdateId
innerState.updates = innerState.updates.filter(
(update: any) => update.u > data.lastUpdateId
);

// apply all updates
innerState.updates.forEach((update: Record<string, any>) => {
this.processOrderBookUpdate(orderBook, update);
});

sortOrderBook(orderBook);
calcOrderBookTotal(orderBook);

innerState.isSnapshotLoaded = true;
innerState.updates = [];

callback(orderBook);
};

const waitForConnectedAndSubscribe = () => {
if (this.isConnected) {
// 1. subscribe to the topic
// 2. wait for the first message and send request to snapshot
// 3. store all incoming updates in an array
// 4. when the snapshot is received, apply all updates and send the order book to the callback
// 5. then on each update, apply it to the order book and send it to the callback
this.messageHandlers.depthUpdate = ([data]: Data) => {
// incorrect symbol, we don't take account
if (data.s !== symbol) return;

// first update, request snapshot
if (!innerState.isSnapshotLoaded && innerState.updates.length === 0) {
fetchSnapshot();
innerState.updates = [data];
return;
}

// more updates, but snapshot is not loaded yet
if (!innerState.isSnapshotLoaded) {
innerState.updates = [...innerState.updates, data];
return;
}

// snapshot is loaded, apply updates and callback
this.processOrderBookUpdate(orderBook, data);
sortOrderBook(orderBook);
calcOrderBookTotal(orderBook);

callback(orderBook);
};

const payload = { method: 'SUBSCRIBE', params: [topic], id: 1 };
this.ws?.send?.(JSON.stringify(payload));
} else {
setTimeout(() => waitForConnectedAndSubscribe(), 100);
}
};

waitForConnectedAndSubscribe();

return () => {
delete this.messageHandlers.depthUpdate;
orderBook.asks = [];
orderBook.bids = [];

if (this.isConnected) {
const payload = { method: 'UNSUBSCRIBE', params: [topic], id: 1 };
this.ws?.send?.(JSON.stringify(payload));
}
};
};

private processOrderBookUpdate = (
orderBook: OrderBook,
update: Record<string, any>
) => {
const sides = { bids: update.b, asks: update.a };

Object.entries(sides).forEach(([side, data]) => {
// we need this for ts compile
if (side !== 'bids' && side !== 'asks') return;

data.forEach(([p, a]: string[]) => {
const price = parseFloat(p);
const amount = parseFloat(a);
const index = orderBook[side].findIndex((b) => b.price === price);

if (index === -1 && amount > 0) {
orderBook[side].push({ price, amount, total: 0 });
return;
}

if (amount === 0) {
orderBook[side].splice(index, 1);
return;
}

// eslint-disable-next-line no-param-reassign
orderBook[side][index].amount = amount;
});
});
};
}

0 comments on commit 1779ae9

Please sign in to comment.