diff --git a/examples/examples/binance_candle.rs b/examples/examples/binance_candle.rs index 390e878..4f27500 100644 --- a/examples/examples/binance_candle.rs +++ b/examples/examples/binance_candle.rs @@ -17,16 +17,19 @@ async fn main() -> anyhow::Result<()> { let endpoint = match endpoint.as_str() { "binance-u" => Binance::usd_margin_futures(), "binance-s" => Binance::spot(), + "binance-e" => Binance::european_options(), _ => anyhow::bail!("unsupported"), }; + let inst = std::env::var("INST").unwrap_or_else(|_| String::from("btcusdt")); + let mut binance = endpoint .connect_exc() .into_rate_limited(200, Duration::from_secs(60)) .into_fetch_candles_forward(1000); let mut stream = binance .fetch_candles_range( - "btcusdt", + &inst, Period::minutes(UtcOffset::UTC, 1), datetime!(2020-06-27 00:00:00 +08:00).., ) diff --git a/examples/examples/binance_fetch_instruments.rs b/examples/examples/binance_fetch_instruments.rs index 708f886..8c03ba8 100644 --- a/examples/examples/binance_fetch_instruments.rs +++ b/examples/examples/binance_fetch_instruments.rs @@ -16,6 +16,7 @@ async fn main() -> anyhow::Result<()> { let endpoint = match endpoint.as_str() { "binance-u" => Binance::usd_margin_futures(), "binance-s" => Binance::spot(), + "binance-e" => Binance::european_options(), _ => anyhow::bail!("unsupported"), }; diff --git a/examples/examples/binance_orders.rs b/examples/examples/binance_orders.rs index e3d4a58..a6df906 100644 --- a/examples/examples/binance_orders.rs +++ b/examples/examples/binance_orders.rs @@ -30,6 +30,7 @@ async fn main() -> anyhow::Result<()> { let mut endpoint = match endpoint.as_str() { "binance-u" => Binance::usd_margin_futures(), "binance-s" => Binance::spot(), + "binance-e" => Binance::european_options(), _ => anyhow::bail!("unsupported"), }; diff --git a/examples/examples/binance_ticker.rs b/examples/examples/binance_ticker.rs index cc4760a..fceacfb 100644 --- a/examples/examples/binance_ticker.rs +++ b/examples/examples/binance_ticker.rs @@ -23,6 +23,7 @@ async fn main() -> anyhow::Result<()> { let mut endpoint = match endpoint.as_str() { "binance-u" => Binance::usd_margin_futures(), "binance-s" => Binance::spot(), + "binance-e" => Binance::european_options(), _ => anyhow::bail!("unsupported"), }; diff --git a/examples/examples/exc_trading.rs b/examples/examples/exc_trading.rs index e533bd4..c85695a 100644 --- a/examples/examples/exc_trading.rs +++ b/examples/examples/exc_trading.rs @@ -31,6 +31,7 @@ impl From for exc_binance::MarginOp { enum Exchange { BinanceU, BinanceS, + BinanceE, Okx, } @@ -226,6 +227,11 @@ async fn main() -> anyhow::Result<()> { let exc = Binance::usd_margin_futures().private(key).connect_exc(); env.execute(exc, inst, execs).await?; } + Exchange::BinanceE => { + let key = serde_json::from_str(&args.key)?; + let exc = Binance::european_options().private(key).connect_exc(); + env.execute(exc, inst, execs).await?; + } Exchange::BinanceS => { let key = serde_json::from_str(&args.key)?; let options = match (args.buy_margin, args.sell_margin) { diff --git a/exc-binance/examples/binance.rs b/exc-binance/examples/binance.rs index 9a3b710..ab481ac 100644 --- a/exc-binance/examples/binance.rs +++ b/exc-binance/examples/binance.rs @@ -24,7 +24,7 @@ async fn main() -> anyhow::Result<()> { .connect(); api.ready().await?; let mut stream = api - .call(Request::Ws(WsRequest::subscribe(Name::agg_trade( + .call(Request::Ws(WsRequest::subscribe_stream(Name::agg_trade( "btcusdt", )))) .await? @@ -59,7 +59,7 @@ async fn main() -> anyhow::Result<()> { count += 1; api.ready().await?; match api - .call(Request::Ws(WsRequest::subscribe(Name::agg_trade( + .call(Request::Ws(WsRequest::subscribe_stream(Name::agg_trade( "btcusdt", )))) .await diff --git a/exc-binance/examples/binance_options.rs b/exc-binance/examples/binance_options.rs new file mode 100644 index 0000000..0eaed61 --- /dev/null +++ b/exc-binance/examples/binance_options.rs @@ -0,0 +1,44 @@ +use std::time::Duration; + +use exc_binance::{ + websocket::{protocol::frame::Name, request::WsRequest}, + Binance, Request, +}; +use futures::StreamExt; +use tower::{Service, ServiceExt}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::fmt() + .with_writer(std::io::stderr) + .with_env_filter(tracing_subscriber::EnvFilter::new( + std::env::var("RUST_LOG").unwrap_or_else(|_| "error,binance_options=debug".into()), + )) + .init(); + let channel = std::env::var("CHANNEL").unwrap_or("index".to_string()); + let inst = std::env::var("INST")?; + let mut api = Binance::european_options() + .ws_keep_alive_timeout(Duration::from_secs(30)) + .connect(); + api.ready().await?; + let mut stream = api + .call(Request::Ws(WsRequest::subscribe_stream( + Name::new(&channel).with_inst(&inst), + ))) + .await? + .into_stream::() + .unwrap() + .boxed(); + while let Some(data) = stream.next().await { + match data { + Ok(data) => { + tracing::info!("data={data:#?}"); + } + Err(err) => { + tracing::error!("error={err}"); + break; + } + } + } + Ok(()) +} diff --git a/exc-binance/examples/binance_trading_directly.rs b/exc-binance/examples/binance_trading_directly.rs index 845abe2..c0d3f91 100644 --- a/exc-binance/examples/binance_trading_directly.rs +++ b/exc-binance/examples/binance_trading_directly.rs @@ -62,6 +62,7 @@ async fn main() -> anyhow::Result<()> { symbol: symbol.to_string(), order_id: Some(id), orig_client_order_id: None, + client_order_id: None, }, })) .await? @@ -74,6 +75,7 @@ async fn main() -> anyhow::Result<()> { symbol: symbol.to_string(), order_id: Some(id), orig_client_order_id: None, + client_order_id: None, }, })) .await? diff --git a/exc-binance/examples/binance_ws_api.rs b/exc-binance/examples/binance_ws_api.rs index 7eb3415..d7894c1 100644 --- a/exc-binance/examples/binance_ws_api.rs +++ b/exc-binance/examples/binance_ws_api.rs @@ -22,7 +22,7 @@ async fn main() -> anyhow::Result<()> { .connect(); ServiceExt::::ready(&mut api).await?; let mut stream = api - .call(WsRequest::subscribe(Name::agg_trade("btcusdt"))) + .call(WsRequest::subscribe_stream(Name::agg_trade("btcusdt"))) .await? .into_stream::() .unwrap() @@ -49,7 +49,7 @@ async fn main() -> anyhow::Result<()> { count += 1; ServiceExt::::ready(&mut api).await?; match api - .call(WsRequest::subscribe(Name::agg_trade("btcusdt"))) + .call(WsRequest::subscribe_stream(Name::agg_trade("btcusdt"))) .await { Ok(resp) => { diff --git a/exc-binance/src/endpoint.rs b/exc-binance/src/endpoint.rs index 1d0ca7b..aac46f3 100644 --- a/exc-binance/src/endpoint.rs +++ b/exc-binance/src/endpoint.rs @@ -45,6 +45,16 @@ impl Endpoint { } } + /// Endpoint for European options. + pub fn european_options() -> Self { + Self { + key: None, + http: (RestEndpoint::EuropeanOptions, HttpEndpoint::default()), + ws: BinanceWebsocketApi::european_options(), + buffer: CAP, + } + } + /// Set websocket keep-alive timeout. pub fn ws_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { self.ws.keep_alive_timeout(timeout); diff --git a/exc-binance/src/http/error.rs b/exc-binance/src/http/error.rs index 00212c1..7a52b22 100644 --- a/exc-binance/src/http/error.rs +++ b/exc-binance/src/http/error.rs @@ -55,6 +55,12 @@ pub enum RestError { /// Missing date for futures. #[error("missing date for futures")] MissingDateForFutures, + /// Invalid date for options. + #[error("invalid date for options")] + InvalidDateForOptions, + /// Missing base asset for options. + #[error("missing base asset for options")] + MissingBaseAssetForOptions, /// Unknown contract type. #[error("unknown contract type: {0:?}")] UnknownContractType(String), diff --git a/exc-binance/src/http/request/account.rs b/exc-binance/src/http/request/account.rs index 8c5b48a..52f5ddf 100644 --- a/exc-binance/src/http/request/account.rs +++ b/exc-binance/src/http/request/account.rs @@ -29,9 +29,11 @@ impl Rest for ListSubAccounts { fn to_path(&self, endpoint: &RestEndpoint) -> Result { match endpoint { - RestEndpoint::UsdMarginFutures => Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( - "`ListSubAccounts` only available on `binance-s`" - ))), + RestEndpoint::UsdMarginFutures | RestEndpoint::EuropeanOptions => { + Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( + "`ListSubAccounts` only available on `binance-s`" + ))) + } RestEndpoint::Spot(_options) => Ok("/sapi/v1/sub-account/list".to_string()), } } @@ -68,9 +70,11 @@ impl Rest for GetSubAccountAssets { fn to_path(&self, endpoint: &RestEndpoint) -> Result { match endpoint { - RestEndpoint::UsdMarginFutures => Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( - "`GetSubAccountAssets` only available on `binance-s`" - ))), + RestEndpoint::UsdMarginFutures | RestEndpoint::EuropeanOptions => { + Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( + "`GetSubAccountAssets` only available on `binance-s`" + ))) + } RestEndpoint::Spot(_options) => Ok("/sapi/v3/sub-account/assets".to_string()), } } @@ -107,9 +111,11 @@ impl Rest for GetSubAccountMargin { fn to_path(&self, endpoint: &RestEndpoint) -> Result { match endpoint { - RestEndpoint::UsdMarginFutures => Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( - "`GetSubAccountMargin` only available on `binance-s`" - ))), + RestEndpoint::UsdMarginFutures | RestEndpoint::EuropeanOptions => { + Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( + "`GetSubAccountMargin` only available on `binance-s`" + ))) + } RestEndpoint::Spot(_options) => Ok("/sapi/v1/sub-account/margin/account".to_string()), } } @@ -165,9 +171,11 @@ impl Rest for GetSubAccountFutures { fn to_path(&self, endpoint: &RestEndpoint) -> Result { match endpoint { - RestEndpoint::UsdMarginFutures => Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( - "`GetSubAccountFutures` only available on `binance-s`" - ))), + RestEndpoint::UsdMarginFutures | RestEndpoint::EuropeanOptions => { + Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( + "`GetSubAccountFutures` only available on `binance-s`" + ))) + } RestEndpoint::Spot(_options) => Ok("/sapi/v2/sub-account/futures/account".to_string()), } } @@ -223,9 +231,11 @@ impl Rest for GetSubAccountFuturesPositions { fn to_path(&self, endpoint: &RestEndpoint) -> Result { match endpoint { - RestEndpoint::UsdMarginFutures => Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( - "`GetSubAccountFuturesPositions` only available on `binance-s`" - ))), + RestEndpoint::UsdMarginFutures | RestEndpoint::EuropeanOptions => { + Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( + "`GetSubAccountFuturesPositions` only available on `binance-s`" + ))) + } RestEndpoint::Spot(_options) => { Ok("/sapi/v2/sub-account/futures/positionRisk".to_string()) } diff --git a/exc-binance/src/http/request/candle.rs b/exc-binance/src/http/request/candle.rs index 4ddae83..43b787a 100644 --- a/exc-binance/src/http/request/candle.rs +++ b/exc-binance/src/http/request/candle.rs @@ -81,6 +81,7 @@ impl Rest for QueryCandles { match endpoint { RestEndpoint::UsdMarginFutures => Ok(format!("/fapi/v1/klines?{qs}")), RestEndpoint::Spot(_) => Ok(format!("/api/v3/klines?{qs}")), + RestEndpoint::EuropeanOptions => Ok(format!("/eapi/v1/klines?{qs}")), } } diff --git a/exc-binance/src/http/request/instrument.rs b/exc-binance/src/http/request/instrument.rs index 22b8592..2c6844c 100644 --- a/exc-binance/src/http/request/instrument.rs +++ b/exc-binance/src/http/request/instrument.rs @@ -15,6 +15,7 @@ impl Rest for ExchangeInfo { match endpoint { RestEndpoint::UsdMarginFutures => Ok("/fapi/v1/exchangeInfo".to_string()), RestEndpoint::Spot(_) => Ok("/api/v3/exchangeInfo".to_string()), + RestEndpoint::EuropeanOptions => Ok("/eapi/v1/exchangeInfo".to_string()), } } diff --git a/exc-binance/src/http/request/listen_key.rs b/exc-binance/src/http/request/listen_key.rs index 5ea5736..f4448db 100644 --- a/exc-binance/src/http/request/listen_key.rs +++ b/exc-binance/src/http/request/listen_key.rs @@ -14,6 +14,7 @@ impl Rest for CurrentListenKey { fn to_path(&self, endpoint: &super::RestEndpoint) -> Result { match endpoint { RestEndpoint::UsdMarginFutures => Ok("/fapi/v1/listenKey".to_string()), + RestEndpoint::EuropeanOptions => Ok("/eapi/v1/listenKey".to_string()), RestEndpoint::Spot(options) => { if options.margin.is_some() { Ok("/sapi/v1/userDataStream".to_string()) @@ -50,6 +51,7 @@ impl Rest for DeleteListenKey { fn to_path(&self, endpoint: &super::RestEndpoint) -> Result { match endpoint { RestEndpoint::UsdMarginFutures => Ok("/fapi/v1/listenKey".to_string()), + RestEndpoint::EuropeanOptions => Ok("/eapi/v1/listenKey".to_string()), RestEndpoint::Spot(options) => { if options.margin.is_some() { Ok("/sapi/v1/userDataStream".to_string()) diff --git a/exc-binance/src/http/request/mod.rs b/exc-binance/src/http/request/mod.rs index 0379636..2d917d0 100644 --- a/exc-binance/src/http/request/mod.rs +++ b/exc-binance/src/http/request/mod.rs @@ -169,6 +169,8 @@ pub enum RestEndpoint { /// Spot. /// Set it to `true` to enable margin trading. Spot(SpotOptions), + /// European options. + EuropeanOptions, } impl fmt::Display for RestEndpoint { @@ -176,6 +178,7 @@ impl fmt::Display for RestEndpoint { match self { Self::UsdMarginFutures => write!(f, "binance-u"), Self::Spot(_) => write!(f, "binance-s"), + Self::EuropeanOptions => write!(f, "binance-e"), } } } @@ -186,6 +189,7 @@ impl RestEndpoint { match self { Self::UsdMarginFutures => "https://fapi.binance.com", Self::Spot(_) => "https://api.binance.com", + Self::EuropeanOptions => "https://eapi.binance.com", } } } @@ -235,11 +239,19 @@ impl RestRequest { } else { hyper::Body::from(serde_urlencoded::to_string(&value)?) }; - let mut request = Request::builder() - .method(self.payload.method(endpoint)?) - .uri(uri) - .header("content-type", "application/x-www-form-urlencoded") - .body(body)?; + let method = self.payload.method(endpoint)?; + + let mut builder = Request::builder().method(method.clone()).uri(uri); + + // FIXME: This is required by binance european options for now. + if !matches!( + (endpoint, method), + (RestEndpoint::EuropeanOptions, Method::GET) + ) { + builder = builder.header("content-type", "application/x-www-form-urlencoded"); + } + + let mut request = builder.body(body)?; let headers = request.headers_mut(); if let Some(key) = key { if self.payload.need_apikey() { diff --git a/exc-binance/src/http/request/trading/european_options.rs b/exc-binance/src/http/request/trading/european_options.rs new file mode 100644 index 0000000..d4e8215 --- /dev/null +++ b/exc-binance/src/http/request/trading/european_options.rs @@ -0,0 +1,135 @@ +use exc_core::types; +use rust_decimal::Decimal; +use serde::Serialize; + +use crate::{ + http::{ + error::RestError, + request::{Payload, Rest, RestEndpoint}, + }, + types::trading::{OrderSide, TimeInForce}, +}; + +use super::RespType; + +/// Supported order types for european options. +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum OrderType { + /// Limit. + Limit, + /// Market. + Market, +} + +/// Place order. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PlaceOrder { + /// Symbol. + pub symbol: String, + /// Side. + pub side: OrderSide, + /// Order type. + #[serde(rename = "type")] + pub order_type: OrderType, + /// Reduce only. + #[serde(skip_serializing_if = "Option::is_none")] + pub reduce_only: Option, + /// Post only. + #[serde(skip_serializing_if = "Option::is_none")] + pub post_only: Option, + /// Quantity. + pub quantity: Decimal, + /// Price. + #[serde(skip_serializing_if = "Option::is_none")] + pub price: Option, + /// Client id. + #[serde(skip_serializing_if = "Option::is_none")] + pub client_order_id: Option, + /// Time-In-Force. + #[serde(skip_serializing_if = "Option::is_none")] + pub time_in_force: Option, + /// New order response type. + #[serde(skip_serializing_if = "Option::is_none")] + pub new_order_resp_type: Option, + /// Is the order a MMP order. + #[serde(skip_serializing_if = "Option::is_none")] + pub is_mmp: Option, +} + +impl<'a> TryFrom<&'a types::PlaceOrder> for PlaceOrder { + type Error = RestError; + + fn try_from(req: &'a exc_core::types::PlaceOrder) -> Result { + let place = req.place; + let side = if place.size.is_zero() { + return Err(RestError::PlaceZeroSize); + } else if place.size.is_sign_positive() { + OrderSide::Buy + } else { + OrderSide::Sell + }; + let (order_type, price, tif, post_only) = match place.kind { + types::OrderKind::Market => (OrderType::Market, None, None, None), + types::OrderKind::Limit(price, tif) => { + let tif = match tif { + types::TimeInForce::GoodTilCancelled => Some(TimeInForce::Gtc), + types::TimeInForce::FillOrKill => Some(TimeInForce::Fok), + types::TimeInForce::ImmediateOrCancel => Some(TimeInForce::Ioc), + }; + (OrderType::Limit, Some(price), tif, None) + } + types::OrderKind::PostOnly(price) => ( + OrderType::Limit, + Some(price), + Some(TimeInForce::Gtc), + Some(true), + ), + }; + Ok(Self { + symbol: req.opts.instrument().to_uppercase(), + side, + order_type, + reduce_only: None, + quantity: place.size.abs(), + price, + client_order_id: req.opts.client_id().map(|s| s.to_string()), + time_in_force: tif, + new_order_resp_type: None, + post_only, + is_mmp: None, + }) + } +} + +impl Rest for PlaceOrder { + fn method(&self, _endpoint: &RestEndpoint) -> Result { + Ok(http::Method::POST) + } + + fn to_path(&self, endpoint: &RestEndpoint) -> Result { + match endpoint { + RestEndpoint::EuropeanOptions => Ok("/eapi/v1/order".to_string()), + _ => Err(RestError::UnsupportedEndpoint(anyhow::anyhow!( + "only support european options" + ))), + } + } + + fn need_apikey(&self) -> bool { + true + } + + fn need_sign(&self) -> bool { + true + } + + fn serialize(&self, _endpoint: &RestEndpoint) -> Result { + Ok(serde_json::to_value(self)?) + } + + fn to_payload(&self) -> Payload { + Payload::new(self.clone()) + } +} diff --git a/exc-binance/src/http/request/trading/mod.rs b/exc-binance/src/http/request/trading/mod.rs index f05d857..56d3a5d 100644 --- a/exc-binance/src/http/request/trading/mod.rs +++ b/exc-binance/src/http/request/trading/mod.rs @@ -9,6 +9,9 @@ pub mod usd_margin_futures; /// Spot. pub mod spot; +/// European options. +pub mod european_options; + /// Place order. #[derive(Debug, Clone)] pub struct PlaceOrder { @@ -21,6 +24,9 @@ impl PlaceOrder { RestEndpoint::UsdMarginFutures => Ok(PlaceOrderKind::UsdMarginFutures( usd_margin_futures::PlaceOrder::try_from(&self.inner)?, )), + RestEndpoint::EuropeanOptions => Ok(PlaceOrderKind::EuropeanOptions( + european_options::PlaceOrder::try_from(&self.inner)?, + )), RestEndpoint::Spot(options) => { let mut req = spot::PlaceOrder::try_from(&self.inner)?; if let Some(margin) = options.margin.as_ref() { @@ -47,6 +53,16 @@ impl PlaceOrder { } } +/// Response type. +#[derive(Debug, Clone, Copy, Serialize)] +#[serde(rename_all = "UPPERCASE")] +pub enum RespType { + /// Ack. + Ack, + /// Result. + Result, +} + /// Place order kind. #[derive(Debug, Clone, Serialize)] #[serde(untagged)] @@ -55,6 +71,8 @@ pub enum PlaceOrderKind { UsdMarginFutures(usd_margin_futures::PlaceOrder), /// Spot. Spot(spot::PlaceOrder), + /// European options. + EuropeanOptions(european_options::PlaceOrder), } impl Rest for PlaceOrder { @@ -65,6 +83,7 @@ impl Rest for PlaceOrder { fn to_path(&self, endpoint: &RestEndpoint) -> Result { match endpoint { RestEndpoint::UsdMarginFutures => Ok("/fapi/v1/order".to_string()), + RestEndpoint::EuropeanOptions => Ok("/eapi/v1/order".to_string()), RestEndpoint::Spot(options) => { if options.margin.is_some() { Ok("/sapi/v1/margin/order".to_string()) @@ -104,6 +123,9 @@ pub struct GetOrderInner { /// Client Id. #[serde(skip_serializing_if = "Option::is_none")] pub orig_client_order_id: Option, + /// Client Id. + #[serde(skip_serializing_if = "Option::is_none")] + pub client_order_id: Option, } /// Cancel order. @@ -123,6 +145,7 @@ impl Rest for CancelOrder { fn to_path(&self, endpoint: &RestEndpoint) -> Result { match endpoint { RestEndpoint::UsdMarginFutures => Ok("/fapi/v1/order".to_string()), + RestEndpoint::EuropeanOptions => Ok("/eapi/v1/order".to_string()), RestEndpoint::Spot(options) => { if options.margin.is_some() { Ok("/sapi/v1/margin/order".to_string()) @@ -141,8 +164,8 @@ impl Rest for CancelOrder { true } - fn serialize(&self, _endpoint: &RestEndpoint) -> Result { - Ok(serde_json::to_value(self)?) + fn serialize(&self, endpoint: &RestEndpoint) -> Result { + Ok(serde_json::to_value(self.dispatch(endpoint)?)?) } fn to_payload(&self) -> super::Payload { @@ -150,6 +173,20 @@ impl Rest for CancelOrder { } } +impl CancelOrder { + fn dispatch(&self, endpoint: &RestEndpoint) -> Result { + match endpoint { + RestEndpoint::UsdMarginFutures => Ok(self.clone()), + RestEndpoint::EuropeanOptions => { + let mut req = self.clone(); + req.inner.client_order_id = req.inner.orig_client_order_id.take(); + Ok(req) + } + RestEndpoint::Spot(_) => Ok(self.clone()), + } + } +} + /// Get order. #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "camelCase")] @@ -159,6 +196,20 @@ pub struct GetOrder { pub inner: GetOrderInner, } +impl GetOrder { + fn dispatch(&self, endpoint: &RestEndpoint) -> Result { + match endpoint { + RestEndpoint::UsdMarginFutures => Ok(self.clone()), + RestEndpoint::EuropeanOptions => { + let mut req = self.clone(); + req.inner.client_order_id = req.inner.orig_client_order_id.take(); + Ok(req) + } + RestEndpoint::Spot(_) => Ok(self.clone()), + } + } +} + impl Rest for GetOrder { fn method(&self, _endpoint: &RestEndpoint) -> Result { Ok(http::Method::GET) @@ -167,6 +218,7 @@ impl Rest for GetOrder { fn to_path(&self, endpoint: &RestEndpoint) -> Result { match endpoint { RestEndpoint::UsdMarginFutures => Ok("/fapi/v1/order".to_string()), + RestEndpoint::EuropeanOptions => Ok("/eapi/v1/order".to_string()), RestEndpoint::Spot(options) => { if options.margin.is_some() { Ok("/sapi/v1/margin/order".to_string()) @@ -185,8 +237,8 @@ impl Rest for GetOrder { true } - fn serialize(&self, _endpoint: &RestEndpoint) -> Result { - Ok(serde_json::to_value(self)?) + fn serialize(&self, endpoint: &RestEndpoint) -> Result { + Ok(serde_json::to_value(self.dispatch(endpoint)?)?) } fn to_payload(&self) -> super::Payload { diff --git a/exc-binance/src/http/request/trading/usd_margin_futures.rs b/exc-binance/src/http/request/trading/usd_margin_futures.rs index 98ff55f..1b82204 100644 --- a/exc-binance/src/http/request/trading/usd_margin_futures.rs +++ b/exc-binance/src/http/request/trading/usd_margin_futures.rs @@ -10,15 +10,7 @@ use crate::{ types::trading::{OrderSide, OrderType, PositionSide, TimeInForce}, }; -/// Responsee type. -#[derive(Debug, Clone, Copy, Serialize)] -#[serde(rename_all = "UPPERCASE")] -pub enum RespType { - /// Ack. - Ack, - /// Result. - Result, -} +pub use super::RespType; /// Place order. #[derive(Debug, Clone, Serialize)] diff --git a/exc-binance/src/http/request/utils.rs b/exc-binance/src/http/request/utils.rs index 1017733..0b885b5 100644 --- a/exc-binance/src/http/request/utils.rs +++ b/exc-binance/src/http/request/utils.rs @@ -13,6 +13,7 @@ impl Rest for Ping { match endpoint { RestEndpoint::Spot(_) => Ok("/api/v3/ping".to_string()), RestEndpoint::UsdMarginFutures => Ok("/fapi/v1/ping".to_string()), + RestEndpoint::EuropeanOptions => Ok("/eapi/v1/ping".to_string()), } } diff --git a/exc-binance/src/http/response/candle.rs b/exc-binance/src/http/response/candle.rs index 4a79aa7..d9f3223 100644 --- a/exc-binance/src/http/response/candle.rs +++ b/exc-binance/src/http/response/candle.rs @@ -5,7 +5,7 @@ use crate::http::error::RestError; use super::Data; -/// Candle. +/// Candle in list form. #[derive(Debug, Deserialize)] pub struct Candle( /// Open time. @@ -34,6 +34,37 @@ pub struct Candle( serde_json::Value, ); +/// Options Candle. +#[allow(unused)] +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OptionsCandle { + /// Open. + pub(crate) open: Decimal, + /// High. + pub(crate) high: Decimal, + /// Low. + pub(crate) low: Decimal, + /// Close. + pub(crate) close: Decimal, + /// Volume. + pub(crate) volume: Decimal, + /// Amount. + pub(crate) amount: Decimal, + /// Interval. + pub(crate) interval: String, + /// Trade Count. + pub(crate) trade_count: usize, + /// Taker volume. + pub(crate) taker_volume: Decimal, + /// Taker amount. + pub(crate) taker_amount: Decimal, + /// Open time. + pub(crate) open_time: i64, + /// Close time. + pub(crate) close_time: i64, +} + impl TryFrom for Vec { type Error = RestError; @@ -46,3 +77,25 @@ impl TryFrom for Vec { } } } + +/// Candle Kind. +pub enum CandlesKind { + /// Candles. + Candles(Vec), + /// Options Candles. + OptionsCandles(Vec), +} + +impl TryFrom for CandlesKind { + type Error = RestError; + + fn try_from(value: Data) -> Result { + match value { + Data::Candles(c) => Ok(Self::Candles(c)), + Data::OptionsCandles(c) => Ok(Self::OptionsCandles(c)), + _ => Err(RestError::UnexpectedResponseType(anyhow::anyhow!( + "{value:?}" + ))), + } + } +} diff --git a/exc-binance/src/http/response/instrument.rs b/exc-binance/src/http/response/instrument.rs index f0b5c4e..f5e88b1 100644 --- a/exc-binance/src/http/response/instrument.rs +++ b/exc-binance/src/http/response/instrument.rs @@ -18,6 +18,8 @@ pub enum ExchangeInfo { UsdMarginFutures(UFExchangeInfo), /// Spot. Spot(SpotExchangeInfo), + /// European options. + EuropeanOptions(EuropeanExchangeInfo), } /// Usd-margin futures exchange info. @@ -43,6 +45,17 @@ pub struct SpotExchangeInfo { pub(crate) timezone: String, } +/// European options exchange info. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct EuropeanExchangeInfo { + pub(crate) option_contracts: Vec, + pub(crate) option_assets: Vec, + pub(crate) option_symbols: Vec, + pub(crate) rate_limits: Vec, + pub(crate) timezone: String, +} + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub(crate) struct RateLimit { @@ -253,6 +266,75 @@ pub(crate) enum SymbolFilter { }, } +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct OptionSymbol { + pub(crate) contract_id: i64, + pub(crate) expiry_date: i64, + pub(crate) filters: Vec, + pub(crate) id: i64, + pub(crate) symbol: String, + pub(crate) side: OptionSide, + pub(crate) strike_price: Decimal, + pub(crate) underlying: String, + pub(crate) unit: u32, + pub(crate) maker_fee_rate: Decimal, + pub(crate) taker_fee_rate: Decimal, + pub(crate) min_qty: Decimal, + pub(crate) max_qty: Decimal, + pub(crate) maintenance_margin: Decimal, + pub(crate) min_initial_margin: Decimal, + pub(crate) min_maintenance_margin: Decimal, + pub(crate) price_scale: u32, + pub(crate) quantity_scale: i64, + pub(crate) quote_asset: Asset, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub(crate) enum OptionSide { + Call, + Put, +} + +impl OptionSymbol { + fn base(&self) -> Option { + let (base, _) = self.symbol.split_once('-')?; + base.parse().ok() + } + + pub(crate) fn expire_ts(&self) -> Result { + // Expiry date is in milliseconds. + time::OffsetDateTime::from_unix_timestamp_nanos(self.expiry_date as i128 * 1_000_000) + .map_err(|_| RestError::InvalidDateForOptions) + } + + fn expiry_date(&self) -> Result { + let ts = self.expire_ts()?; + Ok(ts.date()) + } + + pub(crate) fn to_exc_symbol(&self) -> Result { + let base = self + .base() + .ok_or_else(|| RestError::MissingBaseAssetForOptions)?; + let quote = &self.quote_asset; + let date = self.expiry_date()?; + let price = self.strike_price.normalize(); + let symbol = match self.side { + OptionSide::Call => ExcSymbol::call(&base, quote, date, price), + OptionSide::Put => ExcSymbol::put(&base, quote, date, price), + } + .ok_or_else(|| RestError::InvalidDateForOptions)?; + Ok(symbol) + } + + pub(crate) fn is_live(&self) -> bool { + // FIXME: Check if the option is live by comparing the expiry date with the current date. + true + } +} + impl TryFrom for ExchangeInfo { type Error = RestError; diff --git a/exc-binance/src/http/response/mod.rs b/exc-binance/src/http/response/mod.rs index 9ac2335..e09daca 100644 --- a/exc-binance/src/http/response/mod.rs +++ b/exc-binance/src/http/response/mod.rs @@ -48,6 +48,8 @@ pub type Unknown = serde_json::Value; pub enum Data { /// Candles. Candles(Vec), + /// Options candles. + OptionsCandles(Vec), /// Exchange info. ExchangeInfo(ExchangeInfo), /// Listen key. diff --git a/exc-binance/src/http/response/trading.rs b/exc-binance/src/http/response/trading.rs index 874bf8c..67b9166 100644 --- a/exc-binance/src/http/response/trading.rs +++ b/exc-binance/src/http/response/trading.rs @@ -1,4 +1,4 @@ -use exc_core::{Asset, ExchangeError}; +use exc_core::{Asset, ExchangeError, Str}; use rust_decimal::Decimal; use serde::Deserialize; use serde_with::serde_as; @@ -16,6 +16,8 @@ use super::Data; pub enum Order { /// Usd-Margin Futures. UsdMarginFutures(UsdMarginFuturesOrder), + /// Options. + EuropeanOptions(OptionsOrder), /// Spot. Spot(SpotOrder), } @@ -26,6 +28,7 @@ impl Order { match self { Self::UsdMarginFutures(order) => order.order_id, Self::Spot(order) => order.ack.order_id, + Self::EuropeanOptions(order) => order.order_id, } } @@ -34,6 +37,7 @@ impl Order { match self { Self::UsdMarginFutures(order) => order.symbol.as_str(), Self::Spot(order) => order.ack.symbol.as_str(), + Self::EuropeanOptions(order) => order.symbol.as_str(), } } @@ -43,6 +47,11 @@ impl Order { match self { Self::UsdMarginFutures(order) => order.client_order_id.as_str(), Self::Spot(order) => order.ack.client_order_id(), + Self::EuropeanOptions(order) => order + .client_order_id + .as_ref() + .map(|s| s.as_str()) + .unwrap_or_default(), } } @@ -51,6 +60,7 @@ impl Order { match self { Self::UsdMarginFutures(order) => Some(order.update_time), Self::Spot(order) => order.ack.transact_time, + Self::EuropeanOptions(order) => order.state.as_ref().map(|s| s.update_time), } } } @@ -209,3 +219,59 @@ pub struct SpotFill { /// Trade id. pub trade_id: i64, } + +/// Options Order. +#[serde_as] +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OptionsOrder { + /// Order id. + pub(crate) order_id: i64, + /// Client id. + pub(crate) client_order_id: Option, + /// Symbol. + pub(crate) symbol: Str, + /// Price. + pub(crate) price: Decimal, + /// Size. + pub(crate) quantity: Decimal, + /// Side. + pub(crate) side: OrderSide, + /// Order type. + #[serde(rename = "type")] + pub(crate) order_type: OrderType, + /// Reduce only. + #[allow(unused)] + pub(crate) reduce_only: bool, + /// Post only. + pub(crate) post_only: bool, + /// Mmp. + #[allow(unused)] + pub(crate) mmp: bool, + /// State. + #[serde(flatten, default)] + pub(crate) state: Option, +} + +/// Options Order State. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct OptionsOrderState { + /// Create time. + #[allow(unused)] + pub(crate) create_time: i64, + /// Update time. + pub(crate) update_time: i64, + /// Filled size. + pub(crate) executed_qty: Decimal, + /// Average price. + pub(crate) avg_price: Decimal, + /// Quote asset. + pub(crate) quote_asset: Asset, + /// Fee. + pub(crate) fee: Decimal, + /// Time-In-Force. + pub(crate) time_in_force: TimeInForce, + /// Status. + pub(crate) status: Status, +} diff --git a/exc-binance/src/service.rs b/exc-binance/src/service.rs index e512397..0f06198 100644 --- a/exc-binance/src/service.rs +++ b/exc-binance/src/service.rs @@ -126,6 +126,11 @@ impl Binance { pub fn spot_with_options(options: SpotOptions) -> Endpoint { Endpoint::spot_with_options(options) } + + /// European options endpoint. + pub fn european_options() -> Endpoint { + Endpoint::european_options() + } } impl Service for Binance { diff --git a/exc-binance/src/types/adaptations/book.rs b/exc-binance/src/types/adaptations/book.rs index 2288a06..aeef4ab 100644 --- a/exc-binance/src/types/adaptations/book.rs +++ b/exc-binance/src/types/adaptations/book.rs @@ -1,28 +1,21 @@ use exc_core::{types, Adaptor, ExchangeError}; use futures::{StreamExt, TryStreamExt}; -use time::OffsetDateTime; -use crate::{types::Name, websocket::protocol::frame::book_ticker::BookTicker, Request}; +use crate::{ + websocket::{protocol::frame::DepthFrame, request::WsRequest}, + Request, +}; impl Adaptor for Request { fn from_request(req: types::SubscribeBidAsk) -> Result { - Ok(Self::subscribe(Name::book_ticker(&req.instrument))) + Ok(WsRequest::dispatch_bid_ask(req).into()) } fn into_response(resp: Self::Response) -> Result { - let stream = resp.into_stream::()?; + let stream = resp.into_stream::()?; Ok(stream .map_err(ExchangeError::from) - .and_then(|t| async move { - Ok(types::BidAsk { - ts: t - .trade_timestamp - .map(super::from_timestamp) - .unwrap_or_else(|| Ok(OffsetDateTime::now_utc()))?, - bid: Some((t.bid.normalize(), t.bid_size.normalize())), - ask: Some((t.ask.normalize(), t.ask_size.normalize())), - }) - }) + .and_then(|t| async move { t.try_into() }) .boxed()) } } diff --git a/exc-binance/src/types/adaptations/candle.rs b/exc-binance/src/types/adaptations/candle.rs index 19c832e..170cc93 100644 --- a/exc-binance/src/types/adaptations/candle.rs +++ b/exc-binance/src/types/adaptations/candle.rs @@ -2,6 +2,7 @@ use crate::{ http::{request, response}, Request, }; +use either::Either; use exc_core::{ types::{self, CandleStream}, Adaptor, ExchangeError, @@ -80,19 +81,33 @@ impl Adaptor for Request { } fn into_response(resp: Self::Response) -> Result { - let candles = resp - .into_response::()? - .into_iter() - .map(|c| { - Ok(types::Candle { - ts: super::from_timestamp(c.0)?, - open: c.1.normalize(), - high: c.2.normalize(), - low: c.3.normalize(), - close: c.4.normalize(), - volume: c.5.normalize(), - }) - }); + let candles = resp.into_response::()?; + let candles = match candles { + response::candle::CandlesKind::Candles(candles) => { + Either::Left(candles.into_iter().map(|c| { + Ok(types::Candle { + ts: super::from_timestamp(c.0)?, + open: c.1.normalize(), + high: c.2.normalize(), + low: c.3.normalize(), + close: c.4.normalize(), + volume: c.5.normalize(), + }) + })) + } + response::candle::CandlesKind::OptionsCandles(candles) => { + Either::Right(candles.into_iter().rev().map(|c| { + Ok(types::Candle { + ts: super::from_timestamp(c.open_time)?, + open: c.open.normalize(), + high: c.high.normalize(), + low: c.low.normalize(), + close: c.close.normalize(), + volume: c.volume.normalize(), + }) + })) + } + }; Ok(CandleStream::new_forward(futures::stream::iter(candles))) } } diff --git a/exc-binance/src/types/adaptations/instrument.rs b/exc-binance/src/types/adaptations/instrument.rs index 82e7d39..fa5f08b 100644 --- a/exc-binance/src/types/adaptations/instrument.rs +++ b/exc-binance/src/types/adaptations/instrument.rs @@ -126,6 +126,28 @@ impl Adaptor for Request { })) .boxed()) } + response::ExchangeInfo::EuropeanOptions(info) => { + Ok(stream::iter(info.option_symbols.into_iter().filter_map(|symbol| { + let mut size_tick = None; + for filter in symbol.filters.iter() { + if let Filter::Symbol(SymbolFilter::LotSize { step_size, .. }) = filter { + size_tick = Some(step_size.normalize()); + } + } + let Some(size_tick) = size_tick else { + tracing::warn!("missing size tick for {}", symbol.symbol); + return None; + }; + let attrs = Attributes { reversed: false, unit: Decimal::from(symbol.unit), price_tick: Decimal::new(1, symbol.price_scale), size_tick, min_size: symbol.min_qty, min_value: Decimal::ZERO }; + let expire = symbol.expire_ts().map_err(|err| { + tracing::warn!(%err, "cannot parse expire for {}", symbol.symbol); + }).ok()?; + let meta = InstrumentMeta::new(&symbol.symbol, symbol.to_exc_symbol().map_err(|err| { + tracing::warn!(%err, "cannot build exc symbol from {}", symbol.symbol); + }).ok()?, attrs).with_live(symbol.is_live()).with_expire(expire); + Some(Ok(meta)) + })).boxed()) + } } } } diff --git a/exc-binance/src/types/adaptations/trade.rs b/exc-binance/src/types/adaptations/trade.rs index af815be..b13a63b 100644 --- a/exc-binance/src/types/adaptations/trade.rs +++ b/exc-binance/src/types/adaptations/trade.rs @@ -1,25 +1,21 @@ use exc_core::{types, Adaptor, ExchangeError}; use futures::{StreamExt, TryStreamExt}; -use crate::{types::Name, websocket::protocol::frame::agg_trade::AggTrade, Request}; +use crate::{ + websocket::{protocol::frame::TradeFrame, request::WsRequest}, + Request, +}; impl Adaptor for Request { fn from_request(req: types::SubscribeTrades) -> Result { - Ok(Self::subscribe(Name::agg_trade(&req.instrument))) + Ok(WsRequest::dispatch_trades(req).into()) } fn into_response(resp: Self::Response) -> Result { - let stream = resp.into_stream::()?; + let stream = resp.into_stream::()?; Ok(stream .map_err(ExchangeError::from) - .and_then(|trade| async move { - Ok(types::Trade { - ts: super::from_timestamp(trade.trade_timestamp)?, - price: trade.price.normalize(), - size: trade.size.normalize(), - buy: !trade.buy_maker, - }) - }) + .and_then(|trade| async move { trade.try_into() }) .boxed()) } } diff --git a/exc-binance/src/types/adaptations/trading.rs b/exc-binance/src/types/adaptations/trading.rs index f7dbe1a..6ca43e2 100644 --- a/exc-binance/src/types/adaptations/trading.rs +++ b/exc-binance/src/types/adaptations/trading.rs @@ -1,7 +1,6 @@ -use std::collections::HashMap; +use std::{collections::HashMap, ops::Neg}; -use either::Either; -use exc_core::{types, Adaptor, ExchangeError}; +use exc_core::{types, Adaptor, ExchangeError, Str}; use futures::{FutureExt, StreamExt, TryStreamExt}; use rust_decimal::Decimal; use time::OffsetDateTime; @@ -15,197 +14,20 @@ use crate::{ trading::{self, OrderSide, Status, TimeInForce}, Name, }, - websocket::protocol::frame::account::{ExecutionReport, OrderType, OrderUpdate}, + websocket::protocol::frame::account::OrderUpdateFrame, Request, }; -type OrderUpdateKind = Either; - impl Adaptor for Request { fn from_request(req: types::SubscribeOrders) -> Result { Ok(Self::subscribe(Name::order_trade_update(&req.instrument))) } fn into_response(resp: Self::Response) -> Result { - let stream = resp.into_stream::()?; + let stream = resp.into_stream::()?; Ok(stream .map_err(ExchangeError::from) - .and_then(|update| async move { - match update { - Either::Left(update) => { - let kind = match update.order_type { - OrderType::Limit => match update.time_in_force { - TimeInForce::Gtc => types::OrderKind::Limit( - update.price.normalize(), - types::TimeInForce::GoodTilCancelled, - ), - TimeInForce::Fok => types::OrderKind::Limit( - update.price.normalize(), - types::TimeInForce::FillOrKill, - ), - TimeInForce::Ioc => types::OrderKind::Limit( - update.price.normalize(), - types::TimeInForce::ImmediateOrCancel, - ), - TimeInForce::Gtx => { - types::OrderKind::PostOnly(update.price.normalize()) - } - }, - OrderType::Market => types::OrderKind::Market, - other => { - return Err(ExchangeError::Other(anyhow!( - "unsupported order type: {other:?}" - ))); - } - }; - let mut filled = update.filled_size.abs().normalize(); - let mut size = update.size.abs().normalize(); - match update.side { - OrderSide::Buy => { - filled.set_sign_positive(true); - size.set_sign_positive(true) - } - OrderSide::Sell => { - filled.set_sign_positive(false); - size.set_sign_positive(false) - } - } - let status = match update.status { - Status::New | Status::PartiallyFilled => types::OrderStatus::Pending, - Status::Canceled | Status::Expired | Status::Filled => { - types::OrderStatus::Finished - } - Status::NewAdl | Status::NewInsurance => types::OrderStatus::Pending, - }; - let trade_size = update.last_trade_size.abs().normalize(); - let trade = if !trade_size.is_zero() { - let mut trade = types::OrderTrade { - price: update.last_trade_price.normalize(), - size: if matches!(update.side, OrderSide::Buy) { - trade_size - } else { - -trade_size - }, - fee: Decimal::ZERO, - fee_asset: None, - }; - if let Some(asset) = update.fee_asset { - trade.fee = -update.fee.normalize(); - trade.fee_asset = Some(asset); - } - Some(trade) - } else { - None - }; - Ok(types::OrderUpdate { - ts: super::from_timestamp(update.trade_ts)?, - order: types::Order { - id: types::OrderId::from(update.client_id), - target: types::Place { size, kind }, - state: types::OrderState { - filled, - cost: if filled.is_zero() { - Decimal::ONE - } else { - update.cost - }, - status, - fees: HashMap::default(), - }, - trade, - }, - }) - } - Either::Right(update) => { - let client_id = update.client_id().to_string(); - let kind = match update.order_type { - OrderType::Limit => match update.time_in_force { - TimeInForce::Gtc => types::OrderKind::Limit( - update.price.normalize(), - types::TimeInForce::GoodTilCancelled, - ), - TimeInForce::Fok => types::OrderKind::Limit( - update.price.normalize(), - types::TimeInForce::FillOrKill, - ), - TimeInForce::Ioc => types::OrderKind::Limit( - update.price.normalize(), - types::TimeInForce::ImmediateOrCancel, - ), - TimeInForce::Gtx => { - types::OrderKind::PostOnly(update.price.normalize()) - } - }, - OrderType::Market => types::OrderKind::Market, - OrderType::LimitMaker => { - types::OrderKind::PostOnly(update.price.normalize()) - } - other => { - return Err(ExchangeError::Other(anyhow!( - "unsupported order type: {other:?}" - ))); - } - }; - let mut filled = update.filled_size.abs().normalize(); - let mut size = update.size.abs().normalize(); - match update.side { - OrderSide::Buy => { - filled.set_sign_positive(true); - size.set_sign_positive(true) - } - OrderSide::Sell => { - filled.set_sign_positive(false); - size.set_sign_positive(false) - } - } - let status = match update.status { - Status::New | Status::PartiallyFilled => types::OrderStatus::Pending, - Status::Canceled | Status::Expired | Status::Filled => { - types::OrderStatus::Finished - } - Status::NewAdl | Status::NewInsurance => types::OrderStatus::Pending, - }; - let trade_size = update.last_trade_size.abs().normalize(); - let trade = if !trade_size.is_zero() { - let mut trade = types::OrderTrade { - price: update.last_trade_price, - size: if matches!(update.side, OrderSide::Buy) { - trade_size - } else { - -trade_size - }, - fee: Decimal::ZERO, - fee_asset: None, - }; - if let Some(asset) = update.fee_asset { - trade.fee = -update.fee.normalize(); - trade.fee_asset = Some(asset); - } - Some(trade) - } else { - None - }; - Ok(types::OrderUpdate { - ts: super::from_timestamp(update.trade_ts)?, - order: types::Order { - id: types::OrderId::from(client_id), - target: types::Place { size, kind }, - state: types::OrderState { - filled, - cost: if update.filled_size.is_zero() { - Decimal::ONE - } else { - (update.filled_quote_size / update.filled_size).normalize() - }, - status, - fees: HashMap::default(), - }, - trade, - }, - }) - } - } - }) + .and_then(|update| async move { update.try_into() }) .boxed()) } } @@ -353,6 +175,64 @@ impl TryFrom for types::Order { ))) } } + Order::EuropeanOptions(order) => { + let id = order + .client_order_id + .unwrap_or_else(|| Str::new(order.order_id.to_string())); + let mut size = order.quantity.normalize().abs(); + match order.side { + OrderSide::Buy => size.set_sign_positive(true), + OrderSide::Sell => size.set_sign_positive(false), + } + let state = order + .state + .ok_or_else(|| ExchangeError::Other(anyhow::anyhow!("order is not ready")))?; + let kind = match order.order_type { + trading::OrderType::Limit => match (order.post_only, state.time_in_force) { + (false, TimeInForce::Gtc) => types::OrderKind::Limit( + order.price.normalize(), + types::TimeInForce::GoodTilCancelled, + ), + (false, TimeInForce::Fok) => types::OrderKind::Limit( + order.price.normalize(), + types::TimeInForce::FillOrKill, + ), + (false, TimeInForce::Ioc) => types::OrderKind::Limit( + order.price.normalize(), + types::TimeInForce::ImmediateOrCancel, + ), + (true, _) => types::OrderKind::PostOnly(order.price.normalize()), + kind => { + return Err(ExchangeError::Other(anyhow::anyhow!( + "unsupported order kind: {kind:?}" + ))); + } + }, + trading::OrderType::Market => types::OrderKind::Market, + other => { + return Err(ExchangeError::Other(anyhow!( + "unsupported order type: {other:?}" + ))); + } + }; + let mut filled = state.executed_qty.normalize().abs(); + match order.side { + OrderSide::Buy => filled.set_sign_positive(true), + OrderSide::Sell => filled.set_sign_positive(false), + } + let state = types::OrderState { + filled, + cost: state.avg_price.normalize(), + status: state.status.try_into()?, + fees: HashMap::from([(state.quote_asset, state.fee.normalize().neg())]), + }; + Ok(types::Order { + id: types::OrderId::from(id), + target: types::Place { size, kind }, + state, + trade: None, + }) + } } } } @@ -374,7 +254,12 @@ impl Adaptor for Request { .map(super::from_timestamp) .unwrap_or_else(|| Ok(OffsetDateTime::now_utc()))?, id, - order: order.try_into().ok(), + order: order + .try_into() + .map_err(|err| { + tracing::warn!(%err, "failed to convert order"); + }) + .ok(), }) } .boxed()) @@ -388,6 +273,7 @@ impl Adaptor for Request { symbol: req.instrument.to_uppercase(), order_id: None, orig_client_order_id: Some(req.id.as_str().to_string()), + client_order_id: None, }, })) } @@ -416,6 +302,7 @@ impl Adaptor for Request { symbol: req.instrument.to_uppercase(), order_id: None, orig_client_order_id: Some(req.id.as_str().to_string()), + client_order_id: None, }, })) } diff --git a/exc-binance/src/types/request.rs b/exc-binance/src/types/request.rs index 3d5f58b..69db9b2 100644 --- a/exc-binance/src/types/request.rs +++ b/exc-binance/src/types/request.rs @@ -24,7 +24,7 @@ impl Request { /// Create a request to subscribe to a ws stream. pub fn subscribe(stream: Name) -> Self { - Self::Ws(WsRequest::subscribe(stream)) + Self::Ws(WsRequest::subscribe_stream(stream)) } /// Main stream subcribe. @@ -32,3 +32,9 @@ impl Request { Self::Ws(WsRequest::main_stream(stream)) } } + +impl From for Request { + fn from(req: WsRequest) -> Self { + Self::Ws(req) + } +} diff --git a/exc-binance/src/types/trading.rs b/exc-binance/src/types/trading.rs index 5b696f5..10ba913 100644 --- a/exc-binance/src/types/trading.rs +++ b/exc-binance/src/types/trading.rs @@ -1,3 +1,4 @@ +use exc_core::{types, ExchangeError}; use serde::{Deserialize, Serialize}; /// Order side. @@ -41,12 +42,14 @@ pub enum TimeInForce { #[serde(rename_all = "SCREAMING_SNAKE_CASE")] pub enum Status { /// New. + #[serde(alias = "ACCEPTED")] New, /// Parttially filled. PartiallyFilled, /// Filled. Filled, /// Cancelled. + #[serde(alias = "REJECTED", alias = "CANCELLED")] Canceled, /// Expired. Expired, @@ -56,6 +59,19 @@ pub enum Status { NewAdl, } +impl TryFrom for types::OrderStatus { + type Error = ExchangeError; + + fn try_from(status: Status) -> Result { + let status = match status { + Status::New | Status::PartiallyFilled => types::OrderStatus::Pending, + Status::Canceled | Status::Expired | Status::Filled => types::OrderStatus::Finished, + Status::NewAdl | Status::NewInsurance => types::OrderStatus::Pending, + }; + Ok(status) + } +} + /// Order type. #[derive(Debug, Clone, Copy, Serialize, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] diff --git a/exc-binance/src/websocket/connect.rs b/exc-binance/src/websocket/connect.rs index ce88404..0c6889d 100644 --- a/exc-binance/src/websocket/connect.rs +++ b/exc-binance/src/websocket/connect.rs @@ -29,6 +29,7 @@ pub enum BinanceWsHost { UsdMarginFuturesPrivate, Spot, SpotPrivate, + EuropeanOptions, } impl BinanceWsHost { @@ -37,6 +38,7 @@ impl BinanceWsHost { Self::UsdMarginFutures => "wss://fstream.binance.com", Self::UsdMarginFuturesPrivate => "wss://fstream.binance.com", Self::Spot | Self::SpotPrivate => "wss://stream.binance.com:9443", + Self::EuropeanOptions => "wss://nbstream.binance.com/eoptions", } } @@ -177,6 +179,7 @@ impl Service for BinanceWsConnect { fn call(&mut self, req: BinanceWsTarget) -> Self::Future { let connect = WsConnector::default(); + let endpoint = req.host; let res = req .into_uri(self.retry, self.interval, self.stop_refresing_after) .and_then(|(uri, worker)| { @@ -191,6 +194,7 @@ impl Service for BinanceWsConnect { async move { let (ws, worker) = res.await?; WsClient::with_websocket( + endpoint, ws, main_stream, keep_alive_timeout, diff --git a/exc-binance/src/websocket/mod.rs b/exc-binance/src/websocket/mod.rs index 55bcc14..f7f62d7 100644 --- a/exc-binance/src/websocket/mod.rs +++ b/exc-binance/src/websocket/mod.rs @@ -35,13 +35,24 @@ impl BinanceWebsocketApi { pub fn usd_margin_futures() -> WsEndpoint { WsEndpoint::new( BinanceWsHost::UsdMarginFutures, - Name::new("markPrice").inst("bnbusdt"), + Name::new("markPrice").with_inst("bnbusdt"), ) } /// Endpoint of Spot API. pub fn spot() -> WsEndpoint { - WsEndpoint::new(BinanceWsHost::Spot, Name::new("miniTicker").inst("btcusdt")) + WsEndpoint::new( + BinanceWsHost::Spot, + Name::new("miniTicker").with_inst("btcusdt"), + ) + } + + /// Endpoint of European Options API. + pub fn european_options() -> WsEndpoint { + WsEndpoint::new( + BinanceWsHost::EuropeanOptions, + Name::new("index").with_inst("BTCUSDT"), + ) } } diff --git a/exc-binance/src/websocket/protocol/frame/account.rs b/exc-binance/src/websocket/protocol/frame/account.rs index 6e4c77e..2f61245 100644 --- a/exc-binance/src/websocket/protocol/frame/account.rs +++ b/exc-binance/src/websocket/protocol/frame/account.rs @@ -1,5 +1,7 @@ +use std::{collections::HashMap, ops::Neg}; + use either::Either; -use exc_core::{Asset, Str}; +use exc_core::{types, Asset, ExchangeError, Str}; use rust_decimal::Decimal; use serde::Deserialize; @@ -38,6 +40,18 @@ pub enum AccountEvent { ExecutionReport(ExecutionReport), } +/// Order Update Frame. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum OrderUpdateFrame { + /// Options. + Options(OptionsOrder), + /// USD-M Futures. + UsdMarginFutures(OrderUpdate), + /// Execution report. + Spot(ExecutionReport), +} + /// Order type. #[derive(Debug, Clone, Copy, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] @@ -255,6 +269,104 @@ impl ExecutionReport { } } +/// Order update for options. +#[derive(Debug, Clone, Deserialize)] +#[allow(unused)] +pub struct OptionsOrderUpdate { + /// Event timestamp. + #[serde(rename = "E")] + pub(crate) event_ts: i64, + #[serde(rename = "o")] + pub(crate) order: Vec, +} + +/// Options order. +#[serde_with::serde_as] +#[derive(Debug, Clone, Deserialize)] +#[allow(unused)] +pub struct OptionsOrder { + /// Created timestamp. + #[serde(rename = "T")] + pub(crate) create_ts: i64, + /// Updated timestamp. + #[serde(rename = "t")] + pub(crate) update_ts: i64, + /// Symbol. + #[serde(rename = "s")] + pub(crate) symbol: Str, + /// Client id. + #[serde(rename = "c")] + #[serde_as(as = "serde_with::NoneAsEmptyString")] + pub(crate) client_id: Option, + /// Order id. + #[serde(rename = "oid")] + pub(crate) order_id: Str, + /// Price. + #[serde(rename = "p")] + pub(crate) price: Decimal, + /// Size. + #[serde(rename = "q")] + pub(crate) size: Decimal, + /// Reduce-only. + #[serde(rename = "r")] + pub(crate) reduce_only: bool, + /// Post-only. + #[serde(rename = "po")] + pub(crate) post_only: bool, + /// Status. + #[serde(rename = "S")] + pub(crate) status: Status, + /// Filled size. + #[serde(rename = "e")] + pub(crate) filled_size: Decimal, + /// Filled amount. + #[serde(rename = "ec")] + pub(crate) filled_amount: Decimal, + /// Fee. + #[serde(rename = "f")] + pub(crate) fee: Decimal, + /// Time-in-force. + #[serde(rename = "tif")] + pub(crate) time_in_force: TimeInForce, + /// Order type. + #[serde(rename = "oty")] + pub(crate) order_type: OrderType, + /// Trades. + #[serde(rename = "fi")] + #[serde(default)] + pub(crate) trades: Vec, +} + +/// Options trade. +#[derive(Debug, Clone, Deserialize)] +#[allow(unused)] +pub struct OptionsTrade { + /// Trade ts. + #[serde(rename = "T")] + pub(crate) trade_ts: i64, + /// Trade id. + #[serde(rename = "t")] + pub(crate) trade_id: Str, + /// Size. + #[serde(rename = "q")] + pub(crate) size: Decimal, + /// Price. + #[serde(rename = "p")] + pub(crate) price: Decimal, + /// Fee. + #[serde(rename = "f")] + pub(crate) fee: Decimal, + /// Maker or taker. + #[serde(rename = "m")] + pub(crate) maker: Str, +} + +impl Nameable for OptionsOrder { + fn to_name(&self) -> Name { + Name::order_trade_update(&self.symbol) + } +} + impl Nameable for AccountEvent { fn to_name(&self) -> Name { match self { @@ -294,3 +406,256 @@ impl TryFrom for Either { } } } + +impl TryFrom for OrderUpdateFrame { + type Error = WsError; + + fn try_from(frame: StreamFrame) -> Result { + match frame.data { + StreamFrameKind::AccountEvent(e) => match e { + AccountEvent::OrderTradeUpdate { order, .. } => Ok(Self::UsdMarginFutures(order)), + AccountEvent::ExecutionReport(r) => Ok(Self::Spot(r)), + e => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{e:?}"))), + }, + StreamFrameKind::OptionsOrder(order) => Ok(Self::Options(order)), + e => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{e:?}"))), + } + } +} + +impl TryFrom for types::OrderUpdate { + type Error = ExchangeError; + + fn try_from(value: OrderUpdateFrame) -> Result { + match value { + OrderUpdateFrame::UsdMarginFutures(update) => { + let kind = match update.order_type { + OrderType::Limit => match update.time_in_force { + TimeInForce::Gtc => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::GoodTilCancelled, + ), + TimeInForce::Fok => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::FillOrKill, + ), + TimeInForce::Ioc => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::ImmediateOrCancel, + ), + TimeInForce::Gtx => types::OrderKind::PostOnly(update.price.normalize()), + }, + OrderType::Market => types::OrderKind::Market, + other => { + return Err(ExchangeError::Other(anyhow!( + "unsupported order type: {other:?}" + ))); + } + }; + let mut filled = update.filled_size.abs().normalize(); + let mut size = update.size.abs().normalize(); + match update.side { + OrderSide::Buy => { + filled.set_sign_positive(true); + size.set_sign_positive(true) + } + OrderSide::Sell => { + filled.set_sign_positive(false); + size.set_sign_positive(false) + } + } + let status = update.status.try_into()?; + let trade_size = update.last_trade_size.abs().normalize(); + let trade = if !trade_size.is_zero() { + let mut trade = types::OrderTrade { + price: update.last_trade_price.normalize(), + size: if matches!(update.side, OrderSide::Buy) { + trade_size + } else { + -trade_size + }, + fee: Decimal::ZERO, + fee_asset: None, + }; + if let Some(asset) = update.fee_asset { + trade.fee = -update.fee.normalize(); + trade.fee_asset = Some(asset); + } + Some(trade) + } else { + None + }; + Ok(types::OrderUpdate { + ts: crate::types::adaptations::from_timestamp(update.trade_ts)?, + order: types::Order { + id: types::OrderId::from(update.client_id), + target: types::Place { size, kind }, + state: types::OrderState { + filled, + cost: if filled.is_zero() { + Decimal::ONE + } else { + update.cost + }, + status, + fees: HashMap::default(), + }, + trade, + }, + }) + } + OrderUpdateFrame::Spot(update) => { + let client_id = update.client_id().to_string(); + let kind = match update.order_type { + OrderType::Limit => match update.time_in_force { + TimeInForce::Gtc => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::GoodTilCancelled, + ), + TimeInForce::Fok => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::FillOrKill, + ), + TimeInForce::Ioc => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::ImmediateOrCancel, + ), + TimeInForce::Gtx => types::OrderKind::PostOnly(update.price.normalize()), + }, + OrderType::Market => types::OrderKind::Market, + OrderType::LimitMaker => types::OrderKind::PostOnly(update.price.normalize()), + other => { + return Err(ExchangeError::Other(anyhow!( + "unsupported order type: {other:?}" + ))); + } + }; + let mut filled = update.filled_size.abs().normalize(); + let mut size = update.size.abs().normalize(); + match update.side { + OrderSide::Buy => { + filled.set_sign_positive(true); + size.set_sign_positive(true) + } + OrderSide::Sell => { + filled.set_sign_positive(false); + size.set_sign_positive(false) + } + } + let status = match update.status { + Status::New | Status::PartiallyFilled => types::OrderStatus::Pending, + Status::Canceled | Status::Expired | Status::Filled => { + types::OrderStatus::Finished + } + Status::NewAdl | Status::NewInsurance => types::OrderStatus::Pending, + }; + let trade_size = update.last_trade_size.abs().normalize(); + let trade = if !trade_size.is_zero() { + let mut trade = types::OrderTrade { + price: update.last_trade_price, + size: if matches!(update.side, OrderSide::Buy) { + trade_size + } else { + -trade_size + }, + fee: Decimal::ZERO, + fee_asset: None, + }; + if let Some(asset) = update.fee_asset { + trade.fee = -update.fee.normalize(); + trade.fee_asset = Some(asset); + } + Some(trade) + } else { + None + }; + Ok(types::OrderUpdate { + ts: crate::types::adaptations::from_timestamp(update.trade_ts)?, + order: types::Order { + id: types::OrderId::from(client_id), + target: types::Place { size, kind }, + state: types::OrderState { + filled, + cost: if update.filled_size.is_zero() { + Decimal::ONE + } else { + (update.filled_quote_size / update.filled_size).normalize() + }, + status, + fees: HashMap::default(), + }, + trade, + }, + }) + } + OrderUpdateFrame::Options(update) => { + let kind = match update.order_type { + OrderType::Limit => match (update.post_only, update.time_in_force) { + (false, TimeInForce::Gtc) => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::GoodTilCancelled, + ), + (false, TimeInForce::Fok) => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::FillOrKill, + ), + (false, TimeInForce::Ioc) => types::OrderKind::Limit( + update.price.normalize(), + types::TimeInForce::ImmediateOrCancel, + ), + (true, _) => types::OrderKind::PostOnly(update.price.normalize()), + other => { + return Err(ExchangeError::Other(anyhow!( + "unsupported time in force: {other:?}" + ))); + } + }, + OrderType::Market => types::OrderKind::Market, + OrderType::LimitMaker => types::OrderKind::PostOnly(update.price.normalize()), + other => { + return Err(ExchangeError::Other(anyhow!( + "unsupported order type: {other:?}" + ))); + } + }; + let filled = update.filled_size.normalize(); + let cost = if filled.is_zero() { + Decimal::ONE + } else { + update + .filled_amount + .normalize() + .checked_div(filled) + .ok_or_else(|| { + ExchangeError::Other(anyhow::anyhow!( + "parse options order: failed to calculate cost" + )) + })? + }; + let quote_fee = update.fee.normalize().neg(); + // FIXME: we are assuming that the quote is in USDT. + const QUOTE: Asset = Asset::USDT; + let state = types::OrderState { + filled, + cost, + status: update.status.try_into()?, + fees: HashMap::from([(QUOTE, quote_fee)]), + }; + let order = types::Order { + id: types::OrderId::from(update.client_id.unwrap_or(update.order_id)), + target: types::Place { + size: update.size.normalize(), + kind, + }, + state, + // FIXME: we are not parsing the trades. + trade: None, + }; + Ok(types::OrderUpdate { + ts: crate::types::adaptations::from_timestamp(update.update_ts)?, + order, + }) + } + } + } +} diff --git a/exc-binance/src/websocket/protocol/frame/depth.rs b/exc-binance/src/websocket/protocol/frame/depth.rs new file mode 100644 index 0000000..16a60b3 --- /dev/null +++ b/exc-binance/src/websocket/protocol/frame/depth.rs @@ -0,0 +1,44 @@ +use rust_decimal::Decimal; +use serde::Deserialize; + +use crate::websocket::error::WsError; + +use super::{StreamFrame, StreamFrameKind}; + +/// Depth. +#[derive(Debug, Clone, Deserialize)] +pub struct Depth { + /// Event type. + #[serde(rename = "e")] + pub event: String, + /// Event time. + #[serde(rename = "E")] + pub event_timestamp: i64, + /// Symbol. + #[serde(rename = "s")] + pub symbol: String, + /// Trade time. + #[serde(rename = "T")] + pub trade_timestamp: i64, + /// Update ID. + #[serde(rename = "u")] + pub id: usize, + /// Bids. + #[serde(rename = "b")] + pub bids: Vec<(Decimal, Decimal)>, + /// Asks. + #[serde(rename = "a")] + pub asks: Vec<(Decimal, Decimal)>, +} + +impl TryFrom for Depth { + type Error = WsError; + + fn try_from(frame: StreamFrame) -> Result { + if let StreamFrameKind::Depth(t) = frame.data { + Ok(t) + } else { + Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))) + } + } +} diff --git a/exc-binance/src/websocket/protocol/frame/mod.rs b/exc-binance/src/websocket/protocol/frame/mod.rs index 63823d0..9c47c27 100644 --- a/exc-binance/src/websocket/protocol/frame/mod.rs +++ b/exc-binance/src/websocket/protocol/frame/mod.rs @@ -2,6 +2,7 @@ use std::fmt; +use exc_core::ExchangeError; use futures::{future, stream, Sink, SinkExt, Stream, TryStreamExt}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -13,9 +14,15 @@ use self::{account::AccountEvent, agg_trade::AggTrade, book_ticker::BookTicker}; /// Aggregate trade. pub mod agg_trade; +/// Trade. +pub mod trade; + /// Book ticker. pub mod book_ticker; +/// Depth. +pub mod depth; + /// Account. pub mod account; @@ -37,14 +44,16 @@ pub struct Name { } impl Name { - pub(crate) fn new(channel: &str) -> Self { + /// Create a new stream name. + pub fn new(channel: &str) -> Self { Self { inst: None, channel: channel.to_string(), } } - pub(crate) fn inst(mut self, inst: &str) -> Self { + /// Set instrument. + pub fn with_inst(mut self, inst: &str) -> Self { self.inst = Some(inst.to_string()); self } @@ -57,6 +66,14 @@ impl Name { } } + /// Trade + pub fn trade(inst: &str) -> Self { + Self { + inst: Some(inst.to_string()), + channel: "trade".to_string(), + } + } + /// Book ticker pub fn book_ticker(inst: &str) -> Self { Self { @@ -65,6 +82,14 @@ impl Name { } } + /// Depth + pub fn depth(inst: &str, levels: &str, rate: &str) -> Self { + Self { + inst: Some(inst.to_string()), + channel: format!("depth{levels}@{rate}"), + } + } + /// Listen key expired. pub fn listen_key_expired() -> Self { Self::new("listenKeyExpired") @@ -72,7 +97,7 @@ impl Name { /// Order trade update. pub fn order_trade_update(inst: &str) -> Self { - Self::new("orderTradeUpdate").inst(inst) + Self::new("orderTradeUpdate").with_inst(inst) } } @@ -159,6 +184,35 @@ impl ServerFrame { _ => Ok(self), } } + + fn break_down(self) -> Vec { + match &self { + Self::Empty | Self::Response(_) => vec![self], + Self::Stream(f) => match &f.data { + StreamFrameKind::OptionsOrderUpdate(_) => { + let Self::Stream(f) = self else { + unreachable!() + }; + let StreamFrameKind::OptionsOrderUpdate(update) = f.data else { + unreachable!() + }; + let stream = f.stream; + update + .order + .into_iter() + .map(|o| { + let frame = StreamFrame { + stream: stream.clone(), + data: StreamFrameKind::OptionsOrder(o), + }; + Self::Stream(frame) + }) + .collect() + } + _ => vec![self], + }, + } + } } /// Payload that with stream name. @@ -170,13 +224,22 @@ pub trait Nameable { /// Stream frame kind. #[derive(Debug, Clone, Deserialize)] #[serde(untagged)] +#[non_exhaustive] pub enum StreamFrameKind { /// Aggregate trade. AggTrade(AggTrade), + /// Trade. + Trade(trade::Trade), /// Book ticker. BookTicker(BookTicker), + /// Depth. + Depth(depth::Depth), /// Account event. AccountEvent(AccountEvent), + /// Options Order Update. + OptionsOrder(account::OptionsOrder), + /// Options Order Trade Update. + OptionsOrderUpdate(account::OptionsOrderUpdate), /// Unknwon. Unknwon(serde_json::Value), } @@ -195,9 +258,124 @@ impl StreamFrame { pub fn to_name(&self) -> Option { match &self.data { StreamFrameKind::AggTrade(f) => Some(f.to_name()), + StreamFrameKind::Trade(f) => Some(f.to_name()), StreamFrameKind::BookTicker(f) => Some(f.to_name()), + StreamFrameKind::Depth(_) => { + let (inst, channel) = self.stream.split_once('@')?; + Some(Name { + inst: Some(inst.to_string()), + channel: channel.to_string(), + }) + } StreamFrameKind::AccountEvent(e) => Some(e.to_name()), - StreamFrameKind::Unknwon(_) => None, + StreamFrameKind::OptionsOrder(e) => Some(e.to_name()), + StreamFrameKind::OptionsOrderUpdate(_) => None, + StreamFrameKind::Unknwon(_) => { + let (inst, channel) = self.stream.split_once('@')?; + Some(Name { + inst: Some(inst.to_string()), + channel: channel.to_string(), + }) + } + } + } +} + +impl TryFrom for serde_json::Value { + type Error = WsError; + + fn try_from(frame: StreamFrame) -> Result { + match frame.data { + StreamFrameKind::Unknwon(v) => Ok(v), + _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))), + } + } +} + +/// Trade frame. +#[derive(Debug, Clone, Deserialize)] +#[non_exhaustive] +pub enum TradeFrame { + /// Aggregate trade. + AggTrade(AggTrade), + /// Trade. + Trade(trade::Trade), +} + +impl TryFrom for TradeFrame { + type Error = WsError; + + fn try_from(frame: StreamFrame) -> Result { + match frame.data { + StreamFrameKind::AggTrade(trade) => Ok(Self::AggTrade(trade)), + StreamFrameKind::Trade(trade) => Ok(Self::Trade(trade)), + _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))), + } + } +} + +impl TryFrom for exc_core::types::Trade { + type Error = ExchangeError; + + fn try_from(value: TradeFrame) -> Result { + match value { + TradeFrame::AggTrade(trade) => Ok(exc_core::types::Trade { + ts: crate::types::adaptations::from_timestamp(trade.trade_timestamp)?, + price: trade.price.normalize(), + size: trade.size.normalize(), + buy: !trade.buy_maker, + }), + TradeFrame::Trade(trade) => Ok(exc_core::types::Trade { + ts: crate::types::adaptations::from_timestamp(trade.trade_timestamp)?, + price: trade.price.normalize(), + size: trade.size.normalize(), + buy: trade.is_taker_buy(), + }), + } + } +} + +/// Depth frame. +#[derive(Debug, Clone, Deserialize)] +#[non_exhaustive] +pub enum DepthFrame { + /// Book ticker. + BookTicker(BookTicker), + /// Depth. + Depth(depth::Depth), +} + +impl TryFrom for DepthFrame { + type Error = WsError; + + fn try_from(frame: StreamFrame) -> Result { + match frame.data { + StreamFrameKind::BookTicker(t) => Ok(Self::BookTicker(t)), + StreamFrameKind::Depth(t) => Ok(Self::Depth(t)), + _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))), + } + } +} + +impl TryFrom for exc_core::types::BidAsk { + type Error = ExchangeError; + + fn try_from(value: DepthFrame) -> Result { + match value { + DepthFrame::BookTicker(t) => Ok(exc_core::types::BidAsk { + ts: t + .trade_timestamp + .map(crate::types::adaptations::from_timestamp) + .transpose()? + .unwrap_or_else(time::OffsetDateTime::now_utc), + bid: Some((t.bid.normalize(), t.bid_size.normalize())), + ask: Some((t.ask.normalize(), t.ask_size.normalize())), + }), + DepthFrame::Depth(t) => Ok(exc_core::types::BidAsk { + ts: crate::types::adaptations::from_timestamp(t.trade_timestamp)?, + bid: t.bids.first().map(|b| (b.0.normalize(), b.1.normalize())), + ask: t.asks.first().map(|a| (a.0.normalize(), a.1.normalize())), + }), } } } @@ -218,9 +396,11 @@ where .and_then(|msg| { let f = serde_json::from_str::(&msg) .map_err(WsError::from) - .and_then(ServerFrame::health); + .and_then(ServerFrame::health) + .map(|f| stream::iter(f.break_down().into_iter().map(Ok))); future::ready(f) }) + .try_flatten() } #[cfg(test)] diff --git a/exc-binance/src/websocket/protocol/frame/trade.rs b/exc-binance/src/websocket/protocol/frame/trade.rs new file mode 100644 index 0000000..629a6ff --- /dev/null +++ b/exc-binance/src/websocket/protocol/frame/trade.rs @@ -0,0 +1,90 @@ +use rust_decimal::Decimal; +use serde::Deserialize; + +use crate::websocket::error::WsError; + +use super::{Name, Nameable, StreamFrame, StreamFrameKind}; + +/// # Example +/// A [`Trade`] in JSON format: +/// ```json +/// { +/// "e": "trade", // Event type +/// "E": 123456789, // Event time +/// "s": "BTCUSDT", // Symbol +/// "t": 12345, // Trade ID +/// "p": "0.001", // Price +/// "q": "100", // Quantity +/// "b": 88, // Buyer order ID +/// "a": 50, // Seller order ID +/// "T": 1591677567872, // Trade time (ms) +/// "S": "-1", // "-1": sell; "1": buy +/// } +/// ``` +#[derive(Debug, Clone, Deserialize)] +pub struct Trade { + /// Event type. + #[serde(rename = "e")] + pub event: String, + /// Event time. + #[serde(rename = "E")] + pub event_timestamp: i64, + /// Symbol. + #[serde(rename = "s")] + pub symbol: String, + /// Trade ID. + #[serde(rename = "t")] + pub trade_id: String, + /// Price. + #[serde(rename = "p")] + pub price: Decimal, + /// Quantity. + #[serde(rename = "q")] + pub size: Decimal, + /// Buyer order ID. + #[serde(rename = "b")] + pub buyer_order_id: Decimal, + /// Seller order ID. + #[serde(rename = "a")] + pub seller_order_id: Decimal, + /// Trade time (ms). + #[serde(rename = "T")] + pub trade_timestamp: i64, + /// "-1": sell; "1": buy + #[serde(rename = "S")] + pub side: String, +} + +impl Trade { + /// Is buyer the market maker. + pub fn is_taker_buy(&self) -> bool { + self.side == "1" + } + + /// Is taker sell. + pub fn is_taker_sell(&self) -> bool { + self.side == "-1" + } +} + +impl Nameable for Trade { + fn to_name(&self) -> Name { + Name { + // FIXME: better way to determine the case (lower or upper). + inst: Some(self.symbol.clone()), + channel: self.event.clone(), + } + } +} + +impl TryFrom for Trade { + type Error = WsError; + + fn try_from(frame: StreamFrame) -> Result { + if let StreamFrameKind::Trade(trade) = frame.data { + Ok(trade) + } else { + Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))) + } + } +} diff --git a/exc-binance/src/websocket/protocol/mod.rs b/exc-binance/src/websocket/protocol/mod.rs index 63ed311..7dbda2a 100644 --- a/exc-binance/src/websocket/protocol/mod.rs +++ b/exc-binance/src/websocket/protocol/mod.rs @@ -11,8 +11,8 @@ use self::{ stream::{MultiplexRequest, MultiplexResponse}, }; -use super::request::WsRequest; use super::response::WsResponse; +use super::{connect::BinanceWsHost, request::WsRequest}; use super::{error::WsError, request::RequestKind}; use exc_core::transport::websocket::WsStream; use futures::{future::BoxFuture, FutureExt, Sink, SinkExt, Stream, TryFutureExt, TryStreamExt}; @@ -140,6 +140,8 @@ impl From> for WsError { /// Binance websocket service. pub struct WsClient { + main_stream: HashSet, + endpoint: BinanceWsHost, state: Arc, svc: Multiplex, reconnect: bool, @@ -148,6 +150,7 @@ pub struct WsClient { impl WsClient { /// Create a [`WsClient`] using the given websocket stream. pub fn with_websocket( + endpoint: BinanceWsHost, websocket: WsStream, main_stream: HashSet, keep_alive_timeout: Duration, @@ -156,7 +159,7 @@ impl WsClient { ) -> Result { let (protocol, state) = Protocol::new( websocket, - main_stream, + main_stream.clone(), keep_alive_timeout, default_stream_timeout, refresh, @@ -167,11 +170,45 @@ impl WsClient { tracing::error!("protocol error: {err}"); }); Ok(Self { + endpoint, + main_stream, svc, state, reconnect: false, }) } + + fn dispatch(&self, req: WsRequest) -> WsRequest { + tracing::trace!( + "ws client; dispatching request with endpoint: {:?}", + self.endpoint, + ); + match &req.inner { + RequestKind::DispatchTrades(trades) => match self.endpoint { + BinanceWsHost::EuropeanOptions => { + WsRequest::sub_stream(Name::trade(&trades.instrument)) + } + _ => WsRequest::sub_stream(Name::agg_trade(&trades.instrument)), + }, + RequestKind::DispatchBidAsk(bid_ask) => match self.endpoint { + BinanceWsHost::EuropeanOptions => { + WsRequest::sub_stream(Name::depth(&bid_ask.instrument, "10", "100ms")) + } + _ => WsRequest::sub_stream(Name::book_ticker(&bid_ask.instrument)), + }, + RequestKind::DispatchSubscribe(name) => { + if self.main_stream.contains(name) { + WsRequest::main_stream(name.clone()) + } else { + WsRequest::sub_stream(name.clone()) + } + } + _ => { + tracing::error!("ws client; not a dispatch request"); + req + } + } + } } impl Service for WsClient { @@ -188,25 +225,39 @@ impl Service for WsClient { } } - fn call(&mut self, req: WsRequest) -> Self::Future { + fn call(&mut self, mut req: WsRequest) -> Self::Future { let is_stream = req.stream; - match req.inner { - RequestKind::Multiplex(req) => self - .svc - .call(req) - .and_then(move |resp| { - let resp: WsResponse = resp.into(); - if is_stream { - resp.stream().left_future() - } else { - futures::future::ready(Ok(resp)).right_future() + let mut dispatched = false; + loop { + match req.inner { + RequestKind::Multiplex(req) => { + return self + .svc + .call(req) + .and_then(move |resp| { + let resp: WsResponse = resp.into(); + if is_stream { + resp.stream().left_future() + } else { + futures::future::ready(Ok(resp)).right_future() + } + }) + .boxed() + } + RequestKind::Reconnect => { + self.reconnect = true; + return futures::future::ready(Ok(WsResponse::Reconnected)).boxed(); + } + _ => { + if dispatched { + break; } - }) - .boxed(), - RequestKind::Reconnect => { - self.reconnect = true; - futures::future::ready(Ok(WsResponse::Reconnected)).boxed() + req = self.dispatch(req); + dispatched = true; + } } } + tracing::error!("ws client; failed to dispatch request"); + futures::future::ready(Err(WsError::TransportIsBoken)).boxed() } } diff --git a/exc-binance/src/websocket/protocol/stream/mod.rs b/exc-binance/src/websocket/protocol/stream/mod.rs index 96aee82..49c0482 100644 --- a/exc-binance/src/websocket/protocol/stream/mod.rs +++ b/exc-binance/src/websocket/protocol/stream/mod.rs @@ -119,6 +119,7 @@ impl StreamState { topics: &mut HashMap, ) -> Result { tracing::trace!( + ?frame, "stream {}: handling client frame, state={:?}", self.id, self.state @@ -345,6 +346,7 @@ impl ContextShared { } async fn handle_server_frame(self: &Arc, frame: ServerFrame) -> bool { + tracing::trace!("streaming; received a server frame: frame={frame:?}"); let ctx = self.clone(); let main = &ctx.main; let (streams, topics) = &mut (*ctx.streams.lock().unwrap()); @@ -372,6 +374,10 @@ impl ContextShared { let id = stream.id; (id, good) }) + .or_else(|| { + tracing::trace!("streaming; no stream found for {name:?}"); + None + }) }), }; if let Some((id, res)) = res { diff --git a/exc-binance/src/websocket/request/mod.rs b/exc-binance/src/websocket/request/mod.rs index da03631..15ced66 100644 --- a/exc-binance/src/websocket/request/mod.rs +++ b/exc-binance/src/websocket/request/mod.rs @@ -7,6 +7,9 @@ use super::protocol::{ use async_stream::stream; pub(crate) enum RequestKind { + DispatchSubscribe(Name), + DispatchTrades(exc_core::types::SubscribeTrades), + DispatchBidAsk(exc_core::types::SubscribeBidAsk), Multiplex(MultiplexRequest), Reconnect, } @@ -16,6 +19,9 @@ impl RequestKind { match self { Self::Multiplex(req) => Self::Multiplex(req.timeout(duration)), Self::Reconnect => Self::Reconnect, + Self::DispatchTrades(req) => Self::DispatchTrades(req), + Self::DispatchBidAsk(req) => Self::DispatchBidAsk(req), + Self::DispatchSubscribe(req) => Self::DispatchSubscribe(req), } } } @@ -27,8 +33,22 @@ pub struct WsRequest { } impl WsRequest { - /// Subscribe to a stream. + /// Subscribe to a stream. No matter whether the stream is main or sub. + pub fn subscribe_stream(name: Name) -> Self { + Self { + stream: true, + inner: RequestKind::DispatchSubscribe(name), + } + } + + /// Subscribe to a sub stream. + #[deprecated(note = "Use `subscribe_stream` instead")] pub fn subscribe(stream: Name) -> Self { + Self::sub_stream(stream) + } + + /// Subscribe to a sub stream. + pub fn sub_stream(stream: Name) -> Self { Self { stream: true, inner: RequestKind::Multiplex(MultiplexRequest::new(|token| { @@ -62,6 +82,22 @@ impl WsRequest { inner: RequestKind::Reconnect, } } + + /// Dispatch trades. + pub fn dispatch_trades(trades: exc_core::types::SubscribeTrades) -> Self { + Self { + stream: true, + inner: RequestKind::DispatchTrades(trades), + } + } + + /// Dispatch bid ask. + pub fn dispatch_bid_ask(bid_ask: exc_core::types::SubscribeBidAsk) -> Self { + Self { + stream: true, + inner: RequestKind::DispatchBidAsk(bid_ask), + } + } } // impl From for MultiplexRequest { diff --git a/exc-core/src/util/trade_bid_ask.rs b/exc-core/src/util/trade_bid_ask.rs index 6cb05af..5dd8ae8 100644 --- a/exc-core/src/util/trade_bid_ask.rs +++ b/exc-core/src/util/trade_bid_ask.rs @@ -113,6 +113,7 @@ where let event = event?; match event { Either::Left(trade) => { + tracing::trace!("trade: {trade}"); ticker.ts = trade.ts; ticker.last = trade.price; ticker.size = trade.size; @@ -120,6 +121,7 @@ where trade_init = true; }, Either::Right(bid_ask) => { + tracing::trace!("bid_ask: {bid_ask}"); if !ignore_bid_ask_ts { ticker.ts = bid_ask.ts; }