Skip to content

Commit c37fab6

Browse files
committed
Only pause read in PeerManager send_data not read_event
We recently ran into a race condition on macOS where `read_event` would return `Ok(true)` (implying reads should be paused) due to many queued outbound messages but before the caller was able to set the read-pause flag, the `send_data` calls to flush the buffered messages completed. Thus, when the `read_event` caller got scheduled again, the buffer was empty and we should be reading, but it is finally processing the read-pause flag and we end up hanging, unwilling to read messages and unable to learn that we should start reading again as there are no messages to `send_data` for. This should be fairly rare, but not unheard of - the `pause_read` flag in `read_event` is calculated before handling the last message, so there's some time between when its calculated and when its returned. However, that has to race with multiple calls to `send_data` to send all the pending messages, which all have to complete before the `read_event` return happens. We've (as far as I recall) never hit this in prod, but a benchmark HTLC-flood test managed to hit it somewhat reliably within a few minutes on macOS and when a synthetic few-ms sleep was added to each message handling call. Ultimately this is an issue with the API - we pause reads via a returned flag but unpause them via a called method, creating two independent "stream"s of pause/unpauses which can get out of sync. Thus, here, we stick to a single "stream" of pause-read events from `PeerManager` to user code via `send_data` calls, dropping the read-pause flag return from `read_event` entirely. Technically this adds risk that someone can flood us with enough messages fast enough to bloat our outbound buffer for a peer before `PeerManager::process_events` gets called and can flush the pause flag via `read_event` calls to all descriptors. This isn't ideal but it should still be relatively hard to do as `process_events` calls are pretty quick and should be triggered immediately after each `read_event` call completes.
1 parent 05f2848 commit c37fab6

File tree

4 files changed

+52
-69
lines changed

4 files changed

+52
-69
lines changed

