Skip to content

Commit 5375349

Browse files
refactor: migrate received_filter_heights to tokio::Mutex and add find_available_header_at_or_before helper (#141)
* refactor: streamline header retrieval in FilterSyncManager - Introduced `find_available_header_at_or_before` method to encapsulate the logic for scanning backward to find the nearest available block header in storage. - Replaced repetitive header scanning logic in multiple locations with calls to the new method, improving code clarity and maintainability. - Enhanced error handling and logging during header retrieval to provide better insights into the syncing process. * refactor: migrate Mutex to tokio::sync::Mutex for async compatibility - Updated `received_filter_heights` and `processing_thread_requests` to use `tokio::sync::Mutex` instead of `std::sync::Mutex` for better async support. - Adjusted related locking mechanisms in `FilterSyncManager`, `SpvStats`, and various test files to accommodate the new async locking. - Enhanced test files to reflect the changes in synchronization primitives, ensuring compatibility with async operations. * refactor: optimize sync progress calculation in StatusDisplay - Improved the sync progress method by cloning the inner heights handle and copying necessary counters without holding the RwLock, enhancing performance. - Updated the calculation of the last synced filter height to avoid holding the RwLock guard, ensuring better concurrency and responsiveness in the status display. * refactor: introduce SharedFilterHeights type for improved readability - Created a new type alias `SharedFilterHeights` for the mutex-protected set of filter heights, enhancing code clarity and reducing redundancy. - Updated references in `SpvStats`, `FilterSyncManager`, and `SequentialSyncManager` to use the new type alias, streamlining the codebase. * fix: improve error handling in filter header sync - Updated the error handling in the filter header synchronization process to return a more informative `SyncError` when no available headers are found between specified heights. - This change enhances the clarity of error reporting, aiding in debugging and improving the overall robustness of the synchronization logic. * refactor: update processing_thread_requests to use tokio::sync::Mutex - Changed the locking mechanism for `processing_thread_requests` from `std::sync::Mutex` to `tokio::sync::Mutex` to enhance async compatibility. - Updated the locking logic in the `spawn_filter_processor` and related methods to use async await syntax, improving performance and responsiveness in the filter processing workflow. * refactor: improve filter sync locking mechanism - Updated the locking logic in the filter synchronization process to clone the `received_filter_heights` before awaiting the mutex, ensuring the `RwLock` is released promptly. - This change enhances concurrency and responsiveness during filter sync operations, aligning with recent async improvements in the codebase.
1 parent ef31ea7 commit 5375349

File tree

10 files changed

+172
-219
lines changed

10 files changed

+172
-219
lines changed

dash-spv-ffi/tests/unit/test_type_conversions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ mod tests {
209209
filters_received: u64::MAX,
210210
filter_sync_start_time: None,
211211
last_filter_received_time: None,
212-
received_filter_heights: std::sync::Arc::new(std::sync::Mutex::new(
212+
received_filter_heights: std::sync::Arc::new(tokio::sync::Mutex::new(
213213
std::collections::HashSet::new(),
214214
)),
215215
active_filter_requests: 0,

dash-spv/src/client/status_display.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,16 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
7373
/// Get current sync progress.
7474
pub async fn sync_progress(&self) -> Result<SyncProgress> {
7575
let state = self.state.read().await;
76-
let stats = self.stats.read().await;
76+
// Clone the inner heights handle and copy needed counters without awaiting while holding the RwLock
77+
let (filters_received, received_heights) = {
78+
let stats = self.stats.read().await;
79+
(stats.filters_received, std::sync::Arc::clone(&stats.received_filter_heights))
80+
};
7781

78-
// Calculate last synced filter height from received filter heights
79-
let last_synced_filter_height = if let Ok(heights) = stats.received_filter_heights.lock() {
82+
// Calculate last synced filter height from received filter heights without holding the RwLock guard
83+
let last_synced_filter_height = {
84+
let heights = received_heights.lock().await;
8085
heights.iter().max().copied()
81-
} else {
82-
None
8386
};
8487

8588
// Calculate the actual header height considering checkpoint sync
@@ -100,7 +103,7 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
100103
filter_headers_synced: false, // TODO: Implement
101104
masternodes_synced: false, // TODO: Implement
102105
filter_sync_available: false, // TODO: Get from network manager
103-
filters_downloaded: stats.filters_received,
106+
filters_downloaded: filters_received,
104107
last_synced_filter_height,
105108
sync_start: std::time::SystemTime::now(), // TODO: Track properly
106109
last_update: std::time::SystemTime::now(),

dash-spv/src/sync/filters.rs

Lines changed: 140 additions & 198 deletions
Large diffs are not rendered by default.

dash-spv/src/sync/sequential/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::storage::StorageManager;
2424
use crate::sync::{
2525
FilterSyncManager, HeaderSyncManagerWithReorg, MasternodeSyncManager, ReorgConfig,
2626
};
27-
use crate::types::{ChainState, SyncProgress};
27+
use crate::types::{ChainState, SharedFilterHeights, SyncProgress};
2828
use key_wallet_manager::wallet_interface::WalletInterface;
2929

3030
use phases::{PhaseTransition, SyncPhase};
@@ -85,7 +85,7 @@ impl<
8585
/// Create a new sequential sync manager
8686
pub fn new(
8787
config: &ClientConfig,
88-
received_filter_heights: std::sync::Arc<std::sync::Mutex<std::collections::HashSet<u32>>>,
88+
received_filter_heights: SharedFilterHeights,
8989
wallet: std::sync::Arc<tokio::sync::RwLock<W>>,
9090
) -> SyncResult<Self> {
9191
// Create reorg config with sensible defaults

dash-spv/src/types.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ use dashcore::{
99
};
1010
use serde::{Deserialize, Serialize};
1111

12+
/// Shared, mutex-protected set of filter heights used across components.
13+
pub type SharedFilterHeights = std::sync::Arc<tokio::sync::Mutex<std::collections::HashSet<u32>>>;
14+
1215
/// Unique identifier for a peer connection.
1316
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1417
pub struct PeerId(pub u64);
@@ -550,7 +553,7 @@ pub struct SpvStats {
550553

551554
/// Received filter heights for gap tracking (shared with FilterSyncManager).
552555
#[serde(skip)]
553-
pub received_filter_heights: std::sync::Arc<std::sync::Mutex<std::collections::HashSet<u32>>>,
556+
pub received_filter_heights: SharedFilterHeights,
554557

555558
/// Number of filter requests currently active.
556559
pub active_filter_requests: u32,
@@ -587,7 +590,7 @@ impl Default for SpvStats {
587590
filters_received: 0,
588591
filter_sync_start_time: None,
589592
last_filter_received_time: None,
590-
received_filter_heights: std::sync::Arc::new(std::sync::Mutex::new(
593+
received_filter_heights: std::sync::Arc::new(tokio::sync::Mutex::new(
591594
std::collections::HashSet::new(),
592595
)),
593596
active_filter_requests: 0,

dash-spv/tests/block_download_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
//! Tests for block downloading on filter match functionality.
99
1010
use std::collections::HashSet;
11-
use std::sync::{Arc, Mutex};
11+
use std::sync::Arc;
12+
use tokio::sync::Mutex;
1213
use tokio::sync::RwLock;
1314

1415
use dashcore::{

dash-spv/tests/cfheader_gap_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
//! Tests for CFHeader gap detection and auto-restart functionality.
77
88
use std::collections::HashSet;
9-
use std::sync::{Arc, Mutex};
9+
use std::sync::Arc;
10+
use tokio::sync::Mutex;
1011

1112
use dash_spv::{
1213
client::ClientConfig,

dash-spv/tests/edge_case_filter_sync_test.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
//! Tests for edge case handling in filter header sync, particularly at the tip.
99
1010
use std::collections::HashSet;
11-
use std::sync::{Arc, Mutex};
11+
use std::sync::Arc;
12+
use tokio::sync::Mutex;
1213

1314
use dash_spv::{
1415
client::ClientConfig,
@@ -52,8 +53,8 @@ impl MockNetworkManager {
5253
}
5354
}
5455

55-
fn get_sent_messages(&self) -> Vec<NetworkMessage> {
56-
self.sent_messages.lock().unwrap().clone()
56+
async fn get_sent_messages(&self) -> Vec<NetworkMessage> {
57+
self.sent_messages.lock().await.clone()
5758
}
5859
}
5960

@@ -72,7 +73,7 @@ impl NetworkManager for MockNetworkManager {
7273
}
7374

7475
async fn send_message(&mut self, message: NetworkMessage) -> NetworkResult<()> {
75-
self.sent_messages.lock().unwrap().push(message);
76+
self.sent_messages.lock().await.push(message);
7677
Ok(())
7778
}
7879

@@ -181,7 +182,7 @@ async fn test_filter_sync_at_tip_edge_case() {
181182
assert!(!result.unwrap(), "Should not start sync when already at tip");
182183

183184
// Verify no messages were sent
184-
let sent_messages = network.get_sent_messages();
185+
let sent_messages = network.get_sent_messages().await;
185186
assert_eq!(sent_messages.len(), 0, "Should not send any messages when at tip");
186187
}
187188

@@ -285,7 +286,7 @@ async fn test_no_invalid_getcfheaders_at_tip() {
285286
assert!(result.unwrap(), "Should start sync when behind by 1 block");
286287

287288
// Check the sent message
288-
let sent_messages = network.get_sent_messages();
289+
let sent_messages = network.get_sent_messages().await;
289290
assert_eq!(sent_messages.len(), 1, "Should send exactly one message");
290291

291292
match &sent_messages[0] {

dash-spv/tests/filter_header_verification_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use dashcore::{
3232
};
3333
use dashcore_hashes::{sha256d, Hash};
3434
use std::collections::HashSet;
35-
use std::sync::{Arc, Mutex};
35+
use std::sync::Arc;
36+
use tokio::sync::Mutex;
3637

3738
/// Mock network manager for testing filter sync
3839
#[derive(Debug)]

dash-spv/tests/simple_gap_test.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
//! Basic test for CFHeader gap detection functionality.
22
33
use std::collections::HashSet;
4-
use std::sync::{Arc, Mutex};
4+
use std::sync::Arc;
5+
use tokio::sync::Mutex;
56

67
use dash_spv::{
78
client::ClientConfig,

0 commit comments

Comments
 (0)