Skip to content

Commit 77fc241

Browse files
Upgrade to tokio 1.0
1 parent 85094e6 commit 77fc241

File tree

9 files changed

+292
-367
lines changed

9 files changed

+292
-367
lines changed

Cargo.lock

Lines changed: 190 additions & 264 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bobtimus/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,20 @@ env_logger = "0.8"
1313
futures = { version = "0.3", default-features = false }
1414
hex = "0.4"
1515
hmac = "0.10"
16-
http-api-problem = { version = "0.15", features = [ "with_warp" ] }
16+
http-api-problem = { version = "0.21", features = [ "warp" ] }
1717
log = "0.4"
1818
mime_guess = "2.0.3"
19-
reqwest = "0.10"
19+
reqwest = "0.11"
2020
rust-embed = "5.7.0"
2121
rust_decimal = "1.8"
2222
serde = { version = "1", features = [ "derive" ] }
2323
serde_json = "1"
2424
sha2 = "0.9"
2525
structopt = "0.3"
2626
swap = { path = "../swap" }
27-
tokio = { version = "0.2", features = [ "macros" ] }
28-
tokio-tungstenite = { version = "0.11", features = [ "tls" ] }
29-
warp = "0.2.5"
27+
tokio = { version = "1", features = [ "macros", "rt-multi-thread" ] }
28+
tokio-tungstenite = { version = "0.13", features = [ "tls" ] }
29+
warp = { version = "0.3", default-features = false }
3030

3131
[dev-dependencies]
3232
testcontainers = "0.11"

bobtimus/src/fixed_rate.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
1-
use crate::{LatestRate, LiquidUsdt, Rate};
2-
use anyhow::Result;
3-
use async_trait::async_trait;
4-
use futures::Stream;
1+
use crate::{LatestRate, LiquidUsdt, Rate, RateSubscription};
52
use std::{convert::TryFrom, time::Duration};
63
use tokio::{
74
sync::watch::{self, Receiver},
8-
time::delay_for,
5+
time::sleep,
96
};
107

118
#[derive(Clone)]
@@ -18,17 +15,17 @@ impl Service {
1815

1916
tokio::spawn(async move {
2017
loop {
21-
let _ = tx.broadcast(data);
18+
let _ = tx.send(data);
2219

23-
delay_for(Duration::from_secs(5)).await;
20+
sleep(Duration::from_secs(5)).await;
2421
}
2522
});
2623

2724
Self(rx)
2825
}
2926

30-
pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
31-
self.0.clone()
27+
pub fn subscribe(&self) -> RateSubscription {
28+
RateSubscription::from(self.0.clone())
3229
}
3330
}
3431

@@ -38,10 +35,9 @@ impl Default for Service {
3835
}
3936
}
4037