fuzz/src/full_stack.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ struct Peer<'a> {
195195
peers_connected: &'a RefCell<[bool; 256]>,
196196
}
197197
impl<'a> SocketDescriptor for Peer<'a> {
198-
fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
198+
fn send_data(&mut self, data: &[u8], _continue_read: bool) -> usize {
199199
data.len()
200200
}
201201
fn disconnect_socket(&mut self) {

lightning-background-processor/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
774774
/// # #[derive(Eq, PartialEq, Clone, Hash)]
775775
/// # struct SocketDescriptor {}
776776
/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
777-
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
777+
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
778778
/// # fn disconnect_socket(&mut self) {}
779779
/// # }
780780
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
@@ -1878,7 +1878,7 @@ mod tests {
18781878
#[derive(Clone, Hash, PartialEq, Eq)]
18791879
struct TestDescriptor {}
18801880
impl SocketDescriptor for TestDescriptor {
1881-
fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
1881+
fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize {
18821882
0
18831883
}
18841884

lightning-net-tokio/src/lib.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,8 @@ impl Connection {
243243
Ok(len) => {
244244
let read_res =
245245
peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
246-
let mut us_lock = us.lock().unwrap();
247246
match read_res {
248-
Ok(pause_read) => {
249-
if pause_read {
250-
us_lock.read_paused = true;
251-
}
252-
},
247+
Ok(()) => {},
253248
Err(_) => break Disconnect::CloseConnection,
254249
}
255250
},
@@ -533,7 +528,7 @@ impl SocketDescriptor {
533528
}
534529
}
535530
impl peer_handler::SocketDescriptor for SocketDescriptor {
536-
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
531+
fn send_data(&mut self, data: &[u8], continue_read: bool) -> usize {
537532
// To send data, we take a lock on our Connection to access the TcpStream, writing to it if
538533
// there's room in the kernel buffer, or otherwise create a new Waker with a
539534
// SocketDescriptor in it which can wake up the write_avail Sender, waking up the
@@ -544,13 +539,16 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
544539
return 0;
545540
}
546541

547-
if resume_read && us.read_paused {
542+
let read_was_paused = us.read_paused;
543+
us.read_paused = !continue_read;
544+
545+
if continue_read && read_was_paused {
548546
// The schedule_read future may go to lock up but end up getting woken up by there
549547
// being more room in the write buffer, dropping the other end of this Sender
550548
// before we get here, so we ignore any failures to wake it up.
551-
us.read_paused = false;
552549
let _ = us.read_waker.try_send(());
553550
}
551+
554552
if data.is_empty() {
555553
return 0;
556554
}
@@ -576,16 +574,7 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
576574
}
577575
},
578576
task::Poll::Ready(Err(_)) => return written_len,
579-
task::Poll::Pending => {
580-
// We're queued up for a write event now, but we need to make sure we also
581-
// pause read given we're now waiting on the remote end to ACK (and in
582-
// accordance with the send_data() docs).
583-
us.read_paused = true;
584-
// Further, to avoid any current pending read causing a `read_event` call, wake
585-
// up the read_waker and restart its loop.
586-
let _ = us.read_waker.try_send(());
587-
return written_len;
588-
},
577+
task::Poll::Pending => return written_len,
589578
}
590579
}
591580
}

lightning/src/ln/peer_handler.rs

Lines changed: 41 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -632,16 +632,15 @@ pub trait SocketDescriptor: cmp::Eq + hash::Hash + Clone {
632632
///
633633
/// If the returned size is smaller than `data.len()`, a
634634
/// [`PeerManager::write_buffer_space_avail`] call must be made the next time more data can be
635-
/// written. Additionally, until a `send_data` event completes fully, no further
636-
/// [`PeerManager::read_event`] calls should be made for the same peer! Because this is to
637-
/// prevent denial-of-service issues, you should not read or buffer any data from the socket
638-
/// until then.
635+
/// written.
639636
///
640-
/// If a [`PeerManager::read_event`] call on this descriptor had previously returned true
641-
/// (indicating that read events should be paused to prevent DoS in the send buffer),
642-
/// `resume_read` may be set indicating that read events on this descriptor should resume. A
643-
/// `resume_read` of false carries no meaning, and should not cause any action.
644-
fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize;
637+
/// If `continue_read` is *not* set, further [`PeerManager::read_event`] calls should be
638+
/// avoided until another call is made with it set. This allows us to pause read if there are
639+
/// too many outgoing messages queued for a peer to avoid DoS issues where a peer fills our
640+
/// buffer by sending us messages that need response without reading the responses.
641+
///
642+
/// Note that calls may be made with an empty `data` to update the `continue_read` flag.
643+
fn send_data(&mut self, data: &[u8], continue_read: bool) -> usize;
645644
/// Disconnect the socket pointed to by this SocketDescriptor.
646645
///
647646
/// You do *not* need to call [`PeerManager::socket_disconnected`] with this socket after this
@@ -1664,7 +1663,10 @@ where
16641663
Some(peer_mutex) => {
16651664
let mut peer = peer_mutex.lock().unwrap();
16661665
peer.awaiting_write_event = false;
1667-
self.do_attempt_write_data(descriptor, &mut peer, false);
1666+
// We go ahead and force at least one write here, because if we don't have any
1667+
// messages to send and the net driver thought we did that's weird, so they might
1668+
// also have a confused read-paused state that we should go ahead and clear.
1669+
self.do_attempt_write_data(descriptor, &mut peer, true);
16681670
},
16691671
};
16701672
Ok(())
@@ -1676,11 +1678,9 @@ where
16761678
///
16771679
/// Will *not* call back into [`send_data`] on any descriptors to avoid reentrancy complexity.
16781680
/// Thus, however, you should call [`process_events`] after any `read_event` to generate
1679-
/// [`send_data`] calls to handle responses.
1680-
///
1681-
/// If `Ok(true)` is returned, further read_events should not be triggered until a
1682-
/// [`send_data`] call on this descriptor has `resume_read` set (preventing DoS issues in the
1683-
/// send buffer).
1681+
/// [`send_data`] calls to handle responses. This is also important to give [`send_data`] calls
1682+
/// a chance to pause reads if too many messages have been queued in response allowing a peer
1683+
/// to bloat our memory.
16841684
///
16851685
/// In order to avoid processing too many messages at once per peer, `data` should be on the
16861686
/// order of 4KiB.
@@ -1689,7 +1689,7 @@ where
16891689
/// [`process_events`]: PeerManager::process_events
16901690
pub fn read_event(
16911691
&self, peer_descriptor: &mut Descriptor, data: &[u8],
1692-
) -> Result<bool, PeerHandleError> {
1692+
) -> Result<(), PeerHandleError> {
16931693
match self.do_read_event(peer_descriptor, data) {
16941694
Ok(res) => Ok(res),
16951695
Err(e) => {
@@ -1718,8 +1718,7 @@ where
17181718

17191719
fn do_read_event(
17201720
&self, peer_descriptor: &mut Descriptor, data: &[u8],
1721-
) -> Result<bool, PeerHandleError> {
1722-
let mut pause_read = false;
1721+
) -> Result<(), PeerHandleError> {
17231722
let peers = self.peers.read().unwrap();
17241723
let mut msgs_to_forward = Vec::new();
17251724
let mut peer_node_id = None;
@@ -1994,7 +1993,6 @@ where
19941993
},
19951994
}
19961995
}
1997-
pause_read = !self.peer_should_read(peer);
19981996

19991997
if let Some(message) = msg_to_handle {
20001998
match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -2027,7 +2025,7 @@ where
20272025
);
20282026
}
20292027

2030-
Ok(pause_read)
2028+
Ok(())
20312029
}
20322030

