Skip to content

Commit bce6e4f

Browse files
committed
Ensure we call send_data when we need to pause/unpause reads
In the previous commit, we moved the `send_data` `resume_read` flag to also indicate that we should pause if its unset. This should work as we mostly only set the flag when we're sending but may cause us to fail to pause if we are blocked on gossip validation but `awaiting_write_event` wasn't set as we had previously failed to fully flush a buffer (which no longer implies read-pause). Here we make this logic much more robust by ensuring we always make at least one `send_data` call in `do_attempt_write_data` if we need to pause read (or unpause read).
1 parent c37fab6 commit bce6e4f

File tree

1 file changed

+22
-9
lines changed

1 file changed

+22
-9
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,9 @@ struct Peer {
781781
/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
782782
gossip_broadcast_buffer: VecDeque<MessageBuf>,
783783
awaiting_write_event: bool,
784+
/// Set to true if the last call to [`SocketDescriptor::send_data`] for this peer had the
785+
/// `should_read` flag unset, indicating we've told the driver to stop reading from this peer.
786+
sent_pause_read: bool,
784787

785788
pending_read_buffer: Vec<u8>,
786789
pending_read_buffer_pos: usize,
@@ -1440,6 +1443,7 @@ where
14401443
pending_outbound_buffer_first_msg_offset: 0,
14411444
gossip_broadcast_buffer: VecDeque::new(),
14421445
awaiting_write_event: false,
1446+
sent_pause_read: false,
14431447

14441448
pending_read_buffer,
14451449
pending_read_buffer_pos: 0,
@@ -1500,6 +1504,7 @@ where
15001504
pending_outbound_buffer_first_msg_offset: 0,
15011505
gossip_broadcast_buffer: VecDeque::new(),
15021506
awaiting_write_event: false,
1507+
sent_pause_read: false,
15031508

15041509
pending_read_buffer,
15051510
pending_read_buffer_pos: 0,
@@ -1535,10 +1540,18 @@ where
15351540
}
15361541

15371542
fn do_attempt_write_data(
1538-
&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool,
1543+
&self, descriptor: &mut Descriptor, peer: &mut Peer, mut force_one_write: bool,
15391544
) {
1540-
let mut have_written = false;
1541-
while !peer.awaiting_write_event {
1545+
if !self.peer_should_read(peer) {
1546+
if !peer.sent_pause_read {
1547+
force_one_write = true;
1548+
}
1549+
} else {
1550+
if peer.sent_pause_read {
1551+
force_one_write = true;
1552+
}
1553+
}
1554+
while force_one_write || !peer.awaiting_write_event {
15421555
if peer.should_buffer_onion_message() {
15431556
if let Some((peer_node_id, _)) = peer.their_node_id {
15441557
let handler = &self.message_handler.onion_message_handler;
@@ -1606,20 +1619,20 @@ where
16061619
let should_read = self.peer_should_read(peer);
16071620
let next_buff = match peer.pending_outbound_buffer.front() {
16081621
None => {
1609-
if force_one_write && !have_written {
1610-
if should_read {
1611-
let data_sent = descriptor.send_data(&[], should_read);
1612-
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
1613-
}
1622+
if force_one_write {
1623+
let data_sent = descriptor.send_data(&[], should_read);
1624+
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
1625+
peer.sent_pause_read = !should_read;
16141626
}
16151627
return;
16161628
},
16171629
Some(buff) => buff,
16181630
};
1631+
force_one_write = false;
16191632

16201633
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
16211634
let data_sent = descriptor.send_data(pending, should_read);
1622-
have_written = true;
1635+
peer.sent_pause_read = !should_read;
16231636
peer.pending_outbound_buffer_first_msg_offset += data_sent;
16241637
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
16251638
peer.pending_outbound_buffer_first_msg_offset = 0;

0 commit comments

Comments
 (0)