Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions fendermint/actors/blobs/shared/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ pub struct SetBlobPendingParams {
pub subscriber: Address,
/// Blob blake3 hash.
pub hash: Hash,
/// Blob size.
pub size: u64,
/// Identifier used to differentiate blob additions for the same subscriber.
pub id: SubscriptionId,
}
Expand Down
3 changes: 2 additions & 1 deletion fendermint/actors/blobs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct BlobsActor;

/// The return type used when fetching "added" or "pending" blobs.
/// See `get_added_blobs` and `get_pending_blobs` for more information.
type BlobRequest = (Hash, HashSet<(Address, SubscriptionId, PublicKey)>);
type BlobRequest = (Hash, u64, HashSet<(Address, SubscriptionId, PublicKey)>);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns the size of the blob as u64 so that it could be added to the resolution pool


impl BlobsActor {
/// Creates a new `[BlobsActor]` state.
Expand Down Expand Up @@ -532,6 +532,7 @@ impl BlobsActor {
rt.store(),
subscriber_id_addr,
params.hash,
params.size,
params.id,
params.source,
)
Expand Down
102 changes: 74 additions & 28 deletions fendermint/actors/blobs/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use log::{debug, warn};
use num_traits::{ToPrimitive, Zero};
use recall_ipld::hamt::{BytesKey, MapKey};

type BlobSourcesResult =
anyhow::Result<Vec<(Hash, u64, HashSet<(Address, SubscriptionId, PublicKey)>)>, ActorError>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at some point we should make these giant types into structs

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


mod accounts;
mod blobs;
mod expiries;
Expand Down Expand Up @@ -1038,30 +1041,40 @@ impl State {
}

#[allow(clippy::type_complexity)]
pub fn get_added_blobs<BS: Blockstore>(
&self,
store: &BS,
size: u32,
) -> anyhow::Result<Vec<(Hash, HashSet<(Address, SubscriptionId, PublicKey)>)>, ActorError>
{
self.added.take_page(store, size)
pub fn get_added_blobs<BS: Blockstore>(&self, store: &BS, size: u32) -> BlobSourcesResult {
let blobs = self.blobs.hamt(store)?;
self.added
.take_page(store, size)?
.into_iter()
.map(|(hash, sources)| {
let blob = blobs
.get(&hash)?
.ok_or_else(|| ActorError::not_found(format!("blob {} not found", hash)))?;
Ok((hash, blob.size, sources))
Copy link
Collaborator Author

@avichalp avichalp Mar 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gets the size of the newly added blobs and return it to Fendermint

})
.collect()
}

#[allow(clippy::type_complexity)]
pub fn get_pending_blobs<BS: Blockstore>(
&self,
store: &BS,
size: u32,
) -> anyhow::Result<Vec<(Hash, HashSet<(Address, SubscriptionId, PublicKey)>)>, ActorError>
{
self.pending.take_page(store, size)
pub fn get_pending_blobs<BS: Blockstore>(&self, store: &BS, size: u32) -> BlobSourcesResult {
let blobs = self.blobs.hamt(store)?;
self.pending
.take_page(store, size)?
.into_iter()
.map(|(hash, sources)| {
let blob = blobs
.get(&hash)?
.ok_or_else(|| ActorError::not_found(format!("blob {} not found", hash)))?;
Ok((hash, blob.size, sources))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns the size of pending blobs to Fendermint.

})
.collect()
}

pub fn set_blob_pending<BS: Blockstore>(
&mut self,
store: &BS,
subscriber: Address,
hash: Hash,
size: u64,
id: SubscriptionId,
source: PublicKey,
) -> anyhow::Result<(), ActorError> {
Expand All @@ -1072,6 +1085,13 @@ impl State {
// The blob may have been deleted before it was set to pending
return Ok(());
};
// check if the blob's size matches the size provided when it was added
if blob.size != size {
return Err(ActorError::assertion_failed(format!(
"blob {} size mismatch (expected: {}; actual: {})",
hash, size, blob.size
)));
}
blob.status = BlobStatus::Pending;
// Add the source to the pending queue
self.pending
Expand Down Expand Up @@ -2267,7 +2287,7 @@ mod tests {
assert_eq!(stats.bytes_added, size);

// Set to status pending
let res = state.set_blob_pending(&store, subscriber, hash, id1.clone(), source);
let res = state.set_blob_pending(&store, subscriber, hash, size, id1.clone(), source);
assert!(res.is_ok());
let stats = state.get_stats(config, TokenAmount::zero());
assert_eq!(stats.num_blobs, 1);
Expand Down Expand Up @@ -2831,7 +2851,7 @@ mod tests {
assert_eq!(account.capacity_used, size);

// Set to status pending
let res = state.set_blob_pending(&store, subscriber, hash, id1.clone(), source);
let res = state.set_blob_pending(&store, subscriber, hash, size, id1.clone(), source);
assert!(res.is_ok());

// Check stats
Expand Down Expand Up @@ -3206,8 +3226,14 @@ mod tests {
assert!(res.is_ok());

// Set to status pending
let res =
state.set_blob_pending(&store, subscriber, hash, SubscriptionId::default(), source);
let res = state.set_blob_pending(
&store,
subscriber,
hash,
size,
SubscriptionId::default(),
source,
);
assert!(res.is_ok());

// Finalize as resolved
Expand Down Expand Up @@ -3271,8 +3297,14 @@ mod tests {
assert!(res.is_ok());

// Set to status pending
let res =
state.set_blob_pending(&store, subscriber, hash, SubscriptionId::default(), source);
let res = state.set_blob_pending(
&store,
subscriber,
hash,
size,
SubscriptionId::default(),
source,
);
assert!(res.is_ok());

// Finalize as failed
Expand Down Expand Up @@ -3404,8 +3436,14 @@ mod tests {
assert_eq!(state.capacity_used, account.capacity_used);

// Set to status pending
let res =
state.set_blob_pending(&store, subscriber, hash, SubscriptionId::default(), source);
let res = state.set_blob_pending(
&store,
subscriber,
hash,
size,
SubscriptionId::default(),
source,
);
assert!(res.is_ok());

// Finalize as failed
Expand Down Expand Up @@ -3554,6 +3592,7 @@ mod tests {
&store,
subscriber,
hash1,
size1,
SubscriptionId::default(),
source1,
);
Expand Down Expand Up @@ -4028,7 +4067,7 @@ mod tests {
)
.unwrap();
state
.set_blob_pending(&store, addr, hash, id.clone(), source)
.set_blob_pending(&store, addr, hash, size, id.clone(), source)
.unwrap();
state
.finalize_blob(
Expand Down Expand Up @@ -4210,7 +4249,7 @@ mod tests {
)
.unwrap();
state
.set_blob_pending(&store, addr, hash, id.clone(), source)
.set_blob_pending(&store, addr, hash, size, id.clone(), source)
.unwrap();
state
.finalize_blob(
Expand Down Expand Up @@ -4364,7 +4403,7 @@ mod tests {
)
.unwrap();
state
.set_blob_pending(&store, account1, hash, id.clone(), source)
.set_blob_pending(&store, account1, hash, size, id.clone(), source)
.unwrap();
state
.finalize_blob(
Expand Down Expand Up @@ -4400,7 +4439,7 @@ mod tests {
)
.unwrap();
state
.set_blob_pending(&store, account2, hash, id.clone(), source)
.set_blob_pending(&store, account2, hash, size, id.clone(), source)
.unwrap();
state
.finalize_blob(
Expand Down Expand Up @@ -4636,7 +4675,14 @@ mod tests {
// Simulate the chain putting this blob into pending state, which is
// required before finalization.
state
.set_blob_pending(&store, *user, blob.hash, sub_id.clone(), *source)
.set_blob_pending(
&store,
*user,
blob.hash,
blob.size,
sub_id.clone(),
*source,
)
.unwrap();
state
.finalize_blob(
Expand Down
13 changes: 10 additions & 3 deletions fendermint/vm/interpreter/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub type TopDownFinalityProvider = Arc<Toggle<CachedFinalityProvider<IPCProvider
pub type BlobPool = IrohResolvePool<BlobPoolItem>;
pub type ReadRequestPool = IrohResolvePool<ReadRequestPoolItem>;

type AddedBlobItem = (Hash, HashSet<(Address, SubscriptionId, PublicKey)>);
type AddedBlobItem = (Hash, u64, HashSet<(Address, SubscriptionId, PublicKey)>);
type OpenReadRequestItem = (Hash, Hash, u32, u32, Address, MethodNum);

/// These are the extra state items that the chain interpreter needs,
Expand Down Expand Up @@ -121,6 +121,7 @@ impl From<&CheckpointPoolItem> for ResolveKey {
pub struct BlobPoolItem {
subscriber: Address,
hash: Hash,
size: u64,
id: SubscriptionId,
source: NodeId,
}
Expand All @@ -135,6 +136,7 @@ impl From<&BlobPoolItem> for IrohTaskType {
fn from(value: &BlobPoolItem) -> Self {
Self::ResolveBlob {
source: IrohResolveSource { id: value.source },
size: value.size,
}
}
}
Expand Down Expand Up @@ -302,11 +304,12 @@ where
})?;

// Create IPC messages to add blobs to the pool
for (hash, sources) in added_blobs {
for (hash, size, sources) in added_blobs {
for (subscriber, id, source) in sources {
msgs.push(ChainMessage::Ipc(IpcMessage::BlobPending(PendingBlob {
subscriber,
hash,
size,
id: id.clone(),
source,
})));
Expand Down Expand Up @@ -348,11 +351,12 @@ where
})
.await;
if is_globally_finalized {
tracing::debug!(hash = ?item.hash, "blob has quorum; adding tx to chain");
tracing::debug!(hash = ?item.hash, size = item.size, "blob has quorum; adding tx to chain");
blobs.push(ChainMessage::Ipc(IpcMessage::BlobFinalized(
FinalizedBlob {
subscriber: item.subscriber,
hash: item.hash,
size: item.size,
id: item.id.clone(),
source: item.source,
succeeded,
Expand Down Expand Up @@ -569,6 +573,7 @@ where
let item = BlobPoolItem {
subscriber: blob.subscriber,
hash: blob.hash,
size: blob.size,
id: blob.id,
source: blob.source,
};
Expand Down Expand Up @@ -871,6 +876,7 @@ where
source,
subscriber: blob.subscriber,
hash,
size: blob.size,
id: blob.id.clone(),
};
let params = RawBytes::serialize(params)?;
Expand All @@ -887,6 +893,7 @@ where
env.blob_pool.add(BlobPoolItem {
subscriber: blob.subscriber,
hash: blob.hash,
size: blob.size,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added size in the pool item so it can be checked against the download blob.

id: blob.id.clone(),
source: blob.source,
})
Expand Down
7 changes: 5 additions & 2 deletions fendermint/vm/iroh_resolver/src/iroh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ fn start_resolve<V>(
tokio::spawn(async move {
tracing::debug!(hash = ?task.hash(), "starting iroh blob resolve");
match task.task_type() {
TaskType::ResolveBlob { source } => {
match client.resolve_iroh(task.hash(), source.id.into()).await {
TaskType::ResolveBlob { source, size } => {
match client
.resolve_iroh(task.hash(), size, source.id.into())
.await
{
Err(e) => {
tracing::error!(
error = e.to_string(),
Expand Down
9 changes: 8 additions & 1 deletion fendermint/vm/iroh_resolver/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub struct ResolveTask {
pub enum TaskType {
ResolveBlob {
source: ResolveSource,
size: u64,
},
CloseReadRequest {
blob_hash: Hash,
Expand Down Expand Up @@ -264,6 +265,7 @@ mod tests {
struct TestItem {
hash: Hash,
source: NodeId,
size: u64,
}

impl TestItem {
Expand All @@ -273,7 +275,11 @@ mod tests {
rng.fill(&mut data);
let hash = Hash::new(data);
let source = SecretKey::generate().public();
Self { hash, source }
Self {
hash,
source,
size: 256,
}
}
}

Expand All @@ -287,6 +293,7 @@ mod tests {
fn from(value: &TestItem) -> Self {
Self::ResolveBlob {
source: ResolveSource { id: value.source },
size: value.size,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions fendermint/vm/message/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ pub struct FinalizedBlob {
pub subscriber: Address,
/// The blake3 hash of the blob.
pub hash: Hash,
/// The size of the blob.
pub size: u64,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FinalizedBlob needs to have size to construct BlobPoolItem again so it can be removed from the pool.

/// Identifier used to differentiate blob additions for the same subscriber.
pub id: SubscriptionId,
/// The node ID of the source node serving validators the blob.
Expand All @@ -143,6 +145,8 @@ pub struct PendingBlob {
pub subscriber: Address,
/// The blake3 hash of the blob.
pub hash: Hash,
/// The size of the blob.
pub size: u64,
/// Identifier used to differentiate blob additions for the same subscriber.
pub id: SubscriptionId,
/// The node ID of the source node serving validators the blob.
Expand Down
Loading
Loading