Skip to content

Commit

Permalink
socks5: rework waiting in inbound.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
octol committed Dec 9, 2022
1 parent 5283329 commit be84184
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions common/socks5/proxy-helpers/src/proxy_runner/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,11 @@ where
ordered_msg.len()
);

// If we are closing the socket, wait until the data has passed `OutQueueControl` and the lane
// is empty, otherwise just wait until we are reasonably close to finish sending as a way to
// throttle the incoming data.
if let Some(lane_queue_lengths) = lane_queue_lengths {
if is_finished {
wait_until_lane_empty(lane_queue_lengths, connection_id).await;
} else {
// We allow a bit of slack when this is not the last msg
wait_until_lane_almost_empty(lane_queue_lengths, connection_id).await;
}
// Before sending the data downstream, wait for the lane at the `OutQueueControl` is reasonably
// close to finishing. This is a way of pacing the sending application (backpressure).
if let Some(ref lane_queue_lengths) = lane_queue_lengths {
// We allow a bit of slack to try to keep the pipeline >0
wait_until_lane_almost_empty(lane_queue_lengths, connection_id).await;
}

mix_sender
Expand All @@ -95,16 +90,30 @@ where
.expect("InputMessageReceiver has stopped receiving!");

if is_finished {
// technically we already informed it when we sent the message to mixnet above
debug!(target: &*format!("({}) socks5 inbound", connection_id), "The local socket is closed - won't receive any more data. Informing remote about that...");
// After sending, if this is the last message, wait until we've actually transmitted the data
// in the `OutQueueControl` and the lane is empty.
if let Some(ref lane_queue_lengths) = lane_queue_lengths {
// This is basically an ugly workaround to make sure that we don't start waiting until
// the data that we pushed arrived at the OutQueueControl.
// This usually not a problem in the socks5-client, but for the network-requester this
// info is synced at up to every 500ms.
sleep(Duration::from_secs(2)).await;
wait_until_lane_empty(lane_queue_lengths, connection_id).await;
}

// Technically we already informed it when we sent the message to mixnet above
debug!(
target: &*format!("({}) socks5 inbound", connection_id),
"The local socket is closed - won't receive any more data. Informing remote about that..."
);
}

is_finished
}

async fn wait_until_lane_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
async fn wait_until_lane_empty(lane_queue_lengths: &LaneQueueLengths, connection_id: u64) {
if tokio::time::timeout(
Duration::from_secs(2 * 60),
Duration::from_secs(4 * 60),
wait_for_lane(
lane_queue_lengths,
connection_id,
Expand All @@ -119,13 +128,13 @@ async fn wait_until_lane_empty(lane_queue_lengths: LaneQueueLengths, connection_
}
}

async fn wait_until_lane_almost_empty(lane_queue_lengths: LaneQueueLengths, connection_id: u64) {
async fn wait_until_lane_almost_empty(lane_queue_lengths: &LaneQueueLengths, connection_id: u64) {
if tokio::time::timeout(
Duration::from_secs(2 * 60),
Duration::from_secs(4 * 60),
wait_for_lane(
lane_queue_lengths,
connection_id,
10,
30,
Duration::from_millis(100),
),
)
Expand All @@ -137,7 +146,7 @@ async fn wait_until_lane_almost_empty(lane_queue_lengths: LaneQueueLengths, conn
}

async fn wait_for_lane(
lane_queue_lengths: LaneQueueLengths,
lane_queue_lengths: &LaneQueueLengths,
connection_id: u64,
queue_length_threshold: usize,
sleep_duration: Duration,
Expand Down

0 comments on commit be84184

Please sign in to comment.