Skip to content

Commit ddc3431

Browse files
authored
Merge pull request #6 from TheBlueMatt/main
Make connections non-blocking
2 parents 9fe42a5 + 43092b8 commit ddc3431

File tree

4 files changed

+41
-19
lines changed

4 files changed

+41
-19
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ bitcoin-bech32 = "0.7"
2121
bech32 = "0.7"
2222
hex = "0.3"
2323

24+
futures = "0.3"
2425
time = "0.2"
2526
rand = "0.4"
2627
serde_json = { version = "1.0" }

src/cli.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use lightning_invoice::{utils, Currency, Invoice};
1818
use std::env;
1919
use std::io;
2020
use std::io::{BufRead, Write};
21-
use std::net::{SocketAddr, TcpStream};
21+
use std::net::{SocketAddr, ToSocketAddrs};
2222
use std::ops::Deref;
2323
use std::path::Path;
2424
use std::str::FromStr;
@@ -151,6 +151,7 @@ pub(crate) async fn poll_for_user_input(
151151
peer_manager.clone(),
152152
event_notifier.clone(),
153153
)
154+
.await
154155
.is_err()
155156
{
156157
print!("> ");
@@ -300,6 +301,7 @@ pub(crate) async fn poll_for_user_input(
300301
peer_manager.clone(),
301302
event_notifier.clone(),
302303
)
304+
.await
303305
.is_ok()
304306
{
305307
println!("SUCCESS: connected to peer {}", pubkey);
@@ -452,7 +454,7 @@ fn list_payments(inbound_payments: PaymentInfoStorage, outbound_payments: Paymen
452454
println!("]");
453455
}
454456

455-
pub(crate) fn connect_peer_if_necessary(
457+
pub(crate) async fn connect_peer_if_necessary(
456458
pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
457459
event_notifier: mpsc::Sender<()>,
458460
) -> Result<(), ()> {
@@ -461,24 +463,36 @@ pub(crate) fn connect_peer_if_necessary(
461463
return Ok(());
462464
}
463465
}
464-
match TcpStream::connect_timeout(&peer_addr, Duration::from_secs(10)) {
465-
Ok(stream) => {
466-
let peer_mgr = peer_manager.clone();
467-
let event_ntfns = event_notifier.clone();
468-
tokio::spawn(async move {
469-
lightning_net_tokio::setup_outbound(peer_mgr, event_ntfns, pubkey, stream).await;
470-
});
466+
match lightning_net_tokio::connect_outbound(
467+
Arc::clone(&peer_manager),
468+
event_notifier,
469+
pubkey,
470+
peer_addr,
471+
)
472+
.await
473+
{
474+
Some(conn_closed_fut) => {
475+
let mut closed_fut_box = Box::pin(conn_closed_fut);
471476
let mut peer_connected = false;
472477
while !peer_connected {
478+
match futures::poll!(&mut closed_fut_box) {
479+
std::task::Poll::Ready(_) => {
480+
println!("ERROR: Peer disconnected before we finished the handshake");
481+
return Err(());
482+
}
483+
std::task::Poll::Pending => {}
484+
}
473485
for node_pubkey in peer_manager.get_peer_node_ids() {
474486
if node_pubkey == pubkey {
475487
peer_connected = true;
476488
}
477489
}
490+
// Avoid blocking the tokio context by sleeping a bit
491+
tokio::time::sleep(Duration::from_millis(10)).await;
478492
}
479493
}
480-
Err(e) => {
481-
println!("ERROR: failed to connect to peer: {:?}", e);
494+
None => {
495+
println!("ERROR: failed to connect to peer");
482496
return Err(());
483497
}
484498
}
@@ -625,12 +639,12 @@ pub(crate) fn parse_peer_info(
625639
return Err(std::io::Error::new(
626640
std::io::ErrorKind::Other,
627641
"ERROR: incorrectly formatted peer
628-
info. Should be formatted as: `pubkey@host:port`",
642+
info. Should be formatted as: `pubkey@host:port`",
629643
));
630644
}
631645

632-
let peer_addr: Result<SocketAddr, _> = peer_addr_str.unwrap().parse();
633-
if peer_addr.is_err() {
646+
let peer_addr = peer_addr_str.unwrap().to_socket_addrs().map(|mut r| r.next());
647+
if peer_addr.is_err() || peer_addr.as_ref().unwrap().is_none() {
634648
return Err(std::io::Error::new(
635649
std::io::ErrorKind::Other,
636650
"ERROR: couldn't parse pubkey@host:port into a socket address",
@@ -645,5 +659,5 @@ pub(crate) fn parse_peer_info(
645659
));
646660
}
647661

648-
Ok((pubkey.unwrap(), peer_addr.unwrap()))
662+
Ok((pubkey.unwrap(), peer_addr.unwrap().unwrap()))
649663
}

src/main.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -482,14 +482,20 @@ async fn start_ldk() {
482482
let event_notifier = event_ntfn_sender.clone();
483483
let listening_port = args.ldk_peer_listening_port;
484484
tokio::spawn(async move {
485-
let listener = std::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port)).unwrap();
485+
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port))
486+
.await
487+
.expect("Failed to bind to listen port - is something else already listening on it?");
486488
loop {
487489
let peer_mgr = peer_manager_connection_handler.clone();
488490
let notifier = event_notifier.clone();
489-
let tcp_stream = listener.accept().unwrap().0;
491+
let tcp_stream = listener.accept().await.unwrap().0;
490492
tokio::spawn(async move {
491-
lightning_net_tokio::setup_inbound(peer_mgr.clone(), notifier.clone(), tcp_stream)
492-
.await;
493+
lightning_net_tokio::setup_inbound(
494+
peer_mgr.clone(),
495+
notifier.clone(),
496+
tcp_stream.into_std().unwrap(),
497+
)
498+
.await;
493499
});
494500
}
495501
});

0 commit comments

Comments
 (0)