Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
juan518munoz committed Oct 18, 2024
1 parent dca89d2 commit 317cfef
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 42 deletions.
1 change: 1 addition & 0 deletions core/node/eigenda_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tracing.workspace = true
rlp.workspace = true
rand.workspace = true
sha3.workspace = true
hex.workspace = true
# we can't use the workspace version of prost because
# the tonic dependency requires a hugher version.
prost = "0.13.1"
Expand Down
22 changes: 17 additions & 5 deletions core/node/eigenda_proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use axum::{
routing::{get, post},
Router,
};
use disperser::disperser_client::DisperserClient;
use request_processor::RequestProcessor;
use tokio::sync::watch;
use tonic::transport::{Channel, ClientTlsConfig};

mod blob_info;
mod common;
Expand All @@ -20,7 +22,17 @@ pub async fn run_server(mut stop_receiver: watch::Receiver<bool>) -> anyhow::Res
// TODO: Replace port for config
let bind_address = SocketAddr::from(([0, 0, 0, 0], 4242));
tracing::info!("Starting eigenda proxy on {bind_address}");
let app = create_eigenda_proxy_router();

let disperser_endpoint = Channel::builder(
"https://disperser-holesky.eigenda.xyz"
.to_string()
.parse()
.unwrap(),
)
.tls_config(ClientTlsConfig::new().with_native_roots())
.unwrap();
let disperser = DisperserClient::connect(disperser_endpoint).await.unwrap();
let app = create_eigenda_proxy_router(disperser);

let listener = tokio::net::TcpListener::bind(bind_address)
.await
Expand All @@ -40,10 +52,10 @@ pub async fn run_server(mut stop_receiver: watch::Receiver<bool>) -> anyhow::Res
Ok(())
}

