Skip to content

Commit

Permalink
more tracing additions
Browse files Browse the repository at this point in the history
  • Loading branch information
agrinman committed May 1, 2021
1 parent 0c8e729 commit 0947924
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 49 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tunnelto_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ trust-dns-resolver = "0.20"
hmac-sha256 = "0.1.7"
hex = "0.4.3"
rand = "0.7.3"
async-trait = "0.1.50"

tracing = "0.1.25"
tracing-subscriber = "0.2.17"
Expand Down
25 changes: 16 additions & 9 deletions tunnelto_server/src/auth/auth_db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use rusoto_core::{Client, HttpClient, Region};
use rusoto_dynamodb::{AttributeValue, DynamoDb, DynamoDbClient, GetItemError, GetItemInput};

use super::AuthResult;
use crate::auth::AuthService;
use async_trait::async_trait;
use rusoto_credential::EnvironmentProvider;
use sha2::Digest;
use std::collections::HashMap;
Expand Down Expand Up @@ -62,23 +65,24 @@ pub enum Error {
SubdomainNotAuthorized,
}

pub enum AuthResult {
ReservedByYou,
ReservedByOther,
ReservedByYouButDelinquent,
Available,
}
impl AuthDbService {
pub async fn auth_sub_domain(
#[async_trait]
impl AuthService for AuthDbService {
type Error = Error;
type AuthKey = String;

async fn auth_sub_domain(
&self,
auth_key: &str,
auth_key: &String,
subdomain: &str,
) -> Result<AuthResult, Error> {
let authenticated_account_id = self.get_account_id_for_auth_key(auth_key).await?;
tracing::info!(account=%authenticated_account_id.to_string(), requested_subdomain=%subdomain, "authenticated client");

match self.get_account_id_for_subdomain(subdomain).await? {
Some(account_id) => {
// check you reserved it
if authenticated_account_id != account_id {
tracing::info!(account=%authenticated_account_id.to_string(), "reserved by other");
return Ok(AuthResult::ReservedByOther);
}

Expand All @@ -87,6 +91,7 @@ impl AuthDbService {
.is_account_in_good_standing(authenticated_account_id)
.await?
{
tracing::warn!(account=%authenticated_account_id.to_string(), "delinquent");
return Ok(AuthResult::ReservedByYouButDelinquent);
}

Expand All @@ -95,7 +100,9 @@ impl AuthDbService {
None => Ok(AuthResult::Available),
}
}
}

impl AuthDbService {
async fn get_account_id_for_auth_key(&self, auth_key: &str) -> Result<Uuid, Error> {
let auth_key_hash = key_id(auth_key);

Expand Down
18 changes: 11 additions & 7 deletions tunnelto_server/src/auth/client_auth.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::auth::reconnect_token::ReconnectTokenPayload;
use crate::auth_db::AuthResult;
use crate::auth::{AuthResult, AuthService};
use crate::{ReconnectToken, CONFIG};
use futures::{SinkExt, StreamExt};
use tracing::error;
Expand All @@ -12,6 +12,7 @@ pub struct ClientHandshake {
pub is_anonymous: bool,
}

#[tracing::instrument(skip(websocket))]
pub async fn auth_client_handshake(
mut websocket: WebSocket,
) -> Option<(WebSocket, ClientHandshake)> {
Expand All @@ -32,6 +33,7 @@ pub async fn auth_client_handshake(
}
}

#[tracing::instrument(skip(websocket))]
async fn auth_client_v1(
client_hello: ClientHelloV1,
mut websocket: WebSocket,
Expand Down Expand Up @@ -65,15 +67,16 @@ async fn auth_client_v1(
))
}

#[tracing::instrument(skip(client_hello_data, websocket))]
async fn auth_client(
client_hello_data: &[u8],
mut websocket: WebSocket,
) -> Option<(WebSocket, ClientHandshake)> {
// parse the client hello
let client_hello: ClientHello = match serde_json::from_slice(client_hello_data) {
Ok(ch) => ch,
Err(e) => {
error!("invalid client hello: {}", e);
Err(error) => {
error!(?error, "invalid client hello");
let data = serde_json::to_vec(&ServerHello::AuthFailed).unwrap_or_default();
let _ = websocket.send(Message::binary(data)).await;
return None;
Expand Down Expand Up @@ -172,23 +175,24 @@ async fn auth_client(
))
}

#[tracing::instrument(skip(token, websocket))]
async fn handle_reconnect_token(
token: ReconnectToken,
mut websocket: WebSocket,
) -> Option<(WebSocket, ClientHandshake)> {
let payload = match ReconnectTokenPayload::verify(token, &CONFIG.master_sig_key) {
Ok(payload) => payload,
Err(e) => {
error!("invalid reconnect token: {:?}", e);
Err(error) => {
error!(?error, "invalid reconnect token");
let data = serde_json::to_vec(&ServerHello::AuthFailed).unwrap_or_default();
let _ = websocket.send(Message::binary(data)).await;
return None;
}
};

tracing::debug!(
"accepting reconnect token from client: {}",
&payload.client_id
client_id=%&payload.client_id,
"accepting reconnect token from client",
);

Some((
Expand Down
41 changes: 41 additions & 0 deletions tunnelto_server/src/auth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::convert::TryInto;
Expand Down Expand Up @@ -47,3 +48,43 @@ impl SigKey {
signature == expected
}
}

/// Define the required behavior of an Authentication Service
#[async_trait]
pub trait AuthService {
type Error;
type AuthKey;

/// Authenticate a subdomain with an AuthKey
async fn auth_sub_domain(
&self,
auth_key: &Self::AuthKey,
subdomain: &str,
) -> Result<AuthResult, Self::Error>;
}

/// A result for authenticating a subdomain
pub enum AuthResult {
ReservedByYou,
ReservedByOther,
ReservedByYouButDelinquent,
Available,
}

#[derive(Debug, Clone, Copy)]
pub struct NoAuth;

#[async_trait]
impl AuthService for NoAuth {
type Error = ();
type AuthKey = ();

/// Authenticate a subdomain with an AuthKey
async fn auth_sub_domain(
&self,
_auth_key: &Self::AuthKey,
_subdomain: &str,
) -> Result<AuthResult, Self::Error> {
Ok(AuthResult::Available)
}
}
47 changes: 24 additions & 23 deletions tunnelto_server/src/control_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,30 @@ pub fn spawn<A: Into<SocketAddr>>(addr: A) {
tracing::debug!("Health Check #2 triggered");
"ok"
});
let client_conn = warp::path("wormhole").and(warp::ws()).map(move |ws: Ws| {
ws.on_upgrade(|w| {
async move { handle_new_connection(w).await }
.instrument(observability::remote_trace("handle_websocket"))
})
});
let client_conn = warp::path("wormhole")
.and(warp::header::optional("X-Forwarded-For"))
.and(warp::ws())
.map(move |fwd: Option<String>, ws: Ws| {
ws.on_upgrade(|w| {
async move { handle_new_connection(fwd.unwrap_or_default(), w).await }
.instrument(observability::remote_trace("handle_websocket"))
})
});

let routes = client_conn.or(health_check);

// spawn our websocket control server
tokio::spawn(warp::serve(routes).run(addr.into()));
}

async fn handle_new_connection(websocket: WebSocket) {
#[tracing::instrument(skip(websocket))]
async fn handle_new_connection(forwarded_for: String, websocket: WebSocket) {
let (websocket, handshake) = match try_client_handshake(websocket).await {
Some(ws) => ws,
None => return,
};

tracing::info!(?handshake.sub_domain, "open tunnel");
tracing::info!(?forwarded_for, ?handshake.sub_domain, "open tunnel");

let (tx, rx) = unbounded::<ControlPacket>();
let mut client = ConnectedClient {
Expand Down Expand Up @@ -97,6 +101,7 @@ async fn handle_new_connection(websocket: WebSocket) {
);
}

#[tracing::instrument(skip(websocket))]
async fn try_client_handshake(websocket: WebSocket) -> Option<(WebSocket, ClientHandshake)> {
// Authenticate client handshake
let (mut websocket, client_handshake) = client_auth::auth_client_handshake(websocket).await?;
Expand All @@ -109,8 +114,8 @@ async fn try_client_handshake(websocket: WebSocket) -> Option<(WebSocket, Client
.unwrap_or_default();

let send_result = websocket.send(Message::binary(data)).await;
if let Err(e) = send_result {
error!("aborting...failed to write server hello: {:?}", e);
if let Err(error) = send_result {
error!(?error, "aborting...failed to write server hello");
return None;
}

Expand Down Expand Up @@ -157,32 +162,28 @@ async fn process_client_messages(client: ConnectedClient, mut client_conn: Split
}
// handle close with reason
Some(Ok(msg)) if msg.is_close() && !msg.as_bytes().is_empty() => {
tracing::debug!("got close, reason = {:?}", msg.to_str());
tracing::debug!(close_reason=?msg, "got close");
Connections::remove(&client);
return;
}
_ => {
tracing::debug!("goodbye client: {:?}", &client.id);
tracing::debug!(?client.id, "goodbye client");
Connections::remove(&client);
return;
}
};

let packet = match ControlPacket::deserialize(&message) {
Ok(packet) => packet,
Err(e) => {
error!("invalid data packet: {:?}", e);
Err(error) => {
error!(?error, "invalid data packet");
continue;
}
};

let (stream_id, message) = match packet {
ControlPacket::Data(stream_id, data) => {
tracing::debug!(
"forwarding to stream[id={}]: {} bytes",
&stream_id.to_string(),
data.len()
);
tracing::debug!(?stream_id, num_bytes=?data.len(),"forwarding to stream");
(stream_id, StreamMessage::Data(data))
}
ControlPacket::Refused(stream_id) => {
Expand All @@ -203,8 +204,8 @@ async fn process_client_messages(client: ConnectedClient, mut client_conn: Split
let stream = ACTIVE_STREAMS.get(&stream_id).map(|s| s.value().clone());

if let Some(mut stream) = stream {
let _ = stream.tx.send(message).await.map_err(|e| {
tracing::error!("Failed to send to stream tx: {:?}", e);
let _ = stream.tx.send(message).await.map_err(|error| {
tracing::trace!(?error, "Failed to send to stream tx");
});
}
}
Expand All @@ -220,8 +221,8 @@ async fn tunnel_client(
match queue.next().await {
Some(packet) => {
let result = sink.send(Message::binary(packet.serialize())).await;
if result.is_err() {
error!("client disconnected: aborting.");
if let Err(error) = result {
tracing::trace!(?error, "client disconnected: aborting.");
Connections::remove(&client);
return;
}
Expand Down
3 changes: 3 additions & 0 deletions tunnelto_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ lazy_static! {
pub static ref AUTH_DB_SERVICE: AuthDbService =
AuthDbService::new().expect("failed to init auth-service");
pub static ref CONFIG: Config = Config::from_env();

// To disable all authentication:
// pub static ref AUTH_DB_SERVICE: crate::auth::NoAuth = crate::auth::NoAuth;
}

#[tokio::main]
Expand Down
6 changes: 4 additions & 2 deletions tunnelto_server/src/observability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use tracing_honeycomb::{register_dist_tracing_root, TraceId};
use warp::trace::Info;

pub fn remote_trace(source: &str) -> Span {
let current = tracing::Span::current();

let trace_id = TraceId::new();
let id = crate::CONFIG.instance_id.clone();

// Create a span using tracing macros
let span = tracing::info_span!("begin", id = %id, source = %source, req = %trace_id);
let span = tracing::info_span!(target: "event", parent: &current, "begin span", id = %id, source = %source, req = %trace_id);
span.in_scope(|| {
let _ = register_dist_tracing_root(trace_id, None).map_err(|e| {
eprintln!("register trace root error: {:?}", e);
Expand Down Expand Up @@ -37,7 +39,7 @@ pub fn network_trace(info: Info) -> Span {
if let Err(err) = register_dist_tracing_root(request_id, None) {
eprintln!("register trace root error (warp): {:?}", err);
}
tracing::info!(?id, ?method, ?path, ?remote_addr, "network request");
tracing::info!(id=%id, ?method, ?path, ?remote_addr, "network request");
});

span
Expand Down
Loading

0 comments on commit 0947924

Please sign in to comment.