Skip to content

Commit

Permalink
Add support for -L tunnels to the example client
Browse files Browse the repository at this point in the history
  • Loading branch information
honzasp committed Aug 6, 2022
1 parent 9d86079 commit 6008e51
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 41 deletions.
208 changes: 169 additions & 39 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::{Result, Context as _, bail};
use bytes::BytesMut;
use enclose::enclose;
use futures::ready;
use futures::future::{FutureExt as _, FusedFuture as _};
use futures::stream::{StreamExt as _, TryStreamExt as _, FuturesUnordered};
use regex::Regex;
use rustix::termios;
use std::collections::HashSet;
Expand Down Expand Up @@ -49,6 +51,14 @@ fn run_main() -> Result<ExitCode> {
.value_name("command"))
.arg(clap::Arg::new("want-tty").short('t')
.action(clap::ArgAction::SetTrue))
.arg(clap::Arg::new("local-tunnel").short('L')
.takes_value(true)
.action(clap::ArgAction::Append)
.value_name("[local-host:]local-port:remote-host:remote-port"))
.arg(clap::Arg::new("remote-tunnel").short('R')
.takes_value(true)
.action(clap::ArgAction::Append)
.value_name("[remote-host:]remote-port:local-host:local-port"))
.get_matches();

let mut destination = Destination::default();
Expand All @@ -60,19 +70,41 @@ fn run_main() -> Result<ExitCode> {

let keys = matches.get_many::<PathBuf>("private-key")
.into_iter().flatten()
.map(|path| read_key(path))
.map(|key| read_key(&key))
.collect::<Result<Vec<_>>>()?;

let command = matches.get_one::<String>("command").cloned();
let want_tty = *matches.get_one::<bool>("want-tty").unwrap() || command.is_none();

let local_tunnels = matches.get_many::<String>("local-tunnel")
.into_iter().flatten()
.map(|spec| parse_tunnel_spec(&spec))
.collect::<Result<Vec<_>>>()?;
let remote_tunnels = matches.get_many::<String>("remote-tunnel")
.into_iter().flatten()
.map(|spec| parse_tunnel_spec(&spec))
.collect::<Result<Vec<_>>>()?;

let opts = Opts { destination, keys, command, want_tty, local_tunnels, remote_tunnels };

let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all().build()?;
let exit_code = runtime.block_on(run_client(destination, keys, command, want_tty))?;
let exit_code = runtime.block_on(run_client(opts))?;
runtime.shutdown_background();
Ok(exit_code)
}

#[derive(Debug)]
struct Opts {
destination: Destination,
keys: Vec<Key>,
command: Option<String>,
want_tty: bool,
local_tunnels: Vec<TunnelSpec>,
#[allow(dead_code)]
remote_tunnels: Vec<TunnelSpec>,
}