41-
#[async_trait]
4238
impl LatestRate for Service {
43-
async fn latest_rate(&mut self) -> Result<Rate> {
44-
Ok(fixed_rate())
39+
fn latest_rate(&mut self) -> Rate {
40+
fixed_rate()
4541
}
4642
}
4743

bobtimus/src/http.rs

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use crate::{problem, Bobtimus, CreateSwapPayload, LatestRate, Rate};
1+
use crate::{problem, Bobtimus, CreateSwapPayload, LatestRate, RateSubscription};
2+
use anyhow::Context;
23
use elements_fun::{
34
encode::serialize_hex,
45
secp256k1::rand::{CryptoRng, RngCore},
56
};
6-
use futures::{Stream, StreamExt};
7+
use futures::{StreamExt, TryStreamExt};
78
use rust_embed::RustEmbed;
8-
use std::{convert::Infallible, sync::Arc};
9+
use std::{error::Error, fmt, sync::Arc};
910
use tokio::sync::Mutex;
1011
use warp::{
1112
filters::BoxedFilter, http::header::HeaderValue, path::Tail, reply::Response, Filter,
@@ -18,7 +19,7 @@ struct Waves;
1819

1920
pub fn routes<R, RS>(
2021
bobtimus: Arc<Mutex<Bobtimus<R, RS>>>,
21-
latest_rate_subscription: impl Stream<Item = Rate> + Clone + Send + Sync + 'static,
22+
latest_rate_subscription: RateSubscription,
2223
) -> BoxedFilter<(impl Reply,)>
2324
where
2425
R: RngCore + CryptoRng + Clone + Send + Sync + 'static,
@@ -73,13 +74,46 @@ where
7374
.map_err(warp::reject::custom)
7475
}
7576

76-
fn latest_rate<S>(stream: S) -> impl Reply
77-
where
78-
S: Stream<Item = Rate> + Clone + Send + 'static,
79-
{
80-
warp::sse::reply(warp::sse::keep_alive().stream(stream.map(|data| {
81-
Result::<_, Infallible>::Ok((warp::sse::event("rate"), warp::sse::json(data)))
82-
})))
77+
fn latest_rate(subscription: RateSubscription) -> impl Reply {
78+
let stream = subscription
79+
.into_stream()
80+
.map_ok(|data| {
81+
let event = warp::sse::Event::default()
82+
.id("rate")
83+
.json_data(data)
84+
.context("failed to attach json data to sse event")?;
85+
86+
Ok(event)
87+
})
88+
.map(|result| match result {
89+
Ok(Ok(ok)) => Ok(ok),
90+
Ok(Err(e)) => Err(e),
91+
Err(e) => Err(e),
92+
})
93+
.err_into::<RateStreamError>();
94+
95+
warp::sse::reply(warp::sse::keep_alive().stream(stream))
96+
}
97+
98+
#[derive(Debug)]
99+
struct RateStreamError(anyhow::Error);
100+
101+
impl fmt::Display for RateStreamError {
102+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103+
write!(f, "{:#}", self.0)
104+
}
105+
}
106+
107+
impl std::error::Error for RateStreamError {
108+
fn source(&self) -> Option<&(dyn Error + 'static)> {
109+
self.0.source()
110+
}
111+
}
112+
113+
impl From<anyhow::Error> for RateStreamError {
114+
fn from(e: anyhow::Error) -> Self {
115+
RateStreamError(e)
116+
}
83117
}
84118

85119
async fn serve_index() -> Result<impl Reply, Rejection> {

bobtimus/src/kraken.rs

Lines changed: 9 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use crate::{LatestRate, LiquidUsdt, Rate};
1+
use crate::{LatestRate, LiquidUsdt, Rate, RateSubscription};
22
use anyhow::{anyhow, bail, Result};
3-
use async_trait::async_trait;
4-
use futures::{Future, SinkExt, Stream, StreamExt};
3+
use futures::{SinkExt, StreamExt};
54
use reqwest::Url;
65
use serde::{Deserialize, Serialize};
76
use serde_json::Value;
@@ -22,38 +21,17 @@ const SUBSCRIBE_XBT_USD_TICKER_PAYLOAD: &str = r#"
2221
#[derive(Clone)]
2322
pub struct RateService {
2423
receiver: Receiver<Rate>,
25-
latest_rate: Rate,
2624
}
2725

28-
impl Future for RateService {
29-
type Output = Rate;
30-
31-
fn poll(
32-
mut self: std::pin::Pin<&mut Self>,
33-
cx: &mut std::task::Context<'_>,
34-
) -> std::task::Poll<Self::Output> {
35-
match self.receiver.poll_next_unpin(cx) {
36-
std::task::Poll::Ready(Some(rate)) => {
37-
self.latest_rate = rate;
38-
self.poll(cx)
39-
}
40-
std::task::Poll::Ready(None) | std::task::Poll::Pending => {
41-
std::task::Poll::from(self.latest_rate)
42-
}
43-
}
44-
}
45-
}
46-
47-
#[async_trait]
4826
impl LatestRate for RateService {
49-
async fn latest_rate(&mut self) -> anyhow::Result<Rate> {
50-
Ok(self.await)
27+
fn latest_rate(&mut self) -> Rate {
28+
*self.receiver.borrow()
5129
}
5230
}
5331

5432
impl RateService {
5533
pub async fn new() -> Result<Self> {
56-
let (tx, mut rx) = watch::channel(Rate::ZERO);
34+
let (tx, rx) = watch::channel(Rate::ZERO);
5735

5836
let (ws, _response) =
5937
tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?;
@@ -84,25 +62,17 @@ impl RateService {
8462
}
8563
};
8664

87-
let _ = tx.broadcast(rate);
65+
let _ = tx.send(rate);
8866
}
8967
});
9068

9169
write.send(SUBSCRIBE_XBT_USD_TICKER_PAYLOAD.into()).await?;
9270

93-
let latest_rate = rx
94-
.next()
95-
.await
96-
.ok_or_else(|| anyhow!("latest rate stream has ended"))?;
97-
98-
Ok(Self {
99-
receiver: rx,
100-
latest_rate,
101-
})
71+
Ok(Self { receiver: rx })
10272
}
10373

104-
pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
105-
self.receiver.clone()
74+
pub fn subscribe(&self) -> RateSubscription {
75+
RateSubscription::from(self.receiver.clone())
10676
}
10777
}
10878

