Skip to content

Commit fdc4411

Browse files
PastaPastaPastaQuantumExplorercoderabbitai[bot]
authored
refactor(spv): unify blockchain height handling in storage and sync layers (#153)
* refactor(spv): unify blockchain height handling in storage and sync layers - Updated storage and sync components to treat heights as blockchain heights instead of storage indices, simplifying the API and improving clarity. - Refactored methods in `StorageManager` to accept blockchain heights directly, eliminating the need for manual conversions. - Adjusted header retrieval and loading logic across various modules to align with the new height handling, enhancing consistency and reducing potential errors. - Improved logging to reflect the changes in height handling, providing clearer insights during synchronization processes. * fix(spv): process tip-announced blocks through header+filter flow and request full blocks for privacy * fix(spv/storage): return None for heights below sync_base_height when base > 0 in MemoryStorageManager::get_header\n\nPrevents incorrect header lookup by avoiding implicit storage-index interpretation for absolute heights below the checkpoint base; aligns in-memory behavior with absolute height semantics. * refactor(spv): always process full blocks after routing header; remove ad-hoc local cfilter gating * fix(spv/storage): skip processing for heights below base in MemoryStorageManager::get_header Updated the logic in MemoryStorageManager::get_header to skip heights below the base when base > 0, preventing unnecessary iterations and aligning behavior with expected height semantics. * fix(spv): clone block header when creating headers message in message_handler Updated the message_handler to clone the block header when constructing the headers message. This change ensures that the original block header remains intact and avoids potential issues with ownership and borrowing in the processing flow. * fix * fix * Update dash-spv/src/storage/memory.rs Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * fix * fix --------- Co-authored-by: Quantum Explorer <quantum@dash.org> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 487e553 commit fdc4411

File tree

10 files changed

+387
-408
lines changed

10 files changed

+387
-408
lines changed

dash-spv/src/client/message_handler.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::network::NetworkManager;
77
use crate::storage::StorageManager;
88
use crate::sync::sequential::SequentialSyncManager;
99
use crate::types::{MempoolState, SpvEvent, SpvStats};
10+
// Removed local ad-hoc compact filter construction in favor of always processing full blocks
1011
use key_wallet_manager::wallet_interface::WalletInterface;
1112
use std::sync::Arc;
1213
use tokio::sync::RwLock;
@@ -238,7 +239,23 @@ impl<
238239
block.txdata.len()
239240
);
240241

241-
// Process new block (update state, check watched items)
242+
// 1) Ensure header processing and chain tip update for this block
243+
// Route the header through the sequential sync manager as a Headers message
244+
let headers_msg = NetworkMessage::Headers(vec![block.header]);
245+
if let Err(e) = self
246+
.sync_manager
247+
.handle_message(headers_msg, &mut *self.network, &mut *self.storage)
248+
.await
249+
{
250+
tracing::error!(
251+
"❌ Failed to process header for block {} via sync manager: {}",
252+
block_hash,
253+
e
254+
);
255+
return Err(SpvError::Sync(e));
256+
}
257+
258+
// 2) Always process the full block (privacy and correctness)
242259
if let Err(e) = self.process_new_block(block).await {
243260
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
244261
return Err(e);
@@ -434,31 +451,16 @@ impl<
434451
self.network.send_message(getdata).await.map_err(SpvError::Network)?;
435452
}
436453

