Skip to content

Commit 6b1ada6

Browse files
committed
Drop potentially dangerous sync_lock Condvar and use pub/sub model
Using a `Condvar` could be potentially dangerous in async contexts as `wait`ing on it might block the current thread potentially hosting more than one task. Here, we drop the `Condvar` and adopt a pub/sub scheme instead, similar to the one we already implemented in `ConnectionManager`.
1 parent 8e59243 commit 6b1ada6

File tree

1 file changed

+108
-54
lines changed

1 file changed

+108
-54
lines changed

src/wallet.rs

Lines changed: 108 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,16 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
3333
use bitcoin::secp256k1::{PublicKey, Scalar, Secp256k1, SecretKey, Signing};
3434
use bitcoin::{ScriptBuf, Transaction, TxOut, Txid};
3535

36-
use std::ops::Deref;
37-
use std::sync::{Arc, Condvar, Mutex, RwLock};
36+
use std::mem;
37+
use std::ops::{Deref, DerefMut};
38+
use std::sync::{Arc, Mutex, RwLock};
3839
use std::time::Duration;
3940

41+
enum WalletSyncStatus {
42+
Completed,
43+
InProgress { subscribers: Vec<tokio::sync::oneshot::Sender<Result<(), Error>>> },
44+
}
45+
4046
pub struct Wallet<D, B: Deref, E: Deref, L: Deref>
4147
where
4248
D: BatchDatabase,
@@ -51,7 +57,8 @@ where
5157
// A cache storing the most recently retrieved fee rate estimations.
5258
broadcaster: B,
5359
fee_estimator: E,
54-
sync_lock: (Mutex<()>, Condvar),
60+
// A Mutex holding the current sync status.
61+
sync_status: Mutex<WalletSyncStatus>,
5562
// TODO: Drop this workaround after BDK 1.0 upgrade.
5663
balance_cache: RwLock<Balance>,
5764
logger: L,
@@ -76,69 +83,67 @@ where
7683
});
7784

7885
let inner = Mutex::new(wallet);
79-
let sync_lock = (Mutex::new(()), Condvar::new());
86+
let sync_status = Mutex::new(WalletSyncStatus::Completed);
8087
let balance_cache = RwLock::new(start_balance);
81-
Self { blockchain, inner, broadcaster, fee_estimator, sync_lock, balance_cache, logger }
88+
Self { blockchain, inner, broadcaster, fee_estimator, sync_status, balance_cache, logger }
8289
}
8390

8491
pub(crate) async fn sync(&self) -> Result<(), Error> {
85-
let (lock, cvar) = &self.sync_lock;
86-
87-
let guard = match lock.try_lock() {
88-
Ok(guard) => guard,
89-
Err(_) => {
90-
log_info!(self.logger, "Sync in progress, skipping.");
91-
let guard = cvar.wait(lock.lock().unwrap());
92-
drop(guard);
93-
cvar.notify_all();
94-
return Ok(());
95-
},
96-
};
92+
if let Some(sync_receiver) = self.register_or_subscribe_pending_sync() {
93+
log_info!(self.logger, "Sync in progress, skipping.");
94+
return sync_receiver.await.map_err(|e| {
95+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
96+
log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e);
97+
Error::WalletOperationFailed
98+
})?;
99+
}
97100

