From be84184ef290cb5faac3c4ec7c8b334faab96041 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20H=C3=A4ggblad?= Date: Fri, 9 Dec 2022 22:39:36 +0100 Subject: [PATCH] socks5: rework waiting in inbound.rs --- .../proxy-helpers/src/proxy_runner/inbound.rs | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs b/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs index f2ebc2cd9b4..37403c33ad6 100644 --- a/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs +++ b/common/socks5/proxy-helpers/src/proxy_runner/inbound.rs @@ -77,16 +77,11 @@ where ordered_msg.len() ); - // If we are closing the socket, wait until the data has passed `OutQueueControl` and the lane - // is empty, otherwise just wait until we are reasonably close to finish sending as a way to - // throttle the incoming data. - if let Some(lane_queue_lengths) = lane_queue_lengths { - if is_finished { - wait_until_lane_empty(lane_queue_lengths, connection_id).await; - } else { - // We allow a bit of slack when this is not the last msg - wait_until_lane_almost_empty(lane_queue_lengths, connection_id).await; - } + // Before sending the data downstream, wait for the lane at the `OutQueueControl` is reasonably + // close to finishing. This is a way of pacing the sending application (backpressure). + if let Some(ref lane_queue_lengths) = lane_queue_lengths { + // We allow a bit of slack to try to keep the pipeline >0 + wait_until_lane_almost_empty(lane_queue_lengths, connection_id).await; } mix_sender @@ -95,16 +90,30 @@ where .expect("InputMessageReceiver has stopped receiving!"); if is_finished { - // technically we already informed it when we sent the message to mixnet above - debug!(target: &*format!("({}) socks5 inbound", connection_id), "The local socket is closed - won't receive any more data. Informing remote about that..."); + // After sending, if this is the last message, wait until we've actually transmitted the data + // in the `OutQueueControl` and the lane is empty. + if let Some(ref lane_queue_lengths) = lane_queue_lengths { + // This is basically an ugly workaround to make sure that we don't start waiting until + // the data that we pushed arrived at the OutQueueControl. + // This usually not a problem in the socks5-client, but for the network-requester this + // info is synced at up to every 500ms. + sleep(Duration::from_secs(2)).await; + wait_until_lane_empty(lane_queue_lengths, connection_id).await; + } + + // Technically we already informed it when we sent the message to mixnet above + debug!( + target: &*format!("({}) socks5 inbound", connection_id), + "The local socket is closed - won't receive any more data. Informing remote about that..." + ); } is_finished } -async fn wait_until_lane_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) { +async fn wait_until_lane_empty(lane_queue_lengths: &LaneQueueLengths, connection_id: u64) { if tokio::time::timeout( - Duration::from_secs(2 * 60), + Duration::from_secs(4 * 60), wait_for_lane( lane_queue_lengths, connection_id, @@ -119,13 +128,13 @@ async fn wait_until_lane_empty(lane_queue_lengths: LaneQueueLengths, connection_ } } -async fn wait_until_lane_almost_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) { +async fn wait_until_lane_almost_empty(lane_queue_lengths: &LaneQueueLengths, connection_id: u64) { if tokio::time::timeout( - Duration::from_secs(2 * 60), + Duration::from_secs(4 * 60), wait_for_lane( lane_queue_lengths, connection_id, - 10, + 30, Duration::from_millis(100), ), ) @@ -137,7 +146,7 @@ async fn wait_until_lane_almost_empty(lane_queue_lengths: LaneQueueLengths, conn } async fn wait_for_lane( - lane_queue_lengths: LaneQueueLengths, + lane_queue_lengths: &LaneQueueLengths, connection_id: u64, queue_length_threshold: usize, sleep_duration: Duration,