Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dash-spv-ffi/include/dash_spv_ffi.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ typedef void (*WalletTransactionCallback)(const char *wallet_id,
bool is_ours,
void *user_data);

typedef void (*FilterHeadersProgressCallback)(uint32_t filter_height,
uint32_t header_height,
double percentage,
void *user_data);

Comment on lines +152 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

New FilterHeadersProgress callback shape and placement look correct.

Signature matches Rust (u32, u32, f64, void*), and appending the field at the end of FFIEventCallbacks preserves ABI for rebuilt consumers.

Please also sync the generated header into the Swift SDK to keep it in lockstep:

Also applies to: 166-167


🏁 Script executed:

#!/bin/bash
set -euo pipefail
# Verify Swift header is in sync with generated one
GEN="dash-spv-ffi/include/dash_spv_ffi.h"
SWIFT="swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h"
if [[ -f "$GEN" && -f "$SWIFT" ]]; then
  if cmp -s "$GEN" "$SWIFT"; then
    echo "Headers are in sync ✅"
  else
    echo "Headers differ ❌"; diff -u "$SWIFT" "$GEN" | sed -n '1,200p'
    echo "Run ./sync-headers.sh to update."
    exit 1
  fi
else
  echo "One or both header paths missing. Please check repository layout."
  exit 1
fi

Length of output: 1048


Sync Swift header with generated header (run ./sync-headers.sh)

dash-spv-ffi/include/dash_spv_ffi.h adds the FilterHeadersProgress typedef and the on_filter_headers_progress field; swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h is out of sync — run ./sync-headers.sh to update. Signature matches Rust (u32, u32, f64, void*) and appending the field at the end of FFIEventCallbacks preserves ABI.

🤖 Prompt for AI Agents
In dash-spv-ffi/include/dash_spv_ffi.h around lines 152-156 the new typedef
FilterHeadersProgressCallback and the on_filter_headers_progress callback field
were added but the Swift copy
swift-dash-core-sdk/Sources/DashSPVFFI/include/dash_spv_ffi.h is out of sync;
run ./sync-headers.sh to regenerate the Swift header so it includes the typedef
(uint32_t, uint32_t, double, void*) and append the on_filter_headers_progress
field at the end of the FFIEventCallbacks struct to preserve ABI compatibility.

typedef struct FFIEventCallbacks {
BlockCallback on_block;
TransactionCallback on_transaction;
Expand All @@ -158,6 +163,7 @@ typedef struct FFIEventCallbacks {
MempoolRemovedCallback on_mempool_transaction_removed;
CompactFilterMatchedCallback on_compact_filter_matched;
WalletTransactionCallback on_wallet_transaction;
FilterHeadersProgressCallback on_filter_headers_progress;
void *user_data;
} FFIEventCallbacks;

Expand Down
26 changes: 26 additions & 0 deletions dash-spv-ffi/src/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ pub type WalletTransactionCallback = Option<
),
>;

pub type FilterHeadersProgressCallback = Option<
extern "C" fn(filter_height: u32, header_height: u32, percentage: f64, user_data: *mut c_void),
>;

#[repr(C)]
pub struct FFIEventCallbacks {
pub on_block: BlockCallback,
Expand All @@ -145,6 +149,7 @@ pub struct FFIEventCallbacks {
pub on_mempool_transaction_removed: MempoolRemovedCallback,
pub on_compact_filter_matched: CompactFilterMatchedCallback,
pub on_wallet_transaction: WalletTransactionCallback,
pub on_filter_headers_progress: FilterHeadersProgressCallback,
pub user_data: *mut c_void,
}

Expand Down Expand Up @@ -173,6 +178,7 @@ impl Default for FFIEventCallbacks {
on_mempool_transaction_removed: None,
on_compact_filter_matched: None,
on_wallet_transaction: None,
on_filter_headers_progress: None,
user_data: std::ptr::null_mut(),
}
}
Expand Down Expand Up @@ -383,3 +389,23 @@ impl FFIEventCallbacks {
}
}
}

