Skip to content

Commit fab0f61

Browse files
more work
1 parent befd035 commit fab0f61

File tree

11 files changed

+565
-130
lines changed

11 files changed

+565
-130
lines changed

dash-spv-ffi/include/dash_spv_ffi.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,11 @@ typedef void (*WalletTransactionCallback)(const char *wallet_id,
149149
bool is_ours,
150150
void *user_data);
151151

152+
typedef void (*FilterHeadersProgressCallback)(uint32_t filter_height,
153+
uint32_t header_height,
154+
double percentage,
155+
void *user_data);
156+
152157
typedef struct FFIEventCallbacks {
153158
BlockCallback on_block;
154159
TransactionCallback on_transaction;
@@ -158,6 +163,7 @@ typedef struct FFIEventCallbacks {
158163
MempoolRemovedCallback on_mempool_transaction_removed;
159164
CompactFilterMatchedCallback on_compact_filter_matched;
160165
WalletTransactionCallback on_wallet_transaction;
166+
FilterHeadersProgressCallback on_filter_headers_progress;
161167
void *user_data;
162168
} FFIEventCallbacks;
163169

dash-spv-ffi/src/callbacks.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,10 @@ pub type WalletTransactionCallback = Option<
135135
),
136136
>;
137137

138+
pub type FilterHeadersProgressCallback = Option<
139+
extern "C" fn(filter_height: u32, header_height: u32, percentage: f64, user_data: *mut c_void),
140+
>;
141+
138142
#[repr(C)]
139143
pub struct FFIEventCallbacks {
140144
pub on_block: BlockCallback,
@@ -145,6 +149,7 @@ pub struct FFIEventCallbacks {
145149
pub on_mempool_transaction_removed: MempoolRemovedCallback,
146150
pub on_compact_filter_matched: CompactFilterMatchedCallback,
147151
pub on_wallet_transaction: WalletTransactionCallback,
152+
pub on_filter_headers_progress: FilterHeadersProgressCallback,
148153
pub user_data: *mut c_void,
149154
}
150155

@@ -173,6 +178,7 @@ impl Default for FFIEventCallbacks {
173178
on_mempool_transaction_removed: None,
174179
on_compact_filter_matched: None,
175180
on_wallet_transaction: None,
181+
on_filter_headers_progress: None,
176182
user_data: std::ptr::null_mut(),
177183
}
178184
}
@@ -383,3 +389,23 @@ impl FFIEventCallbacks {
383389
}
384390
}
385391
}
392+
393+
impl FFIEventCallbacks {
394+
pub fn call_filter_headers_progress(
395+
&self,
396+
filter_height: u32,
397+
header_height: u32,
398+
percentage: f64,
399+
) {
400+
if let Some(callback) = self.on_filter_headers_progress {
401+
tracing::info!(
402+
"📊 Calling filter headers progress callback: filter_height={}, header_height={}, pct={:.2}",
403+
filter_height, header_height, percentage
404+
);
405+
callback(filter_height, header_height, percentage, self.user_data);
406+
tracing::info!("✅ Filter headers progress callback completed");
407+
} else {
408+
tracing::debug!("Filter headers progress callback not set");
409+
}
410+
}
411+
}

