Skip to content

chore(l1): drop unwraps from rpc crate #3075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use rand::rngs::OsRng;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{
fs,
future::IntoFuture,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
sync::Arc,
Expand Down Expand Up @@ -157,8 +156,7 @@ pub async fn init_rpc_api(
l2_opts.sponsor_private_key,
#[cfg(feature = "l2")]
rollup_store,
)
.into_future();
);

tracker.spawn(rpc_api);
}
Expand Down
3 changes: 3 additions & 0 deletions crates/networking/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,6 @@ path = "./lib.rs"

[features]
l2 = ["ethrex-storage-rollup"]

[lints.clippy]
unwrap_used = "deny"
8 changes: 4 additions & 4 deletions crates/networking/rpc/authentication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn validate_jwt_authentication(token: &str, secret: &Bytes) -> Result<(), Au
validation.set_required_spec_claims(&["iat"]);
match decode::<Claims>(token, &decoding_key, &validation) {
Ok(token_data) => {
if invalid_issued_at_claim(token_data) {
if invalid_issued_at_claim(token_data)? {
Err(AuthenticationError::InvalidIssuedAtClaim)
} else {
Ok(())
Expand All @@ -57,10 +57,10 @@ pub fn validate_jwt_authentication(token: &str, secret: &Bytes) -> Result<(), Au
}

/// Checks that the "iat" timestamp in the claim is less than 60 seconds from now
fn invalid_issued_at_claim(token_data: TokenData<Claims>) -> bool {
fn invalid_issued_at_claim(token_data: TokenData<Claims>) -> Result<bool, AuthenticationError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.map_err(|_| AuthenticationError::InvalidIssuedAtClaim)?
.as_secs() as usize;
(now as isize - token_data.claims.iat as isize).abs() > 60
Ok((now as isize - token_data.claims.iat as isize).abs() > 60)
}
2 changes: 2 additions & 0 deletions crates/networking/rpc/clients/beacon/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub enum BeaconClientError {
RpcError(u64, String),
#[error("Response deserialization error: {0}")]
DeserializeError(#[from] serde_json::Error),
#[error("Failed to set url endpoint: {0}")]
FailedToSetURLEndpointError(String),
#[error("Error: {0}")]
Custom(String),
}
4 changes: 3 additions & 1 deletion crates/networking/rpc/clients/beacon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ impl BeaconClient {
println!("Sending request: {endpoint}");
let response = self
.client
.get(self.url.clone().join(endpoint).unwrap())
.get(self.url.clone().join(endpoint).map_err(|error| {
BeaconClientError::FailedToSetURLEndpointError(error.to_string())
})?)
.header("content-type", "application/json")
.header("accept", "application/json")
.send()
Expand Down
44 changes: 26 additions & 18 deletions crates/networking/rpc/l2/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,18 @@ impl RpcHandler for SponsoredTx {
.await
.map_err(RpcErr::from)?;
let gas_price_request = GasPrice {}.handle(context.clone()).await?;
let max_fee_per_gas = u64::from_str_radix(
gas_price_request
.as_str()
.unwrap_or("0x0")
.strip_prefix("0x")
.unwrap(),
16,
)
.map_err(|err| RpcErr::Internal(err.to_string()))?;

let gas_price_request = gas_price_request
.as_str()
.unwrap_or("0x0")
.strip_prefix("0x")
.ok_or(RpcErr::InvalidEthrexL2Message(
"Gas price request has invalid format".to_string(),
))?;

let max_fee_per_gas = u64::from_str_radix(gas_price_request, 16).map_err(|error| {
RpcErr::InvalidEthrexL2Message(format!("Gas price request has invalid size: {error}"))
})?;

let mut tx = if let Some(auth_list) = &self.authorization_list {
SendRawTransactionRequest::EIP7702(EIP7702Transaction {
Expand Down Expand Up @@ -195,15 +198,20 @@ impl RpcHandler for SponsoredTx {
}
.handle(context.clone())
.await?;
let gas_limit = u64::from_str_radix(
estimate_gas_request
.as_str()
.unwrap_or("0x0")
.strip_prefix("0x")
.unwrap(),
16,
)
.unwrap();

let estimate_gas_request = estimate_gas_request
.as_str()
.unwrap_or("0x0")
.strip_prefix("0x")
.ok_or(RpcErr::InvalidEthrexL2Message(
"Estimate gas request has invalid format".to_string(),
))?;

let gas_limit = u64::from_str_radix(estimate_gas_request, 16).map_err(|error| {
RpcErr::InvalidEthrexL2Message(format!(
"Estimate gas request has invalid size: {error}"
))
})?;
if gas_limit == 0 || gas_limit > GAS_LIMIT_HARD_LIMIT {
return Err(RpcErr::InvalidEthrexL2Message(
"tx too expensive".to_string(),
Expand Down
58 changes: 35 additions & 23 deletions crates/networking/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::utils::{
use crate::{admin, net};
use crate::{eth, mempool};
use axum::extract::State;
use axum::{routing::post, Json, Router};
use axum::{http::StatusCode, routing::post, Json, Router};
use axum_extra::{
headers::{authorization::Bearer, Authorization},
TypedHeader,
Expand Down Expand Up @@ -140,7 +140,7 @@ pub async fn start_api(
#[cfg(feature = "l2")] valid_delegation_addresses: Vec<Address>,
#[cfg(feature = "l2")] sponsor_pk: SecretKey,
#[cfg(feature = "l2")] rollup_store: StoreRollup,
) {
) -> Result<(), RpcErr> {
// TODO: Refactor how filters are handled,
// filters are used by the filters endpoints (eth_newFilter, eth_getFilterChanges, ...etc)
let active_filters = Arc::new(Mutex::new(HashMap::new()));
Expand Down Expand Up @@ -187,7 +187,9 @@ pub async fn start_api(
.route("/", post(handle_http_request))
.layer(cors)
.with_state(service_context.clone());
let http_listener = TcpListener::bind(http_addr).await.unwrap();
let http_listener = TcpListener::bind(http_addr)
.await
.map_err(|error| RpcErr::Internal(error.to_string()))?;
let http_server = axum::serve(http_listener, http_router)
.with_graceful_shutdown(shutdown_signal())
.into_future();
Expand All @@ -204,7 +206,9 @@ pub async fn start_api(
let authrpc_router = Router::new()
.route("/", post(authrpc_handler))
.with_state(service_context);
let authrpc_listener = TcpListener::bind(authrpc_addr).await.unwrap();
let authrpc_listener = TcpListener::bind(authrpc_addr)
.await
.map_err(|error| RpcErr::Internal(error.to_string()))?;
let authrpc_server = axum::serve(authrpc_listener, authrpc_router)
.with_graceful_shutdown(shutdown_signal())
.into_future();
Expand All @@ -213,6 +217,7 @@ pub async fn start_api(
let _ = tokio::try_join!(authrpc_server, http_server)
.inspect_err(|e| info!("Error shutting down servers: {e:?}"));
}
Ok(())
}

async fn shutdown_signal() {
Expand All @@ -224,48 +229,56 @@ async fn shutdown_signal() {
async fn handle_http_request(
State(service_context): State<RpcApiContext>,
body: String,
) -> Json<Value> {
) -> Result<Json<Value>, StatusCode> {
let res = match serde_json::from_str::<RpcRequestWrapper>(&body) {
Ok(RpcRequestWrapper::Single(request)) => {
let res = map_http_requests(&request, service_context).await;
rpc_response(request.id, res)
rpc_response(request.id, res).map_err(|_| StatusCode::BAD_REQUEST)?
}
Ok(RpcRequestWrapper::Multiple(requests)) => {
let mut responses = Vec::new();
for req in requests {
let res = map_http_requests(&req, service_context.clone()).await;
responses.push(rpc_response(req.id, res));
responses.push(rpc_response(req.id, res).map_err(|_| StatusCode::BAD_REQUEST)?);
}
serde_json::to_value(responses).unwrap()
serde_json::to_value(responses).map_err(|_| StatusCode::BAD_REQUEST)?
}
Err(_) => rpc_response(
RpcRequestId::String("".to_string()),
Err(RpcErr::BadParams("Invalid request body".to_string())),
),
)
.map_err(|_| StatusCode::BAD_REQUEST)?,
};
Json(res)
Ok(Json(res))
}

pub async fn handle_authrpc_request(
State(service_context): State<RpcApiContext>,
auth_header: Option<TypedHeader<Authorization<Bearer>>>,
body: String,
) -> Json<Value> {
) -> Result<Json<Value>, StatusCode> {
let req: RpcRequest = match serde_json::from_str(&body) {
Ok(req) => req,
Err(_) => {
return Json(rpc_response(
RpcRequestId::String("".to_string()),
Err(RpcErr::BadParams("Invalid request body".to_string())),
return Ok(Json(
rpc_response(
RpcRequestId::String("".to_string()),
Err(RpcErr::BadParams("Invalid request body".to_string())),
)
.map_err(|_| StatusCode::BAD_REQUEST)?,
));
}
};
match authenticate(&service_context.node_data.jwt_secret, auth_header) {
Err(error) => Json(rpc_response(req.id, Err(error))),
Err(error) => Ok(Json(
rpc_response(req.id, Err(error)).map_err(|_| StatusCode::BAD_REQUEST)?,
)),
Ok(()) => {
// Proceed with the request
let res = map_authrpc_requests(&req, service_context).await;
Json(rpc_response(req.id, res))
Ok(Json(
rpc_response(req.id, res).map_err(|_| StatusCode::BAD_REQUEST)?,
))
}
}
}
Expand Down Expand Up @@ -440,11 +453,11 @@ pub async fn map_l2_requests(req: &RpcRequest, context: RpcApiContext) -> Result
}
}

fn rpc_response<E>(id: RpcRequestId, res: Result<Value, E>) -> Value
fn rpc_response<E>(id: RpcRequestId, res: Result<Value, E>) -> Result<Value, RpcErr>
where
E: Into<RpcErrorMetadata>,
{
match res {
Ok(match res {
Ok(result) => serde_json::to_value(RpcSuccessResponse {
id,
jsonrpc: "2.0".to_string(),
Expand All @@ -455,8 +468,7 @@ where
jsonrpc: "2.0".to_string(),
error: error.into(),
}),
}
.unwrap()
}?)
}

#[cfg(test)]
Expand Down Expand Up @@ -494,7 +506,7 @@ mod tests {

let enr_url = context.node_data.local_node_record.enr_url().unwrap();
let result = map_http_requests(&request, context).await;
let rpc_response = rpc_response(request.id, result);
let rpc_response = rpc_response(request.id, result).unwrap();
let blob_schedule = serde_json::json!({
"cancun": { "target": 3, "max": 6, "baseFeeUpdateFraction": 3338477 },
"prague": { "target": 6, "max": 9, "baseFeeUpdateFraction": 5007716 }
Expand Down Expand Up @@ -572,7 +584,7 @@ mod tests {
// Process request
let context = default_context_with_storage(storage).await;
let result = map_http_requests(&request, context).await;
let response = rpc_response(request.id, result);
let response = rpc_response(request.id, result).unwrap();
let expected_response = to_rpc_response_success_value(
r#"{"jsonrpc":"2.0","id":1,"result":{"accessList":[],"gasUsed":"0x5208"}}"#,
);
Expand Down Expand Up @@ -623,7 +635,7 @@ mod tests {
let context = default_context_with_storage(storage).await;
// Process request
let result = map_http_requests(&request, context).await;
let response = rpc_response(request.id, result);
let response = rpc_response(request.id, result).unwrap();
let expected_response_string =
format!(r#"{{"id":67,"jsonrpc": "2.0","result": "{}"}}"#, chain_id);
let expected_response = to_rpc_response_success_value(&expected_response_string);
Expand Down
3 changes: 2 additions & 1 deletion crates/networking/rpc/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ pub mod test_utils {
#[cfg(feature = "l2")]
rollup_store,
)
.await;
.await
.unwrap();
}

pub async fn default_context_with_storage(storage: Store) -> RpcApiContext {
Expand Down