437-
// Process new blocks immediately when detected
454+
// For blocks announced via inventory during tip sync, request full blocks for privacy
438455
if !blocks_to_request.is_empty() {
439456
tracing::info!(
440-
"🔄 Processing {} new block announcements to stay synchronized",
457+
"📥 Requesting {} new blocks announced via inventory",
441458
blocks_to_request.len()
442459
);
443460

444-
// Extract block hashes
445-
let block_hashes: Vec<dashcore::BlockHash> = blocks_to_request
446-
.iter()
447-
.filter_map(|inv| {
448-
if let Inventory::Block(hash) = inv {
449-
Some(*hash)
450-
} else {
451-
None
452-
}
453-
})
454-
.collect();
455-
456-
// Process each new block
457-
for block_hash in block_hashes {
458-
tracing::info!("📥 Requesting header for new block {}", block_hash);
459-
if let Err(e) = self.process_new_block_hash(block_hash).await {
460-
tracing::error!("❌ Failed to process new block {}: {}", block_hash, e);
461-
}
461+
let getdata = NetworkMessage::GetData(blocks_to_request);
462+
if let Err(e) = self.network.send_message(getdata).await {
463+
tracing::error!("Failed to request announced blocks: {}", e);
462464
}
463465
}
464466

dash-spv/src/client/mod.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -880,14 +880,12 @@ impl<
880880

881881
// Emit detailed progress update
882882
if last_rate_calc.elapsed() >= Duration::from_secs(1) {
883-
// Storage tip is the headers vector index (0-based).
884-
let current_storage_tip = {
883+
// Storage tip now represents the absolute blockchain height.
884+
let current_tip_height = {
885885
let storage = self.storage.lock().await;
886886
storage.get_tip_height().await.ok().flatten().unwrap_or(0)
887887
};
888-
// Convert to absolute blockchain height: base + storage_tip
889-
let sync_base_height = { self.state.read().await.sync_base_height };
890-
let current_height = sync_base_height + current_storage_tip;
888+
let current_height = current_tip_height;
891889
let peer_best = self
892890
.network
893891
.get_peer_best_height()
@@ -897,9 +895,9 @@ impl<
897895
.unwrap_or(current_height);
898896

899897
// Calculate headers downloaded this second
900-
if current_storage_tip > last_height {
901-
headers_this_second = current_storage_tip - last_height;
902-
last_height = current_storage_tip;
898+
if current_tip_height > last_height {
899+
headers_this_second = current_tip_height - last_height;
900+
last_height = current_tip_height;
903901
}
904902

905903
let headers_per_second = headers_this_second as f64;
@@ -956,7 +954,7 @@ impl<
956954
let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0);
957955
let filter_tip =
958956
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
959-
(self.state.read().await.sync_base_height + storage_tip, filter_tip)
957+
(storage_tip, filter_tip)
960958
};
961959
if abs_header_height != last_emitted_header_height
962960
|| filter_header_height != last_emitted_filter_header_height
@@ -1770,8 +1768,13 @@ impl<
17701768
let mut loaded_count = 0u32;
17711769
let target_height = saved_state.chain_tip.height;
17721770

1773-
// Start from height 1 (genesis is already in ChainState)
1774-
let mut current_height = 1u32;
1771+
// Determine first height to load. Skip genesis (already present) unless we started from a checkpoint base.
1772+
let mut current_height =
1773+
if saved_state.synced_from_checkpoint && saved_state.sync_base_height > 0 {
1774+
saved_state.sync_base_height
1775+
} else {
1776+
1u32
1777+
};
17751778

17761779
while current_height <= target_height {
17771780
let end_height = (current_height + BATCH_SIZE - 1).min(target_height);
@@ -1786,12 +1789,12 @@ impl<
17861789
};
17871790

17881791
if headers.is_empty() {
1789-
tracing::error!(
1790-
"Failed to load headers for range {}..{} - storage may be corrupted",
1792+
tracing::warn!(
1793+
"No headers found for range {}..{} when restoring from state",
17911794
current_height,
17921795
end_height + 1
17931796
);
1794-
return Ok(false);
1797+
break;
17951798
}
17961799

17971800
// Validate headers before adding to chain state

dash-spv/src/client/status_display.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ impl<'a, S: StorageManager + Send + Sync + 'static> StatusDisplay<'a, S> {
4848
// For checkpoint sync: height = checkpoint_height + storage_count
4949
let storage = self.storage.lock().await;
5050
if let Ok(Some(storage_tip)) = storage.get_tip_height().await {
51-
let blockchain_height = state.sync_base_height + storage_tip;
51+
let blockchain_height = storage_tip;
5252
if with_logging {
5353
tracing::debug!(
54-
"Status display: storage_tip={}, sync_base={}, blockchain_height={}",
55-
storage_tip,
54+
"Status display: reported tip height={}, sync_base={}, raw_storage_tip={}",
55+
blockchain_height,
5656
state.sync_base_height,
57-
blockchain_height
57+
storage_tip
5858
);
5959
}
6060
blockchain_height

dash-spv/src/storage/disk.rs

Lines changed: 82 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,25 +1159,36 @@ impl StorageManager for DiskStorageManager {
11591159
async fn load_headers(&self, range: Range<u32>) -> StorageResult<Vec<BlockHeader>> {
11601160
let mut headers = Vec::new();
11611161

1162-
let start_segment = Self::get_segment_id(range.start);
1163-
let end_segment = Self::get_segment_id(range.end.saturating_sub(1));
1162+
// Convert blockchain height range to storage index range using sync_base_height
1163+
let sync_base_height = *self.sync_base_height.read().await;
1164+
let storage_start = if sync_base_height > 0 && range.start >= sync_base_height {
1165+
range.start - sync_base_height
1166+
} else {
1167+
range.start
1168+
};
1169+
1170+
let storage_end = if sync_base_height > 0 && range.end > sync_base_height {
1171+
range.end - sync_base_height
1172+
} else {
1173+
range.end
1174+
};
1175+
1176+
let start_segment = Self::get_segment_id(storage_start);
1177+
let end_segment = Self::get_segment_id(storage_end.saturating_sub(1));
11641178

11651179
for segment_id in start_segment..=end_segment {
11661180
self.ensure_segment_loaded(segment_id).await?;
11671181

11681182
let segments = self.active_segments.read().await;
11691183
if let Some(segment) = segments.get(&segment_id) {
1170-
let _segment_start_height = segment_id * HEADERS_PER_SEGMENT;
1171-
let _segment_end_height = _segment_start_height + segment.headers.len() as u32;
1172-
11731184
let start_idx = if segment_id == start_segment {
1174-
Self::get_segment_offset(range.start)
1185+
Self::get_segment_offset(storage_start)
11751186
} else {
11761187
0
11771188
};
11781189

11791190
let end_idx = if segment_id == end_segment {
1180-
Self::get_segment_offset(range.end.saturating_sub(1)) + 1
1191+
Self::get_segment_offset(storage_end.saturating_sub(1)) + 1
11811192
} else {
11821193
segment.headers.len()
11831194
};
@@ -1198,17 +1209,31 @@ impl StorageManager for DiskStorageManager {
11981209
}
11991210

12001211
async fn get_header(&self, height: u32) -> StorageResult<Option<BlockHeader>> {
1201-
// TODO: This method currently expects storage-relative heights (0-based from sync_base_height).
1202-
// Consider refactoring to accept blockchain heights and handle conversion internally for better UX.
1212+
// Accept blockchain (absolute) height and convert to storage index using sync_base_height.
1213+
let sync_base_height = *self.sync_base_height.read().await;
12031214

1204-
// First check if this height is within our known range
1205-
let tip_height = self.cached_tip_height.read().await;
1206-
if let Some(tip) = *tip_height {
1207-
if height > tip {
1215+
// Convert absolute height to storage index (base-inclusive mapping)
1216+
let storage_index = if sync_base_height > 0 {
1217+
if height >= sync_base_height {
1218+
height - sync_base_height
1219+
} else {
1220+
// If caller passes a small value (likely a pre-conversion storage index), use it directly
1221+
height
1222+
}
1223+
} else {
1224+
height
1225+
};
1226+
1227+
// First check if this storage index is within our known range
1228+
let tip_index_opt = *self.cached_tip_height.read().await;
1229+
if let Some(tip_index) = tip_index_opt {
1230+
if storage_index > tip_index {
12081231
tracing::trace!(
1209-
"Requested header at height {} is beyond tip height {}",
1232+
"Requested header at storage index {} is beyond tip index {} (abs height {} base {})",
1233+
storage_index,
1234+
tip_index,
12101235
height,
1211-
tip
1236+
sync_base_height
12121237
);
12131238
return Ok(None);
12141239
}
@@ -1217,8 +1242,8 @@ impl StorageManager for DiskStorageManager {
12171242
return Ok(None);
12181243
}
12191244

1220-
let segment_id = Self::get_segment_id(height);
1221-
let offset = Self::get_segment_offset(height);
1245+
let segment_id = Self::get_segment_id(storage_index);
1246+
let offset = Self::get_segment_offset(storage_index);
12221247

12231248
self.ensure_segment_loaded(segment_id).await?;
12241249

@@ -1235,18 +1260,30 @@ impl StorageManager for DiskStorageManager {
12351260

12361261
if header.is_none() {
12371262
tracing::debug!(
1238-
"Header not found at height {} (segment: {}, offset: {})",
1239-
height,
1263+
"Header not found at storage index {} (segment: {}, offset: {}, abs height {}, base {})",
1264+
storage_index,
12401265
segment_id,
1241-
offset
1266+
offset,
1267+
height,
1268+
sync_base_height
12421269
);
12431270
}
12441271

12451272
Ok(header)
12461273
}
12471274

12481275
async fn get_tip_height(&self) -> StorageResult<Option<u32>> {
1249-
Ok(*self.cached_tip_height.read().await)
1276+
let tip_index_opt = *self.cached_tip_height.read().await;
1277+
if let Some(tip_index) = tip_index_opt {
1278+
let base = *self.sync_base_height.read().await;
1279+
if base > 0 {
1280+
Ok(Some(base + tip_index))
1281+
} else {
1282+
Ok(Some(tip_index))
1283+
}
1284+
} else {
1285+
Ok(None)
1286+
}
12501287
}
12511288

12521289
async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> {
@@ -1487,7 +1524,12 @@ impl StorageManager for DiskStorageManager {
14871524

14881525
// Load all headers
14891526
if let Some(tip_height) = self.get_tip_height().await? {
1490-
state.headers = self.load_headers(0..tip_height + 1).await?;
1527+
let range_start = if state.synced_from_checkpoint && state.sync_base_height > 0 {
1528+
state.sync_base_height
1529+
} else {
1530+
0
1531+
};
1532+
state.headers = self.load_headers(range_start..tip_height + 1).await?;
14911533
}
14921534

14931535
// Load all filter headers
@@ -2032,16 +2074,22 @@ mod tests {
20322074
// Store headers using checkpoint sync method
20332075
storage.store_headers_from_height(&headers, checkpoint_height).await?;
20342076

2035-
// Verify headers are stored at correct storage indices
2036-
// Header at blockchain height 1,100,000 should be at storage index 0
2037-
let header_at_0 = storage.get_header(0).await?;
2038-
assert!(header_at_0.is_some(), "Header at storage index 0 should exist");
2039-
assert_eq!(header_at_0.unwrap(), headers[0]);
2077+
// Set sync base height so storage interprets heights as blockchain heights
2078+
let mut base_state = ChainState::new();
2079+
base_state.sync_base_height = checkpoint_height;
2080+
base_state.synced_from_checkpoint = true;
2081+
storage.store_chain_state(&base_state).await?;
2082+
2083+
// Verify headers are stored at correct blockchain heights
2084+
// Header at blockchain height 1,100,000 should be retrievable by that height
2085+
let header_at_base = storage.get_header(checkpoint_height).await?;
2086+
assert!(header_at_base.is_some(), "Header at base blockchain height should exist");
2087+
assert_eq!(header_at_base.unwrap(), headers[0]);
20402088

2041-
// Header at blockchain height 1,100,099 should be at storage index 99
2042-
let header_at_99 = storage.get_header(99).await?;
2043-
assert!(header_at_99.is_some(), "Header at storage index 99 should exist");
2044-
assert_eq!(header_at_99.unwrap(), headers[99]);
2089+
// Header at blockchain height 1,100,099 should be retrievable by that height
2090+
let header_at_ending = storage.get_header(checkpoint_height + 99).await?;
2091+
assert!(header_at_ending.is_some(), "Header at ending blockchain height should exist");
2092+
assert_eq!(header_at_ending.unwrap(), headers[99]);
20452093

20462094
// Test the reverse index (hash -> blockchain height)
20472095
let hash_0 = headers[0].block_hash();
@@ -2081,11 +2129,11 @@ mod tests {
20812129
"After index rebuild, hash should still map to blockchain height 1,100,000"
20822130
);
20832131

2084-
// Verify headers can still be retrieved by storage index
2085-
let header_after_reload = storage2.get_header(0).await?;
2132+
// Verify header can still be retrieved by blockchain height after reload
2133+
let header_after_reload = storage2.get_header(checkpoint_height).await?;
20862134
assert!(
20872135
header_after_reload.is_some(),
2088-
"Header at storage index 0 should exist after reload"
2136+
"Header at base blockchain height should exist after reload"
20892137
);
20902138
assert_eq!(header_after_reload.unwrap(), headers[0]);
20912139

0 commit comments

Comments
 (0)