98-
let sync_options = SyncOptions { progress: None };
99-
let wallet_lock = self.inner.lock().unwrap();
100-
let res = match wallet_lock.sync(&self.blockchain, sync_options).await {
101-
Ok(()) => {
102-
// TODO: Drop this workaround after BDK 1.0 upgrade.
103-
// Update balance cache after syncing.
104-
if let Ok(balance) = wallet_lock.get_balance() {
105-
*self.balance_cache.write().unwrap() = balance;
106-
}
107-
Ok(())
108-
},
109-
Err(e) => match e {
110-
bdk::Error::Esplora(ref be) => match **be {
111-
bdk::blockchain::esplora::EsploraError::Reqwest(_) => {
112-
// Drop lock, sleep for a second, retry.
113-
drop(wallet_lock);
114-
tokio::time::sleep(Duration::from_secs(1)).await;
115-
log_error!(
116-
self.logger,
117-
"Sync failed due to HTTP connection error, retrying: {}",
118-
e
119-
);
120-
let sync_options = SyncOptions { progress: None };
121-
self.inner
122-
.lock()
123-
.unwrap()
124-
.sync(&self.blockchain, sync_options)
125-
.await
126-
.map_err(|e| From::from(e))
101+
let res = {
102+
let sync_options = SyncOptions { progress: None };
103+
let wallet_lock = self.inner.lock().unwrap();
104+
match wallet_lock.sync(&self.blockchain, sync_options).await {
105+
Ok(()) => {
106+
// TODO: Drop this workaround after BDK 1.0 upgrade.
107+
// Update balance cache after syncing.
108+
if let Ok(balance) = wallet_lock.get_balance() {
109+
*self.balance_cache.write().unwrap() = balance;
110+
}
111+
Ok(())
112+
},
113+
Err(e) => match e {
114+
bdk::Error::Esplora(ref be) => match **be {
115+
bdk::blockchain::esplora::EsploraError::Reqwest(_) => {
116+
// Drop lock, sleep for a second, retry.
117+
drop(wallet_lock);
118+
tokio::time::sleep(Duration::from_secs(1)).await;
119+
log_error!(
120+
self.logger,
121+
"Sync failed due to HTTP connection error, retrying: {}",
122+
e
123+
);
124+
let sync_options = SyncOptions { progress: None };
125+
self.inner
126+
.lock()
127+
.unwrap()
128+
.sync(&self.blockchain, sync_options)
129+
.await
130+
.map_err(|e| From::from(e))
131+
},
132+
_ => {
133+
log_error!(self.logger, "Sync failed due to Esplora error: {}", e);
134+
Err(From::from(e))
135+
},
127136
},
128137
_ => {
129-
log_error!(self.logger, "Sync failed due to Esplora error: {}", e);
138+
log_error!(self.logger, "Wallet sync error: {}", e);
130139
Err(From::from(e))
131140
},
132141
},
133-
_ => {
134-
log_error!(self.logger, "Wallet sync error: {}", e);
135-
Err(From::from(e))
136-
},
137-
},
142+
}
138143
};
139144

140-
drop(guard);
141-
cvar.notify_all();
145+
self.propagate_result_to_subscribers(res);
146+
142147
res
143148
}
144149

@@ -291,6 +296,55 @@ where
291296

292297
Ok(txid)
293298
}
299+
300+
fn register_or_subscribe_pending_sync(
301+
&self,
302+
) -> Option<tokio::sync::oneshot::Receiver<Result<(), Error>>> {
303+
let mut sync_status_lock = self.sync_status.lock().unwrap();
304+
match sync_status_lock.deref_mut() {
305+
WalletSyncStatus::Completed => {
306+
// We're first to register for a sync.
307+
*sync_status_lock = WalletSyncStatus::InProgress { subscribers: Vec::new() };
308+
None
309+
},
310+
WalletSyncStatus::InProgress { subscribers } => {
311+
// A sync is in-progress, we subscribe.
312+
let (tx, rx) = tokio::sync::oneshot::channel();
313+
subscribers.push(tx);
314+
Some(rx)
315+
},
316+
}
317+
}
318+
319+
fn propagate_result_to_subscribers(&self, res: Result<(), Error>) {
320+
// Send the notification to any other tasks that might be waiting on it by now.
321+
let mut waiting_subscribers = Vec::new();
322+
{
323+
let mut sync_status_lock = self.sync_status.lock().unwrap();
324+
match sync_status_lock.deref_mut() {
325+
WalletSyncStatus::Completed => {
326+
// No sync in-progress, do nothing.
327+
return;
328+
},
329+
WalletSyncStatus::InProgress { subscribers } => {
330+
// A sync is in-progress, we notify subscribers.
331+
mem::swap(&mut waiting_subscribers, subscribers);
332+
*sync_status_lock = WalletSyncStatus::Completed;
333+
},
334+
}
335+
}
336+
337+
for sender in waiting_subscribers {
338+
sender.send(res).unwrap_or_else(|e| {
339+
debug_assert!(false, "Failed to send wallet sync result to subscribers: {:?}", e);
340+
log_error!(
341+
self.logger,
342+
"Failed to send wallet sync result to subscribers: {:?}",
343+
e
344+
);
345+
});
346+
}
347+
}
294348
}
295349

296350
impl<D, B: Deref, E: Deref, L: Deref> WalletSource for Wallet<D, B, E, L>

0 commit comments

Comments
 (0)