20332031
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
@@ -3725,7 +3723,7 @@ mod tests {
37253723
}
37263724

37273725
impl SocketDescriptor for FileDescriptor {
3728-
fn send_data(&mut self, data: &[u8], _resume_read: bool) -> usize {
3726+
fn send_data(&mut self, data: &[u8], _continue_read: bool) -> usize {
37293727
if self.hang_writes.load(Ordering::Acquire) {
37303728
0
37313729
} else {
@@ -3939,12 +3937,8 @@ mod tests {
39393937

39403938
fn try_establish_connection<'a>(
39413939
peer_a: &TestPeer<'a>, peer_b: &TestPeer<'a>,
3942-
) -> (
3943-
FileDescriptor,
3944-
FileDescriptor,
3945-
Result<bool, PeerHandleError>,
3946-
Result<bool, PeerHandleError>,
3947-
) {
3940+
) -> (FileDescriptor, FileDescriptor, Result<(), PeerHandleError>, Result<(), PeerHandleError>)
3941+
{
39483942
let addr_a = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1000 };
39493943
let addr_b = SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 1001 };
39503944

@@ -3958,11 +3952,11 @@ mod tests {
39583952
let initial_data =
39593953
peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
39603954
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
3961-
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
3955+
peer_a.read_event(&mut fd_a, &initial_data).unwrap();
39623956
peer_a.process_events();
39633957

39643958
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
3965-
assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
3959+
peer_b.read_event(&mut fd_b, &a_data).unwrap();
39663960

39673961
peer_b.process_events();
39683962
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
@@ -3989,8 +3983,8 @@ mod tests {
39893983

39903984
let (fd_a, fd_b, a_refused, b_refused) = try_establish_connection(peer_a, peer_b);
39913985

3992-
assert_eq!(a_refused.unwrap(), false);
3993-
assert_eq!(b_refused.unwrap(), false);
3986+
a_refused.unwrap();
3987+
b_refused.unwrap();
39943988

39953989
assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().counterparty_node_id, id_b);
39963990
assert_eq!(peer_a.peer_by_node_id(&id_b).unwrap().socket_address, Some(addr_b));
@@ -4113,11 +4107,11 @@ mod tests {
41134107
let initial_data =
41144108
peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
41154109
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
4116-
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
4110+
peer_a.read_event(&mut fd_a, &initial_data).unwrap();
41174111
peer_a.process_events();
41184112

41194113
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4120-
assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
4114+
peer_b.read_event(&mut fd_b, &a_data).unwrap();
41214115

41224116
peer_b.process_events();
41234117
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
@@ -4144,11 +4138,11 @@ mod tests {
41444138
let initial_data =
41454139
peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
41464140
peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
4147-
assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
4141+
peer_a.read_event(&mut fd_a, &initial_data).unwrap();
41484142
peer_a.process_events();
41494143

41504144
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4151-
assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
4145+
peer_b.read_event(&mut fd_b, &a_data).unwrap();
41524146

41534147
peer_b.process_events();
41544148
let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
@@ -4220,7 +4214,7 @@ mod tests {
42204214
peers[0].process_events();
42214215

42224216
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4223-
assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
4217+
peers[1].read_event(&mut fd_b, &a_data).unwrap();
42244218
}
42254219

42264220
#[test]
@@ -4240,13 +4234,13 @@ mod tests {
42404234
let mut dup_encryptor =
42414235
PeerChannelEncryptor::new_outbound(id_a, SecretKey::from_slice(&[42; 32]).unwrap());
42424236
let initial_data = dup_encryptor.get_act_one(&peers[1].secp_ctx);
4243-
assert_eq!(peers[0].read_event(&mut fd_dup, &initial_data).unwrap(), false);
4237+
peers[0].read_event(&mut fd_dup, &initial_data).unwrap();
42444238
peers[0].process_events();
42454239

42464240
let a_data = fd_dup.outbound_data.lock().unwrap().split_off(0);
42474241
let (act_three, _) =
42484242
dup_encryptor.process_act_two(&a_data[..], &&cfgs[1].node_signer).unwrap();
4249-
assert_eq!(peers[0].read_event(&mut fd_dup, &act_three).unwrap(), false);
4243+
peers[0].read_event(&mut fd_dup, &act_three).unwrap();
42504244

42514245
let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 };
42524246
let msg_bytes = dup_encryptor.encrypt_message(&not_init_msg);
@@ -4504,10 +4498,10 @@ mod tests {
45044498
assert_eq!(peers_len, 1);
45054499
}
45064500

4507-
assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
4501+
peers[0].read_event(&mut fd_a, &initial_data).unwrap();
45084502
peers[0].process_events();
45094503
let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
4510-
assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
4504+
peers[1].read_event(&mut fd_b, &a_data).unwrap();
45114505
peers[1].process_events();
45124506

45134507
// ...but if we get a second timer tick, we should disconnect the peer
@@ -4557,11 +4551,11 @@ mod tests {
45574551
let act_one = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap();
45584552
peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
45594553

4560-
assert_eq!(peer_a.read_event(&mut fd_a, &act_one).unwrap(), false);
4554+
peer_a.read_event(&mut fd_a, &act_one).unwrap();
45614555
peer_a.process_events();
45624556

45634557
let act_two = fd_a.outbound_data.lock().unwrap().split_off(0);
4564-
assert_eq!(peer_b.read_event(&mut fd_b, &act_two).unwrap(), false);
4558+
peer_b.read_event(&mut fd_b, &act_two).unwrap();
45654559
peer_b.process_events();
45664560

45674561
// Calling this here triggers the race on inbound connections.
@@ -4575,7 +4569,7 @@ mod tests {
45754569
assert!(!handshake_complete);
45764570
}
45774571

4578-
assert_eq!(peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap(), false);
4572+
peer_a.read_event(&mut fd_a, &act_three_with_init_b).unwrap();
45794573
peer_a.process_events();
45804574

45814575
{
@@ -4595,7 +4589,7 @@ mod tests {
45954589
assert!(!handshake_complete);
45964590
}
45974591

4598-
assert_eq!(peer_b.read_event(&mut fd_b, &init_a).unwrap(), false);
4592+
peer_b.read_event(&mut fd_b, &init_a).unwrap();
45994593
peer_b.process_events();
46004594

46014595
{
@@ -4632,7 +4626,7 @@ mod tests {
46324626
peer_a.process_events();
46334627
let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
46344628
assert!(!msg.is_empty());
4635-
assert_eq!(peer_b.read_event(&mut fd_b, &msg).unwrap(), false);
4629+
peer_b.read_event(&mut fd_b, &msg).unwrap();
46364630
peer_b.process_events();
46374631
};
46384632

@@ -4675,12 +4669,12 @@ mod tests {
46754669

46764670
let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
46774671
if !msg.is_empty() {
4678-
assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false);
4672+
peers[1].read_event(&mut fd_b, &msg).unwrap();
46794673
continue;
46804674
}
46814675
let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
46824676
if !msg.is_empty() {
4683-
assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false);
4677+
peers[0].read_event(&mut fd_a, &msg).unwrap();
46844678
continue;
46854679
}
46864680
break;

0 commit comments

Comments
 (0)