impl FFIEventCallbacks {
pub fn call_filter_headers_progress(
&self,
filter_height: u32,
header_height: u32,
percentage: f64,
) {
if let Some(callback) = self.on_filter_headers_progress {
tracing::info!(
"📊 Calling filter headers progress callback: filter_height={}, header_height={}, pct={:.2}",
filter_height, header_height, percentage
);
callback(filter_height, header_height, percentage, self.user_data);
tracing::info!("✅ Filter headers progress callback completed");
} else {
tracing::debug!("Filter headers progress callback not set");
}
}
}
22 changes: 22 additions & 0 deletions dash-spv-ffi/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ impl FFIDashSpvClient {
confirmed, unconfirmed, total);
callbacks.call_balance_update(confirmed, unconfirmed);
}
dash_spv::types::SpvEvent::FilterHeadersProgress { filter_header_height, header_height, percentage } => {
tracing::info!("📊 Filter headers progress event: filter={}, header={}, pct={:.2}",
filter_header_height, header_height, percentage);
callbacks
.call_filter_headers_progress(
filter_header_height,
header_height,
percentage,
);
}
dash_spv::types::SpvEvent::TransactionDetected { ref txid, confirmed, ref addresses, amount, block_height, .. } => {
tracing::info!("💸 Transaction detected: txid={}, confirmed={}, amount={}, addresses={:?}, height={:?}",
txid, confirmed, amount, addresses, block_height);
Expand Down Expand Up @@ -717,6 +727,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
let inner = client.inner.clone();
let runtime = client.runtime.clone();
let sync_callbacks = client.sync_callbacks.clone();
// Shared flag to coordinate internal threads during sync
let sync_running = Arc::new(AtomicBool::new(true));

// Take progress receiver from client
let progress_receiver = {
Expand Down Expand Up @@ -772,6 +784,7 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
// Spawn sync task in a separate thread with safe callback access
let runtime_handle = runtime.handle().clone();
let sync_callbacks_clone = sync_callbacks.clone();
let sync_running_for_join = sync_running.clone();
let sync_handle = std::thread::spawn(move || {
// Run monitoring loop
let monitor_result = runtime_handle.block_on(async move {
Expand All @@ -792,6 +805,9 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
res
});

// Signal background handlers to stop
sync_running_for_join.store(false, Ordering::Relaxed);

// Send completion callback and cleanup
{
let mut cb_guard = sync_callbacks_clone.lock().unwrap();
Expand Down Expand Up @@ -843,6 +859,8 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
FFIErrorCode::Success as i32
}

// Note: filter headers progress is forwarded via FFIEventCallbacks.on_filter_headers_progress

/// Cancels the sync operation.
///
/// **Note**: This function currently only stops the SPV client and clears sync callbacks,
Expand Down Expand Up @@ -1021,6 +1039,10 @@ pub unsafe extern "C" fn dash_spv_ffi_client_set_event_callbacks(
tracing::info!(" Block callback: {}", callbacks.on_block.is_some());
tracing::info!(" Transaction callback: {}", callbacks.on_transaction.is_some());
tracing::info!(" Balance update callback: {}", callbacks.on_balance_update.is_some());
tracing::info!(
" Filter headers progress callback: {}",
callbacks.on_filter_headers_progress.is_some()
);

let mut event_callbacks = client.event_callbacks.lock().unwrap();
*event_callbacks = callbacks;
Expand Down
8 changes: 7 additions & 1 deletion dash-spv-ffi/tests/integration/test_full_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ mod tests {
on_block: Some(on_block),
on_transaction: Some(on_transaction),
on_balance_update: Some(on_balance),
on_mempool_transaction_added: None,
on_mempool_transaction_confirmed: None,
on_mempool_transaction_removed: None,
on_compact_filter_matched: None,
on_wallet_transaction: None,
on_filter_headers_progress: None,
user_data: &ctx as *const _ as *mut c_void,
};

Expand Down Expand Up @@ -536,4 +542,4 @@ mod tests {
ctx.cleanup();
}
}
}
}
2 changes: 2 additions & 0 deletions dash-spv-ffi/tests/test_event_callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fn test_event_callbacks_setup() {
on_mempool_transaction_removed: None,
on_compact_filter_matched: None,
on_wallet_transaction: None,
on_filter_headers_progress: None,
user_data,
};

Expand Down Expand Up @@ -266,6 +267,7 @@ fn test_enhanced_event_callbacks() {
on_mempool_transaction_removed: None,
on_compact_filter_matched: Some(test_compact_filter_matched_callback),
on_wallet_transaction: Some(test_wallet_transaction_callback),
on_filter_headers_progress: None,
user_data: Arc::as_ptr(&event_data) as *mut c_void,
};

Expand Down
1 change: 1 addition & 0 deletions dash-spv-ffi/tests/unit/test_async_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ mod tests {
on_mempool_transaction_removed: None,
on_compact_filter_matched: None,
on_wallet_transaction: None,
on_filter_headers_progress: None,
user_data: &event_data as *const _ as *mut c_void,
};

Expand Down
19 changes: 19 additions & 0 deletions dash-spv/src/client/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,25 @@ impl<
{
tracing::error!("Sequential sync manager error handling message: {}", e);
}

// Additionally forward compact filters to the block processor so it can
// perform wallet matching and emit CompactFilterMatched events.
if let NetworkMessage::CFilter(ref cfilter_msg) = message {
let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
let task = crate::client::BlockProcessingTask::ProcessCompactFilter {
filter: dashcore::bip158::BlockFilter {
content: cfilter_msg.filter.clone(),
},
block_hash: cfilter_msg.block_hash,
response_tx,
};
if let Err(e) = self.block_processor_tx.send(task) {
tracing::warn!(
"Failed to forward CFilter to block processor for event emission: {}",
e
);
}
}
}
NetworkMessage::Block(_) => {
// Blocks can be large - avoid cloning unless necessary
Expand Down
51 changes: 47 additions & 4 deletions dash-spv/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,10 @@ impl<
// Track masternode sync completion for ChainLock validation
let mut masternode_engine_updated = false;

// Last emitted heights for filter headers progress to avoid duplicate events
let mut last_emitted_header_height: u32 = 0;
let mut last_emitted_filter_header_height: u32 = 0;

loop {
// Check if we should stop
let running = self.running.read().await;
Expand Down Expand Up @@ -862,10 +866,14 @@ impl<

// Emit detailed progress update
if last_rate_calc.elapsed() >= Duration::from_secs(1) {
let current_height = {
// Storage tip is the headers vector index (0-based).
let current_storage_tip = {
let storage = self.storage.lock().await;
storage.get_tip_height().await.ok().flatten().unwrap_or(0)
};
// Convert to absolute blockchain height: base + storage_tip
let sync_base_height = { self.state.read().await.sync_base_height };
let current_height = sync_base_height + current_storage_tip;
let peer_best = self
.network
.get_peer_best_height()
Expand All @@ -875,9 +883,9 @@ impl<
.unwrap_or(current_height);

// Calculate headers downloaded this second
if current_height > last_height {
headers_this_second = current_height - last_height;
last_height = current_height;
if current_storage_tip > last_height {
headers_this_second = current_storage_tip - last_height;
last_height = current_storage_tip;
}

let headers_per_second = headers_this_second as f64;
Expand Down Expand Up @@ -928,6 +936,34 @@ impl<
last_rate_calc = Instant::now();
}

// Emit filter headers progress only when heights change
let (abs_header_height, filter_header_height) = {
let storage = self.storage.lock().await;
let storage_tip = storage.get_tip_height().await.ok().flatten().unwrap_or(0);
let filter_tip =
storage.get_filter_tip_height().await.ok().flatten().unwrap_or(0);
(self.state.read().await.sync_base_height + storage_tip, filter_tip)
};
if abs_header_height != last_emitted_header_height
|| filter_header_height != last_emitted_filter_header_height
{
if abs_header_height > 0 {
let pct = if filter_header_height <= abs_header_height {
(filter_header_height as f64 / abs_header_height as f64 * 100.0)
.min(100.0)
} else {
0.0
};
self.emit_event(SpvEvent::FilterHeadersProgress {
filter_header_height,
header_height: abs_header_height,
percentage: pct,
});
}
last_emitted_header_height = abs_header_height;
last_emitted_filter_header_height = filter_header_height;
}

last_status_update = Instant::now();
}

Expand Down Expand Up @@ -2374,6 +2410,13 @@ impl<
}
}

tracing::debug!(
"get_stats: header_height={}, filter_height={}, peers={}",
stats.header_height,
stats.filter_height,
stats.connected_peers
);

Ok(stats)
}

Expand Down
Loading
Loading