Skip to content

Commit db3b91e

Browse files
feat: enhance filter synchronization with flow control and request tracking
1 parent da3897a commit db3b91e

File tree

7 files changed

+1048
-60
lines changed

7 files changed

+1048
-60
lines changed

dash-spv/src/client/config.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,15 @@ pub struct ClientConfig {
5656

5757
/// Log level for tracing.
5858
pub log_level: String,
59+
60+
/// Maximum concurrent filter requests (default: 8).
61+
pub max_concurrent_filter_requests: usize,
62+
63+
/// Enable flow control for filter requests (default: true).
64+
pub enable_filter_flow_control: bool,
65+
66+
/// Delay between filter requests in milliseconds (default: 50).
67+
pub filter_request_delay_ms: u64,
5968
}
6069

6170
impl Default for ClientConfig {
@@ -76,6 +85,9 @@ impl Default for ClientConfig {
7685
max_peers: 8,
7786
enable_persistence: true,
7887
log_level: "info".to_string(),
88+
max_concurrent_filter_requests: 16,
89+
enable_filter_flow_control: true,
90+
filter_request_delay_ms: 0,
7991
}
8092
}
8193
}
@@ -159,6 +171,24 @@ impl ClientConfig {
159171
self
160172
}
161173

174+
/// Set maximum concurrent filter requests.
175+
pub fn with_max_concurrent_filter_requests(mut self, max_requests: usize) -> Self {
176+
self.max_concurrent_filter_requests = max_requests;
177+
self
178+
}
179+
180+
/// Enable or disable filter flow control.
181+
pub fn with_filter_flow_control(mut self, enabled: bool) -> Self {
182+
self.enable_filter_flow_control = enabled;
183+
self
184+
}
185+
186+
/// Set delay between filter requests.
187+
pub fn with_filter_request_delay(mut self, delay_ms: u64) -> Self {
188+
self.filter_request_delay_ms = delay_ms;
189+
self
190+
}
191+
162192
/// Validate the configuration.
163193
pub fn validate(&self) -> Result<(), String> {
164194
if self.peers.is_empty() {
@@ -177,6 +207,10 @@ impl ClientConfig {
177207
return Err("max_peers must be > 0".to_string());
178208
}
179209

210+
if self.max_concurrent_filter_requests == 0 {
211+
return Err("max_concurrent_filter_requests must be > 0".to_string());
212+
}
213+
180214
Ok(())
181215
}
182216

dash-spv/src/client/filter_sync.rs

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ impl<'a> FilterSyncCoordinator<'a> {
5555
}
5656
drop(running);
5757

58-
// Get current tip height to determine range
59-
let tip_height = self.storage.get_tip_height().await
58+
// Get current filter tip height to determine range (use filter headers, not block headers)
59+
// This ensures consistency between range calculation and progress tracking
60+
let tip_height = self.storage.get_filter_tip_height().await
6061
.map_err(|e| SpvError::Storage(e))?
6162
.unwrap_or(0);
6263

@@ -79,7 +80,7 @@ impl<'a> FilterSyncCoordinator<'a> {
7980
let start_height = earliest_height.min(default_start); // Go back to the earliest required height
8081
let actual_count = tip_height - start_height + 1; // Actual number of blocks available
8182

82-
tracing::info!("Requesting filters from height {} to {} ({} blocks)",
83+
tracing::info!("Requesting filters from height {} to {} ({} blocks based on filter tip height)",
8384
start_height, tip_height, actual_count);
8485
tracing::info!("Filter processing and matching will happen automatically in background thread as CFilter messages arrive");
8586

@@ -91,45 +92,30 @@ impl<'a> FilterSyncCoordinator<'a> {
9192
Ok(Vec::new())
9293
}
9394

94-
/// Sync filters in coordination with the monitoring loop using simplified processing
95+
/// Sync filters in coordination with the monitoring loop using flow control processing
9596
async fn sync_filters_coordinated(&mut self, start_height: u32, count: u32) -> Result<()> {
96-
let end_height = start_height + count - 1;
97-
98-
tracing::info!("Starting coordinated filter sync from height {} to {} ({} filters expected)",
99-
start_height, end_height, count);
97+
tracing::info!("Starting coordinated filter sync with flow control from height {} to {} ({} filters expected)",
98+
start_height, start_height + count - 1, count);
10099

101100
// Start tracking filter sync progress
102101
crate::sync::filters::FilterSyncManager::start_filter_sync_tracking(
103102
self.stats,
104103
count as u64
105104
).await;
106105

107-
// Use batch processing to send filter requests
108-
let batch_size = 100;
109-
let mut current_height = start_height;
110-
let mut batches_sent = 0;
111-
112-
// Send all filter requests in batches
113-
while current_height <= end_height {
114-
let batch_end = (current_height + batch_size - 1).min(end_height);
115-
116-
tracing::debug!("Sending batch {}: heights {} to {}", batches_sent + 1, current_height, batch_end);
117-
118-
// Get stop hash for this batch
119-
let stop_hash = self.storage.get_header(batch_end).await
120-
.map_err(|e| SpvError::Storage(e))?
121-
.ok_or_else(|| SpvError::Config("Stop header not found".to_string()))?
122-
.block_hash();
123-
124-
// Send the request - monitoring loop will handle the responses via filter processor
125-
self.sync_manager.filter_sync_mut().request_filters(&mut *self.network, current_height, stop_hash).await
126-
.map_err(|e| SpvError::Sync(e))?;
127-
128-
current_height = batch_end + 1;
129-
batches_sent += 1;
130-
}
106+
// Use the new flow control method
107+
self.sync_manager.filter_sync_mut()
108+
.sync_filters_with_flow_control(
109+
&mut *self.network,
110+
&mut *self.storage,
111+
Some(start_height),
112+
Some(count)
113+
).await
114+
.map_err(|e| SpvError::Sync(e))?;
131115

132-
tracing::info!("✅ All filter requests sent ({} batches), processing via filter processor thread", batches_sent);
116+
let (pending_count, active_count, flow_enabled) = self.sync_manager.filter_sync().get_flow_control_status();
117+
tracing::info!("✅ Filter sync with flow control initiated (flow control enabled: {}, {} requests queued, {} active)",
118+
flow_enabled, pending_count, active_count);
133119

134120
Ok(())
135121
}

dash-spv/src/client/message_handler.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,19 @@ impl<'a> MessageHandler<'a> {
177177
NetworkMessage::CFilter(cfilter) => {
178178
tracing::debug!("Received CFilter for block {}", cfilter.block_hash);
179179

180-
// Let the sync manager handle sync coordination (just tracking, not the full filter)
181-
if let Err(e) = self.sync_manager.handle_cfilter_message(cfilter.block_hash, &mut *self.storage).await {
180+
// Record the height of this received filter for gap tracking
181+
crate::sync::filters::FilterSyncManager::record_filter_received_at_height(
182+
self.stats,
183+
&*self.storage,
184+
&cfilter.block_hash
185+
).await;
186+
187+
// Enhanced sync coordination with flow control
188+
if let Err(e) = self.sync_manager.handle_cfilter_message(
189+
cfilter.block_hash,
190+
&mut *self.storage,
191+
&mut *self.network
192+
).await {
182193
tracing::error!("Failed to handle CFilter in sync manager: {}", e);
183194
}
184195

dash-spv/src/client/mod.rs

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,8 @@ impl DashSpvClient {
221221
// Create shared data structures
222222
let watch_items = Arc::new(RwLock::new(HashSet::new()));
223223

224-
// Create sync manager
225-
let sync_manager = SyncManager::new(&config);
224+
// Create sync manager with shared filter heights
225+
let sync_manager = SyncManager::new(&config, stats.read().await.received_filter_heights.clone());
226226

227227
// Create validation manager
228228
let validation = ValidationManager::new(config.validation_mode);
@@ -449,6 +449,10 @@ impl DashSpvClient {
449449
let mut last_consistency_check = Instant::now();
450450
let consistency_check_interval = std::time::Duration::from_secs(300); // Every 5 minutes
451451

452+
// Timer for filter gap checking
453+
let mut last_filter_gap_check = Instant::now();
454+
let filter_gap_check_interval = std::time::Duration::from_secs(10);
455+
452456
loop {
453457
// Check if we should stop
454458
let running = self.running.read().await;
@@ -506,13 +510,40 @@ impl DashSpvClient {
506510
if last_status_update.elapsed() >= status_update_interval {
507511
self.update_status_display().await;
508512

509-
// Report filter sync progress if active
510-
let (filters_requested, filters_received, progress, timeout) =
511-
crate::sync::filters::FilterSyncManager::get_filter_sync_status(&self.stats).await;
513+
// Report enhanced filter sync progress if active
514+
let (filters_requested, filters_received, basic_progress, timeout, total_missing, actual_coverage, missing_ranges) =
515+
crate::sync::filters::FilterSyncManager::get_filter_sync_status_with_gaps(&self.stats, self.sync_manager.filter_sync()).await;
512516

513517
if filters_requested > 0 {
514-
tracing::info!("📊 Filter sync progress: {:.1}% ({}/{} filters received)",
515-
progress, filters_received, filters_requested);
518+
// Check if sync is truly complete: both basic progress AND gap analysis must indicate completion
519+
// This fixes a bug where "Complete!" was shown when only gap analysis returned 0 missing filters
520+
// but basic progress (filters_received < filters_requested) indicated incomplete sync.
521+
let is_complete = filters_received >= filters_requested && total_missing == 0;
522+
523+
// Debug logging for completion detection
524+
if filters_received >= filters_requested && total_missing > 0 {
525+
tracing::debug!("🔍 Completion discrepancy detected: basic progress complete ({}/{}) but {} missing filters detected",
526+
filters_received, filters_requested, total_missing);
527+
}
528+
529+
if !is_complete {
530+
tracing::info!("📊 Filter sync: Basic {:.1}% ({}/{}), Actual coverage {:.1}%, Missing: {} filters in {} ranges",
531+
basic_progress, filters_received, filters_requested, actual_coverage, total_missing, missing_ranges.len());
532+
533+
// Show first few missing ranges for debugging
534+
if missing_ranges.len() > 0 {
535+
let show_count = missing_ranges.len().min(3);
536+
for (i, (start, end)) in missing_ranges.iter().enumerate().take(show_count) {
537+
tracing::warn!(" Gap {}: range {}-{} ({} filters)", i + 1, start, end, end - start + 1);
538+
}
539+
if missing_ranges.len() > show_count {
540+
tracing::warn!(" ... and {} more gaps", missing_ranges.len() - show_count);
541+
}
542+
}
543+
} else {
544+
tracing::info!("📊 Filter sync progress: {:.1}% ({}/{} filters received) - Complete!",
545+
basic_progress, filters_received, filters_requested);
546+
}
516547

517548
if timeout {
518549
tracing::warn!("⚠️ Filter sync timeout: no filters received in 30+ seconds");
@@ -549,6 +580,17 @@ impl DashSpvClient {
549580
last_consistency_check = Instant::now();
550581
}
551582

583+
// Check for missing filters and retry periodically
584+
if last_filter_gap_check.elapsed() >= filter_gap_check_interval {
585+
if self.config.enable_filters {
586+
if let Err(e) = self.sync_manager.filter_sync_mut()
587+
.check_and_retry_missing_filters(&mut *self.network, &*self.storage).await {
588+
tracing::warn!("Failed to check and retry missing filters: {}", e);
589+
}
590+
}
591+
last_filter_gap_check = Instant::now();
592+
}
593+
552594
// Handle network messages
553595
match self.network.receive_message().await {
554596
Ok(Some(message)) => {

0 commit comments

Comments
 (0)