@@ -172,26 +142,4 @@ mod tests {
172142

173143
let _ = serde_json::from_str::<TickerUpdate>(sample_response).unwrap();
174144
}
175-
176-
#[tokio::test]
177-
async fn latest_rate_does_not_wait_for_next_value() {
178-
let (write, read) = watch::channel(Rate::ZERO);
179-
180-
let latest_rate = Rate {
181-
ask: LiquidUsdt::from_str_in_dollar("20000.0").unwrap(),
182-
bid: LiquidUsdt::from_str_in_dollar("19000.0").unwrap(),
183-
};
184-
let _ = write.broadcast(latest_rate).unwrap();
185-
186-
let mut service = RateService {
187-
receiver: read,
188-
latest_rate: Rate::ZERO,
189-
};
190-
191-
let rate = service.latest_rate().await.unwrap();
192-
assert_eq!(rate, latest_rate);
193-
194-
let rate = service.latest_rate().await.unwrap();
195-
assert_eq!(rate, latest_rate);
196-
}
197145
}

bobtimus/src/lib.rs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use anyhow::{Context, Result};
2-
use async_trait::async_trait;
32
use elements_fun::{
43
bitcoin::{
54
secp256k1::{All, Secp256k1},
@@ -12,8 +11,9 @@ use elements_fun::{
1211
Address, AssetId, OutPoint, Transaction, TxIn,
1312
};
1413
use elements_harness::{elementd_rpc::ElementsRpc, Client as ElementsdClient};
15-
use futures::{stream::FuturesUnordered, TryStreamExt};
14+
use futures::{stream, stream::FuturesUnordered, Stream, TryStreamExt};
1615
use serde::{Deserialize, Serialize};
16+
use tokio::sync::watch::Receiver;
1717

1818
mod amounts;
1919

@@ -55,11 +55,7 @@ impl<R, RS> Bobtimus<R, RS> {
5555
R: RngCore + CryptoRng,
5656
RS: LatestRate,
5757
{
58-
let latest_rate = self
59-
.rate_service
60-
.latest_rate()
61-
.await
62-
.context("failed to get latest rate")?;
58+
let latest_rate = self.rate_service.latest_rate();
6359
let usdt_amount = latest_rate.buy_quote(payload.btc_amount)?;
6460

6561
let bob_inputs = self
@@ -197,9 +193,34 @@ impl<R, RS> Bobtimus<R, RS> {
197193
}
198194
}
199195

200-
#[async_trait]
201196
pub trait LatestRate {
202-
async fn latest_rate(&mut self) -> Result<Rate>;
197+
fn latest_rate(&mut self) -> Rate;
198+
}
199+
200+
#[derive(Clone)]
201+
pub struct RateSubscription {
202+
receiver: Receiver<Rate>,
203+
}
204+
205+
impl From<Receiver<Rate>> for RateSubscription {
206+
fn from(receiver: Receiver<Rate>) -> Self {
207+
Self { receiver }
208+
}
209+
}
210+
211+
impl RateSubscription {
212+
pub fn into_stream(self) -> impl Stream<Item = Result<Rate>> {
213+
stream::try_unfold(self.receiver, |mut receiver| async move {
214+
receiver
215+
.changed()
216+
.await
217+
.context("failed to receive latest rate update")?;
218+
219+
let latest_rate = *receiver.borrow();
220+
221+
Ok(Some((latest_rate, receiver)))
222+
})
223+
}
203224
}
204225

205226
#[cfg(test)]

elements-harness/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ elements-fun = { path = "../elements-fun", features = [ "serde" ] }
1111
futures = "0.3.5"
1212
hex = "0.4.2"
1313
hmac = "0.10"
14-
jsonrpc_client = { git = "https://github.com/thomaseizinger/rust-jsonrpc-client", branch = "master", features = [ "reqwest" ] }
14+
jsonrpc_client = { version = "0.5", features = [ "reqwest" ] }
1515
log = "0.4"
1616
rand = "0.7"
17-
reqwest = { version = "0.10" }
17+
reqwest = "0.11"
1818
serde = "1.0"
1919
serde_json = "1.0"
2020
sha2 = "0.9"
2121
testcontainers = "0.11"
2222
thiserror = "1.0"
23-
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
23+
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }
2424
tracing = "0.1"
2525
url = "2"
2626

swap/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@ thiserror = "1.0.23"
1515
[dev-dependencies]
1616
elements-harness = { path = "../elements-harness" }
1717
testcontainers = "0.11"
18-
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
18+
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }

waves/wallet/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ js-sys = "0.3"
2626
log = "0.4"
2727
rand = { version = "0.6", features = [ "wasm-bindgen" ] }
2828
rand_core = { version = "0.5", features = [ "std" ] }
29-
reqwest = { version = "0.10", default-features = false, features = [ "rustls", "json" ] }
29+
reqwest = { version = "0.11", default-features = false, features = [ "rustls", "json" ] }
3030
rust_decimal = "1"
3131
scrypt = { version = "0.5" }
3232
serde = { version = "1", features = [ "derive" ] }

0 commit comments

Comments
 (0)