Skip to content

Commit

Permalink
refactoring!
Browse files Browse the repository at this point in the history
  • Loading branch information
Axel-Jacobsen committed Dec 31, 2023
1 parent 0ff30b2 commit 1412d44
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 238 deletions.
17 changes: 9 additions & 8 deletions src/bots/arb_bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ use tokio::sync::{broadcast, mpsc};

use crate::bots::Bot;
use crate::manifold_types;
use crate::market_handler;

use crate::coms::{InternalPacket,Method};

pub struct ArbitrageBot {
id: String,
market: manifold_types::FullMarket,
answers: HashMap<String, manifold_types::Answer>,
bot_to_mh_tx: mpsc::Sender<market_handler::InternalPacket>,
mh_to_bot_rx: broadcast::Receiver<market_handler::InternalPacket>,
bot_to_mh_tx: mpsc::Sender<InternalPacket>,
mh_to_bot_rx: broadcast::Receiver<InternalPacket>,
}

impl ArbitrageBot {
pub fn new(
id: String,
market: manifold_types::FullMarket,
bot_to_mh_tx: mpsc::Sender<market_handler::InternalPacket>,
mh_to_bot_rx: broadcast::Receiver<market_handler::InternalPacket>,
bot_to_mh_tx: mpsc::Sender<InternalPacket>,
mh_to_bot_rx: broadcast::Receiver<InternalPacket>,
) -> Self {
let mut id_to_answers = HashMap::new();

Expand Down Expand Up @@ -92,10 +93,10 @@ impl ArbitrageBot {
fn botbet_to_internal_coms_packet(
&self,
bet: manifold_types::BotBet,
) -> market_handler::InternalPacket {
market_handler::InternalPacket::new(
) -> InternalPacket {
InternalPacket::new(
self.get_id(),
market_handler::Method::Post,
Method::Post,
"bet".to_string(),
vec![],
Some(serde_json::json!({
Expand Down
21 changes: 11 additions & 10 deletions src/bots/ewma_bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use tokio::sync::{broadcast, mpsc};

use crate::bots::Bot;
use crate::manifold_types;
use crate::market_handler;

use crate::coms::{Method,InternalPacket};

struct EWMA {
s0: f64,
Expand All @@ -30,8 +31,8 @@ pub struct EWMABot {
id: String,
market: manifold_types::FullMarket,

bot_to_mh_tx: mpsc::Sender<market_handler::InternalPacket>,
mh_to_bot_rx: broadcast::Receiver<market_handler::InternalPacket>,
bot_to_mh_tx: mpsc::Sender<InternalPacket>,
mh_to_bot_rx: broadcast::Receiver<InternalPacket>,

ewma_1: EWMA,
ewma_2: EWMA,
Expand All @@ -45,8 +46,8 @@ impl EWMABot {
pub fn new(
id: String,
market: manifold_types::FullMarket,
bot_to_mh_tx: mpsc::Sender<market_handler::InternalPacket>,
mh_to_bot_rx: broadcast::Receiver<market_handler::InternalPacket>,
bot_to_mh_tx: mpsc::Sender<InternalPacket>,
mh_to_bot_rx: broadcast::Receiver<InternalPacket>,
alpha_1: f64,
alpha_2: f64,
) -> Self {
Expand All @@ -65,7 +66,7 @@ impl EWMABot {
}
}

async fn make_trades(&mut self, trades: Vec<market_handler::InternalPacket>) {
async fn make_trades(&mut self, trades: Vec<InternalPacket>) {
for trade in trades {
self.bot_to_mh_tx.send(trade).await.unwrap();

Expand Down Expand Up @@ -124,9 +125,9 @@ impl Bot for EWMABot {

match self.update_prob(&bet) {
manifold_types::Side::Buy => {
let buy_bet = market_handler::InternalPacket::new(
let buy_bet = InternalPacket::new(
self.get_id(),
market_handler::Method::Post,
Method::Post,
"bet".to_string(),
vec![],
Some(serde_json::json!({
Expand All @@ -139,9 +140,9 @@ impl Bot for EWMABot {
self.make_trades(vec![buy_bet]).await;
}
manifold_types::Side::Sell => {
let sell_bet = market_handler::InternalPacket::new(
let sell_bet = InternalPacket::new(
self.get_id(),
market_handler::Method::Post,
Method::Post,
format!("market/{}/sell", bet.contract_id),
vec![],
Some(serde_json::json!({
Expand Down
202 changes: 202 additions & 0 deletions src/coms.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
use std::env;




use log::{debug, error};
use serde::{Deserialize, Serialize};
use serde_json::Value;

use tokio::time::{Duration};


use crate::rate_limiter;


use crate::errors;

const MANIFOLD_API_URL: &str = "https://api.manifold.markets/v0";

fn get_env_key(key: &str) -> Result<String, String> {
match env::var(key) {
Ok(key) => Ok(format!("Key {key}")),
Err(e) => Err(format!("couldn't find Manifold API key: {e}")),
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Method {
Get,
Post,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct InternalPacket {
pub bot_id: String,
method: Method,
endpoint: String,
query_params: Vec<(String, String)>,
data: Option<Value>,
response: Option<String>,
}

impl InternalPacket {
pub fn new(
bot_id: String,
method: Method,
endpoint: String,
query_params: Vec<(String, String)>,
data: Option<Value>,
) -> Self {
Self {
bot_id,
method,
endpoint,
query_params,
data,
response: None,
}
}

pub fn response_from_existing(packet: &InternalPacket, response: String) -> Self {
Self {
bot_id: packet.bot_id.clone(),
method: packet.method.clone(),
endpoint: packet.endpoint.clone(),
query_params: packet.query_params.clone(),
data: packet.data.clone(),
response: Some(response),
}
}
}

pub async fn get_endpoint(
endpoint: String,
query_params: &[(String, String)],
) -> Result<reqwest::Response, reqwest::Error> {
debug!(
"get endpoint; endpoint '{endpoint}'; query params '{:?}'",
query_params,
);

let client = reqwest::Client::new();

let req = client
.get(format!("{MANIFOLD_API_URL}/{endpoint}"))
.query(&query_params)
.header("Authorization", get_env_key("MANIFOLD_KEY").unwrap());

let resp = req.send().await?;

if resp.status().is_success() {
Ok(resp)
} else {
error!("api error (bad status code) {resp:?} {query_params:?}");
Err(resp.error_for_status().unwrap_err())
}
}

pub async fn post_endpoint(
endpoint: String,
query_params: &[(String, String)],
data: Option<Value>,
) -> Result<reqwest::Response, reqwest::Error> {
debug!(
"post endpoint; endpoint '{endpoint}'; query params '{:?}'; data '{:?}'",
query_params, data
);

let client = reqwest::Client::new();
let req = client
.post(format!("{MANIFOLD_API_URL}/{endpoint}"))
.query(&query_params)
.header("Authorization", get_env_key("MANIFOLD_KEY").unwrap());

let data_clone = data.clone();

let resp = if let Some(data) = data {
let reqq = req.json(&data);
reqq.send().await?
} else {
req.send().await?
};

if resp.status().is_success() {
Ok(resp)
} else {
error!("api error (bad status code) {resp:?} {query_params:?} {data_clone:?}");
Err(resp.error_for_status().unwrap_err())
}
}

pub async fn response_into<T: serde::de::DeserializeOwned>(
resp: reqwest::Response,
) -> Result<T, errors::ReqwestResponseParsing> {
let body = resp.text().await?;
let from_json = serde_json::from_str::<T>(&body);
match from_json {
Ok(t) => Ok(t),
Err(e) => {
error!("Couldn't parse response {body}");
Err(errors::ReqwestResponseParsing::SerdeError(e))
}
}
}


pub async fn rate_limited_post_endpoint(
mut write_rate_limiter: rate_limiter::RateLimiter,
endpoint: String,
query_params: &[(String, String)],
data: Option<Value>,
) -> Result<reqwest::Response, reqwest::Error> {
if write_rate_limiter.block_for_average_pace_then_commit(Duration::from_secs(60)) {
post_endpoint(endpoint, query_params, data).await
} else {
panic!(
"rate limiter timed out; this shouldn't be possible, \
most likely rate limit is set wrong"
);
}
}

pub async fn rate_limited_get_endpoint(
mut read_rate_limiter: rate_limiter::RateLimiter,
endpoint: String,
query_params: &[(String, String)],
) -> Result<reqwest::Response, reqwest::Error> {
if read_rate_limiter.block_for_average_pace_then_commit(Duration::from_secs(1)) {
get_endpoint(endpoint, query_params).await
} else {
panic!(
"rate limiter timed out; this shouldn't be possible, \
most likely rate limit is set wrong"
);
}
}

pub async fn send_internal_packet(
read_rate_limiter: &rate_limiter::RateLimiter,
write_rate_limiter: &rate_limiter::RateLimiter,
internal_coms_packet: &InternalPacket,
) -> Result<reqwest::Response, reqwest::Error> {
match internal_coms_packet.method {
Method::Get => {
rate_limited_get_endpoint(
read_rate_limiter.clone(),
internal_coms_packet.endpoint.clone(),
&internal_coms_packet.query_params,
)
.await
}
Method::Post => {
rate_limited_post_endpoint(
write_rate_limiter.clone(),
internal_coms_packet.endpoint.clone(),
&internal_coms_packet.query_params,
internal_coms_packet.data.clone(),
)
.await
}
}
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod errors;
mod manifold_types;
mod market_handler;
mod rate_limiter;
mod utils;
mod coms;

use crate::bots::arb_bot::ArbitrageBot;
use crate::bots::ewma_bot::EWMABot;
Expand Down
Loading

0 comments on commit 1412d44

Please sign in to comment.