Skip to content
This repository has been archived by the owner on Sep 9, 2023. It is now read-only.

Bitfinex handle heartbeat sequenceId #257

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions __tests__/exchanges/bitfinex-client.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ testClient({
hasTimestampMs: true,
hasSequenceId: true,
hasCount: true,
done: function(spec, result, update) {
const hasAsks = update.asks && update.asks.length > 0;
const hasBids = update.bids && update.bids.length > 0;
return hasAsks || hasBids;
},
},

l3snapshot: {
Expand All @@ -81,5 +86,10 @@ testClient({
hasTimestampMs: true,
hasSequenceId: true,
hasCount: true,
done: function(spec, result, update) {
const hasAsks = update.asks && update.asks.length > 0;
const hasBids = update.bids && update.bids.length > 0;
return hasAsks || hasBids;
},
},
});
153 changes: 143 additions & 10 deletions src/exchanges/bitfinex-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,20 @@ const Level3Snapshot = require("../level3-snapshot");
const Level3Update = require("../level3-update");

class BitfinexClient extends BasicClient {
constructor({ wssPath = "wss://api.bitfinex.com/ws/2", watcherMs, l2UpdateDepth = 250 } = {}) {
/**
*
* @param {Object} params
* @param {Boolean} [params.enableEmptyHeartbeatEvents] (optional, default false). if true, emits empty events for all channels on heartbeat events which includes the sequenceId.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new constructor param enableEmptyHeartbeatEvents, to enable empty Ticker and Trade events w/the sequenceId when these channels receive a heartbeat

* @param {String} [params.tradeMessageType] (optional, defaults to "tu"). one of "tu", "te", or "all". determines whether to use trade channel events of type "te" or "tu", or all trade events. see https://blog.bitfinex.com/api/websocket-api-update/.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new constructor param tradeMessageType to specify which trade events to handle. it was previously "tu" and that's probably fine for most ppl, but now you can specify "te", "tu", or "all"

* if you're using sequenceIds to validate websocket messages you may want to use "all" to receive every sequenceId
*/
constructor({
wssPath = "wss://api.bitfinex.com/ws/2",
watcherMs,
l2UpdateDepth = 250,
enableEmptyHeartbeatEvents = false,
tradeMessageType = "tu",
} = {}) {
super(wssPath, "Bitfinex", undefined, watcherMs);
this._channels = {};

Expand All @@ -18,6 +31,8 @@ class BitfinexClient extends BasicClient {
this.hasLevel2Updates = true;
this.hasLevel3Updates = true;
this.l2UpdateDepth = l2UpdateDepth;
this.enableEmptyHeartbeatEvents = enableEmptyHeartbeatEvents;
this.tradeMessageType = tradeMessageType; // "te", "tu", or "all"
}

_onConnected() {
Expand Down Expand Up @@ -129,7 +144,6 @@ class BitfinexClient extends BasicClient {

_onMessage(raw) {
let msg = JSON.parse(raw);

bmancini55 marked this conversation as resolved.
Show resolved Hide resolved
// capture channel metadata
if (msg.event === "subscribed") {
this._channels[msg.chanId] = msg;
Expand All @@ -140,9 +154,6 @@ class BitfinexClient extends BasicClient {
let channel = this._channels[msg[0]];
if (!channel) return;

// ignore heartbeats
if (msg[1] === "hb") return;

if (channel.channel === "ticker") {
let market = this._tickerSubs.get(channel.pair);
if (!market) return;
Expand All @@ -152,7 +163,7 @@ class BitfinexClient extends BasicClient {
}

// trades
if (channel.channel === "trades" && msg[1] === "tu") {
if (channel.channel === "trades") {
let market = this._tradeSubs.get(channel.pair);
if (!market) return;

Expand Down Expand Up @@ -181,15 +192,31 @@ class BitfinexClient extends BasicClient {
}

_onTicker(msg, market) {
let msgBody = msg[1];
const sequenceId = Number(msg[2]);
const timestampMs = msg[3];

if (msg[1] === "hb") {
bmancini55 marked this conversation as resolved.
Show resolved Hide resolved
if (this.enableEmptyHeartbeatEvents === false) return;
// handle heartbeat by emitting empty update w/sequenceId.
// heartbeat msg: [ 198655, 'hb', 3, 1610920929093 ]
let ticker = new Ticker({
exchange: "Bitfinex",
base: market.base,
quote: market.quote,
timestamp: timestampMs,
sequenceId,
});
this.emit("ticker", ticker, market);
return;
}
let msgBody = msg[1];
let [bid, bidSize, ask, askSize, change, changePercent, last, volume, high, low] = msgBody;
let open = last + change;
let ticker = new Ticker({
exchange: "Bitfinex",
base: market.base,
quote: market.quote,
timestamp: Date.now(),
timestamp: timestampMs,
sequenceId,
last: last.toFixed(8),
open: open.toFixed(8),
Expand All @@ -207,9 +234,79 @@ class BitfinexClient extends BasicClient {
}

_onTradeMessage(msg, market) {
// example msg: [ 359491, 'tu', [ 560287312, 1609712228656, 0.005, 33432 ], 6 ]
let [id, unix, amount, price] = msg[2];
const timestampMs = msg[3];
if (msg[1] === "hb") {
bmancini55 marked this conversation as resolved.
Show resolved Hide resolved
const sequenceId = Number(msg[2]);
if (this.enableEmptyHeartbeatEvents === false) return;
// handle heartbeat by emitting empty update w/sequenceId.
// example trade heartbeat msg: [ 198655, 'hb', 3, 1610920929093 ]
let trade = new Trade({
exchange: "Bitfinex",
base: market.base,
quote: market.quote,
timestamp: timestampMs,
sequenceId,
});
this.emit("trade", trade, market);
return;
}
if (Array.isArray(msg[1])) {
bmancini55 marked this conversation as resolved.
Show resolved Hide resolved
const sequenceId = Number(msg[2]);
// trade snapshot example msg:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this wasn't handling the initial trades snapshot previously, so I fixed that as well

/*
[
CHANNEL_ID,
[
[
ID,
MTS,
AMOUNT,
PRICE
],
...
],
sequenceId,
timestampMs
]
*/
msg[1].forEach(thisTrade => {
bmancini55 marked this conversation as resolved.
Show resolved Hide resolved
let [id, unix, amount, price] = thisTrade;

let side = amount > 0 ? "buy" : "sell";
price = price.toFixed(8);
amount = Math.abs(amount).toFixed(8);
let trade = new Trade({
exchange: "Bitfinex",
base: market.base,
quote: market.quote,
tradeId: id.toFixed(),
sequenceId,
unix: unix,
side,
price,
amount,
});
this.emit("trade", trade, market);
});
return;
}
// example trade update msg: [ 359491, 'tu' or 'te', [ 560287312, 1609712228656, 0.005, 33432 ], 6 ]
// note: "tu" means it's got the tradeId, this is delayed by 1-2 seconds and includes tradeId.
// "te" is the same but available immediately and without the tradeId
let shouldHandleTradeEvent = false;
const tradeEventType = msg[1];
if (this.tradeMessageType === "all") {
bmancini55 marked this conversation as resolved.
Show resolved Hide resolved
shouldHandleTradeEvent = true;
} else if (this.tradeMessageType === "te" && tradeEventType === "te") {
shouldHandleTradeEvent = true;
} else if (this.tradeMessageType === "tu" && tradeEventType === "tu") {
shouldHandleTradeEvent = true;
}
if (!shouldHandleTradeEvent) {
return;
}
const sequenceId = Number(msg[3]);
let [id, unix, amount, price] = msg[2];

let side = amount > 0 ? "buy" : "sell";
price = price.toFixed(8);
Expand Down Expand Up @@ -270,6 +367,24 @@ class BitfinexClient extends BasicClient {
const sequenceId = Number(msg[2]);
const timestampMs = msg[3];

if (msg[1] === "hb") {
bmancini55 marked this conversation as resolved.
Show resolved Hide resolved
// handle heartbeat by emitting empty update w/sequenceId.
// heartbeat msg: [ 169546, 'hb', 17, 1610921150321 ]
// NOTE: for order book updates we don't check if enableEmptyHeartbeatEvents === true, because
// an empty l2 update is 100% backward compatible so no harm done in emitting it
let update = new Level2Update({
exchange: "Bitfinex",
base: market.base,
quote: market.quote,
sequenceId,
timestampMs,
asks: [],
bids: [],
});
this.emit("l2update", update, market);
return;
}

if (!price.toFixed) return;
let point = new Level2Point(price.toFixed(8), Math.abs(size).toFixed(8), count.toFixed(0));
let asks = [];
Expand Down Expand Up @@ -340,6 +455,24 @@ class BitfinexClient extends BasicClient {
const sequenceId = Number(msg[2]);
const timestampMs = msg[3];

if (msg[1] === "hb") {
bmancini55 marked this conversation as resolved.
Show resolved Hide resolved
// handle heartbeat by emitting empty update w/sequenceId.
// heartbeat msg: [ 169546, 'hb', 17, 1610921150321 ]
// NOTE: for order book updates we don't check if enableEmptyHeartbeatEvents === true, because
// an empty l3 update is 100% backward compatible so no harm done in emitting it
let result = new Level3Update({
exchange: "Bitfinex",
base: market.base,
quote: market.quote,
sequenceId,
timestampMs,
asks: [],
bids: [],
});
this.emit("l3update", result, market);
return;
}

let point = new Level3Point(orderId.toFixed(), price.toFixed(8), Math.abs(size).toFixed(8));
if (size > 0) bids.push(point);
else asks.push(point);
Expand Down