Skip to content

Commit

Permalink
First pass to implement socks5 support
Browse files Browse the repository at this point in the history
  • Loading branch information
ikatson committed Aug 7, 2024
1 parent 8c16239 commit 70dcb2e
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 23 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion crates/librqbit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ anyhow = "1"
itertools = "0.12"
http = "1"
regex = "1"
reqwest = { version = "0.12", default-features = false, features = ["json"] }
reqwest = { version = "0.12", default-features = false, features = [
"json",
"socks",
] }
urlencoding = "2"
byteorder = "1"
bincode = "1"
Expand Down Expand Up @@ -75,6 +78,7 @@ async-stream = "0.3.5"
memmap2 = { version = "0.9.4" }
lru = { version = "0.12.3", optional = true }
mime_guess = { version = "2.0.5", default-features = false }
tokio-socks = "0.5.2"

[dev-dependencies]
futures = { version = "0.3" }
Expand Down
21 changes: 18 additions & 3 deletions crates/librqbit/src/dht_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, net::SocketAddr};
use std::{collections::HashSet, net::SocketAddr, sync::Arc};

use anyhow::Context;
use buffers::ByteBufOwned;
Expand All @@ -8,6 +8,7 @@ use tracing::{debug, error_span, Instrument};

use crate::{
peer_connection::PeerConnectionOptions, peer_info_reader, spawn_utils::BlockingSpawner,
stream_connect::StreamConnector,
};
use librqbit_core::hash_id::Id20;

