Skip to content

Commit

Permalink
client: remove unnecessary structs
Browse files Browse the repository at this point in the history
  • Loading branch information
EAimTY committed Mar 11, 2022
1 parent 87f8382 commit 4536cad
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 180 deletions.
263 changes: 133 additions & 130 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,152 +15,136 @@ use std::{
sync::Arc,
vec::IntoIter,
};
use tokio::sync::{
mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender},
oneshot::{self, Receiver as OneshotReceiver, Sender as OneshotSender},
use tokio::{
sync::{
mpsc::{self, Receiver as MpscReceiver, Sender as MpscSender},
oneshot::{self, Receiver as OneshotReceiver, Sender as OneshotSender},
},
task::JoinHandle,
};
use tuic_protocol::{Address as TuicAddress, Command as TuicCommand, Response as TuicResponse};

pub struct TuicClient {
endpoint: Endpoint,
pub fn init(
server_addr: ServerAddr,
certificate: Option<Certificate>,
token_digest: [u8; 32],
reduce_rtt: bool,
req_rx: MpscReceiver<Request>,
}

impl TuicClient {
pub fn init(
server_addr: ServerAddr,
certificate: Option<Certificate>,
token_digest: [u8; 32],
reduce_rtt: bool,
congestion_controller: CongestionController,
) -> Result<(Self, MpscSender<Request>)> {
let config = {
let mut config = if let Some(cert) = certificate {
let mut root_cert_store = RootCertStore::empty();
root_cert_store.add(&cert)?;
ClientConfig::with_root_certificates(root_cert_store)
} else {
ClientConfig::with_native_roots()
};
congestion_controller: CongestionController,
) -> Result<(JoinHandle<()>, MpscSender<Request>)> {
let config = {
let mut config = if let Some(cert) = certificate {
let mut root_cert_store = RootCertStore::empty();
root_cert_store.add(&cert)?;
ClientConfig::with_root_certificates(root_cert_store)
} else {
ClientConfig::with_native_roots()
};

let mut transport = TransportConfig::default();
let mut transport = TransportConfig::default();

match congestion_controller {
CongestionController::Cubic => {
transport.congestion_controller_factory(Arc::new(CubicConfig::default()))
}
CongestionController::NewReno => {
transport.congestion_controller_factory(Arc::new(NewRenoConfig::default()))
}
CongestionController::Bbr => {
transport.congestion_controller_factory(Arc::new(BbrConfig::default()))
}
};
match congestion_controller {
CongestionController::Cubic => {
transport.congestion_controller_factory(Arc::new(CubicConfig::default()))
}
CongestionController::NewReno => {
transport.congestion_controller_factory(Arc::new(NewRenoConfig::default()))
}
CongestionController::Bbr => {
transport.congestion_controller_factory(Arc::new(BbrConfig::default()))
}
};

config.transport = Arc::new(transport);
config.transport = Arc::new(transport);
config
};

config
};
let mut endpoint = Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
endpoint.set_default_client_config(config);

let mut endpoint = Endpoint::client(SocketAddr::from(([0, 0, 0, 0], 0)))?;
endpoint.set_default_client_config(config);
let (req_tx, req_rx) = mpsc::channel(1);

let (req_tx, req_rx) = mpsc::channel(1);
let listen_relay_request = tokio::spawn(listen_relay_request(
endpoint,
server_addr,
token_digest,
reduce_rtt,
req_rx,
));

Ok((
Self {
endpoint,
server_addr,
token_digest,
reduce_rtt,
req_rx,
},
req_tx,
))
}
Ok((listen_relay_request, req_tx))
}

pub async fn run(mut self) {
let mut conn = self.establish_conn().await;
async fn listen_relay_request(
endpoint: Endpoint,
server_addr: ServerAddr,
token_digest: [u8; 32],
reduce_rtt: bool,
mut req_rx: MpscReceiver<Request>,
) {
let mut conn = establish_connection(&endpoint, &server_addr, token_digest, reduce_rtt).await;

while let Some(req) = self.req_rx.recv().await {
match req {
Request::Connect { addr, tx } => {
let (send, recv) = self.get_bi_stream(&mut conn).await;
tokio::spawn(handle_connect(send, recv, addr, tx));
}
Request::Associate {
assoc_id,
pkt_send_rx,
pkt_receive_tx,
} => {}
while let Some(req) = req_rx.recv().await {
match req {
Request::Connect { addr, tx } => {
let (send, recv) =
new_bi_stream(&mut conn, &endpoint, &server_addr, token_digest, reduce_rtt)
.await;
tokio::spawn(handle_command_connect(send, recv, addr, tx));
}
Request::Associate {
assoc_id,
pkt_send_rx,
pkt_receive_tx,
} => {}
}
}
}

async fn establish_connection(
endpoint: &Endpoint,
server_addr: &ServerAddr,
token_digest: [u8; 32],
reduce_rtt: bool,
) -> (Connection, IncomingUniStreams, Datagrams) {
let (mut addrs, server_name) = match &server_addr {
ServerAddr::HostnameAddr { hostname, .. } => (
unsafe { mem::transmute(MaybeUninit::<IntoIter<SocketAddr>>::uninit()) },
hostname,
),
ServerAddr::SocketAddr {
server_addr,
server_name,
} => (vec![server_addr.to_owned()].into_iter(), server_name),
};

async fn get_bi_stream(
&self,
conn_ref: &mut (Connection, IncomingUniStreams, Datagrams),
) -> (SendStream, RecvStream) {
loop {
match conn_ref.0.open_bi().await {
Ok(res) => return res,
loop {
if let ServerAddr::HostnameAddr {
hostname,
server_port,
} = &server_addr
{
match (hostname.as_str(), *server_port).to_socket_addrs() {
Ok(resolved) => addrs = resolved,
Err(err) => {
match err {
ConnectionError::ConnectionClosed(_) | ConnectionError::TimedOut => {}
err => eprintln!("{err}"),
}
*conn_ref = self.establish_conn().await
eprintln!("{err}");
continue;
}
}
}
}

async fn establish_conn(&self) -> (Connection, IncomingUniStreams, Datagrams) {
let (mut addrs, server_name) = match &self.server_addr {
ServerAddr::HostnameAddr { hostname, .. } => (
unsafe { mem::transmute(MaybeUninit::<IntoIter<SocketAddr>>::uninit()) },
hostname,
),
ServerAddr::SocketAddr {
server_addr,
server_name,
} => (vec![server_addr.to_owned()].into_iter(), server_name),
};

loop {
if let ServerAddr::HostnameAddr {
hostname,
server_port,
} = &self.server_addr
{
match (hostname.as_str(), *server_port).to_socket_addrs() {
Ok(resolved) => addrs = resolved,
Err(err) => {
eprintln!("{err}");
continue;
}
}
}

for addr in addrs.as_ref() {
match self.endpoint.connect(*addr, server_name) {
Ok(conn) => {
match process_conn(conn, self.token_digest, self.reduce_rtt).await {
Ok(conn) => return conn,
Err(err) => eprintln!("{err}"),
}
}
for addr in addrs.as_ref() {
match endpoint.connect(*addr, server_name) {
Ok(conn) => match process_connection(conn, token_digest, reduce_rtt).await {
Ok(conn) => return conn,
Err(err) => eprintln!("{err}"),
}
},
Err(err) => eprintln!("{err}"),
}
}
}
}

async fn process_conn(
async fn process_connection(
conn: Connecting,
token_digest: [u8; 32],
reduce_rtt: bool,
Expand Down Expand Up @@ -195,7 +179,29 @@ async fn process_conn(
Ok((connection, uni_streams, datagrams))
}

async fn handle_connect(
async fn new_bi_stream(
conn_ref: &mut (Connection, IncomingUniStreams, Datagrams),
endpoint: &Endpoint,
server_addr: &ServerAddr,
token_digest: [u8; 32],
reduce_rtt: bool,
) -> (SendStream, RecvStream) {
loop {
match conn_ref.0.open_bi().await {
Ok(res) => return res,
Err(err) => {
match err {
ConnectionError::ConnectionClosed(_) | ConnectionError::TimedOut => {}
err => eprintln!("{err}"),
}
*conn_ref =
establish_connection(endpoint, server_addr, token_digest, reduce_rtt).await
}
}
}
}

async fn handle_command_connect(
mut send: SendStream,
mut recv: RecvStream,
addr: Address,
Expand All @@ -220,9 +226,18 @@ async fn handle_connect(
},
Err(err) => eprintln!("{err}"),
}

let _ = tx.send(None);
}

fn get_random_u32() -> u32 {
lazy_static! {
static ref RNG: Mutex<StdRng> = Mutex::new(StdRng::from_entropy());
}

RNG.lock().next_u32()
}

pub enum Request {
Connect {
addr: Address,
Expand All @@ -235,26 +250,14 @@ pub enum Request {
},
}

struct Rng(Mutex<StdRng>);

impl Rng {
fn get_random(&self) -> u32 {
self.0.lock().next_u32()
}
}

lazy_static! {
static ref RNG: Rng = Rng(Mutex::new(StdRng::from_entropy()));
}

impl Request {
pub fn new_connect(addr: Address) -> (Self, OneshotReceiver<Option<(SendStream, RecvStream)>>) {
let (tx, rx) = oneshot::channel();
(Request::Connect { addr, tx }, rx)
}

pub fn new_associate() -> (Self, MpscSender<()>, MpscReceiver<()>) {
let assoc_id = RNG.get_random();
let assoc_id = get_random_u32();
let (pkt_send_tx, pkt_send_rx) = mpsc::channel(1);
let (pkt_receive_tx, pkt_receive_rx) = mpsc::channel(1);

Expand Down
23 changes: 11 additions & 12 deletions client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{client::TuicClient, config::ConfigBuilder, socks5::Socks5Server};
use crate::config::ConfigBuilder;
use std::{env, process};

mod cert;
Expand All @@ -20,28 +20,27 @@ async fn main() {
}
};

let (tuic_client, req_tx) = match TuicClient::init(
let (tuic_client, req_tx) = match client::init(
config.server_addr,
config.certificate,
config.token_digest,
config.reduce_rtt,
config.congestion_controller,
) {
Ok((client, tx)) => (tokio::spawn(client.run()), tx),
Ok(res) => res,
Err(err) => {
eprintln!("{err}");
process::exit(1);
}
};

let socks5_server =
match Socks5Server::init(config.local_addr, config.socks5_auth, req_tx).await {
Ok(server) => tokio::spawn(server.run()),
Err(err) => {
eprintln!("{err}");
process::exit(1);
}
};
let socks5_server = match socks5::init(config.local_addr, config.socks5_auth, req_tx).await {
Ok(res) => res,
Err(err) => {
eprintln!("{err}");
process::exit(1);
}
};

let _ = tokio::try_join!(tuic_client, socks5_server);
let _ = tokio::join!(tuic_client, socks5_server);
}
Loading

0 comments on commit 4536cad

Please sign in to comment.