Skip to content
2 changes: 1 addition & 1 deletion dash-spv-ffi/tests/unit/test_type_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ mod tests {
filters_received: u64::MAX,
filter_sync_start_time: None,
last_filter_received_time: None,
received_filter_heights: std::sync::Arc::new(std::sync::Mutex::new(
received_filter_heights: std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashSet::new(),
)),
active_filter_requests: 0,
Expand Down
15 changes: 9 additions & 6 deletions dash-spv/src/client/status_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,16 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
/// Get current sync progress.
pub async fn sync_progress(&self) -> Result<SyncProgress> {
let state = self.state.read().await;
let stats = self.stats.read().await;
// Clone the inner heights handle and copy needed counters without awaiting while holding the RwLock
let (filters_received, received_heights) = {
let stats = self.stats.read().await;
(stats.filters_received, std::sync::Arc::clone(&stats.received_filter_heights))
};

// Calculate last synced filter height from received filter heights
let last_synced_filter_height = if let Ok(heights) = stats.received_filter_heights.lock() {
// Calculate last synced filter height from received filter heights without holding the RwLock guard
let last_synced_filter_height = {
let heights = received_heights.lock().await;
heights.iter().max().copied()
} else {
None
};

// Calculate the actual header height considering checkpoint sync
Expand All @@ -100,7 +103,7 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
filter_headers_synced: false, // TODO: Implement
masternodes_synced: false, // TODO: Implement
filter_sync_available: false, // TODO: Get from network manager
filters_downloaded: stats.filters_received,
filters_downloaded: filters_received,
last_synced_filter_height,
sync_start: std::time::SystemTime::now(), // TODO: Track properly
last_update: std::time::SystemTime::now(),
Expand Down
338 changes: 140 additions & 198 deletions dash-spv/src/sync/filters.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions dash-spv/src/sync/sequential/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::storage::StorageManager;
use crate::sync::{
FilterSyncManager, HeaderSyncManagerWithReorg, MasternodeSyncManager, ReorgConfig,
};
use crate::types::{ChainState, SyncProgress};
use crate::types::{ChainState, SharedFilterHeights, SyncProgress};
use key_wallet_manager::wallet_interface::WalletInterface;

use phases::{PhaseTransition, SyncPhase};
Expand Down Expand Up @@ -85,7 +85,7 @@ impl<
/// Create a new sequential sync manager
pub fn new(
config: &ClientConfig,
received_filter_heights: std::sync::Arc<std::sync::Mutex<std::collections::HashSet<u32>>>,
received_filter_heights: SharedFilterHeights,
wallet: std::sync::Arc<tokio::sync::RwLock<W>>,
) -> SyncResult<Self> {
// Create reorg config with sensible defaults
Expand Down
7 changes: 5 additions & 2 deletions dash-spv/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use dashcore::{
};
use serde::{Deserialize, Serialize};

/// Shared, mutex-protected set of filter heights used across components.
pub type SharedFilterHeights = std::sync::Arc<tokio::sync::Mutex<std::collections::HashSet<u32>>>;

/// Unique identifier for a peer connection.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PeerId(pub u64);
Expand Down Expand Up @@ -550,7 +553,7 @@ pub struct SpvStats {

/// Received filter heights for gap tracking (shared with FilterSyncManager).
#[serde(skip)]
pub received_filter_heights: std::sync::Arc<std::sync::Mutex<std::collections::HashSet<u32>>>,
pub received_filter_heights: SharedFilterHeights,

/// Number of filter requests currently active.
pub active_filter_requests: u32,
Expand Down Expand Up @@ -587,7 +590,7 @@ impl Default for SpvStats {
filters_received: 0,
filter_sync_start_time: None,
last_filter_received_time: None,
received_filter_heights: std::sync::Arc::new(std::sync::Mutex::new(
received_filter_heights: std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashSet::new(),
)),
active_filter_requests: 0,
Expand Down
3 changes: 2 additions & 1 deletion dash-spv/tests/block_download_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
//! Tests for block downloading on filter match functionality.

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::RwLock;

use dashcore::{
Expand Down
3 changes: 2 additions & 1 deletion dash-spv/tests/cfheader_gap_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
//! Tests for CFHeader gap detection and auto-restart functionality.

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::Mutex;

use dash_spv::{
client::ClientConfig,
Expand Down
13 changes: 7 additions & 6 deletions dash-spv/tests/edge_case_filter_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
//! Tests for edge case handling in filter header sync, particularly at the tip.

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::Mutex;

use dash_spv::{
client::ClientConfig,
Expand Down Expand Up @@ -52,8 +53,8 @@ impl MockNetworkManager {
}
}

fn get_sent_messages(&self) -> Vec<NetworkMessage> {
self.sent_messages.lock().unwrap().clone()
async fn get_sent_messages(&self) -> Vec<NetworkMessage> {
self.sent_messages.lock().await.clone()
}
}

Expand All @@ -72,7 +73,7 @@ impl NetworkManager for MockNetworkManager {
}

async fn send_message(&mut self, message: NetworkMessage) -> NetworkResult<()> {
self.sent_messages.lock().unwrap().push(message);
self.sent_messages.lock().await.push(message);
Ok(())
}

Expand Down Expand Up @@ -181,7 +182,7 @@ async fn test_filter_sync_at_tip_edge_case() {
assert!(!result.unwrap(), "Should not start sync when already at tip");

// Verify no messages were sent
let sent_messages = network.get_sent_messages();
let sent_messages = network.get_sent_messages().await;
assert_eq!(sent_messages.len(), 0, "Should not send any messages when at tip");
}

Expand Down Expand Up @@ -285,7 +286,7 @@ async fn test_no_invalid_getcfheaders_at_tip() {
assert!(result.unwrap(), "Should start sync when behind by 1 block");

// Check the sent message
let sent_messages = network.get_sent_messages();
let sent_messages = network.get_sent_messages().await;
assert_eq!(sent_messages.len(), 1, "Should send exactly one message");

match &sent_messages[0] {
Expand Down
3 changes: 2 additions & 1 deletion dash-spv/tests/filter_header_verification_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ use dashcore::{
};
use dashcore_hashes::{sha256d, Hash};
use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::Mutex;

/// Mock network manager for testing filter sync
#[derive(Debug)]
Expand Down
3 changes: 2 additions & 1 deletion dash-spv/tests/simple_gap_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Basic test for CFHeader gap detection functionality.

use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use tokio::sync::Mutex;

use dash_spv::{
client::ClientConfig,
Expand Down
Loading