Skip to content

Commit

Permalink
feat: switch to axum
Browse files Browse the repository at this point in the history
  • Loading branch information
skifli authored Dec 14, 2024
1 parent 0a1dad8 commit ff534ff
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 70 deletions.
2 changes: 1 addition & 1 deletion bruty_client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "bruty_client"
authors = ["skifli"]
version = "0.4.1"
version = "0.4.2"
edition = "2021"

[dependencies]
Expand Down
6 changes: 3 additions & 3 deletions bruty_server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
[package]
name = "bruty_server"
authors = ["skifli"]
version = "0.4.1"
version = "0.4.2"
edition = "2021"

[dependencies]
axum = { version = "0.7.3", features = ["ws"] }
flume = "0.11.0"
futures-util = "0.3.30"
log = "0.4"
rmp-serde = "1.1.2"
shuttle-warp = "0.49.0"
shuttle-axum = "0.49.0"
shuttle-runtime = "0.49.0"
tokio = { version = "1.40.0", features = ["full"] }
warp = "0.3.7"

bruty_share = { path = "../bruty_share" }
116 changes: 53 additions & 63 deletions bruty_server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use axum;
use bruty_share;
use flume;
use futures_util::SinkExt;
Expand All @@ -6,16 +7,15 @@ use log;
/* use shuttle_persist; */
use shuttle_runtime::SecretStore;
use tokio;
use warp;
use warp::Filter;

mod payload_handlers;
mod server_threads;

const AUTHOR: &str = env!("CARGO_PKG_AUTHORS");
const VERSION: &str = env!("CARGO_PKG_VERSION");

type WebSocketSender = futures_util::stream::SplitSink<warp::ws::WebSocket, warp::ws::Message>;
type WebSocketSender =
futures_util::stream::SplitSink<axum::extract::ws::WebSocket, axum::extract::ws::Message>;

/// An extension trait for `SplitSink` that adds methods for sending payloads.
pub trait SplitSinkExt {
Expand All @@ -25,23 +25,22 @@ pub trait SplitSinkExt {
/// * `payload` - The payload to send.
///
/// # Returns
/// * `Result<(), warp::Error>` - The result of sending the payload.
/// * `Result<(), axum::Error>` - The result of sending the payload.
fn send_payload(
&mut self,
payload: bruty_share::Payload,
) -> impl std::future::Future<Output = std::result::Result<(), warp::Error>>;
) -> impl std::future::Future<Output = std::result::Result<(), axum::Error>>;
}

