diff --git a/crates/water/src/config/mod.rs b/crates/water/src/config/mod.rs index 4188190..37116db 100644 --- a/crates/water/src/config/mod.rs +++ b/crates/water/src/config/mod.rs @@ -6,6 +6,8 @@ pub mod wasm_shared_config; +/// WATER configuration +#[derive(Clone)] pub struct WATERConfig { /// Path to the .wasm binary pub filepath: String, @@ -40,15 +42,15 @@ impl WATERConfig { } } -/// A enum of types of the client +/// WATER client type: A enum of types of the client #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum WaterBinType { - Unknown, - Wrap, Dial, Listen, Relay, Runner, + Wrap, + Unknown, } impl From for WaterBinType { @@ -56,9 +58,9 @@ impl From for WaterBinType { match num { 0 => WaterBinType::Dial, 1 => WaterBinType::Listen, - 2 => WaterBinType::Runner, - 3 => WaterBinType::Wrap, - 4 => WaterBinType::Relay, + 2 => WaterBinType::Relay, + 3 => WaterBinType::Runner, + 4 => WaterBinType::Wrap, _ => WaterBinType::Unknown, } } diff --git a/crates/water/src/globals.rs b/crates/water/src/globals.rs index 3c9220f..9ed224d 100644 --- a/crates/water/src/globals.rs +++ b/crates/water/src/globals.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] pub const WASM_PATH: &str = "./proxy.wasm"; -pub const CONFIG_WASM_PATH: &str = "./conf.json"; +pub const CONFIG_WASM_PATH: &str = "./config.json"; pub const MAIN: &str = "main"; pub const VERSION_FN: &str = "_water_version"; diff --git a/crates/water/src/runtime/client.rs b/crates/water/src/runtime/client.rs index 70baf90..de2d28e 100644 --- a/crates/water/src/runtime/client.rs +++ b/crates/water/src/runtime/client.rs @@ -96,6 +96,32 @@ impl WATERClient { }) } + pub fn keep_listen(&mut self) -> Result { + info!("[HOST] WATERClient keep listening...",); + + let water = match &mut self.stream { + WATERClientType::Listener(ref mut listener) => WATERClientType::Listener(Box::new( + v0::listener::WATERListener::migrate_listener(&self.config, listener.get_core())?, + ) + as Box), + WATERClientType::Relay(ref mut relay) => WATERClientType::Relay(Box::new( + v0::relay::WATERRelay::migrate_listener(&self.config, relay.get_core())?, + ) + as Box), + _ => { + return Err(anyhow::anyhow!( + "[HOST] This client is neither a Listener nor a Relay" + )); + } + }; + + Ok(WATERClient { + config: self.config.clone(), + debug: self.debug, + stream: water, + }) + } + pub fn set_debug(&mut self, debug: bool) { self.debug = debug; } @@ -185,10 +211,7 @@ impl WATERClient { match &mut self.stream { WATERClientType::Dialer(dialer) => dialer.run_entry_fn(&self.config), - WATERClientType::Listener(listener) => { - // TODO: clone listener here, since we are doing one WATM instance / accept - listener.run_entry_fn(&self.config) - } + WATERClientType::Listener(listener) => listener.run_entry_fn(&self.config), WATERClientType::Relay(relay) => relay.run_entry_fn(&self.config), _ => Err(anyhow::anyhow!("This client is not a Runner")), } diff --git a/crates/water/src/runtime/core.rs b/crates/water/src/runtime/core.rs index cbc3624..46e21c5 100644 --- a/crates/water/src/runtime/core.rs +++ b/crates/water/src/runtime/core.rs @@ -29,16 +29,20 @@ impl H2O { pub fn init(conf: &WATERConfig) -> Result { info!("[HOST] WATERCore H2O initing..."); - let mut wasm_config = wasmtime::Config::new(); - wasm_config.wasm_threads(true); + let wasm_config = wasmtime::Config::new(); + + #[cfg(feature = "multithread")] + { + wasm_config.wasm_threads(true); + } let engine = Engine::new(&wasm_config)?; - let mut linker: Linker = Linker::new(&engine); + let linker: Linker = Linker::new(&engine); let module = Module::from_file(&engine, &conf.filepath)?; let host = Host::default(); - let mut store = Store::new(&engine, host); + let store = Store::new(&engine, host); let mut error_occured = None; @@ -71,6 +75,17 @@ impl H2O { return Err(anyhow::Error::msg("WATM module version not found")); } + Self::create_core(conf, linker, store, module, engine, version) + } + + pub fn create_core( + conf: &WATERConfig, + mut linker: Linker, + mut store: Store, + module: Module, + engine: Engine, + version: Option, + ) -> Result { store.data_mut().preview1_ctx = Some(WasiCtxBuilder::new().inherit_stdio().build()); if store.data().preview1_ctx.is_none() { @@ -146,6 +161,57 @@ impl H2O { }) } + // This function is for migrating the v0 core for listener and relay + // to handle every new connection is creating a new separate core (as v0 spec) + pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O) -> Result { + info!("[HOST] WATERCore H2O v0_migrating..."); + + // reseting the listener accepted_fd or the relay's accepted_fd & dial_fd + // when migrating from existed listener / relay + let version = match &core.version { + Version::V0(v0conf) => { + match v0conf { + Some(og_v0_conf) => match og_v0_conf.lock() { + Ok(og_v0_conf) => { + let mut new_v0_conf_inner = og_v0_conf.clone(); + // reset the new cloned v0conf + new_v0_conf_inner.reset_listener_or_relay(); + + Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner)))) + } + Err(e) => { + return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?; + } + }, + None => { + return Err(anyhow::anyhow!("v0_conf is None"))?; + } + } + } + _ => { + return Err(anyhow::anyhow!("This is not a V0 core"))?; + } + }; + + // NOTE: Some of the followings can reuse the existing core, leave to later explore + let wasm_config = wasmtime::Config::new(); + + #[cfg(feature = "multithread")] + { + wasm_config.wasm_threads(true); + } + + let engine = Engine::new(&wasm_config)?; + let linker: Linker = Linker::new(&engine); + + let module = Module::from_file(&engine, &conf.filepath)?; + + let host = Host::default(); + let store = Store::new(&engine, host); + + Self::create_core(conf, linker, store, module, engine, Some(version)) + } + pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> { self._init(conf.debug)?; self._process_config(conf)?; // This is for now needed only by v1_preview diff --git a/crates/water/src/runtime/transport.rs b/crates/water/src/runtime/transport.rs index 86e079d..e0a6a0b 100644 --- a/crates/water/src/runtime/transport.rs +++ b/crates/water/src/runtime/transport.rs @@ -190,30 +190,4 @@ pub trait WATERTransportTrait: Send { Ok(handle) } - - // fn read(&mut self, _buf: &mut Vec) -> Result { - // Err(anyhow::anyhow!("Method not supported")) - // } - - // fn write(&mut self, _buf: &[u8]) -> Result<(), anyhow::Error> { - // Err(anyhow::anyhow!("Method not supported")) - // } - - // // v0 only - // fn cancel_with(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { - // Err(anyhow::anyhow!("Method not supported")) - // } - - // // v0 only - // fn cancel(&mut self, _conf: &WATERConfig) -> Result<(), anyhow::Error> { - // Err(anyhow::anyhow!("Method not supported")) - // } - - // // v0 only - // fn run_entry_fn( - // &mut self, - // _conf: &WATERConfig, - // ) -> Result>, anyhow::Error> { - // Err(anyhow::anyhow!("Method not supported")) - // } } diff --git a/crates/water/src/runtime/v0/config.rs b/crates/water/src/runtime/v0/config.rs index 45b9617..22058cb 100644 --- a/crates/water/src/runtime/v0/config.rs +++ b/crates/water/src/runtime/v0/config.rs @@ -51,8 +51,8 @@ impl Config { pub enum V0CRole { Unknown, Dialer(i32), - Listener(i32), - Relay(i32, i32), // listener_fd, dialer_fd + Listener(i32, i32), // listener_fd, accepted_fd + Relay(i32, i32, i32), // listener_fd, accepted_fd, dialer_fd } // V0 specific configurations @@ -93,8 +93,12 @@ impl V0Config { info!("[HOST] WATERCore V0 connecting to {}", addr); match &mut self.conn { - V0CRole::Relay(_lis, ref mut conn_fd) => { + V0CRole::Relay(_, _, ref mut conn_fd) => { // now relay has been built, need to dial + if *conn_fd != -1 { + return Err(anyhow::Error::msg("Relay already connected")); + } + let conn = std::net::TcpStream::connect(addr)?; *conn_fd = conn.as_raw_fd(); Ok(conn) @@ -116,9 +120,9 @@ impl V0Config { let listener = std::net::TcpListener::bind(addr)?; if is_relay { - self.conn = V0CRole::Relay(listener.into_raw_fd(), 0); + self.conn = V0CRole::Relay(listener.into_raw_fd(), -1, -1); } else { - self.conn = V0CRole::Listener(listener.into_raw_fd()); + self.conn = V0CRole::Listener(listener.into_raw_fd(), -1); } Ok(()) } @@ -126,17 +130,30 @@ impl V0Config { pub fn accept(&mut self) -> Result { info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn); - match &self.conn { - V0CRole::Listener(listener) => { - let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; + match self.conn { + V0CRole::Listener(ref mut listener_fd, ref mut accepted_fd) => { + if *accepted_fd != -1 { + return Err(anyhow::Error::msg("Listener already accepted")); + } + + let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) }; + let (stream, _) = listener.accept()?; - self.conn = V0CRole::Listener(listener.into_raw_fd()); // makde sure it is not closed after scope + + *listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope + *accepted_fd = stream.as_raw_fd(); + Ok(stream) } - V0CRole::Relay(listener, _) => { - let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; + V0CRole::Relay(ref mut listener_fd, ref mut accepted_fd, _) => { + if *accepted_fd != -1 { + return Err(anyhow::Error::msg("Relay already accepted")); + } + + let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) }; let (stream, _) = listener.accept()?; - self.conn = V0CRole::Relay(listener.into_raw_fd(), 0); // makde sure it is not closed after scope + *listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope + *accepted_fd = stream.as_raw_fd(); Ok(stream) } _ => Err(anyhow::Error::msg("not a listener")), @@ -146,22 +163,56 @@ impl V0Config { pub fn defer(&mut self) { info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn); - match &self.conn { - V0CRole::Listener(_listener) => { - // TODO: Listener shouldn't be deferred, but the stream it connected to should be - // let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; - // drop(listener); + match self.conn { + V0CRole::Listener(_, ref mut accepted_fd) => { + // The accepted stream should be defered, not the listener + let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; + drop(accepted_conn); + *accepted_fd = -1; // set it back to default } - V0CRole::Dialer(conn) => { - let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) }; + V0CRole::Dialer(conn_fd) => { + let conn = unsafe { std::net::TcpStream::from_raw_fd(conn_fd) }; drop(conn); } - V0CRole::Relay(_listener, conn) => { - // Listener shouldn't be deferred, like the above reason - // let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener) }; - // drop(listener); - let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn) }; + V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => { + let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; + drop(accepted_conn); + *accepted_fd = -1; // set it back to default + + let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) }; drop(conn); + *conn_fd = -1; // set it back to default + } + _ => {} + } + } + + pub fn reset_listener_or_relay(&mut self) { + info!( + "[HOST] WATERCore v0 reset lisener / relay with conn {:?} ...", + self.conn + ); + + match self.conn { + V0CRole::Listener(_, ref mut accepted_fd) => { + if *accepted_fd != -1 { + let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; + drop(accepted_conn); + *accepted_fd = -1; // set it back to default + } + } + V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => { + if *accepted_fd != -1 { + let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) }; + drop(accepted_conn); + *accepted_fd = -1; // set it back to default + } + + if *conn_fd != -1 { + let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) }; + drop(conn); + *conn_fd = -1; // set it back to default + } } _ => {} } diff --git a/crates/water/src/runtime/v0/listener.rs b/crates/water/src/runtime/v0/listener.rs index 685f507..c87da97 100644 --- a/crates/water/src/runtime/v0/listener.rs +++ b/crates/water/src/runtime/v0/listener.rs @@ -129,4 +129,14 @@ impl WATERListener { Ok(runtime) } + + pub fn migrate_listener(_conf: &WATERConfig, core: &H2O) -> Result { + info!("[HOST] WATERListener v0 migrating listener..."); + + let mut new_core = + core::H2O::v0_migrate_core(_conf, core).context("Failed to migrate core")?; + new_core._prepare(_conf)?; + + WATERListener::init(_conf, new_core) + } } diff --git a/crates/water/src/runtime/v0/relay.rs b/crates/water/src/runtime/v0/relay.rs index b97fea1..cb437a6 100644 --- a/crates/water/src/runtime/v0/relay.rs +++ b/crates/water/src/runtime/v0/relay.rs @@ -108,4 +108,14 @@ impl WATERRelay { Ok(runtime) } + + pub fn migrate_listener(_conf: &WATERConfig, core: &H2O) -> Result { + info!("[HOST] WATERelay v0 migrating listener..."); + + let mut new_core = + core::H2O::v0_migrate_core(_conf, core).context("Failed to migrate core")?; + new_core._prepare(_conf)?; + + WATERRelay::init(_conf, new_core) + } } diff --git a/examples/clients/cli/README.md b/examples/clients/cli/README.md index e023753..8c6d89c 100644 --- a/examples/clients/cli/README.md +++ b/examples/clients/cli/README.md @@ -5,11 +5,38 @@ ## How to run? To run the Host program + WASM: ```shell -cargo run --bin wasmable_transport -- --wasm-path <./proxy.wasm> --entry-fn
--config-wasm +cargo run --bin water_cli -- --wasm-path <./proxy.wasm> --entry-fn
--config-wasm --type-client <3> ``` Then you can netcat into the connection, for now, I included a `proxy.wasm` as a multiple conneciton echo server, test with several terminals: ```shell nc 127.0.0.1 9005 ``` -you should see `> CONNECTED` in the terminal of running WASM, then you can connect a bunch like this and input anything to see how it echos. \ No newline at end of file +you should see `> CONNECTED` in the terminal of running WASM, then you can connect a bunch like this and input anything to see how it echos. + +## Examples +To run the shadowsocks wasm: + +1. run the server side from the [official implementation](https://github.com/shadowsocks/shadowsocks-rust) with the following config: + ```json + { + "server": "127.0.0.1", + "server_port": 8388, + "password": "Test!23", + "method": "chacha20-ietf-poly1305" + } + ``` + and run the server side with: + ```shell + cargo run --bin ssserver -- -c .json + ``` + +2. then run the cli tool with the `ss_client_wasm` + ```shell + cargo run --bin water_cli -- --wasm-path demo_wasm/ss_client_wasm.wasm --entry-fn v1_listen --config-wasm demo_configs/ss_config.json --type-client 3 + ``` + +3. to test the traffic is going through + ```shell + curl -4 -v --socks5 localhost:8080 https://erikchi.com + ``` \ No newline at end of file diff --git a/examples/clients/cli/demo_configs/ss_config.json b/examples/clients/cli/demo_configs/ss_config.json new file mode 100644 index 0000000..f9b7db7 --- /dev/null +++ b/examples/clients/cli/demo_configs/ss_config.json @@ -0,0 +1,7 @@ +{ + "remote_address": "127.0.0.1", + "remote_port": 8388, + "local_address": "127.0.0.1", + "local_port": 8080, + "bypass": false +} \ No newline at end of file diff --git a/examples/clients/cli/demo_wasm/echo_client.wasm b/examples/clients/cli/demo_wasm/echo_client.wasm new file mode 100644 index 0000000..ca67e57 Binary files /dev/null and b/examples/clients/cli/demo_wasm/echo_client.wasm differ diff --git a/examples/clients/cli/demo_wasm/plain.wasm b/examples/clients/cli/demo_wasm/plain.wasm new file mode 100644 index 0000000..77e1350 Binary files /dev/null and b/examples/clients/cli/demo_wasm/plain.wasm differ diff --git a/examples/clients/cli/demo_wasm/proxy.wasm b/examples/clients/cli/demo_wasm/proxy.wasm new file mode 100644 index 0000000..c3c13d4 Binary files /dev/null and b/examples/clients/cli/demo_wasm/proxy.wasm differ diff --git a/examples/clients/cli/demo_wasm/ss_client_wasm.wasm b/examples/clients/cli/demo_wasm/ss_client_wasm.wasm new file mode 100644 index 0000000..11e5042 Binary files /dev/null and b/examples/clients/cli/demo_wasm/ss_client_wasm.wasm differ diff --git a/examples/clients/cli/src/cli.rs b/examples/clients/cli/src/cli.rs index 2954a06..4f49088 100644 --- a/examples/clients/cli/src/cli.rs +++ b/examples/clients/cli/src/cli.rs @@ -1,15 +1,12 @@ -use water::config::WATERConfig; +use water::config::{WATERConfig, WaterBinType}; use water::globals::{CONFIG_WASM_PATH, MAIN, WASM_PATH}; +use water::runtime; use clap::Parser; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { - /// optional address on which to listen - #[arg(short, long, default_value_t = String::from("127.0.0.1:9001"))] - listen: String, - /// Optional argument specifying the .wasm file to load #[arg(short, long, default_value_t = String::from(WASM_PATH))] wasm_path: String, @@ -23,11 +20,11 @@ struct Args { config_wasm: String, /// Optional argument specifying the client_type, default to be Runner - #[arg(short, long, default_value_t = 2)] + #[arg(short, long, default_value_t = 3)] type_client: u32, /// Optional argument enabling debug logging - #[arg(short, long, default_value_t = false)] + #[arg(short, long, default_value_t = true)] debug: bool, } @@ -37,7 +34,7 @@ impl From for WATERConfig { filepath: args.wasm_path, entry_fn: args.entry_fn, config_wasm: args.config_wasm, - client_type: args.type_client.into(), + client_type: WaterBinType::from(args.type_client), debug: args.debug, } } @@ -55,23 +52,20 @@ pub fn parse_and_execute() -> Result<(), anyhow::Error> { } pub fn execute(_conf: WATERConfig) -> Result<(), anyhow::Error> { - // let mut water_client = runtime::WATERClient::new(conf)?; - - // // FIXME: hardcoded the addr & port for now - // water_client.connect("", 0)?; - - // loop { - // // keep reading from stdin and call read and write function from water_client.stream - // let mut buf = String::new(); - // std::io::stdin().read_line(&mut buf)?; - - // water_client.write(buf.as_bytes())?; + let mut water_client = runtime::client::WATERClient::new(_conf).unwrap(); - // let mut buf = vec![0; 1024]; - // water_client.read(&mut buf)?; - - // println!("read: {:?}", String::from_utf8_lossy(&buf)); - // } + match water_client.config.client_type { + WaterBinType::Dial => { + water_client.connect().unwrap(); + } + WaterBinType::Runner => { + water_client.execute().unwrap(); + } + WaterBinType::Listen => {} + WaterBinType::Relay => {} + WaterBinType::Wrap => {} + WaterBinType::Unknown => {} + } Ok(()) } diff --git a/tests/tests/cross_lang_tests.rs b/tests/tests/cross_lang_tests.rs index fe341a1..b489cea 100644 --- a/tests/tests/cross_lang_tests.rs +++ b/tests/tests/cross_lang_tests.rs @@ -177,3 +177,58 @@ fn test_cross_lang_wasm_listener() -> Result<(), Box> { Ok(()) } + +// #[test] +// fn test_cross_lang_wasm_multi_listener() -> Result<(), Box> { +// tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + +// let cfg_str = r#" +// { +// "remote_address": "127.0.0.1", +// "remote_port": 8088, +// "local_address": "127.0.0.1", +// "local_port": 8082 +// } +// "#; +// // Create a directory inside of `std::env::temp_dir()`. +// let dir = tempdir()?; +// let file_path = dir.path().join("temp-config.txt"); +// let mut file = File::create(&file_path)?; +// writeln!(file, "{}", cfg_str)?; + +// let conf = config::WATERConfig::init( +// // plain.wasm is in v0 and fully compatible with the Go engine +// // More details for the Go-side of running plain.wasm check here: +// // https://github.com/gaukas/water/tree/master/examples/v0/plain +// // +// // More details for the implementation of plain.wasm check this PR: +// // https://github.com/erikziyunchi/water-rs/pull/10 +// // +// String::from("./test_wasm/plain.wasm"), +// String::from("_water_worker"), +// String::from(file_path.to_string_lossy()), +// config::WaterBinType::Listen, +// true, +// ) +// .unwrap(); + +// let mut water_client = runtime::client::WATERClient::new(conf).unwrap(); +// water_client.listen().unwrap(); + +// water_client.accept().unwrap(); +// water_client.cancel_with().unwrap(); +// let mut handler = water_client.run_worker().unwrap(); + +// for i in 0..5 { +// handler.join().unwrap(); +// let mut new_water = water_client.keep_listen().unwrap(); +// new_water.accept().unwrap(); +// new_water.cancel_with().unwrap(); +// handler = new_water.run_worker().unwrap(); +// } + +// drop(file); +// dir.close()?; + +// Ok(()) +// } diff --git a/tests/tests/spinning_relay.rs b/tests/tests/spinning_relay.rs index cedd557..366218c 100644 --- a/tests/tests/spinning_relay.rs +++ b/tests/tests/spinning_relay.rs @@ -158,7 +158,26 @@ fn spin_cross_lang_wasm_relay() -> Result<(), Box> { water_client.associate().unwrap(); water_client.cancel_with().unwrap(); - let handle_water = water_client.run_worker().unwrap(); + let mut handle_water = water_client.run_worker().unwrap(); + + for _i in 0..5 { + match handle_water.join().unwrap() { + Ok(_) => {} + Err(e) => { + eprintln!("Running _water_worker ERROR: {}", e); + return Err(Box::new(Error::new( + ErrorKind::Other, + "Failed to join _water_worker thread", + ))); + } + }; + + let mut new_water = water_client.keep_listen().unwrap(); + // no need to call relay again, since relay() is also creating the listener + new_water.associate().unwrap(); + new_water.cancel_with().unwrap(); + handle_water = new_water.run_worker().unwrap(); + } std::thread::sleep(std::time::Duration::from_secs(20)); @@ -166,16 +185,6 @@ fn spin_cross_lang_wasm_relay() -> Result<(), Box> { drop(file); dir.close()?; - match handle_water.join().unwrap() { - Ok(_) => {} - Err(e) => { - eprintln!("Running _water_worker ERROR: {}", e); - return Err(Box::new(Error::new( - ErrorKind::Other, - "Failed to join _water_worker thread", - ))); - } - }; Ok(()) }