Skip to content

Commit

Permalink
Merge branch 'feat/user-upload-file-task-msp' into feat/msp-respond-s…
Browse files Browse the repository at this point in the history
…torage-requests-tasks
  • Loading branch information
snowmead committed Oct 7, 2024
2 parents 1a9050b + ce4c443 commit aa08313
Showing 1 changed file with 86 additions and 88 deletions.
174 changes: 86 additions & 88 deletions node/src/tasks/user_sends_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,81 +51,6 @@ where
storage_hub_handler,
}
}

async fn send_chunks_to_provider(
&mut self,
peer_ids: Vec<PeerId>,
file_metadata: &FileMetadata,
) -> Option<Result<(), anyhow::Error>> {
let file_key = file_metadata.file_key::<HashT<StorageProofsMerkleTrieLayout>>();
let chunk_count = file_metadata.chunks_count();

// Iterates and tries to send file to peer.
// Breaks loop after first successful attempt since all peer ids belong to the same provider.
for peer_id in peer_ids {
debug!(target: LOG_TARGET, "Attempting to send chunks of file key {:?} to peer {:?}", file_key, peer_id);

for chunk_id in 0..chunk_count {
debug!(target: LOG_TARGET, "Trying to send chunk id {:?} of file {:?} to peer {:?}", chunk_id, file_key, peer_id);
let proof = match self
.storage_hub_handler
.file_storage
.read()
.await
.generate_proof(&file_key, &vec![ChunkId::new(chunk_id)])
{
Ok(proof) => proof,
Err(e) => {
return Some(Err(anyhow::anyhow!(
"Failed to generate proof for chunk id {:?} of file {:?}\n Error: {:?}",
chunk_id,
file_key,
e
)));
}
};

let upload_response = self
.storage_hub_handler
.file_transfer
.upload_request(peer_id, file_key.as_ref().into(), proof)
.await;

match upload_response {
Ok(_) => {
debug!(target: LOG_TARGET, "Successfully uploaded chunk id {:?} of file {:?} to peer {:?}", chunk_id, file_metadata.fingerprint, peer_id);
}
Err(e) => {
error!(target: LOG_TARGET, "Failed to upload chunk_id {:?} to peer {:?}\n Error: {:?}", chunk_id, peer_id, e);
// In case of an error, we break the inner loop
// and try to connect to the next peer id.
break;
}
}
}
info!(target: LOG_TARGET, "Successfully sent file {:?} to peer {:?}", file_metadata.fingerprint, peer_id);
return Some(Ok(()));
}
None
}

async fn extract_peer_ids(&mut self, multiaddresses: Vec<Multiaddr>) -> Vec<PeerId> {
let mut peer_ids = Vec::new();
for multiaddress in &multiaddresses {
if let Some(peer_id) = PeerId::try_from_multiaddr(&multiaddress) {
if let Err(error) = self
.storage_hub_handler
.file_transfer
.add_known_address(peer_id, multiaddress.clone())
.await
{
error!(target: LOG_TARGET, "Failed to add known address {:?} for peer {:?} due to {:?}", multiaddress, peer_id, error);
}
peer_ids.push(peer_id);
}
}
peer_ids
}
}

impl<FL, FSH> EventHandler<NewStorageRequest> for UserSendsFileTask<FL, FSH>
Expand Down Expand Up @@ -204,13 +129,7 @@ where
info!(target: LOG_TARGET, "No peers were found to receive file key {:?}", file_key);
}

match self.send_chunks_to_provider(peer_ids, &file_metadata).await {
Some(result) => result,
None => Err(anyhow::anyhow!(
"Failed to send file {:?} to any of the peers",
file_metadata.fingerprint
)),
}
self.send_chunks_to_provider(peer_ids, &file_metadata).await
}
}

Expand Down Expand Up @@ -253,12 +172,91 @@ where
info!(target: LOG_TARGET, "No peers were found to receive file key {:?}", file_key);
}

match self.send_chunks_to_provider(peer_ids, &file_metadata).await {
Some(result) => result,
None => Err(anyhow::anyhow!(
"Failed to send file key {:?} to any of the peers",
file_key
)),
self.send_chunks_to_provider(peer_ids, &file_metadata).await
}
}

impl<FL, FSH> UserSendsFileTask<FL, FSH>
where
FL: FileStorageT,
FSH: ForestStorageHandler + Clone + Send + Sync + 'static,
{
async fn send_chunks_to_provider(
&mut self,
peer_ids: Vec<PeerId>,
file_metadata: &FileMetadata,
) -> Result<(), anyhow::Error> {
let file_key = file_metadata.file_key::<HashT<StorageProofsMerkleTrieLayout>>();
let chunk_count = file_metadata.chunks_count();

// Iterates and tries to send file to peer.
// Breaks loop after first successful attempt since all peer ids belong to the same provider.
for peer_id in peer_ids {

Check warning on line 194 in node/src/tasks/user_sends_file.rs

View workflow job for this annotation

GitHub Actions / Check lint with clippy

this loop never actually loops
debug!(target: LOG_TARGET, "Attempting to send chunks of file key {:?} to peer {:?}", file_key, peer_id);

for chunk_id in 0..chunk_count {
debug!(target: LOG_TARGET, "Trying to send chunk id {:?} of file {:?} to peer {:?}", chunk_id, file_key, peer_id);
let proof = match self
.storage_hub_handler
.file_storage
.read()
.await
.generate_proof(&file_key, &vec![ChunkId::new(chunk_id)])
{
Ok(proof) => proof,
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to generate proof for chunk id {:?} of file {:?}\n Error: {:?}",
chunk_id,
file_key,
e
));
}
};

let upload_response = self
.storage_hub_handler
.file_transfer
.upload_request(peer_id, file_key.as_ref().into(), proof)
.await;

match upload_response {
Ok(_) => {
debug!(target: LOG_TARGET, "Successfully uploaded chunk id {:?} of file {:?} to peer {:?}", chunk_id, file_metadata.fingerprint, peer_id);
}
Err(e) => {
error!(target: LOG_TARGET, "Failed to upload chunk_id {:?} to peer {:?}\n Error: {:?}", chunk_id, peer_id, e);
// In case of an error, we break the inner loop
// and try to connect to the next peer id.
break;
}
}
}
info!(target: LOG_TARGET, "Successfully sent file {:?} to peer {:?}", file_metadata.fingerprint, peer_id);
return Ok(());
}

Err(anyhow::anyhow!(
"Failed to send file {:?} to any of the peers",
file_metadata.fingerprint
))
}

async fn extract_peer_ids(&mut self, multiaddresses: Vec<Multiaddr>) -> Vec<PeerId> {
let mut peer_ids = Vec::new();
for multiaddress in &multiaddresses {
if let Some(peer_id) = PeerId::try_from_multiaddr(&multiaddress) {
if let Err(error) = self
.storage_hub_handler
.file_transfer
.add_known_address(peer_id, multiaddress.clone())
.await
{
error!(target: LOG_TARGET, "Failed to add known address {:?} for peer {:?} due to {:?}", multiaddress, peer_id, error);
}
peer_ids.push(peer_id);
}
}
peer_ids
}
}

0 comments on commit aa08313

Please sign in to comment.