Skip to content

Commit 2340750

Browse files
committed
feat: Introduce DashSpvClientInterface
1 parent 4e9ef04 commit 2340750

File tree

11 files changed

+269
-176
lines changed

11 files changed

+269
-176
lines changed

dash-spv-ffi/src/client.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -886,23 +886,27 @@ pub unsafe extern "C" fn dash_spv_ffi_client_sync_to_tip_with_progress(
886886
}
887887
}
888888
};
889+
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
890+
let run_token = shutdown_token_sync.clone();
889891
let (abort_handle, abort_registration) = AbortHandle::new_pair();
890-
let mut monitor_future =
891-
Box::pin(Abortable::new(spv_client.monitor_network(), abort_registration));
892+
let mut run_future = Box::pin(Abortable::new(
893+
spv_client.run(command_receiver, run_token),
894+
abort_registration,
895+
));
892896
let result = tokio::select! {
893-
res = &mut monitor_future => match res {
897+
res = &mut run_future => match res {
894898
Ok(inner) => inner,
895899
Err(_) => Ok(()),
896900
},
897901
_ = shutdown_token_sync.cancelled() => {
898902
abort_handle.abort();
899-
match monitor_future.as_mut().await {
903+
match run_future.as_mut().await {
900904
Ok(inner) => inner,
901905
Err(_) => Ok(()),
902906
}
903907
}
904908
};
905-
drop(monitor_future);
909+
drop(run_future);
906910
let mut guard = inner.lock().unwrap();
907911
*guard = Some(spv_client);
908912
result

dash-spv-ffi/src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub extern "C" fn dash_spv_ffi_clear_error() {
5252
impl From<SpvError> for FFIErrorCode {
5353
fn from(err: SpvError) -> Self {
5454
match err {
55+
SpvError::ChannelFailure(_, _) => FFIErrorCode::RuntimeError,
5556
SpvError::Network(_) => FFIErrorCode::NetworkError,
5657
SpvError::Storage(_) => FFIErrorCode::StorageError,
5758
SpvError::Validation(_) => FFIErrorCode::ValidationError,
@@ -60,6 +61,7 @@ impl From<SpvError> for FFIErrorCode {
6061
SpvError::Config(_) => FFIErrorCode::ConfigError,
6162
SpvError::Parse(_) => FFIErrorCode::ValidationError,
6263
SpvError::Wallet(_) => FFIErrorCode::WalletError,
64+
SpvError::QuorumLookupError(_) => FFIErrorCode::ValidationError,
6365
SpvError::General(_) => FFIErrorCode::Unknown,
6466
}
6567
}

dash-spv/examples/filter_sync.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
88
use key_wallet_manager::wallet_manager::WalletManager;
99
use std::str::FromStr;
1010
use std::sync::Arc;
11-
use tokio::signal;
1211
use tokio::sync::RwLock;
12+
use tokio_util::sync::CancellationToken;
1313

1414
#[tokio::main]
1515
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -43,29 +43,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4343
println!("Watching address: {:?}", watch_address);
4444

4545
// Full sync including filters
46-
let progress = client.sync_to_tip().await?;
46+
client.sync_to_tip().await?;
4747

48-
tokio::select! {
49-
result = client.monitor_network() => {
50-
println!("monitor_network result {:?}", result);
51-
},
52-
_ = signal::ctrl_c() => {
53-
println!("monitor_network canceled");
54-
}
55-
}
48+
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
49+
let shutdown_token = CancellationToken::new();
5650

57-
println!("Headers synced: {}", progress.header_height);
58-
println!("Filter headers synced: {}", progress.filter_header_height);
59-
60-
// Get statistics
61-
let stats = client.stats().await?;
62-
println!("Filter headers downloaded: {}", stats.filter_headers_downloaded);
63-
println!("Filters downloaded: {}", stats.filters_downloaded);
64-
println!("Filter matches found: {}", stats.filters_matched);
65-
println!("Blocks requested: {}", stats.blocks_requested);
66-
67-
// Stop the client
68-
client.stop().await?;
51+
client.run_until_shutdown(command_receiver, shutdown_token).await?;
6952

7053
println!("Done!");
7154
Ok(())

dash-spv/examples/simple_sync.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
77

88
use key_wallet_manager::wallet_manager::WalletManager;
99
use std::sync::Arc;
10-
use tokio::signal;
1110
use tokio::sync::RwLock;
11+
use tokio_util::sync::CancellationToken;
1212

1313
#[tokio::main]
1414
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -48,17 +48,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4848
println!("Headers downloaded: {}", stats.headers_downloaded);
4949
println!("Bytes received: {}", stats.bytes_received);
5050

51-
tokio::select! {
52-
result = client.monitor_network() => {
53-
println!("monitor_network result {:?}", result);
54-
},
55-
_ = signal::ctrl_c() => {
56-
println!("monitor_network canceled");
57-
}
58-
}
59-
60-
// Stop the client
61-
client.stop().await?;
51+
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
52+
let shutdown_token = CancellationToken::new();
53+
54+
client.run_until_shutdown(command_receiver, shutdown_token).await?;
6255

6356
println!("Done!");
6457
Ok(())

dash-spv/examples/spv_with_wallet.rs

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use dash_spv::{ClientConfig, DashSpvClient};
88
use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo;
99
use key_wallet_manager::wallet_manager::WalletManager;
1010
use std::sync::Arc;
11-
use tokio::signal;
1211
use tokio::sync::RwLock;
12+
use tokio_util::sync::CancellationToken;
1313

1414
#[tokio::main]
1515
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -48,18 +48,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4848
// - Mempool transactions via process_mempool_transaction()
4949
// - Reorgs via handle_reorg()
5050
// - Compact filter checks via check_compact_filter()
51-
tokio::select! {
52-
result = client.monitor_network() => {
53-
println!("monitor_network result {:?}", result);
54-
},
55-
_ = signal::ctrl_c() => {
56-
println!("monitor_network canceled");
57-
}
58-
}
5951

60-
// Stop the client
61-
println!("Stopping SPV client...");
62-
client.stop().await?;
52+
let (_command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel();
53+
let shutdown_token = CancellationToken::new();
54+
55+
client.run_until_shutdown(command_receiver, shutdown_token).await?;
6356

6457
println!("Done!");
6558
Ok(())

dash-spv/src/client/interface.rs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use crate::error::SpvError;
2+
use dashcore::sml::llmq_type::LLMQType;
3+
use dashcore::sml::quorum_entry::qualified_quorum_entry::QualifiedQuorumEntry;
4+
use dashcore::QuorumHash;
5+
use std::fmt::Display;
6+
use tokio::sync::{mpsc, oneshot};
7+
8+
pub type Result<T> = std::result::Result<T, SpvError>;
9+
10+
pub type GetQuorumByHeightResult = Result<QualifiedQuorumEntry>;
11+
12+
async fn receive<Type>(context: String, receiver: oneshot::Receiver<Type>) -> Result<Type> {
13+
receiver.await.map_err(|error| SpvError::ChannelFailure(context, error.to_string()))
14+
}
15+
16+
pub enum DashSpvClientCommand {
17+
GetQuorumByHeight {
18+
height: u32,
19+
quorum_type: LLMQType,
20+
quorum_hash: QuorumHash,
21+
sender: oneshot::Sender<GetQuorumByHeightResult>,
22+
},
23+
}
24+
25+
impl DashSpvClientCommand {
26+
pub async fn send(
27+
self,
28+
context: String,
29+
sender: mpsc::UnboundedSender<DashSpvClientCommand>,
30+
) -> Result<()> {
31+
sender.send(self).map_err(|error| SpvError::ChannelFailure(context, error.to_string()))?;
32+
Ok(())
33+
}
34+
}
35+
36+
impl Display for DashSpvClientCommand {
37+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38+
let str = match self {
39+
DashSpvClientCommand::GetQuorumByHeight {
40+
height,
41+
quorum_type,
42+
quorum_hash,
43+
sender: _,
44+
} => format!("GetQuorumByHeight({height}, {quorum_type}, {quorum_hash})"),
45+
};
46+
write!(f, "{}", str)
47+
}
48+
}
49+
50+
#[derive(Clone)]
51+
pub struct DashSpvClientInterface {
52+
pub command_sender: mpsc::UnboundedSender<DashSpvClientCommand>,
53+
}
54+
55+
impl DashSpvClientInterface {
56+
pub fn new(command_sender: mpsc::UnboundedSender<DashSpvClientCommand>) -> Self {
57+
Self {
58+
command_sender,
59+
}
60+
}
61+
62+
pub async fn get_quorum_by_height(
63+
&self,
64+
height: u32,
65+
quorum_type: LLMQType,
66+
quorum_hash: QuorumHash,
67+
) -> GetQuorumByHeightResult {
68+
let (sender, receiver) = oneshot::channel();
69+
let command = DashSpvClientCommand::GetQuorumByHeight {
70+
height,
71+
quorum_type,
72+
quorum_hash,
73+
sender,
74+
};
75+
let context = command.to_string();
76+
command.send(context.clone(), self.command_sender.clone()).await?;
77+
receive(context, receiver).await?
78+
}
79+
}

dash-spv/src/client/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
pub mod block_processor;
3838
pub mod config;
3939
pub mod filter_sync;
40+
pub mod interface;
4041
pub mod message_handler;
4142
pub mod status_display;
4243

dash-spv/src/client/queries.rs

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use crate::error::{Result, SpvError};
1010
use crate::network::NetworkManager;
1111
use crate::storage::StorageManager;
1212
use crate::types::AddressBalance;
13+
use dashcore::sml::llmq_type::LLMQType;
1314
use dashcore::sml::masternode_list::MasternodeList;
1415
use dashcore::sml::masternode_list_engine::MasternodeListEngine;
1516
use dashcore::sml::quorum_entry::qualified_quorum_entry::QualifiedQuorumEntry;
17+
use dashcore::QuorumHash;
1618
use key_wallet_manager::wallet_interface::WalletInterface;
1719

1820
use super::DashSpvClient;
@@ -73,45 +75,32 @@ impl<
7375
pub fn get_quorum_at_height(
7476
&self,
7577
height: u32,
76-
quorum_type: u8,
77-
quorum_hash: &[u8; 32],
78-
) -> Option<&QualifiedQuorumEntry> {
79-
use dashcore::sml::llmq_type::LLMQType;
80-
use dashcore::QuorumHash;
81-
use dashcore_hashes::Hash;
82-
83-
let llmq_type: LLMQType = LLMQType::from(quorum_type);
84-
if llmq_type == LLMQType::LlmqtypeUnknown {
85-
tracing::warn!("Invalid quorum type {} requested at height {}", quorum_type, height);
86-
return None;
87-
};
88-
89-
let qhash = QuorumHash::from_byte_array(*quorum_hash);
90-
78+
quorum_type: LLMQType,
79+
quorum_hash: QuorumHash,
80+
) -> Result<QualifiedQuorumEntry> {
9181
// First check if we have the masternode list at this height
9282
match self.get_masternode_list_at_height(height) {
9383
Some(ml) => {
9484
// We have the masternode list, now look for the quorum
95-
match ml.quorums.get(&llmq_type) {
96-
Some(quorums) => match quorums.get(&qhash) {
85+
match ml.quorums.get(&quorum_type) {
86+
Some(quorums) => match quorums.get(&quorum_hash) {
9787
Some(quorum) => {
9888
tracing::debug!(
9989
"Found quorum type {} at height {} with hash {}",
10090
quorum_type,
10191
height,
10292
hex::encode(quorum_hash)
10393
);
104-
Some(quorum)
94+
Ok(quorum.clone())
10595
}
10696
None => {
107-
tracing::warn!(
108-
"Quorum not found: type {} at height {} with hash {} (masternode list exists with {} quorums of this type)",
109-
quorum_type,
110-
height,
111-
hex::encode(quorum_hash),
112-
quorums.len()
113-
);
114-
None
97+
let message = format!("Quorum not found: type {} at height {} with hash {} (masternode list exists with {} quorums of this type)",
98+
quorum_type,
99+
height,
100+
hex::encode(quorum_hash),
101+
quorums.len());
102+
tracing::warn!(message);
103+
Err(SpvError::QuorumLookupError(message))
115104
}
116105
},
117106
None => {
@@ -120,7 +109,10 @@ impl<
120109
quorum_type,
121110
height
122111
);
123-
None
112+
Err(SpvError::QuorumLookupError(format!(
113+
"No quorums of type {} found at height {}",
114+
quorum_type, height
115+
)))
124116
}
125117
}
126118
}
@@ -129,7 +121,10 @@ impl<
129121
"No masternode list found at height {} - cannot retrieve quorum",
130122
height
131123
);
132-
None
124+
Err(SpvError::QuorumLookupError(format!(
125+
"No masternode list found at height {}",
126+
height
127+
)))
133128
}
134129
}
135130
}

0 commit comments

Comments
 (0)