Expand All @@ -30,6 +31,7 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
initial_addrs: Vec<SocketAddr>,
addrs_stream: A,
peer_connection_options: Option<PeerConnectionOptions>,
connector: Arc<StreamConnector>,
) -> ReadMetainfoResult<A> {
let mut seen = HashSet::<SocketAddr>::new();
let mut addrs = addrs_stream;
Expand All @@ -38,6 +40,7 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp

let read_info_guarded = |addr| {
let semaphore = &semaphore;
let connector = connector.clone();
async move {
let token = semaphore.acquire().await?;
let ret = peer_info_reader::read_metainfo_from_peer(
Expand All @@ -46,6 +49,7 @@ pub async fn read_metainfo_from_peer_receiver<A: Stream<Item = SocketAddr> + Unp
info_hash,
peer_connection_options,
BlockingSpawner::new(true),
connector,
)
.instrument(error_span!("read_metainfo_from_peer", ?addr))
.await
Expand Down Expand Up @@ -93,7 +97,10 @@ mod tests {
use librqbit_core::peer_id::generate_peer_id;

use super::*;
use std::{str::FromStr, sync::Once};
use std::{
str::FromStr,
sync::{Arc, Once},
};

static LOG_INIT: Once = Once::new();

Expand All @@ -114,7 +121,15 @@ mod tests {

let peer_rx = dht.get_peers(info_hash, None).unwrap();
let peer_id = generate_peer_id();
match read_metainfo_from_peer_receiver(peer_id, info_hash, Vec::new(), peer_rx, None).await
match read_metainfo_from_peer_receiver(
peer_id,
info_hash,
Vec::new(),
peer_rx,
None,
Arc::new(Default::default()),
)
.await
{
ReadMetainfoResult::Found { info, .. } => dbg!(info),
ReadMetainfoResult::ChannelClosed { .. } => todo!("should not have happened"),
Expand Down
1 change: 1 addition & 0 deletions crates/librqbit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod read_buf;
mod session;
mod spawn_utils;
pub mod storage;
mod stream_connect;
mod torrent_state;
pub mod tracing_subscriber_config_utils;
mod type_aliases;
Expand Down
11 changes: 8 additions & 3 deletions crates/librqbit/src/peer_connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -21,7 +22,7 @@ use serde_with::serde_as;
use tokio::time::timeout;
use tracing::{debug, trace};

use crate::{read_buf::ReadBuf, spawn_utils::BlockingSpawner};
use crate::{read_buf::ReadBuf, spawn_utils::BlockingSpawner, stream_connect::StreamConnector};

pub trait PeerConnectionHandler {
fn on_connected(&self, _connection_time: Duration) {}
Expand Down Expand Up @@ -65,6 +66,7 @@ pub(crate) struct PeerConnection<H> {
peer_id: Id20,
options: PeerConnectionOptions,
spawner: BlockingSpawner,
connector: Arc<StreamConnector>,
}

pub(crate) async fn with_timeout<T, E>(
Expand All @@ -88,6 +90,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
handler: H,
options: Option<PeerConnectionOptions>,
spawner: BlockingSpawner,
connector: Arc<StreamConnector>,
) -> Self {
PeerConnection {
handler,
Expand All @@ -96,6 +99,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
peer_id,
spawner,
options: options.unwrap_or_default(),
connector,
}
}

Expand Down Expand Up @@ -169,7 +173,8 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
.unwrap_or_else(|| Duration::from_secs(10));

let now = Instant::now();
let mut conn = with_timeout(connect_timeout, tokio::net::TcpStream::connect(self.addr))
let conn = self.connector.connect(self.addr);
let mut conn = with_timeout(connect_timeout, conn)
.await
.context("error connecting")?;
self.handler.on_connected(now.elapsed());
Expand Down Expand Up @@ -218,7 +223,7 @@ impl<H: PeerConnectionHandler> PeerConnection<H> {
handshake_supports_extended: bool,
mut read_buf: ReadBuf,
mut write_buf: Vec<u8>,
mut conn: tokio::net::TcpStream,
mut conn: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
mut outgoing_chan: tokio::sync::mpsc::UnboundedReceiver<WriterRequest>,
mut have_broadcast: tokio::sync::broadcast::Receiver<ValidPieceIndex>,
) -> anyhow::Result<()> {
Expand Down
21 changes: 15 additions & 6 deletions crates/librqbit/src/peer_info_reader/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};

use bencode::from_bytes;
use buffers::{ByteBuf, ByteBufOwned};
Expand All @@ -22,6 +22,7 @@ use crate::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
spawn_utils::BlockingSpawner,
stream_connect::StreamConnector,
};

pub(crate) async fn read_metainfo_from_peer(
Expand All @@ -30,6 +31,7 @@ pub(crate) async fn read_metainfo_from_peer(
info_hash: Id20,
peer_connection_options: Option<PeerConnectionOptions>,
spawner: BlockingSpawner,
connector: Arc<StreamConnector>,
) -> anyhow::Result<TorrentMetaV1Info<ByteBufOwned>> {
let (result_tx, result_rx) =
tokio::sync::oneshot::channel::<anyhow::Result<TorrentMetaV1Info<ByteBufOwned>>>();
Expand All @@ -48,6 +50,7 @@ pub(crate) async fn read_metainfo_from_peer(
handler,
peer_connection_options,
spawner,
connector,
);

let result_reader = async move { result_rx.await? };
Expand Down Expand Up @@ -234,6 +237,7 @@ impl PeerConnectionHandler for Handler {

#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{net::SocketAddr, str::FromStr, sync::Once};

use librqbit_core::hash_id::Id20;
Expand All @@ -260,10 +264,15 @@ mod tests {
let addr = SocketAddr::from_str("127.0.0.1:27311").unwrap();
let peer_id = generate_peer_id();
let info_hash = Id20::from_str("9905f844e5d8787ecd5e08fb46b2eb0a42c131d7").unwrap();
dbg!(
read_metainfo_from_peer(addr, peer_id, info_hash, None, BlockingSpawner::new(true))
.await
.unwrap()
);
dbg!(read_metainfo_from_peer(
addr,
peer_id,
info_hash,
None,
BlockingSpawner::new(true),
Arc::new(Default::default())
)
.await
.unwrap());
}
}
37 changes: 28 additions & 9 deletions crates/librqbit/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
storage::{
filesystem::FilesystemStorageFactory, BoxStorageFactory, StorageFactoryExt, TorrentStorage,
},
stream_connect::{SocksProxyConfig, StreamConnector},
torrent_state::{
ManagedTorrentBuilder, ManagedTorrentHandle, ManagedTorrentState, TorrentStateLive,
},
Expand Down Expand Up @@ -197,6 +198,7 @@ pub struct Session {
default_storage_factory: Option<BoxStorageFactory>,

reqwest_client: reqwest::Client,
connector: Arc<StreamConnector>,

// This is stored for all tasks to stop when session is dropped.
_cancellation_token_drop_guard: DropGuard,
Expand Down Expand Up @@ -413,11 +415,6 @@ impl<'a> AddTorrent<'a> {
}
}

pub struct SocksProxyConfig {
// must start with socks5
pub url: String,
}

#[derive(Default)]
pub struct SessionOptions {
/// Turn on to disable DHT.
Expand Down Expand Up @@ -449,7 +446,8 @@ pub struct SessionOptions {

pub default_storage_factory: Option<BoxStorageFactory>,

pub socks_proxy: Option<SocksProxyConfig>,
// socks5://[username:password@]host:port
pub socks_proxy_url: Option<String>,
}

async fn create_tcp_listener(
Expand Down Expand Up @@ -548,9 +546,27 @@ impl Session {
})
.unwrap_or_default();

let reqwest_client = reqwest::Client::builder()
.build()
.context("error building HTTP(S) client")?;
let proxy_config = match opts.socks_proxy_url.as_ref() {
Some(pu) => Some(
SocksProxyConfig::parse(pu)
.with_context(|| format!("error parsing proxy url {}", pu))?,
),
None => None,
};

let reqwest_client = {
let builder = if let Some(proxy_url) = opts.socks_proxy_url.as_ref() {
let proxy = reqwest::Proxy::all(proxy_url)
.context("error creating socks5 proxy for HTTP")?;
reqwest::Client::builder().proxy(proxy)
} else {
reqwest::Client::builder()
};

builder.build().context("error building HTTP(S) client")?
};

let stream_connector = Arc::new(StreamConnector::from(proxy_config));

let session = Arc::new(Self {
persistence_filename,
Expand All @@ -566,6 +582,7 @@ impl Session {
disk_write_tx,
default_storage_factory: opts.default_storage_factory,
reqwest_client,
connector: stream_connector,
});

if let Some(mut disk_write_rx) = disk_write_rx {
Expand Down Expand Up @@ -919,6 +936,7 @@ impl Session {
opts.initial_peers.clone().unwrap_or_default(),
peer_rx,
Some(self.merge_peer_opts(opts.peer_opts)),
self.connector.clone(),
)
.await
{
Expand Down Expand Up @@ -1088,6 +1106,7 @@ impl Session {
.allow_overwrite(opts.overwrite)
.spawner(self.spawner)
.trackers(trackers)
.connector(self.connector.clone())
.peer_id(self.peer_id);

if let Some(d) = self.disk_write_tx.clone() {
Expand Down
Loading

0 comments on commit 70dcb2e

Please sign in to comment.