This repository has been archived by the owner on Sep 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 185
Bitfinex handle heartbeat sequenceId #257
Merged
bmancini55
merged 19 commits into
altangent:master
from
evan-coygo:fix/bitfinex-heartbeat-sequenceId
Oct 8, 2021
Merged
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
6393449
bitfinex emite empty l2update or l3update on heartbeat event for sequ…
ejfrancis b854201
bitfinex added enableEmptyHeartbeatEvents to allow empty trade/ticker…
ejfrancis 561f42b
fix trade heartbeat
ejfrancis 396b9e8
format
ejfrancis 44c001b
format and better jsdoc for enableEmptyHearatbeatEvents
ejfrancis 3e76b01
use timestamp from messages instead of Date.now(), handle trade strea…
ejfrancis 1fb6ed0
fix typo
ejfrancis 1935113
bitfinex fix incorrect sequenceId on trades, added constructor param …
ejfrancis 40c85dc
allow 'all' for bitfinex tradeMessageType
ejfrancis 696a361
move heartbeat handling to their own methods
ejfrancis 2e06704
merge master
ejfrancis 91d1604
ported changes to new BitfinexClient.ts to support enableEmptyHeartbe…
ejfrancis 4c58155
testrunner comment
ejfrancis 91c26b3
format fix
ejfrancis 476190e
lint fix
ejfrancis 17070a5
use same spec in BitfinexClient tests
ejfrancis f51756d
bitfinex: run test suite with custom options
bmancini55 e523355
bitfinex: add BitfinexTradeMessageType enum
bmancini55 0f85fab
bitfinex: process unsubscribe event
bmancini55 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,20 @@ const Level3Snapshot = require("../level3-snapshot"); | |
const Level3Update = require("../level3-update"); | ||
|
||
class BitfinexClient extends BasicClient { | ||
constructor({ wssPath = "wss://api.bitfinex.com/ws/2", watcherMs, l2UpdateDepth = 250 } = {}) { | ||
/** | ||
* | ||
* @param {Object} params | ||
* @param {Boolean} [params.enableEmptyHeartbeatEvents] (optional, default false). if true, emits empty events for all channels on heartbeat events which includes the sequenceId. | ||
* @param {String} [params.tradeMessageType] (optional, defaults to "tu"). one of "tu", "te", or "all". determines whether to use trade channel events of type "te" or "tu", or all trade events. see https://blog.bitfinex.com/api/websocket-api-update/. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. new constructor param |
||
* if you're using sequenceIds to validate websocket messages you may want to use "all" to receive every sequenceId | ||
*/ | ||
constructor({ | ||
wssPath = "wss://api.bitfinex.com/ws/2", | ||
watcherMs, | ||
l2UpdateDepth = 250, | ||
enableEmptyHeartbeatEvents = false, | ||
tradeMessageType = "tu", | ||
} = {}) { | ||
super(wssPath, "Bitfinex", undefined, watcherMs); | ||
this._channels = {}; | ||
|
||
|
@@ -18,6 +31,8 @@ class BitfinexClient extends BasicClient { | |
this.hasLevel2Updates = true; | ||
this.hasLevel3Updates = true; | ||
this.l2UpdateDepth = l2UpdateDepth; | ||
this.enableEmptyHeartbeatEvents = enableEmptyHeartbeatEvents; | ||
this.tradeMessageType = tradeMessageType; // "te", "tu", or "all" | ||
} | ||
|
||
_onConnected() { | ||
|
@@ -129,7 +144,6 @@ class BitfinexClient extends BasicClient { | |
|
||
_onMessage(raw) { | ||
let msg = JSON.parse(raw); | ||
|
||
bmancini55 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// capture channel metadata | ||
if (msg.event === "subscribed") { | ||
this._channels[msg.chanId] = msg; | ||
|
@@ -140,9 +154,6 @@ class BitfinexClient extends BasicClient { | |
let channel = this._channels[msg[0]]; | ||
if (!channel) return; | ||
|
||
// ignore heartbeats | ||
if (msg[1] === "hb") return; | ||
|
||
if (channel.channel === "ticker") { | ||
let market = this._tickerSubs.get(channel.pair); | ||
if (!market) return; | ||
|
@@ -152,7 +163,7 @@ class BitfinexClient extends BasicClient { | |
} | ||
|
||
// trades | ||
if (channel.channel === "trades" && msg[1] === "tu") { | ||
if (channel.channel === "trades") { | ||
let market = this._tradeSubs.get(channel.pair); | ||
if (!market) return; | ||
|
||
|
@@ -181,15 +192,31 @@ class BitfinexClient extends BasicClient { | |
} | ||
|
||
_onTicker(msg, market) { | ||
let msgBody = msg[1]; | ||
const sequenceId = Number(msg[2]); | ||
const timestampMs = msg[3]; | ||
|
||
if (msg[1] === "hb") { | ||
bmancini55 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (this.enableEmptyHeartbeatEvents === false) return; | ||
// handle heartbeat by emitting empty update w/sequenceId. | ||
// heartbeat msg: [ 198655, 'hb', 3, 1610920929093 ] | ||
let ticker = new Ticker({ | ||
exchange: "Bitfinex", | ||
base: market.base, | ||
quote: market.quote, | ||
timestamp: timestampMs, | ||
sequenceId, | ||
}); | ||
this.emit("ticker", ticker, market); | ||
return; | ||
} | ||
let msgBody = msg[1]; | ||
let [bid, bidSize, ask, askSize, change, changePercent, last, volume, high, low] = msgBody; | ||
let open = last + change; | ||
let ticker = new Ticker({ | ||
exchange: "Bitfinex", | ||
base: market.base, | ||
quote: market.quote, | ||
timestamp: Date.now(), | ||
timestamp: timestampMs, | ||
sequenceId, | ||
last: last.toFixed(8), | ||
open: open.toFixed(8), | ||
|
@@ -207,9 +234,79 @@ class BitfinexClient extends BasicClient { | |
} | ||
|
||
_onTradeMessage(msg, market) { | ||
// example msg: [ 359491, 'tu', [ 560287312, 1609712228656, 0.005, 33432 ], 6 ] | ||
let [id, unix, amount, price] = msg[2]; | ||
const timestampMs = msg[3]; | ||
if (msg[1] === "hb") { | ||
bmancini55 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const sequenceId = Number(msg[2]); | ||
if (this.enableEmptyHeartbeatEvents === false) return; | ||
// handle heartbeat by emitting empty update w/sequenceId. | ||
// example trade heartbeat msg: [ 198655, 'hb', 3, 1610920929093 ] | ||
let trade = new Trade({ | ||
exchange: "Bitfinex", | ||
base: market.base, | ||
quote: market.quote, | ||
timestamp: timestampMs, | ||
sequenceId, | ||
}); | ||
this.emit("trade", trade, market); | ||
return; | ||
} | ||
if (Array.isArray(msg[1])) { | ||
bmancini55 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const sequenceId = Number(msg[2]); | ||
// trade snapshot example msg: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this wasn't handling the initial trades snapshot previously, so I fixed that as well |
||
/* | ||
[ | ||
CHANNEL_ID, | ||
[ | ||
[ | ||
ID, | ||
MTS, | ||
AMOUNT, | ||
PRICE | ||
], | ||
... | ||
], | ||
sequenceId, | ||
timestampMs | ||
] | ||
*/ | ||
msg[1].forEach(thisTrade => { | ||
bmancini55 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let [id, unix, amount, price] = thisTrade; | ||
|
||
let side = amount > 0 ? "buy" : "sell"; | ||
price = price.toFixed(8); | ||
amount = Math.abs(amount).toFixed(8); | ||
let trade = new Trade({ | ||
exchange: "Bitfinex", | ||
base: market.base, | ||
quote: market.quote, | ||
tradeId: id.toFixed(), | ||
sequenceId, | ||
unix: unix, | ||
side, | ||
price, | ||
amount, | ||
}); | ||
this.emit("trade", trade, market); | ||
}); | ||
return; | ||
} | ||
// example trade update msg: [ 359491, 'tu' or 'te', [ 560287312, 1609712228656, 0.005, 33432 ], 6 ] | ||
// note: "tu" means it's got the tradeId, this is delayed by 1-2 seconds and includes tradeId. | ||
// "te" is the same but available immediately and without the tradeId | ||
let shouldHandleTradeEvent = false; | ||
const tradeEventType = msg[1]; | ||
if (this.tradeMessageType === "all") { | ||
bmancini55 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
shouldHandleTradeEvent = true; | ||
} else if (this.tradeMessageType === "te" && tradeEventType === "te") { | ||
shouldHandleTradeEvent = true; | ||
} else if (this.tradeMessageType === "tu" && tradeEventType === "tu") { | ||
shouldHandleTradeEvent = true; | ||
} | ||
if (!shouldHandleTradeEvent) { | ||
return; | ||
} | ||
const sequenceId = Number(msg[3]); | ||
let [id, unix, amount, price] = msg[2]; | ||
|
||
let side = amount > 0 ? "buy" : "sell"; | ||
price = price.toFixed(8); | ||
|
@@ -270,6 +367,24 @@ class BitfinexClient extends BasicClient { | |
const sequenceId = Number(msg[2]); | ||
const timestampMs = msg[3]; | ||
|
||
if (msg[1] === "hb") { | ||
bmancini55 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// handle heartbeat by emitting empty update w/sequenceId. | ||
// heartbeat msg: [ 169546, 'hb', 17, 1610921150321 ] | ||
// NOTE: for order book updates we don't check if enableEmptyHeartbeatEvents === true, because | ||
// an empty l2 update is 100% backward compatible so no harm done in emitting it | ||
let update = new Level2Update({ | ||
exchange: "Bitfinex", | ||
base: market.base, | ||
quote: market.quote, | ||
sequenceId, | ||
timestampMs, | ||
asks: [], | ||
bids: [], | ||
}); | ||
this.emit("l2update", update, market); | ||
return; | ||
} | ||
|
||
if (!price.toFixed) return; | ||
let point = new Level2Point(price.toFixed(8), Math.abs(size).toFixed(8), count.toFixed(0)); | ||
let asks = []; | ||
|
@@ -340,6 +455,24 @@ class BitfinexClient extends BasicClient { | |
const sequenceId = Number(msg[2]); | ||
const timestampMs = msg[3]; | ||
|
||
if (msg[1] === "hb") { | ||
bmancini55 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// handle heartbeat by emitting empty update w/sequenceId. | ||
// heartbeat msg: [ 169546, 'hb', 17, 1610921150321 ] | ||
// NOTE: for order book updates we don't check if enableEmptyHeartbeatEvents === true, because | ||
// an empty l3 update is 100% backward compatible so no harm done in emitting it | ||
let result = new Level3Update({ | ||
exchange: "Bitfinex", | ||
base: market.base, | ||
quote: market.quote, | ||
sequenceId, | ||
timestampMs, | ||
asks: [], | ||
bids: [], | ||
}); | ||
this.emit("l3update", result, market); | ||
return; | ||
} | ||
|
||
let point = new Level3Point(orderId.toFixed(), price.toFixed(8), Math.abs(size).toFixed(8)); | ||
if (size > 0) bids.push(point); | ||
else asks.push(point); | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new constructor param
enableEmptyHeartbeatEvents
, to enable emptyTicker
andTrade
events w/thesequenceId
when these channels receive a heartbeat