Skip to content

Commit ce79761

Browse files
refactor: streamline header retrieval in FilterSyncManager
- Introduced `find_available_header_at_or_before` method to encapsulate the logic for scanning backward to find the nearest available block header in storage. - Replaced repetitive header scanning logic in multiple locations with calls to the new method, improving code clarity and maintainability. - Enhanced error handling and logging during header retrieval to provide better insights into the syncing process.
1 parent ef31ea7 commit ce79761

File tree

1 file changed

+84
-131
lines changed

1 file changed

+84
-131
lines changed

dash-spv/src/sync/filters.rs

Lines changed: 84 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,64 @@ pub struct FilterSyncManager<S: StorageManager, N: NetworkManager> {
102102
impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync + 'static>
103103
FilterSyncManager<S, N>
104104
{
105+
/// Scan backward from `abs_height` down to `min_abs_height` (inclusive)
106+
/// to find the nearest available block header stored in `storage`.
107+
/// Returns the found `(BlockHash, height)` or `None` if none available.
108+
async fn find_available_header_at_or_before(
109+
&self,
110+
abs_height: u32,
111+
min_abs_height: u32,
112+
storage: &S,
113+
) -> Option<(BlockHash, u32)> {
114+
if abs_height < min_abs_height {
115+
return None;
116+
}
117+
118+
let mut scan_height = abs_height;
119+
loop {
120+
let Some(scan_storage_height) = self.header_abs_to_storage_index(scan_height) else {
121+
tracing::debug!(
122+
"Storage index not available for blockchain height {} while scanning (min={})",
123+
scan_height,
124+
min_abs_height
125+
);
126+
break;
127+
};
128+
129+
match storage.get_header(scan_storage_height).await {
130+
Ok(Some(header)) => {
131+
tracing::info!(
132+
"Found available header at blockchain height {} / storage height {}",
133+
scan_height,
134+
scan_storage_height
135+
);
136+
return Some((header.block_hash(), scan_height));
137+
}
138+
Ok(None) => {
139+
tracing::debug!(
140+
"Header missing at blockchain height {} / storage height {}, scanning back",
141+
scan_height,
142+
scan_storage_height
143+
);
144+
}
145+
Err(e) => {
146+
tracing::warn!(
147+
"Error reading header at blockchain height {} / storage height {}: {}",
148+
scan_height,
149+
scan_storage_height,
150+
e
151+
);
152+
}
153+
}
154+
155+
if scan_height == min_abs_height {
156+
break;
157+
}
158+
scan_height = scan_height.saturating_sub(1);
159+
}
160+
161+
None
162+
}
105163
/// Calculate the start height of a CFHeaders batch.
106164
fn calculate_batch_start_height(cf_headers: &CFHeaders, stop_height: u32) -> u32 {
107165
stop_height.saturating_sub(cf_headers.filter_hashes.len() as u32 - 1)
@@ -515,67 +573,23 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
515573
next_batch_end_height
516574
);
517575

518-
// Scan backwards to find the highest available header
519-
let mut scan_height = next_batch_end_height.saturating_sub(1);
520576
let min_height = self.current_sync_height; // Don't go below where we are
521-
let mut found_header_info = None;
522-
523-
while scan_height >= min_height && found_header_info.is_none() {
524-
let Some(scan_storage_height) =
525-
self.header_abs_to_storage_index(scan_height)
526-
else {
527-
break;
528-
};
529-
match storage.get_header(scan_storage_height).await {
530-
Ok(Some(header)) => {
531-
tracing::info!(
532-
"Found available header at blockchain height {} / storage height {} (originally tried {})",
533-
scan_height,
534-
scan_storage_height,
535-
next_batch_end_height
536-
);
537-
found_header_info =
538-
Some((header.block_hash(), scan_height));
539-
break;
540-
}
541-
Ok(None) => {
542-
tracing::debug!(
543-
"Header not found at blockchain height {} / storage height {}, trying {}",
544-
scan_height,
545-
scan_storage_height,
546-
scan_height.saturating_sub(1)
547-
);
548-
if scan_height == 0 {
549-
break;
550-
}
551-
scan_height = scan_height.saturating_sub(1);
552-
}
553-
Err(e) => {
554-
tracing::error!(
555-
"Error checking header at height {}: {}",
556-
scan_height,
557-
e
558-
);
559-
if scan_height == 0 {
560-
break;
561-
}
562-
scan_height = scan_height.saturating_sub(1);
563-
}
564-
}
565-
}
566-
567-
match found_header_info {
577+
match self
578+
.find_available_header_at_or_before(
579+
next_batch_end_height.saturating_sub(1),
580+
min_height,
581+
storage,
582+
)
583+
.await
584+
{
568585
Some((hash, height)) => {
569-
// Check if we found a header at a height less than our current sync height
570586
if height < self.current_sync_height {
571587
tracing::warn!(
572588
"Found header at height {} which is less than current sync height {}. This means we already have filter headers up to {}. Marking sync as complete.",
573589
height,
574590
self.current_sync_height,
575591
self.current_sync_height - 1
576592
);
577-
// We already have filter headers up to current_sync_height - 1
578-
// No need to request more, mark sync as complete
579593
self.syncing_filter_headers = false;
580594
return Ok(false);
581595
}
@@ -590,10 +604,8 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
590604
tracing::error!(
591605
"This indicates a serious storage inconsistency. Stopping filter header sync."
592606
);
593-
594-
// Mark sync as complete since we can't find any valid headers to request
595607
self.syncing_filter_headers = false;
596-
return Ok(false); // Signal sync completion
608+
return Ok(false);
597609
}
598610
}
599611
}
@@ -734,47 +746,23 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
734746
recovery_batch_end_height
735747
);
736748

737-
// Scan backwards to find available header
738-
let mut scan_height = recovery_batch_end_height.saturating_sub(1);
739749
let min_height = self.current_sync_height;
740-
741-
let mut found_recovery_info = None;
742-
while scan_height >= min_height && found_recovery_info.is_none() {
743-
let scan_storage_height =
744-
match self.header_abs_to_storage_index(scan_height) {
745-
Some(v) => v,
746-
None => break,
747-
};
748-
if let Ok(Some(header)) = storage.get_header(scan_storage_height).await
749-
{
750-
tracing::info!(
751-
"Found recovery header at blockchain height {} / storage height {} (originally tried {})",
752-
scan_height,
753-
scan_storage_height,
754-
recovery_batch_end_height
755-
);
756-
found_recovery_info = Some((header.block_hash(), scan_height));
757-
break;
758-
} else {
759-
if scan_height == 0 {
760-
break;
761-
}
762-
scan_height = scan_height.saturating_sub(1);
763-
}
764-
}
765-
766-
match found_recovery_info {
750+
match self
751+
.find_available_header_at_or_before(
752+
recovery_batch_end_height.saturating_sub(1),
753+
min_height,
754+
storage,
755+
)
756+
.await
757+
{
767758
Some((hash, height)) => {
768-
// Check if we found a header at a height less than our current sync height
769759
if height < self.current_sync_height {
770760
tracing::warn!(
771761
"Recovery: Found header at height {} which is less than current sync height {}. This indicates we already have filter headers up to {}. Marking sync as complete.",
772762
height,
773763
self.current_sync_height,
774764
self.current_sync_height - 1
775765
);
776-
// We already have filter headers up to current_sync_height - 1
777-
// No point in trying to recover, mark sync as complete
778766
self.syncing_filter_headers = false;
779767
return Ok(false);
780768
}
@@ -1014,50 +1002,15 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
10141002
batch_end_height
10151003
);
10161004

1017-
// Scan backwards to find the highest available header within the batch
1018-
let mut scan_height = batch_end_height;
1019-
let min_height = self.current_sync_height;
1020-
let mut found_header = None;
1021-
1022-
while scan_height >= min_height && found_header.is_none() {
1023-
let Some(scan_storage_height) =
1024-
self.header_abs_to_storage_index(scan_height)
1025-
else {
1026-
break;
1027-
};
1028-
match storage.get_header(scan_storage_height).await {
1029-
Ok(Some(header)) => {
1030-
tracing::info!(
1031-
"Found available header at blockchain height {} / storage height {} (originally tried {})",
1032-
scan_height,
1033-
scan_storage_height,
1034-
batch_end_height
1035-
);
1036-
found_header = Some(header.block_hash());
1037-
break;
1038-
}
1039-
Ok(None) => {
1040-
if scan_height == min_height {
1041-
break;
1042-
}
1043-
scan_height = scan_height.saturating_sub(1);
1044-
}
1045-
Err(e) => {
1046-
tracing::warn!(
1047-
"Error getting header at height {}: {}",
1048-
scan_height,
1049-
e
1050-
);
1051-
if scan_height == min_height {
1052-
break;
1053-
}
1054-
scan_height = scan_height.saturating_sub(1);
1055-
}
1056-
}
1057-
}
1058-
1059-
match found_header {
1060-
Some(hash) => hash,
1005+
match self
1006+
.find_available_header_at_or_before(
1007+
batch_end_height,
1008+
self.current_sync_height,
1009+
storage,
1010+
)
1011+
.await
1012+
{
1013+
Some((hash, _height)) => hash,
10611014
None => {
10621015
// If we can't find any headers in the batch range, something is wrong
10631016
// Don't fall back to tip as that would create an oversized request

0 commit comments

Comments
 (0)