Skip to content

Commit 7452dba

Browse files
Upgrade to tokio 1.0
1 parent 85094e6 commit 7452dba

File tree

8 files changed

+266
-207
lines changed

8 files changed

+266
-207
lines changed

Cargo.lock

Lines changed: 242 additions & 121 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bobtimus/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@ hmac = "0.10"
1616
http-api-problem = { version = "0.15", features = [ "with_warp" ] }
1717
log = "0.4"
1818
mime_guess = "2.0.3"
19-
reqwest = "0.10"
19+
reqwest = "0.11"
2020
rust-embed = "5.7.0"
2121
rust_decimal = "1.8"
2222
serde = { version = "1", features = [ "derive" ] }
2323
serde_json = "1"
2424
sha2 = "0.9"
2525
structopt = "0.3"
2626
swap = { path = "../swap" }
27-
tokio = { version = "0.2", features = [ "macros" ] }
28-
tokio-tungstenite = { version = "0.11", features = [ "tls" ] }
29-
warp = "0.2.5"
27+
tokio = { version = "1", features = [ "macros", "rt-multi-thread" ] }
28+
tokio-tungstenite = { version = "0.13", features = [ "tls" ] }
29+
warp = { version = "0.2.5", default-features = false }
3030

3131
[dev-dependencies]
3232
testcontainers = "0.11"

bobtimus/src/fixed_rate.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
use crate::{LatestRate, LiquidUsdt, Rate};
2-
use anyhow::Result;
3-
use async_trait::async_trait;
4-
use futures::Stream;
52
use std::{convert::TryFrom, time::Duration};
63
use tokio::{
74
sync::watch::{self, Receiver},
8-
time::delay_for,
5+
time::sleep,
96
};
107

118
#[derive(Clone)]
@@ -18,16 +15,16 @@ impl Service {
1815

1916
tokio::spawn(async move {
2017
loop {
21-
let _ = tx.broadcast(data);
18+
let _ = tx.send(data);
2219

23-
delay_for(Duration::from_secs(5)).await;
20+
sleep(Duration::from_secs(5)).await;
2421
}
2522
});
2623

2724
Self(rx)
2825
}
2926

30-
pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
27+
pub fn subscribe(&self) -> Receiver<Rate> {
3128
self.0.clone()
3229
}
3330
}
@@ -38,10 +35,9 @@ impl Default for Service {
3835
}
3936
}
4037

41-
#[async_trait]
4238
impl LatestRate for Service {
43-
async fn latest_rate(&mut self) -> Result<Rate> {
44-
Ok(fixed_rate())
39+
fn latest_rate(&mut self) -> Rate {
40+
fixed_rate()
4541
}
4642
}
4743

bobtimus/src/kraken.rs

Lines changed: 7 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::{LatestRate, LiquidUsdt, Rate};
22
use anyhow::{anyhow, bail, Result};
3-
use async_trait::async_trait;
4-
use futures::{Future, SinkExt, Stream, StreamExt};
3+
use futures::{SinkExt, StreamExt};
54
use reqwest::Url;
65
use serde::{Deserialize, Serialize};
76
use serde_json::Value;
@@ -22,38 +21,17 @@ const SUBSCRIBE_XBT_USD_TICKER_PAYLOAD: &str = r#"
2221
#[derive(Clone)]
2322
pub struct RateService {
2423
receiver: Receiver<Rate>,
25-
latest_rate: Rate,
2624
}
2725

28-
impl Future for RateService {
29-
type Output = Rate;
30-
31-
fn poll(
32-
mut self: std::pin::Pin<&mut Self>,
33-
cx: &mut std::task::Context<'_>,
34-
) -> std::task::Poll<Self::Output> {
35-
match self.receiver.poll_next_unpin(cx) {
36-
std::task::Poll::Ready(Some(rate)) => {
37-
self.latest_rate = rate;
38-
self.poll(cx)
39-
}
40-
std::task::Poll::Ready(None) | std::task::Poll::Pending => {
41-
std::task::Poll::from(self.latest_rate)
42-
}
43-
}
44-
}
45-
}
46-
47-
#[async_trait]
4826
impl LatestRate for RateService {
49-
async fn latest_rate(&mut self) -> anyhow::Result<Rate> {
50-
Ok(self.await)
27+
fn latest_rate(&mut self) -> Rate {
28+
*self.receiver.borrow()
5129
}
5230
}
5331

5432
impl RateService {
5533
pub async fn new() -> Result<Self> {
56-
let (tx, mut rx) = watch::channel(Rate::ZERO);
34+
let (tx, rx) = watch::channel(Rate::ZERO);
5735

5836
let (ws, _response) =
5937
tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?;
@@ -84,24 +62,16 @@ impl RateService {
8462
}
8563
};
8664

87-
let _ = tx.broadcast(rate);
65+
let _ = tx.send(rate);
8866
}
8967
});
9068