dash-spv-ffi/src/client.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,16 @@ impl FFIDashSpvClient {
230230
confirmed, unconfirmed, total);
231231
callbacks.call_balance_update(confirmed, unconfirmed);
232232
}
233+
dash_spv::types::SpvEvent::FilterHeadersProgress { filter_header_height, header_height, percentage } => {
234+
tracing::info!("📊 Filter headers progress event: filter={}, header={}, pct={:.2}",
235+
filter_header_height, header_height, percentage);
236+
callbacks
237+
.call_filter_headers_progress(
238+
filter_header_height,
239+
header_height,
240+
percentage,
241+
);
242+
}
233243
dash_spv::types::SpvEvent::TransactionDetected { ref txid, confirmed, ref addresses, amount, block_height, .. } => {
234244
tracing::info!("💸 Transaction detected: txid={}, confirmed={}, amount={}, addresses={:?}, height={:?}",
235245
txid, confirmed, amount, addresses, block_height);
@@ -717,6 +727,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
717727
let inner = client.inner.clone();
718728
let runtime = client.runtime.clone();
719729
let sync_callbacks = client.sync_callbacks.clone();
730+
// Shared flag to coordinate internal threads during sync
731+
let sync_running = Arc::new(AtomicBool::new(true));
720732

721733
// Take progress receiver from client
722734
let progress_receiver = {
@@ -772,6 +784,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
772784
// Spawn sync task in a separate thread with safe callback access
773785
let runtime_handle = runtime.handle().clone();
774786
let sync_callbacks_clone = sync_callbacks.clone();
787+
let sync_running_for_join = sync_running.clone();
775788
let sync_handle = std::thread::spawn(move || {
776789
// Run monitoring loop
777790
let monitor_result = runtime_handle.block_on(async move {
@@ -792,6 +805,9 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
792805
res
793806
});
794807

808+
// Signal background handlers to stop
809+
sync_running_for_join.store(false, Ordering::Relaxed);
810+
795811
// Send completion callback and cleanup
796812
{
797813
let mut cb_guard = sync_callbacks_clone.lock().unwrap();
@@ -843,6 +859,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
843859
FFIErrorCode::Success as i32
844860
}
845861

862+
// Note: filter headers progress is forwarded via FFIEventCallbacks.on_filter_headers_progress
863+
846864
/// Cancels the sync operation.
847865
///
848866
/// **Note**: This function currently only stops the SPV client and clears sync callbacks,
@@ -1021,6 +1039,10 @@ pub unsafe extern "C" fn dash_spv_ffi_client_set_event_callbacks(
10211039
tracing::info!(" Block callback: {}", callbacks.on_block.is_some());
10221040
tracing::info!(" Transaction callback: {}", callbacks.on_transaction.is_some());
10231041
tracing::info!(" Balance update callback: {}", callbacks.on_balance_update.is_some());
1042+
tracing::info!(
1043+
" Filter headers progress callback: {}",
1044+
callbacks.on_filter_headers_progress.is_some()
1045+
);
10241046

10251047
let mut event_callbacks = client.event_callbacks.lock().unwrap();
10261048
*event_callbacks = callbacks;

dash-spv-ffi/tests/integration/test_full_workflow.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,12 @@ mod tests {
191191
on_block: Some(on_block),
192192
on_transaction: Some(on_transaction),
193193
on_balance_update: Some(on_balance),
194+
on_mempool_transaction_added: None,
195+
on_mempool_transaction_confirmed: None,
196+
on_mempool_transaction_removed: None,
197+
on_compact_filter_matched: None,
198+
on_wallet_transaction: None,
199+
on_filter_headers_progress: None,
194200
user_data: &ctx as *const _ as *mut c_void,
195201
};
196202

@@ -536,4 +542,4 @@ mod tests {
536542
ctx.cleanup();
537543
}
538544
}
539-
}
545+
}

dash-spv-ffi/tests/test_event_callbacks.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ fn test_event_callbacks_setup() {
173173
on_mempool_transaction_removed: None,
174174
on_compact_filter_matched: None,
175175
on_wallet_transaction: None,
176+
on_filter_headers_progress: None,
176177
user_data,
177178
};
178179

@@ -266,6 +267,7 @@ fn test_enhanced_event_callbacks() {
266267
on_mempool_transaction_removed: None,
267268
on_compact_filter_matched: Some(test_compact_filter_matched_callback),
268269
on_wallet_transaction: Some(test_wallet_transaction_callback),
270+
on_filter_headers_progress: None,
269271
user_data: Arc::as_ptr(&event_data) as *mut c_void,
270272
};
271273

dash-spv-ffi/tests/unit/test_async_operations.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ mod tests {
599599
on_mempool_transaction_removed: None,
600600
on_compact_filter_matched: None,
601601
on_wallet_transaction: None,
602+
on_filter_headers_progress: None,
602603
user_data: &event_data as *const _ as *mut c_void,
603604
};
604605

dash-spv/src/client/message_handler.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,25 @@ impl<
164164
{
165165
tracing::error!("Sequential sync manager error handling message: {}", e);
166166
}
167+
168+
// Additionally forward compact filters to the block processor so it can
169+
// perform wallet matching and emit CompactFilterMatched events.
170+
if let NetworkMessage::CFilter(ref cfilter_msg) = message {
171+
let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
172+
let task = crate::client::BlockProcessingTask::ProcessCompactFilter {
173+
filter: dashcore::bip158::BlockFilter {
174+
content: cfilter_msg.filter.clone(),
175+
},
176+
block_hash: cfilter_msg.block_hash,
177+
response_tx,
178+
};
179+
if let Err(e) = self.block_processor_tx.send(task) {
180+
tracing::warn!(
181+
"Failed to forward CFilter to block processor for event emission: {}",
182+
e
183+
);
184+
}
185+
}
167186
}
168187
NetworkMessage::Block(_) => {
169188
// Blocks can be large - avoid cloning unless necessary

dash-spv/src/client/mod.rs

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,10 @@ impl<
728728
// Track masternode sync completion for ChainLock validation
729729
let mut masternode_engine_updated = false;
730730

731+
// Last emitted heights for filter headers progress to avoid duplicate events
732+
let mut last_emitted_header_height: u32 = 0;
733+
let mut last_emitted_filter_header_height: u32 = 0;
734+
731735
loop {
732736
// Check if we should stop
733737
let running = self.running.read().await;
@@ -862,10 +866,14 @@ impl<
862866

863867
// Emit detailed progress update
864868
if last_rate_calc.elapsed() >= Duration::from_secs(1) {
865-
let current_height = {
869+
// Storage tip is the headers vector index (0-based).
870+
let current_storage_tip = {
866871
let storage = self.storage.lock().await;
867872
storage.get_tip_height().await.ok().flatten().unwrap_or(0)
868873
};
874+
// Convert to absolute blockchain height: base + storage_tip
875+
let sync_base_height = { self.state.read().await.sync_base_height };
876+
let current_height = sync_base_height + current_storage_tip;
869877
let peer_best = self
870878
.network
871879
.get_peer_best_height()
@@ -875,9 +883,9 @@ impl<
875883
.unwrap_or(current_height);
876884

877885
// Calculate headers downloaded this second
878-
if current_height > last_height {
879-
headers_this_second = current_height - last_height;
880-
last_height = current_height;
886+
if current_storage_tip > last_height {
887+
headers_this_second = current_storage_tip - last_height;
888+
last_height = current_storage_tip;
881889
}
882890

883891
let headers_per_second = headers_this_second as f64;
@@ -928,6 +936,34 @@ impl<
928936
last_rate_calc = Instant::now();
929937
}
930938

939+
// Emit filter headers progress only when heights change
940+
let (abs_header_height, filter_header_height) = {
941+
let storage = self.storage.lock().await;
942+
let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0);
943+
let filter_tip =
944+
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
945+
(self.state.read().await.sync_base_height + storage_tip, filter_tip)
946+
};
947+
if abs_header_height != last_emitted_header_height
948+
|| filter_header_height != last_emitted_filter_header_height
949+
{
950+
if abs_header_height > 0 {
951+
let pct = if filter_header_height <= abs_header_height {
952+
(filter_header_height as f64 / abs_header_height as f64 * 100.0)
953+
.min(100.0)
954+
} else {
955+
0.0
956+
};
957+
self.emit_event(SpvEvent::FilterHeadersProgress {
958+
filter_header_height,
959+
header_height: abs_header_height,
960+
percentage: pct,
961+
});
962+
}
963+
last_emitted_header_height = abs_header_height;
964+
last_emitted_filter_header_height = filter_header_height;
965+
}
966+
931967
last_status_update = Instant::now();
932968
}
933969

@@ -2374,6 +2410,13 @@ impl<
23742410
}
23752411
}
23762412

2413+
tracing::debug!(
2414+
"get_stats: header_height={}, filter_height={}, peers={}",
2415+
stats.header_height,
2416+
stats.filter_height,
2417+
stats.connected_peers
2418+
);
2419+
23772420
Ok(stats)
23782421
}
23792422

0 commit comments

Comments
 (0)