Skip to content

Commit 593bd79

Browse files
refactor: migrate Mutex to tokio::sync::Mutex for async compatibility
- Updated `received_filter_heights` and `processing_thread_requests` to use `tokio::sync::Mutex` instead of `std::sync::Mutex` for better async support. - Adjusted related locking mechanisms in `FilterSyncManager`, `SpvStats`, and various test files to accommodate the new async locking. - Enhanced test files to reflect the changes in synchronization primitives, ensuring compatibility with async operations.
1 parent ce79761 commit 593bd79

File tree

10 files changed

+60
-64
lines changed

10 files changed

+60
-64
lines changed

dash-spv-ffi/tests/unit/test_type_conversions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ mod tests {
209209
filters_received: u64::MAX,
210210
filter_sync_start_time: None,
211211
last_filter_received_time: None,
212-
received_filter_heights: std::sync::Arc::new(std::sync::Mutex::new(
212+
received_filter_heights: std::sync::Arc::new(tokio::sync::Mutex::new(
213213
std::collections::HashSet::new(),
214214
)),
215215
active_filter_requests: 0,

dash-spv/src/client/status_display.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,9 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
7676
let stats = self.stats.read().await;
7777

7878
// Calculate last synced filter height from received filter heights
79-
let last_synced_filter_height = if let Ok(heights) = stats.received_filter_heights.lock() {
79+
let last_synced_filter_height = {
80+
let heights = stats.received_filter_heights.lock().await;
8081
heights.iter().max().copied()
81-
} else {
82-
None
8382
};
8483

8584
// Calculate the actual header height considering checkpoint sync

dash-spv/src/sync/filters.rs

Lines changed: 39 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ pub struct FilterSyncManager<S: StorageManager, N: NetworkManager> {
7474
/// Blocks currently being downloaded (map for quick lookup)
7575
downloading_blocks: HashMap<BlockHash, u32>,
7676
/// Blocks requested by the filter processing thread
77-
pub processing_thread_requests: std::sync::Arc<std::sync::Mutex<HashSet<BlockHash>>>,
77+
pub processing_thread_requests: std::sync::Arc<tokio::sync::Mutex<HashSet<BlockHash>>>,
7878
/// Track requested filter ranges: (start_height, end_height) -> request_time
7979
requested_filter_ranges: HashMap<(u32, u32), std::time::Instant>,
8080
/// Track individual filter heights that have been received (shared with stats)
81-
received_filter_heights: std::sync::Arc<std::sync::Mutex<HashSet<u32>>>,
81+
received_filter_heights: std::sync::Arc<tokio::sync::Mutex<HashSet<u32>>>,
8282
/// Maximum retries for a filter range
8383
max_filter_retries: u32,
8484
/// Retry attempts per range
@@ -236,7 +236,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
236236
/// Create a new filter sync manager.
237237
pub fn new(
238238
config: &ClientConfig,
239-
received_filter_heights: std::sync::Arc<std::sync::Mutex<std::collections::HashSet<u32>>>,
239+
received_filter_heights: std::sync::Arc<tokio::sync::Mutex<std::collections::HashSet<u32>>>,
240240
) -> Self {
241241
Self {
242242
_config: config.clone(),
@@ -249,7 +249,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
249249
syncing_filters: false,
250250
pending_block_downloads: VecDeque::new(),
251251
downloading_blocks: HashMap::new(),
252-
processing_thread_requests: std::sync::Arc::new(std::sync::Mutex::new(
252+
processing_thread_requests: std::sync::Arc::new(tokio::sync::Mutex::new(
253253
std::collections::HashSet::new(),
254254
)),
255255
requested_filter_ranges: HashMap::new(),
@@ -1698,7 +1698,8 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
16981698
}
16991699

17001700
// Log current state periodically
1701-
if let Ok(guard) = self.received_filter_heights.lock() {
1701+
{
1702+
let guard = self.received_filter_heights.lock().await;
17021703
if guard.len() % 1000 == 0 {
17031704
tracing::info!(
17041705
"Filter sync state: {} filters received, {} active requests, {} pending requests",
@@ -1725,16 +1726,13 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
17251726

17261727
/// Check if a filter request range is complete (all filters received).
17271728
async fn is_request_complete(&self, start: u32, end: u32) -> SyncResult<bool> {
1728-
if let Ok(received_heights) = self.received_filter_heights.lock() {
1729-
for height in start..=end {
1730-
if !received_heights.contains(&height) {
1731-
return Ok(false);
1732-
}
1729+
let received_heights = self.received_filter_heights.lock().await;
1730+
for height in start..=end {
1731+
if !received_heights.contains(&height) {
1732+
return Ok(false);
17331733
}
1734-
Ok(true)
1735-
} else {
1736-
Err(SyncError::Storage("Failed to lock received filter heights".to_string()))
17371734
}
1735+
Ok(true)
17381736
}
17391737

17401738
/// Record that a filter was received at a specific height.
@@ -1748,14 +1746,13 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
17481746
SyncError::Storage(format!("Failed to get header height by hash: {}", e))
17491747
})? {
17501748
// Record in received filter heights
1751-
if let Ok(mut heights) = self.received_filter_heights.lock() {
1752-
heights.insert(height);
1753-
tracing::trace!(
1754-
"📊 Recorded filter received at height {} for block {}",
1755-
height,
1756-
block_hash
1757-
);
1758-
}
1749+
let mut heights = self.received_filter_heights.lock().await;
1750+
heights.insert(height);
1751+
tracing::trace!(
1752+
"📊 Recorded filter received at height {} for block {}",
1753+
height,
1754+
block_hash
1755+
);
17591756
} else {
17601757
tracing::warn!("Could not find height for filter block hash {}", block_hash);
17611758
}
@@ -2429,9 +2426,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
24292426

24302427
// Check if this block was requested by the filter processing thread
24312428
{
2432-
let mut processing_requests = self.processing_thread_requests.lock().map_err(|e| {
2433-
SyncError::InvalidState(format!("processing thread requests lock poisoned: {}", e))
2434-
})?;
2429+
let mut processing_requests = self.processing_thread_requests.lock().await;
24352430
if processing_requests.remove(&block_hash) {
24362431
tracing::info!(
24372432
"📦 Received block {} requested by filter processing thread",
@@ -2607,10 +2602,9 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
26072602

26082603
/// Get the number of filters that have been received.
26092604
pub fn get_received_filter_count(&self) -> u32 {
2610-
if let Ok(heights) = self.received_filter_heights.lock() {
2611-
heights.len() as u32
2612-
} else {
2613-
0
2605+
match self.received_filter_heights.try_lock() {
2606+
Ok(heights) => heights.len() as u32,
2607+
Err(_) => 0,
26142608
}
26152609
}
26162610

@@ -2853,9 +2847,8 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
28532847
stats_lock.filter_sync_start_time = Some(std::time::Instant::now());
28542848
stats_lock.last_filter_received_time = None;
28552849
// Clear the received heights tracking for a fresh start
2856-
if let Ok(mut heights) = stats_lock.received_filter_heights.lock() {
2857-
heights.clear();
2858-
}
2850+
let mut heights = stats_lock.received_filter_heights.lock().await;
2851+
heights.clear();
28592852
tracing::info!(
28602853
"📊 Started new filter sync tracking: {} filters requested",
28612854
total_filters_requested
@@ -2895,14 +2888,13 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
28952888
drop(stats_lock); // Release the stats lock before acquiring the mutex
28962889

28972890
// Now lock the heights and insert
2898-
if let Ok(mut heights) = received_filter_heights.lock() {
2899-
heights.insert(height);
2900-
tracing::trace!(
2901-
"📊 Recorded filter received at height {} for block {}",
2902-
height,
2903-
block_hash
2904-
);
2905-
};
2891+
let mut heights = received_filter_heights.lock().await;
2892+
heights.insert(height);
2893+
tracing::trace!(
2894+
"📊 Recorded filter received at height {} for block {}",
2895+
height,
2896+
block_hash
2897+
);
29062898
} else {
29072899
tracing::warn!("Could not find height for filter block hash {}", block_hash);
29082900
}
@@ -3030,7 +3022,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
30303022

30313023
/// Record receipt of a filter at a specific height.
30323024
pub fn record_filter_received(&mut self, height: u32) {
3033-
if let Ok(mut heights) = self.received_filter_heights.lock() {
3025+
if let Ok(mut heights) = self.received_filter_heights.try_lock() {
30343026
heights.insert(height);
30353027
tracing::trace!("📊 Recorded filter received at height {}", height);
30363028
}
@@ -3040,9 +3032,9 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
30403032
pub fn find_missing_ranges(&self) -> Vec<(u32, u32)> {
30413033
let mut missing_ranges = Vec::new();
30423034

3043-
let heights = match self.received_filter_heights.lock() {
3035+
let heights = match self.received_filter_heights.try_lock() {
30443036
Ok(heights) => heights.clone(),
3045-
Err(_) => return missing_ranges, // Return empty if lock fails
3037+
Err(_) => return missing_ranges,
30463038
};
30473039

30483040
// For each requested range
@@ -3077,9 +3069,9 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
30773069
let now = std::time::Instant::now();
30783070
let mut timed_out = Vec::new();
30793071

3080-
let heights = match self.received_filter_heights.lock() {
3072+
let heights = match self.received_filter_heights.try_lock() {
30813073
Ok(heights) => heights.clone(),
3082-
Err(_) => return timed_out, // Return empty if lock fails
3074+
Err(_) => return timed_out,
30833075
};
30843076

30853077
for ((start, end), request_time) in &self.requested_filter_ranges {
@@ -3104,9 +3096,9 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
31043096

31053097
/// Check if a filter range is complete (all heights received).
31063098
pub fn is_range_complete(&self, start_height: u32, end_height: u32) -> bool {
3107-
let heights = match self.received_filter_heights.lock() {
3099+
let heights = match self.received_filter_heights.try_lock() {
31083100
Ok(heights) => heights,
3109-
Err(_) => return false, // Return false if lock fails
3101+
Err(_) => return false,
31103102
};
31113103

31123104
for height in start_height..=end_height {
@@ -3453,7 +3445,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
34533445
/// Reset filter range tracking (useful for testing or restart scenarios).
34543446
pub fn reset_filter_tracking(&mut self) {
34553447
self.requested_filter_ranges.clear();
3456-
if let Ok(mut heights) = self.received_filter_heights.lock() {
3448+
if let Ok(mut heights) = self.received_filter_heights.try_lock() {
34573449
heights.clear();
34583450
}
34593451
self.filter_retry_counts.clear();

dash-spv/src/sync/sequential/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl<
8585
/// Create a new sequential sync manager
8686
pub fn new(
8787
config: &ClientConfig,
88-
received_filter_heights: std::sync::Arc<std::sync::Mutex<std::collections::HashSet<u32>>>,
88+
received_filter_heights: std::sync::Arc<tokio::sync::Mutex<std::collections::HashSet<u32>>>,
8989
wallet: std::sync::Arc<tokio::sync::RwLock<W>>,
9090
) -> SyncResult<Self> {
9191
// Create reorg config with sensible defaults

dash-spv/src/types.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ pub struct SpvStats {
550550

551551
/// Received filter heights for gap tracking (shared with FilterSyncManager).
552552
#[serde(skip)]
553-
pub received_filter_heights: std::sync::Arc<std::sync::Mutex<std::collections::HashSet<u32>>>,
553+
pub received_filter_heights: std::sync::Arc<tokio::sync::Mutex<std::collections::HashSet<u32>>>,
554554

555555
/// Number of filter requests currently active.
556556
pub active_filter_requests: u32,
@@ -587,7 +587,7 @@ impl Default for SpvStats {
587587
filters_received: 0,
588588
filter_sync_start_time: None,
589589
last_filter_received_time: None,
590-
received_filter_heights: std::sync::Arc::new(std::sync::Mutex::new(
590+
received_filter_heights: std::sync::Arc::new(tokio::sync::Mutex::new(
591591
std::collections::HashSet::new(),
592592
)),
593593
active_filter_requests: 0,

dash-spv/tests/block_download_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
//! Tests for block downloading on filter match functionality.
99
1010
use std::collections::HashSet;
11-
use std::sync::{Arc, Mutex};
11+
use std::sync::Arc;
12+
use tokio::sync::Mutex;
1213
use tokio::sync::RwLock;
1314

1415
use dashcore::{

dash-spv/tests/cfheader_gap_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
//! Tests for CFHeader gap detection and auto-restart functionality.
77
88
use std::collections::HashSet;
9-
use std::sync::{Arc, Mutex};
9+
use std::sync::Arc;
10+
use tokio::sync::Mutex;
1011

1112
use dash_spv::{
1213
client::ClientConfig,

dash-spv/tests/edge_case_filter_sync_test.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
//! Tests for edge case handling in filter header sync, particularly at the tip.
99
1010
use std::collections::HashSet;
11-
use std::sync::{Arc, Mutex};
11+
use std::sync::Arc;
12+
use tokio::sync::Mutex;
1213

1314
use dash_spv::{
1415
client::ClientConfig,
@@ -52,8 +53,8 @@ impl MockNetworkManager {
5253
}
5354
}
5455

55-
fn get_sent_messages(&self) -> Vec<NetworkMessage> {
56-
self.sent_messages.lock().unwrap().clone()
56+
async fn get_sent_messages(&self) -> Vec<NetworkMessage> {
57+
self.sent_messages.lock().await.clone()
5758
}
5859
}
5960

@@ -72,7 +73,7 @@ impl NetworkManager for MockNetworkManager {
7273
}
7374

7475
async fn send_message(&mut self, message: NetworkMessage) -> NetworkResult<()> {
75-
self.sent_messages.lock().unwrap().push(message);
76+
self.sent_messages.lock().await.push(message);
7677
Ok(())
7778
}
7879

@@ -181,7 +182,7 @@ async fn test_filter_sync_at_tip_edge_case() {
181182
assert!(!result.unwrap(), "Should not start sync when already at tip");
182183

183184
// Verify no messages were sent
184-
let sent_messages = network.get_sent_messages();
185+
let sent_messages = network.get_sent_messages().await;
185186
assert_eq!(sent_messages.len(), 0, "Should not send any messages when at tip");
186187
}
187188

@@ -285,7 +286,7 @@ async fn test_no_invalid_getcfheaders_at_tip() {
285286
assert!(result.unwrap(), "Should start sync when behind by 1 block");
286287

287288
// Check the sent message
288-
let sent_messages = network.get_sent_messages();
289+
let sent_messages = network.get_sent_messages().await;
289290
assert_eq!(sent_messages.len(), 1, "Should send exactly one message");
290291

291292
match &sent_messages[0] {

dash-spv/tests/filter_header_verification_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use dashcore::{
3232
};
3333
use dashcore_hashes::{sha256d, Hash};
3434
use std::collections::HashSet;
35-
use std::sync::{Arc, Mutex};
35+
use std::sync::Arc;
36+
use tokio::sync::Mutex;
3637

3738
/// Mock network manager for testing filter sync
3839
#[derive(Debug)]

dash-spv/tests/simple_gap_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
//! Basic test for CFHeader gap detection functionality.
22
33
use std::collections::HashSet;
4-
use std::sync::{Arc, Mutex};
4+
use std::sync::Arc;
5+
use tokio::sync::Mutex;
56

67
use dash_spv::{
78
client::ClientConfig,

0 commit comments

Comments
 (0)