9169
write.send(SUBSCRIBE_XBT_USD_TICKER_PAYLOAD.into()).await?;
9270

93-
let latest_rate = rx
94-
.next()
95-
.await
96-
.ok_or_else(|| anyhow!("latest rate stream has ended"))?;
97-
98-
Ok(Self {
99-
receiver: rx,
100-
latest_rate,
101-
})
71+
Ok(Self { receiver: rx })
10272
}
10373

104-
pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
74+
pub fn subscribe(&self) -> Receiver<Rate> {
10575
self.receiver.clone()
10676
}
10777
}
@@ -172,26 +142,4 @@ mod tests {
172142

173143
let _ = serde_json::from_str::<TickerUpdate>(sample_response).unwrap();
174144
}
175-
176-
#[tokio::test]
177-
async fn latest_rate_does_not_wait_for_next_value() {
178-
let (write, read) = watch::channel(Rate::ZERO);
179-
180-
let latest_rate = Rate {
181-
ask: LiquidUsdt::from_str_in_dollar("20000.0").unwrap(),
182-
bid: LiquidUsdt::from_str_in_dollar("19000.0").unwrap(),
183-
};
184-
let _ = write.broadcast(latest_rate).unwrap();
185-
186-
let mut service = RateService {
187-
receiver: read,
188-
latest_rate: Rate::ZERO,
189-
};
190-
191-
let rate = service.latest_rate().await.unwrap();
192-
assert_eq!(rate, latest_rate);
193-
194-
let rate = service.latest_rate().await.unwrap();
195-
assert_eq!(rate, latest_rate);
196-
}
197145
}

bobtimus/src/lib.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use anyhow::{Context, Result};
2-
use async_trait::async_trait;
32
use elements_fun::{
43
bitcoin::{
54
secp256k1::{All, Secp256k1},
@@ -55,11 +54,7 @@ impl<R, RS> Bobtimus<R, RS> {
5554
R: RngCore + CryptoRng,
5655
RS: LatestRate,
5756
{
58-
let latest_rate = self
59-
.rate_service
60-
.latest_rate()
61-
.await
62-
.context("failed to get latest rate")?;
57+
let latest_rate = self.rate_service.latest_rate();
6358
let usdt_amount = latest_rate.buy_quote(payload.btc_amount)?;
6459

6560
let bob_inputs = self
@@ -197,9 +192,8 @@ impl<R, RS> Bobtimus<R, RS> {
197192
}
198193
}
199194

200-
#[async_trait]
201195
pub trait LatestRate {
202-
async fn latest_rate(&mut self) -> Result<Rate>;
196+
fn latest_rate(&mut self) -> Rate;
203197
}
204198

205199
#[cfg(test)]

elements-harness/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ elements-fun = { path = "../elements-fun", features = [ "serde" ] }
1111
futures = "0.3.5"
1212
hex = "0.4.2"
1313
hmac = "0.10"
14-
jsonrpc_client = { git = "https://github.com/thomaseizinger/rust-jsonrpc-client", branch = "master", features = [ "reqwest" ] }
14+
jsonrpc_client = { version = "0.5", features = [ "reqwest" ] }
1515
log = "0.4"
1616
rand = "0.7"
17-
reqwest = { version = "0.10" }
17+
reqwest = "0.11"
1818
serde = "1.0"
1919
serde_json = "1.0"
2020
sha2 = "0.9"
2121
testcontainers = "0.11"
2222
thiserror = "1.0"
23-
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
23+
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }
2424
tracing = "0.1"
2525
url = "2"
2626

swap/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ thiserror = "1.0.23"
1515
[dev-dependencies]
1616
elements-harness = { path = "../elements-harness" }
1717
testcontainers = "0.11"
18-
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
18+
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }

waves/wallet/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ js-sys = "0.3"
2626
log = "0.4"
2727
rand = { version = "0.6", features = [ "wasm-bindgen" ] }
2828
rand_core = { version = "0.5", features = [ "std" ] }
29-
reqwest = { version = "0.10", default-features = false, features = [ "rustls", "json" ] }
29+
reqwest = { version = "0.11", default-features = false, features = [ "rustls", "json" ] }
3030
rust_decimal = "1"
3131
scrypt = { version = "0.5" }
3232
serde = { version = "1", features = [ "derive" ] }

0 commit comments

Comments
 (0)