fn create_eigenda_proxy_router() -> Router {
let get_blob_id_processor = RequestProcessor::new();
fn create_eigenda_proxy_router(disperser: DisperserClient<Channel>) -> Router {
let get_blob_id_processor = RequestProcessor::new(disperser);
let _put_blob_id_processor = get_blob_id_processor.clone();
let mut router = Router::new()
let router = Router::new()
.route(
"/get/:l1_batch_number",
get(move |blob_id: Path<String>| async move {
Expand All @@ -52,7 +64,7 @@ fn create_eigenda_proxy_router() -> Router {
)
.route(
"/put/",
post(move |blob_id: Path<u32>| async move {
post(move |_blob_id: Path<u32>| async move {
// put_blob_id_processor
// .put_blob_id(blob_id)
// .await
Expand Down
3 changes: 1 addition & 2 deletions core/node/eigenda_proxy/src/memstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,8 @@ impl MemStore {
}
}

#[cfg(test)]
mod test {
use std::time::Duration;

use super::*;

#[tokio::test]
Expand Down
136 changes: 101 additions & 35 deletions core/node/eigenda_proxy/src/request_processor.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,74 @@
use axum::{extract::Path, Json};
use std::sync::Arc;

use crate::errors::RequestProcessorError;
use axum::{
extract::Path,
http::{header, StatusCode},
response::Response,
Json,
};
use tokio::sync::Mutex;
use tonic::transport::Channel;

#[derive(Clone)]
pub(crate) struct RequestProcessor {}

fn read_version_and_commitment(blob_id: String) -> (String, u64) {
if blob_id.len() < 4 {
panic!("Blob ID too short");
}

let mut blob_id = blob_id;
let prefix: String = blob_id.drain(..2).collect();
if prefix != "0x" {
panic!("Invalid prefix");
}
use crate::{
disperser::{disperser_client::DisperserClient, BlobStatus, BlobStatusRequest},
errors::RequestProcessorError,
};

let version: u64 = blob_id.drain(..1).collect::<String>().parse().unwrap();
match version {
0 => (),
1 => (),
_ => panic!("Invalid version {:?}", version),
}
println!("version: {:?}", version);

let commitment = blob_id;
println!("commitment: {:?}", commitment);

(commitment, version)
#[derive(Clone)]
pub(crate) struct RequestProcessor {
disperser: Arc<Mutex<DisperserClient<Channel>>>,
}

impl RequestProcessor {
pub(crate) fn new() -> Self {
Self {}
pub(crate) fn new(disperser: DisperserClient<Channel>) -> Self {
Self {
disperser: Arc::new(Mutex::new(disperser)),
}
}

pub(crate) async fn get_blob_id(
&self,
Path(blob_id): Path<String>,
) -> Result<Json<String>, RequestProcessorError> {
// Read commitment and mode version
let (commitment, version) = read_version_and_commitment(blob_id); // TODO: Implement

// Request commitment to dispatcher?
) -> axum::response::Response {
let request_id = match hex::decode(blob_id) {
Ok(request_id) => request_id,
Err(_) => {
return Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body("invalid commitment mode: invalid hex string".into())
.unwrap()
}
};

Ok(Json(commitment))
loop {
let request_id = request_id.clone();
let blob_status_reply = self
.disperser
.lock()
.await
.get_blob_status(BlobStatusRequest { request_id })
.await
.unwrap()
.into_inner();
let blob_status = blob_status_reply.status();
// TODO: May be suitable to sleep between calls to avoid spamming the server
match blob_status {
BlobStatus::Unknown => panic!("Blob status is unknown"),
BlobStatus::Processing => tracing::info!("Blob is processing"),
BlobStatus::Confirmed => tracing::info!("Blob is confirmed, but not finalized"),
BlobStatus::Failed => panic!("Blob has failed"),
BlobStatus::InsufficientSignatures => panic!("Insufficient signatures for blob"),
BlobStatus::Dispersing => tracing::info!("Blob is being dispersed"),
BlobStatus::Finalized => {
return Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/octet-stream")
.body("foo".into())
.unwrap()
}
}
}
}

#[tracing::instrument(skip_all)]
Expand All @@ -55,3 +79,45 @@ impl RequestProcessor {
Ok(Json(blob_id))
}
}

#[cfg(test)]
mod tests {
use axum::extract::Path;
use tonic::transport::ClientTlsConfig;

use super::*;

#[tokio::test]
#[should_panic]
async fn test_get_blob_id() {
let endpoint = Channel::builder(
"https://disperser-holesky.eigenda.xyz"
.to_string()
.parse()
.unwrap(),
)
.tls_config(ClientTlsConfig::new().with_native_roots())
.unwrap();
let disperser = DisperserClient::connect(endpoint).await.unwrap();
let request_processor = RequestProcessor::new(disperser);

// We know for certain that this blob id exists in the holesky disperser
let mut blob_id = hex::encode([
102, 99, 98, 102, 97, 51, 51, 98, 99, 55, 98, 52, 50, 100, 102, 98, 52, 102, 102, 54,
51, 98, 52, 97, 50, 54, 51, 56, 48, 55, 50, 53, 57, 53, 97, 48, 100, 51, 102, 54, 97,
102, 97, 48, 57, 51, 100, 54, 98, 57, 101, 50, 102, 50, 49, 54, 57, 50, 97, 55, 48, 98,
48, 54, 45, 51, 49, 51, 55, 51, 50, 51, 52, 51, 55, 51, 48, 51, 51, 51, 55, 51, 48, 51,
54, 51, 48, 51, 50, 51, 48, 51, 50, 51, 50, 51, 53, 51, 51, 51, 49, 51, 55, 50, 102,
51, 49, 50, 102, 51, 51, 51, 51, 50, 102, 51, 48, 50, 102, 51, 51, 51, 51, 50, 102,
101, 51, 98, 48, 99, 52, 52, 50, 57, 56, 102, 99, 49, 99, 49, 52, 57, 97, 102, 98, 102,
52, 99, 56, 57, 57, 54, 102, 98, 57, 50, 52, 50, 55, 97, 101, 52, 49, 101, 52, 54, 52,
57, 98, 57, 51, 52, 99, 97, 52, 57, 53, 57, 57, 49, 98, 55, 56, 53, 50, 98, 56, 53, 53,
]);
let response = request_processor.get_blob_id(Path(blob_id.clone())).await;
assert_eq!(response.status(), StatusCode::OK);

// We change the blob id to a non-existent one
blob_id.push_str("fa");
request_processor.get_blob_id(Path(blob_id)).await;
}
}

0 comments on commit 317cfef

Please sign in to comment.