-
Notifications
You must be signed in to change notification settings - Fork 44
Mithril Client verify multi signature #166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,27 +8,52 @@ use std::fs; | |
use std::io::{self, Write}; | ||
use std::path; | ||
use tar::Archive; | ||
use thiserror::Error; | ||
|
||
use crate::entities::*; | ||
|
||
#[cfg(test)] | ||
use mockall::automock; | ||
|
||
#[derive(Error, Debug)] | ||
pub enum AggregatorHandlerError { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👌 |
||
#[error("remote server technical error: '{0}'")] | ||
RemoteServerTechnical(String), | ||
#[error("remote server logical error: '{0}'")] | ||
RemoteServerLogical(String), | ||
#[error("remote server unreachable: '{0}'")] | ||
RemoteServerUnreachable(String), | ||
#[error("json parsing failed: '{0}'")] | ||
JsonParseFailed(String), | ||
#[error("io error:")] | ||
IOError(#[from] io::Error), | ||
} | ||
|
||
/// AggregatorHandler represents a read interactor with an aggregator | ||
#[cfg_attr(test, automock)] | ||
#[async_trait] | ||
pub trait AggregatorHandler { | ||
/// List snapshots | ||
async fn list_snapshots(&self) -> Result<Vec<Snapshot>, String>; | ||
async fn list_snapshots(&self) -> Result<Vec<Snapshot>, AggregatorHandlerError>; | ||
|
||
/// Get snapshot details | ||
async fn get_snapshot_details(&self, digest: &str) -> Result<Snapshot, String>; | ||
async fn get_snapshot_details(&self, digest: &str) -> Result<Snapshot, AggregatorHandlerError>; | ||
|
||
/// Download snapshot | ||
async fn download_snapshot(&self, digest: &str, location: &str) -> Result<String, String>; | ||
async fn download_snapshot( | ||
&self, | ||
digest: &str, | ||
location: &str, | ||
) -> Result<String, AggregatorHandlerError>; | ||
|
||
/// Unpack snapshot | ||
async fn unpack_snapshot(&self, digest: &str) -> Result<String, String>; | ||
async fn unpack_snapshot(&self, digest: &str) -> Result<String, AggregatorHandlerError>; | ||
|
||
/// Get certificate details | ||
async fn get_certificate_details( | ||
&self, | ||
certificate_hash: &str, | ||
) -> Result<Certificate, AggregatorHandlerError>; | ||
} | ||
|
||
/// AggregatorHTTPClient is a http client for an aggregator | ||
|
@@ -51,64 +76,76 @@ impl AggregatorHTTPClient { | |
#[async_trait] | ||
impl AggregatorHandler for AggregatorHTTPClient { | ||
/// List snapshots | ||
async fn list_snapshots(&self) -> Result<Vec<Snapshot>, String> { | ||
async fn list_snapshots(&self) -> Result<Vec<Snapshot>, AggregatorHandlerError> { | ||
debug!("List snapshots"); | ||
|
||
let url = format!("{}/snapshots", self.aggregator_endpoint); | ||
let response = reqwest::get(url.clone()).await; | ||
match response { | ||
Ok(response) => match response.status() { | ||
StatusCode::OK => match response.json::<Vec<Snapshot>>().await { | ||
Ok(snapshots) => Ok(snapshots), | ||
Err(err) => Err(err.to_string()), | ||
Err(err) => Err(AggregatorHandlerError::JsonParseFailed(err.to_string())), | ||
}, | ||
status_error => Err(format!("error {} received", status_error)), | ||
status_error => Err(AggregatorHandlerError::RemoteServerTechnical( | ||
status_error.to_string(), | ||
)), | ||
}, | ||
Err(err) => Err(err.to_string()), | ||
Err(err) => Err(AggregatorHandlerError::RemoteServerUnreachable( | ||
err.to_string(), | ||
)), | ||
} | ||
} | ||
|
||
/// Get snapshot details | ||
async fn get_snapshot_details(&self, digest: &str) -> Result<Snapshot, String> { | ||
async fn get_snapshot_details(&self, digest: &str) -> Result<Snapshot, AggregatorHandlerError> { | ||
debug!("Details snapshot {}", digest); | ||
|
||
let url = format!("{}/snapshot/{}", self.aggregator_endpoint, digest); | ||
let response = reqwest::get(url.clone()).await; | ||
match response { | ||
Ok(response) => match response.status() { | ||
StatusCode::OK => match response.json::<Snapshot>().await { | ||
Ok(snapshot) => Ok(snapshot), | ||
Err(err) => Err(err.to_string()), | ||
Err(err) => Err(AggregatorHandlerError::JsonParseFailed(err.to_string())), | ||
}, | ||
StatusCode::NOT_FOUND => Err("Snapshot not found".to_string()), | ||
status_error => Err(format!("error {} received", status_error)), | ||
StatusCode::NOT_FOUND => Err(AggregatorHandlerError::RemoteServerLogical( | ||
"snapshot not found".to_string(), | ||
)), | ||
status_error => Err(AggregatorHandlerError::RemoteServerTechnical( | ||
status_error.to_string(), | ||
)), | ||
}, | ||
Err(err) => Err(err.to_string()), | ||
Err(err) => Err(AggregatorHandlerError::RemoteServerUnreachable( | ||
err.to_string(), | ||
)), | ||
} | ||
} | ||
|
||
/// Download Snapshot | ||
async fn download_snapshot(&self, digest: &str, location: &str) -> Result<String, String> { | ||
async fn download_snapshot( | ||
&self, | ||
digest: &str, | ||
location: &str, | ||
) -> Result<String, AggregatorHandlerError> { | ||
debug!("Download snapshot {} from {}", digest, location); | ||
let response = reqwest::get(location).await; | ||
match response { | ||
Ok(response) => match response.status() { | ||
StatusCode::OK => { | ||
let local_path = archive_file_path(digest, &self.network)?; | ||
fs::create_dir_all(&local_path.parent().unwrap()) | ||
.map_err(|e| format!("can't create snapshot dir: {}", e))?; | ||
let mut local_file = fs::File::create(&local_path) | ||
.map_err(|e| format!("can't access snapshot file: {}", e))?; | ||
let bytes_total = response | ||
.content_length() | ||
.ok_or_else(|| "can't get content length".to_string())?; | ||
fs::create_dir_all(&local_path.parent().unwrap())?; | ||
let mut local_file = fs::File::create(&local_path)?; | ||
let bytes_total = response.content_length().ok_or_else(|| { | ||
AggregatorHandlerError::RemoteServerTechnical( | ||
"can't get content length".to_string(), | ||
) | ||
})?; | ||
let mut bytes_downloaded = 0; | ||
let mut remote_stream = response.bytes_stream(); | ||
while let Some(item) = remote_stream.next().await { | ||
let chunk = item.map_err(|e| format!("download failed: {}", e))?; | ||
local_file | ||
.write_all(&chunk) | ||
.map_err(|e| format!("can't write to snapshot file: {}", e))?; | ||
let chunk = item.map_err(|e| { | ||
AggregatorHandlerError::RemoteServerTechnical(e.to_string()) | ||
})?; | ||
local_file.write_all(&chunk)?; | ||
bytes_downloaded += chunk.len() as u64; | ||
print!( | ||
"Downloaded {}% - {}/{} Bytes\r", | ||
|
@@ -120,38 +157,69 @@ impl AggregatorHandler for AggregatorHTTPClient { | |
} | ||
Ok(local_path.into_os_string().into_string().unwrap()) | ||
} | ||
StatusCode::NOT_FOUND => Err("snapshot archive not found".to_string()), | ||
status_error => Err(format!("error {} received", status_error)), | ||
StatusCode::NOT_FOUND => Err(AggregatorHandlerError::RemoteServerLogical( | ||
"snapshot archive not found".to_string(), | ||
)), | ||
status_error => Err(AggregatorHandlerError::RemoteServerTechnical( | ||
status_error.to_string(), | ||
)), | ||
}, | ||
Err(err) => Err(err.to_string()), | ||
Err(err) => Err(AggregatorHandlerError::RemoteServerUnreachable( | ||
err.to_string(), | ||
)), | ||
} | ||
} | ||
|
||
/// Unpack snapshot | ||
async fn unpack_snapshot(&self, digest: &str) -> Result<String, String> { | ||
async fn unpack_snapshot(&self, digest: &str) -> Result<String, AggregatorHandlerError> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to beg for a general utility to handle There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Absolutely, we definitely need to work shortly on a |
||
debug!("Unpack snapshot {}", digest); | ||
println!("Unpacking snapshot..."); | ||
let local_path = archive_file_path(digest, &self.network)?; | ||
let snapshot_file_tar_gz = fs::File::open(local_path.clone()) | ||
.map_err(|e| format!("can't open snapshot file: {}", e))?; | ||
let snapshot_file_tar_gz = fs::File::open(local_path.clone())?; | ||
let snapshot_file_tar = GzDecoder::new(snapshot_file_tar_gz); | ||
let unpack_dir_path = local_path.parent().unwrap().join(path::Path::new("db")); | ||
let mut snapshot_archive = Archive::new(snapshot_file_tar); | ||
snapshot_archive | ||
.unpack(&unpack_dir_path) | ||
.map_err(|e| format!("can't unpack snapshot archive: {}", e))?; | ||
snapshot_archive.unpack(&unpack_dir_path)?; | ||
Ok(unpack_dir_path.into_os_string().into_string().unwrap()) | ||
} | ||
|
||
/// Get certificate details | ||
async fn get_certificate_details( | ||
&self, | ||
certificate_hash: &str, | ||
) -> Result<Certificate, AggregatorHandlerError> { | ||
debug!("Details certificate {}", certificate_hash); | ||
let url = format!( | ||
"{}/certificate/{}", | ||
self.aggregator_endpoint, certificate_hash | ||
); | ||
let response = reqwest::get(url.clone()).await; | ||
match response { | ||
Ok(response) => match response.status() { | ||
StatusCode::OK => match response.json::<Certificate>().await { | ||
Ok(certificate) => Ok(certificate), | ||
Err(err) => Err(AggregatorHandlerError::JsonParseFailed(err.to_string())), | ||
}, | ||
StatusCode::NOT_FOUND => Err(AggregatorHandlerError::RemoteServerLogical( | ||
"certificate not found".to_string(), | ||
)), | ||
status_error => Err(AggregatorHandlerError::RemoteServerTechnical( | ||
status_error.to_string(), | ||
)), | ||
}, | ||
Err(err) => Err(AggregatorHandlerError::RemoteServerUnreachable( | ||
err.to_string(), | ||
)), | ||
} | ||
} | ||
} | ||
|
||
/// Computes local archive filepath | ||
fn archive_file_path(digest: &str, network: &str) -> Result<path::PathBuf, String> { | ||
Ok(env::current_dir() | ||
.map_err(|e| format!("current dir not available: {}", e))? | ||
.join(path::Path::new(&format!( | ||
"data/{}/{}/snapshot.archive.tar.gz", | ||
network, digest | ||
)))) | ||
fn archive_file_path(digest: &str, network: &str) -> Result<path::PathBuf, AggregatorHandlerError> { | ||
Ok(env::current_dir()?.join(path::Path::new(&format!( | ||
"data/{}/{}/snapshot.archive.tar.gz", | ||
network, digest | ||
)))) | ||
} | ||
|
||
#[cfg(test)] | ||
|
@@ -368,4 +436,66 @@ mod tests { | |
let local_dir_path = aggregator_client.unpack_snapshot(digest).await; | ||
assert!(local_dir_path.is_err()); | ||
} | ||
|
||
#[tokio::test] | ||
async fn get_certificate_details_ok() { | ||
let certificate_hash = "certificate-hash-123"; | ||
let (server, config) = setup_test(); | ||
let certificate_expected = fake_data::certificate(certificate_hash.to_string()); | ||
let _certificate_mock = server.mock(|when, then| { | ||
when.path(format!("/certificate/{}", certificate_hash)); | ||
then.status(200) | ||
.body(json!(certificate_expected).to_string()); | ||
}); | ||
let aggregator_client = | ||
AggregatorHTTPClient::new(config.network, config.aggregator_endpoint); | ||
let certificate = aggregator_client | ||
.get_certificate_details(certificate_hash) | ||
.await; | ||
certificate.as_ref().expect("unexpected error"); | ||
assert_eq!(certificate.unwrap(), certificate_expected); | ||
} | ||
|
||
#[tokio::test] | ||
async fn get_certificate_details_ko_404() { | ||
let certificate_hash = "certificate-hash-123"; | ||
let (server, config) = setup_test(); | ||
let _certificate_mock = server.mock(|when, then| { | ||
when.path(format!("/certificate/{}", certificate_hash)); | ||
then.status(404); | ||
}); | ||
let aggregator_client = | ||
AggregatorHTTPClient::new(config.network, config.aggregator_endpoint); | ||
let certificate = aggregator_client | ||
.get_certificate_details(certificate_hash) | ||
.await; | ||
assert!(certificate.is_err()); | ||
} | ||
|
||
#[tokio::test] | ||
async fn get_certificate_details_ko_500() { | ||
let certificate_hash = "certificate-hash-123"; | ||
let (server, config) = setup_test(); | ||
let _certificate_mock = server.mock(|when, then| { | ||
when.path(format!("/certificate/{}", certificate_hash)); | ||
then.status(500); | ||
}); | ||
let aggregator_client = | ||
AggregatorHTTPClient::new(config.network, config.aggregator_endpoint); | ||
let certificate = aggregator_client | ||
.get_certificate_details(certificate_hash) | ||
.await; | ||
assert!(certificate.is_err()); | ||
} | ||
|
||
#[tokio::test] | ||
async fn get_certificate_details_ko_unreachable() { | ||
let certificate_hash = "certificate-hash-123"; | ||
let aggregator_client = | ||
AggregatorHTTPClient::new("testnet".to_string(), "http123://unreachable".to_string()); | ||
let certificate = aggregator_client | ||
.get_certificate_details(certificate_hash) | ||
.await; | ||
assert!(certificate.is_err()); | ||
} | ||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the client to be aware of these? Couldn't we reexport types from the mithril-core library?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, reexport sounds like a very good option 🤔