/// An extension trait for `SplitStream` that adds methods for receiving payloads.
impl SplitSinkExt for WebSocketSender {
async fn send_payload(
&mut self,
payload: bruty_share::Payload,
) -> std::result::Result<(), warp::Error> {
self.send(warp::ws::Message::binary(
) -> std::result::Result<(), axum::Error> {
self.send(axum::extract::ws::Message::Binary(
rmp_serde::to_vec(&payload).unwrap_or_else(|err| {
log::error!("Failed to serialize payload: {}.", err);

vec![]
}),
))
Expand All @@ -63,12 +62,12 @@ impl SplitSinkExt for WebSocketSender {
/// * `bool` - Whether the connection shouldn't be closed.
async fn handle_msg(
websocket_sender: &mut WebSocketSender,
msg: warp::ws::Message,
binary_msg: Vec<u8>,
session: &mut bruty_share::types::Session,
/* persist: &shuttle_persist::PersistInstance, */
server_data: &bruty_share::types::ServerData,
) -> bool {
let payload: bruty_share::Payload = match rmp_serde::from_slice(&msg.as_bytes()) {
let payload: bruty_share::Payload = match rmp_serde::from_slice(&binary_msg) {
Ok(payload) => payload,
Err(err) => {
log::error!(
Expand Down Expand Up @@ -157,7 +156,7 @@ async fn handle_msg(
/// * `persist` - The database connection.
/// * `server_data` - The server's data, with channels used for communication between threads.
async fn handle_websocket(
websocket: warp::ws::WebSocket,
websocket: axum::extract::ws::WebSocket,
session: &mut bruty_share::types::Session,
/* persist: shuttle_persist::PersistInstance, */
server_data: bruty_share::types::ServerData,
Expand Down Expand Up @@ -193,29 +192,33 @@ async fn handle_websocket(
}
};

if msg.is_binary() {
// Binary WebSocket message received
if !handle_msg(
&mut websocket_sender,
msg,
session,
/* &persist, */
&server_data,
)
.await
{
manually_closed = true;
match msg {
axum::extract::ws::Message::Binary(binary_msg) => {
// Binary WebSocket message received
if !handle_msg(
&mut websocket_sender,
binary_msg,
session,
/* &persist, */
&server_data,
)
.await
{
manually_closed = true;
break;
}
},
axum::extract::ws::Message::Close(_) => {
// Client requested to close the connection
break;
}
} else if msg.is_close() {
// Client requested to close the connection
break;
} else {
// Invalid WebSocket message type received
// We don't care about text, ping, pong, etc.
},
_ => {
// Invalid WebSocket message type received
// We don't care about text, ping, pong, etc.

log::warn!("Invalid WebSocket message type received from {} (ID {}).", session.user.name, session.user.id);
continue;
log::warn!("Invalid WebSocket message type received from {} (ID {}).", session.user.name, session.user.id);
continue;
}
}
}
_ = &mut heartbeat_timer => {
Expand Down Expand Up @@ -284,7 +287,7 @@ async fn handle_websocket(
.unwrap();

log::info!(
"Forwaded awaiting result from {} (ID {}) to the next session.",
"Forwarded awaiting result from {} (ID {}) to the next session.",
session.user.name,
session.user.id
);
Expand All @@ -300,13 +303,13 @@ async fn handle_websocket(
/// * `headers` - The headers of the request.
///
/// # Returns
/// * `impl warp::Reply` - The result of handling the connection.
fn handle_connection(
ws: warp::ws::Ws,
/* persist: shuttle_persist::PersistInstance, */
server_data: bruty_share::types::ServerData,
headers: warp::http::HeaderMap,
) -> impl warp::Reply {
/// * `impl axum::response::IntoResponse` - The response to send.
async fn handle_connection(
ws: axum::extract::WebSocketUpgrade,
headers: axum::http::HeaderMap,
/* persist: axum::extract::Extension<shuttle_persist::PersistInstance>, */
server_data: axum::extract::Extension<bruty_share::types::ServerData>,
) -> impl axum::response::IntoResponse {
ws.on_upgrade(move |websocket| async move {
let user_agent = headers
.get("user-agent")
Expand All @@ -329,8 +332,8 @@ fn handle_connection(
secret: "".to_string(),
},
},
/* persist, */
server_data,
/* persist.0, */
server_data.0,
)
.await;
}
Expand All @@ -341,7 +344,7 @@ fn handle_connection(
async fn main(
/* #[shuttle_persist::Persist] persist: shuttle_persist::PersistInstance, */
#[shuttle_runtime::Secrets] secrets: SecretStore,
) -> shuttle_warp::ShuttleWarp<(impl warp::Reply,)> {
) -> shuttle_axum::ShuttleAxum {
log::info!("Bruty Server v{} by {}.", VERSION, AUTHOR);

let users_vec: Vec<char> = secrets.get("USERS").unwrap().chars().collect();
Expand Down Expand Up @@ -430,24 +433,11 @@ async fn main(
server_threads::results_handler(results_receiver).await;
});

// Creates the WebSocket route
let websocket = warp::path("ivocord")
.and(warp::ws()) // Make the route a WebSocket route
/* .and(warp::any().map(move || persist.clone())) // Clone the persist instance */
.and(warp::any().map(move || server_data.clone())) // Clone the server's sender channels
.and(warp::header::headers_cloned()) // Get the headers of the request
.map(
|ws: warp::ws::Ws,
/* persist: shuttle_persist::PersistInstance, */
server_data: bruty_share::types::ServerData,
headers: warp::http::HeaderMap| {
handle_connection(ws, /* persist , */ server_data, headers)
},
); // Handle the connection

let status = warp::path("status").map(|| warp::reply());

let routes = websocket.or(status);

Ok(routes.boxed().into())
let router = axum::Router::new()
.route("/ivocord", axum::routing::get(handle_connection))
/* .layer(axum::Extension(persist)) */
.layer(axum::Extension(server_data))
.route("/status", axum::routing::get(|| async { "OK" }));

Ok(router.into())
}
2 changes: 1 addition & 1 deletion bruty_server/src/payload_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{SplitSinkExt, WebSocketSender};
use futures_util::SinkExt;

const ALLOWED_CLIENT_VERSIONS: &[&str] = &["0.4.1"];
const ALLOWED_CLIENT_VERSIONS: &[&str] = &["0.4.2"];

/// Checks if the connection is authenticated.
/// If not, it sends an InvalidSession OP code and closes the connection.
Expand Down
2 changes: 1 addition & 1 deletion bruty_share/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "bruty_share"
authors = ["skifli"]
version = "0.4.1"
version = "0.4.2"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
2 changes: 1 addition & 1 deletion bruty_share/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn setup(console: bool, log_file: Option<String>) -> Result<(), fern::InitEr
logger
.level_for("reqwest", log::LevelFilter::Warn)
.level_for("tokio-tungstenite", log::LevelFilter::Warn)
.level_for("warp", log::LevelFilter::Warn)
.level_for("axum", log::LevelFilter::Warn)
.apply()?;

Ok(())
Expand Down

0 comments on commit ff534ff

Please sign in to comment.