#[derive(Debug, Default)]
struct Destination {
host: Option<String>,
Expand All @@ -84,9 +116,9 @@ fn parse_destination(dest: &str) -> Result<Destination> {
let re = Regex::new(r"(?x)
^
(ssh://)?
((?P<username>\w+)@)?
((?P<username>\w+) @)?
(?P<host>[[:alnum:].]+)
(:(?P<port>[[:digit:]]+))?
(: (?P<port>[[:digit:]]+))?
$
").unwrap();
let captures = re.captures(dest)
Expand All @@ -99,6 +131,7 @@ fn parse_destination(dest: &str) -> Result<Destination> {
Ok(Destination { host, port, username })
}

#[derive(Debug)]
struct Key {
path: PathBuf,
data: Vec<u8>,
Expand All @@ -113,17 +146,41 @@ fn read_key(path: &Path) -> Result<Key> {
Ok(Key { path: path.into(), data, decoded })
}

async fn run_client(
destination: Destination,
keys: Vec<Key>,
command: Option<String>,
want_tty: bool,
) -> Result<ExitCode> {
let host = destination.host
#[derive(Debug)]
struct TunnelSpec {
bind_host: Option<String>,
bind_port: u16,
connect_host: String,
connect_port: u16,
}

fn parse_tunnel_spec(spec: &str) -> Result<TunnelSpec> {
let re = Regex::new(r"(?x)
^
((?P<bind_host>[[:alnum:].]+) :)?
(?P<bind_port>[[:digit:]]+) :
(?P<connect_host>[[:alnum:].]+) :
(?P<connect_port>[[:digit:]]+)
$
").unwrap();
let captures = re.captures(spec)
.context("invalid format of tunnel spec")?;

let bind_host = captures.name("bind_host").map(|x| x.as_str().into());
let bind_port = captures.name("bind_port").unwrap().as_str().parse()
.context("invalid bind-port in tunnel spec")?;
let connect_host = captures.name("connect_host").unwrap().as_str().into();
let connect_port = captures.name("connect_port").unwrap().as_str().parse()
.context("invalid connect-port in tunnel spec")?;
Ok(TunnelSpec { bind_host, bind_port, connect_host, connect_port })
}

async fn run_client(opts: Opts) -> Result<ExitCode> {
let host = opts.destination.host
.context("please specify the host to connect to")?;
let username = destination.username
let username = opts.destination.username
.context("please specify the username to login with")?;
let port = destination.port.unwrap_or(22);
let port = opts.destination.port.unwrap_or(22);
let config = makiko::ClientConfig::default_compatible_less_secure();

log::info!("connecting to host {:?}, port {}", host, port);
Expand All @@ -132,32 +189,47 @@ async fn run_client(
log::info!("successfully connected");

let (client, mut client_rx, client_fut) = makiko::Client::open(socket, config)?;
let client_task = tokio::task::spawn(client_fut);
let client_task = TaskHandle(tokio::task::spawn(client_fut));

let event_task = tokio::task::spawn(enclose!{(client) async move {
let event_task = TaskHandle(tokio::task::spawn(enclose!{(client) async move {
while let Some(event) = client_rx.recv().await? {
if let makiko::ClientEvent::ServerPubkey(pubkey, accept_tx) = event {
verify_pubkey(&client, pubkey, accept_tx).await?;
}
}
Result::<()>::Ok(())
}});
}}));

let interact_task = tokio::task::spawn(enclose!{(client) async move {
authenticate(&client, username, keys).await
let interact_task = TaskHandle(tokio::task::spawn(enclose!{(client) async move {
authenticate(&client, username, opts.keys).await
.context("could not authenticate")?;
log::info!("successfully authenticated");

let config = makiko::ChannelConfig::default();
let (session, session_rx) = client.open_session(config).await?;
let exit_code = interact(session, session_rx, command, want_tty).await?;
let session_task = TaskHandle(tokio::task::spawn(enclose!{(client) async move {
run_session(client, opts.command, opts.want_tty).await
}}));

let tunnel_tasks = opts.local_tunnels.into_iter().map(enclose!{(client) |spec| {
TaskHandle(tokio::task::spawn(run_local_tunnel(client.clone(), spec)))
}}).collect::<FuturesUnordered<_>>();

let mut session_fut = session_task.fuse();
let mut tunnels_fut = tunnel_tasks.try_collect().fuse();
let mut exit_code = ExitCode::SUCCESS;
while !session_fut.is_terminated() || !tunnels_fut.is_terminated() {
tokio::select! {
res = &mut session_fut => exit_code = res?,
res = &mut tunnels_fut => res?,
}
}

client.disconnect(makiko::DisconnectError::by_app())?;
Result::<ExitCode>::Ok(exit_code)
}});
Result::<_>::Ok(exit_code)
}}));

let mut client_fut = AbortOnDrop(client_task).map(|res| res.expect("client task panicked")).fuse();
let mut event_fut = AbortOnDrop(event_task).map(|res| res.expect("event task panicked")).fuse();
let mut interact_fut = AbortOnDrop(interact_task).map(|res| res.expect("interact task panicked")).fuse();
let mut client_fut = client_task.fuse();
let mut event_fut = event_task.fuse();
let mut interact_fut = interact_task.fuse();

let mut exit_code = None;
loop {
Expand Down Expand Up @@ -318,12 +390,10 @@ async fn authenticate(client: &makiko::Client, username: String, keys: Vec<Key>)
bail!("no authentication method succeeded")
}

async fn interact(
session: makiko::Session,
mut session_rx: makiko::SessionReceiver,
command: Option<String>,
want_tty: bool,
) -> Result<ExitCode> {
async fn run_session(client: makiko::Client, command: Option<String>, want_tty: bool) -> Result<ExitCode> {
let config = makiko::ChannelConfig::default();
let (session, mut session_rx) = client.open_session(config).await?;

let mut pty_req = None;
let mut orig_tio = None;
if want_tty && termios::isatty(std::io::stdin()) {
Expand Down Expand Up @@ -395,8 +465,8 @@ async fn interact(
Result::<()>::Ok(())
}});

let mut recv_fut = AbortOnDrop(recv_task).map(|res| res.expect("receiving task panicked")).fuse();
let mut send_fut = AbortOnDrop(send_task).map(|res| res.expect("sending task panicked")).fuse();
let mut recv_fut = TaskHandle(recv_task);
let mut send_fut = TaskHandle(send_task).fuse();
loop {
tokio::select!{
recv_res = &mut recv_fut => {
Expand All @@ -410,17 +480,77 @@ async fn interact(
}
}

async fn run_local_tunnel(client: makiko::Client, spec: TunnelSpec) -> Result<()> {
let bind_addr = (spec.bind_host.unwrap_or("localhost".into()), spec.bind_port);
let listener = tokio::net::TcpListener::bind(bind_addr).await?;
let mut socket_tasks = FuturesUnordered::new();
loop {
tokio::select!{
res = listener.accept() => {
let (socket, peer_addr) = res?;
let connect_addr = (spec.connect_host.clone(), spec.connect_port);
let originator_addr = (peer_addr.ip().to_string(), peer_addr.port());
let task = TaskHandle(tokio::task::spawn(
run_local_tunnel_socket(client.clone(), connect_addr, socket, originator_addr)
));
socket_tasks.push(task);
},
Some(res) = socket_tasks.next() => res?,
}
}
}

async fn run_local_tunnel_socket(
client: makiko::Client,
connect_addr: (String, u16),
socket: tokio::net::TcpStream,
originator_addr: (String, u16),
) -> Result<()> {
let config = makiko::ChannelConfig::default();
let (tunnel, mut tunnel_rx) = client.connect_tunnel(config, connect_addr, originator_addr).await?;
let (mut socket_read, mut socket_write) = socket.into_split();

let socket_to_client = TaskHandle(tokio::task::spawn(async move {
let mut buffer = BytesMut::new();
while socket_read.read_buf(&mut buffer).await? != 0 {
tunnel.send_data(buffer.split().freeze()).await?;
}
tunnel.send_eof().await?;
Result::<_>::Ok(())
}));

let client_to_socket = TaskHandle(tokio::task::spawn(async move {
while let Some(event) = tunnel_rx.recv().await? {
match event {
makiko::TunnelEvent::Data(mut data) =>
socket_write.write_all_buf(&mut data).await?,
makiko::TunnelEvent::Eof =>
break,
_ => {},
}
}
Result::<_>::Ok(())
}));

tokio::try_join!(socket_to_client, client_to_socket)?;
Ok(())
}

#[derive(Debug)]
pub struct AbortOnDrop<T>(pub tokio::task::JoinHandle<T>);
pub struct TaskHandle<T>(pub tokio::task::JoinHandle<T>);

impl<T> Future for AbortOnDrop<T> {
type Output = Result<T, tokio::task::JoinError>;
impl<T> Future for TaskHandle<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.get_mut().0).poll(cx)
match ready!(Pin::new(&mut self.get_mut().0).poll(cx)) {
Ok(res) => Poll::Ready(res),
Err(err) if err.is_panic() => std::panic::resume_unwind(err.into_panic()),
Err(err) => panic!("Task failed: {}", err),
}
}
}

impl<T> Drop for AbortOnDrop<T> {
impl<T> Drop for TaskHandle<T> {
fn drop(&mut self) {
self.0.abort();
}
Expand Down
9 changes: 7 additions & 2 deletions src/keys/openssh.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Encoding and decoding keys.
use bytes::Bytes;
use derivative::Derivative;
use crate::cipher::{self, CipherAlgoVariant};
use crate::codec::PacketDecode;
use crate::error::{Result, Error};
Expand All @@ -8,11 +9,13 @@ use crate::pubkey::{Pubkey, Privkey};
/// Keypair (public and private key) in OpenSSH format.
///
/// Note that we do not check that the public key and private key form a valid keypair.
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq, Derivative)]
#[derivative(Debug)]
pub struct OpensshKeypair {
/// Public key, always unencrypted.
pub pubkey: Pubkey,
/// Private key, may be encrypted in the key file.
#[cfg_attr(not(feature = "debug_less_secure"), derivative(Debug = "ignore"))]
pub privkey: Privkey,
/// Comment, encrypted if and only if the private key is encrypted.
pub comment: String,
Expand All @@ -22,11 +25,13 @@ pub struct OpensshKeypair {
///
/// We can always decode the public key, which is stored without encryption. The private key will
/// be decoded only if the file was not encrypted.
#[derive(Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq, Derivative)]
#[derivative(Debug)]
pub struct OpensshKeypairNopass {
/// Public key, available even without password.
pub pubkey: Pubkey,
/// Private key, available only if the key file was not encrypted.
#[cfg_attr(not(feature = "debug_less_secure"), derivative(Debug = "ignore"))]
pub privkey: Option<Privkey>,
/// Comment, available only if the key file was not encrypted.
pub comment: Option<String>,
Expand Down

0 comments on commit 6008e51

Please sign in to comment.