Skip to content

Commit

Permalink
Refactor balance_history for trade (#334)
Browse files Browse the repository at this point in the history
* Refactors `balance_history` for trade.

* Fixes lint.

* Update `balance_history` index key order.

* Fixes the failed unit tests.

Co-authored-by: HAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
Co-authored-by: Zhang Zhuo <mycinbrin@gmail.com>
  • Loading branch information
3 people authored Sep 19, 2021
1 parent 3dd4fb9 commit 6598fab
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 85 deletions.
5 changes: 4 additions & 1 deletion migrations/20200123090258_trade_history.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE balance_history (
id SERIAL PRIMARY KEY,
time TIMESTAMP(0) NOT NULL,
user_id INT CHECK (user_id >= 0) NOT NULL,
business_id BIGINT CHECK (business_id >= 0) NOT NULL,
asset VARCHAR(30) NOT NULL,
business VARCHAR(30) NOT NULL,
change DECIMAL(30, 8) NOT NULL,
Expand All @@ -14,7 +15,9 @@ CREATE TABLE balance_history (

CREATE INDEX balance_history_idx_user_asset ON balance_history (user_id, asset);

CREATE INDEX balance_history_idx_user_asset_business ON balance_history (user_id, asset, business);
CREATE INDEX balance_history_idx_user_business ON balance_history (business_id, business);

CREATE INDEX balance_history_idx_user_asset_business ON balance_history (user_id, asset, business, business_id);

CREATE TYPE order_status AS ENUM('active','filled','cancelled', 'expired');

Expand Down
47 changes: 28 additions & 19 deletions src/matchengine/asset/update_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,32 @@ use ttl_cache::TtlCache;
use std::time::Duration;

const BALANCE_MAP_INIT_SIZE_ASSET: usize = 64;
const PERSIST_ZERO_BALANCE_UPDATE: bool = false;

pub struct BalanceUpdateParams {
pub typ: BalanceUpdateType,
pub balance_type: BalanceType,
pub business_type: BusinessType,
pub user_id: u32,
pub business_id: u64,
pub asset: String,
pub business: String,
pub business_id: u64,
pub change: Decimal,
pub detail: serde_json::Value,
pub signature: Vec<u8>,
}

pub enum BalanceUpdateType {
#[derive(Clone, Copy, Eq, Hash, PartialEq)]
pub enum BusinessType {
Deposit,
Withdraw,
Trade,
Transfer,
Withdraw,
}

#[derive(PartialEq, Eq, Hash)]
struct BalanceUpdateKey {
pub balance_type: BalanceType,
pub business_type: BusinessType,
pub user_id: u32,
pub asset: String,
pub business: String,
Expand Down Expand Up @@ -68,14 +74,18 @@ impl BalanceUpdateController {
pub fn update_user_balance(
&mut self,
balance_manager: &mut BalanceManager,
mut persistor: impl PersistExector,
persistor: &mut impl PersistExector,
mut params: BalanceUpdateParams,
) -> Result<()> {
let asset = params.asset;
let balance_type = params.balance_type;
let business = params.business;
let business_type = params.business_type;
let business_id = params.business_id;
let user_id = params.user_id;
let cache_key = BalanceUpdateKey {
balance_type,
business_type,
user_id,
asset: asset.clone(),
business: business.clone(),
Expand All @@ -84,41 +94,40 @@ impl BalanceUpdateController {
if self.cache.contains_key(&cache_key) {
bail!("duplicate request");
}
let old_balance = balance_manager.get(user_id, BalanceType::AVAILABLE, &asset);
let old_balance = balance_manager.get(user_id, balance_type, &asset);
let change = params.change;
let abs_change = change.abs();
let new_balance = if change.is_sign_positive() {
balance_manager.add(user_id, BalanceType::AVAILABLE, &asset, &abs_change)
if change.is_sign_positive() {
balance_manager.add(user_id, balance_type, &asset, &abs_change);
} else if change.is_sign_negative() {
if old_balance < abs_change {
bail!("balance not enough");
}
balance_manager.sub(user_id, BalanceType::AVAILABLE, &asset, &abs_change)
} else {
old_balance
};
balance_manager.sub(user_id, balance_type, &asset, &abs_change);
}
log::debug!("change user balance: {} {} {}", user_id, asset, change);
self.cache.insert(cache_key, true, Duration::from_secs(3600));

if persistor.real_persist() {
if persistor.real_persist() && (PERSIST_ZERO_BALANCE_UPDATE || !change.is_zero()) {
params.detail["id"] = serde_json::Value::from(business_id);
let balance_available = balance_manager.get(user_id, BalanceType::AVAILABLE, &asset);
let balance_frozen = balance_manager.get(user_id, BalanceType::FREEZE, &asset);
let balance_history = BalanceHistory {
time: FTimestamp(utils::current_timestamp()).into(),
user_id: user_id as i32,
business_id: business_id as i64,
asset,
business,
change,
balance: new_balance + balance_frozen,
balance_available: new_balance,
balance: balance_available + balance_frozen,
balance_available,
balance_frozen,
detail: params.detail.to_string(),
signature: params.signature,
};
persistor.put_balance(&balance_history);
match params.typ {
BalanceUpdateType::Deposit => persistor.put_deposit(&balance_history),
BalanceUpdateType::Withdraw => persistor.put_withdraw(&balance_history),
match params.business_type {
BusinessType::Deposit => persistor.put_deposit(&balance_history),
BusinessType::Withdraw => persistor.put_withdraw(&balance_history),
_ => {}
}
}
Expand Down
26 changes: 18 additions & 8 deletions src/matchengine/controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::asset::update_controller::{BalanceUpdateParams, BalanceUpdateType};
use crate::asset::update_controller::{BalanceUpdateParams, BusinessType};
use crate::asset::{BalanceManager, BalanceType, BalanceUpdateController};
use crate::config::{self};
use crate::database::{DatabaseWriterConfig, OperationLogSender};
Expand Down Expand Up @@ -425,17 +425,18 @@ impl Controller {
};
//let persistor = self.get_persistor(real);
let persistor = if real { &mut self.persistor } else { &mut self.dummy_persistor };
let update_type = if change.is_sign_positive() {
BalanceUpdateType::Deposit
let business_type = if change.is_sign_positive() {
BusinessType::Deposit
} else {
BalanceUpdateType::Withdraw
BusinessType::Withdraw
};
self.update_controller
.update_user_balance(
&mut self.balance_manager,
persistor,
BalanceUpdateParams {
typ: update_type,
balance_type: BalanceType::AVAILABLE,
business_type,
user_id: req.user_id,
asset: req.asset.to_string(),
business: req.business.clone(),
Expand Down Expand Up @@ -665,7 +666,8 @@ impl Controller {
&mut self.balance_manager,
persistor,
BalanceUpdateParams {
typ: BalanceUpdateType::Transfer,
balance_type: BalanceType::AVAILABLE,
business_type: BusinessType::Transfer,
user_id: from_user_id,
asset: asset_id.to_owned(),
business: business.to_owned(),
Expand All @@ -683,7 +685,8 @@ impl Controller {
&mut self.balance_manager,
persistor,
BalanceUpdateParams {
typ: BalanceUpdateType::Transfer,
balance_type: BalanceType::AVAILABLE,
business_type: BusinessType::Transfer,
user_id: to_user_id,
asset: asset_id.to_owned(),
business: business.to_owned(),
Expand Down Expand Up @@ -837,10 +840,17 @@ impl Controller {
}
let market = self.markets.get_mut(&req.market).unwrap();
let balance_manager = &mut self.balance_manager;
let update_controller = &mut self.update_controller;
let persistor = if real { &mut self.persistor } else { &mut self.dummy_persistor };
let order_input = OrderInput::try_from(req.clone()).map_err(|e| Status::invalid_argument(format!("invalid decimal {}", e)))?;
market
.put_order(&mut self.sequencer, balance_manager.into(), persistor, order_input)
.put_order(
&mut self.sequencer,
balance_manager.into(),
update_controller,
persistor,
order_input,
)
.map_err(|e| Status::unknown(format!("{}", e)))
}
fn append_operation_log<Operation>(&mut self, method: &str, req: &Operation)
Expand Down
Loading

0 comments on commit 6598fab

Please sign in to comment.