-
Notifications
You must be signed in to change notification settings - Fork 10
fix: blob size validation #562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,9 @@ use log::{debug, warn}; | |
| use num_traits::{ToPrimitive, Zero}; | ||
| use recall_ipld::hamt::{BytesKey, MapKey}; | ||
|
|
||
| type BlobSourcesResult = | ||
| anyhow::Result<Vec<(Hash, u64, HashSet<(Address, SubscriptionId, PublicKey)>)>, ActorError>; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. at some point we should make these giant types into structs
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| mod accounts; | ||
| mod blobs; | ||
| mod expiries; | ||
|
|
@@ -1038,30 +1041,40 @@ impl State { | |
| } | ||
|
|
||
| #[allow(clippy::type_complexity)] | ||
| pub fn get_added_blobs<BS: Blockstore>( | ||
| &self, | ||
| store: &BS, | ||
| size: u32, | ||
| ) -> anyhow::Result<Vec<(Hash, HashSet<(Address, SubscriptionId, PublicKey)>)>, ActorError> | ||
| { | ||
| self.added.take_page(store, size) | ||
| pub fn get_added_blobs<BS: Blockstore>(&self, store: &BS, size: u32) -> BlobSourcesResult { | ||
| let blobs = self.blobs.hamt(store)?; | ||
| self.added | ||
| .take_page(store, size)? | ||
| .into_iter() | ||
| .map(|(hash, sources)| { | ||
| let blob = blobs | ||
| .get(&hash)? | ||
| .ok_or_else(|| ActorError::not_found(format!("blob {} not found", hash)))?; | ||
| Ok((hash, blob.size, sources)) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gets the size of the newly added blobs and return it to Fendermint |
||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| #[allow(clippy::type_complexity)] | ||
| pub fn get_pending_blobs<BS: Blockstore>( | ||
| &self, | ||
| store: &BS, | ||
| size: u32, | ||
| ) -> anyhow::Result<Vec<(Hash, HashSet<(Address, SubscriptionId, PublicKey)>)>, ActorError> | ||
| { | ||
| self.pending.take_page(store, size) | ||
| pub fn get_pending_blobs<BS: Blockstore>(&self, store: &BS, size: u32) -> BlobSourcesResult { | ||
| let blobs = self.blobs.hamt(store)?; | ||
| self.pending | ||
| .take_page(store, size)? | ||
| .into_iter() | ||
| .map(|(hash, sources)| { | ||
| let blob = blobs | ||
| .get(&hash)? | ||
| .ok_or_else(|| ActorError::not_found(format!("blob {} not found", hash)))?; | ||
| Ok((hash, blob.size, sources)) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returns the size of pending blobs to Fendermint. |
||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| pub fn set_blob_pending<BS: Blockstore>( | ||
| &mut self, | ||
| store: &BS, | ||
| subscriber: Address, | ||
| hash: Hash, | ||
| size: u64, | ||
| id: SubscriptionId, | ||
| source: PublicKey, | ||
| ) -> anyhow::Result<(), ActorError> { | ||
|
|
@@ -1072,6 +1085,13 @@ impl State { | |
| // The blob may have been deleted before it was set to pending | ||
| return Ok(()); | ||
| }; | ||
| // check if the blob's size matches the size provided when it was added | ||
| if blob.size != size { | ||
| return Err(ActorError::assertion_failed(format!( | ||
| "blob {} size mismatch (expected: {}; actual: {})", | ||
| hash, size, blob.size | ||
| ))); | ||
| } | ||
| blob.status = BlobStatus::Pending; | ||
| // Add the source to the pending queue | ||
| self.pending | ||
|
|
@@ -2267,7 +2287,7 @@ mod tests { | |
| assert_eq!(stats.bytes_added, size); | ||
|
|
||
| // Set to status pending | ||
| let res = state.set_blob_pending(&store, subscriber, hash, id1.clone(), source); | ||
| let res = state.set_blob_pending(&store, subscriber, hash, size, id1.clone(), source); | ||
| assert!(res.is_ok()); | ||
| let stats = state.get_stats(config, TokenAmount::zero()); | ||
| assert_eq!(stats.num_blobs, 1); | ||
|
|
@@ -2831,7 +2851,7 @@ mod tests { | |
| assert_eq!(account.capacity_used, size); | ||
|
|
||
| // Set to status pending | ||
| let res = state.set_blob_pending(&store, subscriber, hash, id1.clone(), source); | ||
| let res = state.set_blob_pending(&store, subscriber, hash, size, id1.clone(), source); | ||
| assert!(res.is_ok()); | ||
|
|
||
| // Check stats | ||
|
|
@@ -3206,8 +3226,14 @@ mod tests { | |
| assert!(res.is_ok()); | ||
|
|
||
| // Set to status pending | ||
| let res = | ||
| state.set_blob_pending(&store, subscriber, hash, SubscriptionId::default(), source); | ||
| let res = state.set_blob_pending( | ||
| &store, | ||
| subscriber, | ||
| hash, | ||
| size, | ||
| SubscriptionId::default(), | ||
| source, | ||
| ); | ||
| assert!(res.is_ok()); | ||
|
|
||
| // Finalize as resolved | ||
|
|
@@ -3271,8 +3297,14 @@ mod tests { | |
| assert!(res.is_ok()); | ||
|
|
||
| // Set to status pending | ||
| let res = | ||
| state.set_blob_pending(&store, subscriber, hash, SubscriptionId::default(), source); | ||
| let res = state.set_blob_pending( | ||
| &store, | ||
| subscriber, | ||
| hash, | ||
| size, | ||
| SubscriptionId::default(), | ||
| source, | ||
| ); | ||
| assert!(res.is_ok()); | ||
|
|
||
| // Finalize as failed | ||
|
|
@@ -3404,8 +3436,14 @@ mod tests { | |
| assert_eq!(state.capacity_used, account.capacity_used); | ||
|
|
||
| // Set to status pending | ||
| let res = | ||
| state.set_blob_pending(&store, subscriber, hash, SubscriptionId::default(), source); | ||
| let res = state.set_blob_pending( | ||
| &store, | ||
| subscriber, | ||
| hash, | ||
| size, | ||
| SubscriptionId::default(), | ||
| source, | ||
| ); | ||
| assert!(res.is_ok()); | ||
|
|
||
| // Finalize as failed | ||
|
|
@@ -3554,6 +3592,7 @@ mod tests { | |
| &store, | ||
| subscriber, | ||
| hash1, | ||
| size1, | ||
| SubscriptionId::default(), | ||
| source1, | ||
| ); | ||
|
|
@@ -4028,7 +4067,7 @@ mod tests { | |
| ) | ||
| .unwrap(); | ||
| state | ||
| .set_blob_pending(&store, addr, hash, id.clone(), source) | ||
| .set_blob_pending(&store, addr, hash, size, id.clone(), source) | ||
| .unwrap(); | ||
| state | ||
| .finalize_blob( | ||
|
|
@@ -4210,7 +4249,7 @@ mod tests { | |
| ) | ||
| .unwrap(); | ||
| state | ||
| .set_blob_pending(&store, addr, hash, id.clone(), source) | ||
| .set_blob_pending(&store, addr, hash, size, id.clone(), source) | ||
| .unwrap(); | ||
| state | ||
| .finalize_blob( | ||
|
|
@@ -4364,7 +4403,7 @@ mod tests { | |
| ) | ||
| .unwrap(); | ||
| state | ||
| .set_blob_pending(&store, account1, hash, id.clone(), source) | ||
| .set_blob_pending(&store, account1, hash, size, id.clone(), source) | ||
| .unwrap(); | ||
| state | ||
| .finalize_blob( | ||
|
|
@@ -4400,7 +4439,7 @@ mod tests { | |
| ) | ||
| .unwrap(); | ||
| state | ||
| .set_blob_pending(&store, account2, hash, id.clone(), source) | ||
| .set_blob_pending(&store, account2, hash, size, id.clone(), source) | ||
| .unwrap(); | ||
| state | ||
| .finalize_blob( | ||
|
|
@@ -4636,7 +4675,14 @@ mod tests { | |
| // Simulate the chain putting this blob into pending state, which is | ||
| // required before finalization. | ||
| state | ||
| .set_blob_pending(&store, *user, blob.hash, sub_id.clone(), *source) | ||
| .set_blob_pending( | ||
| &store, | ||
| *user, | ||
| blob.hash, | ||
| blob.size, | ||
| sub_id.clone(), | ||
| *source, | ||
| ) | ||
| .unwrap(); | ||
| state | ||
| .finalize_blob( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,7 +72,7 @@ pub type TopDownFinalityProvider = Arc<Toggle<CachedFinalityProvider<IPCProvider | |
| pub type BlobPool = IrohResolvePool<BlobPoolItem>; | ||
| pub type ReadRequestPool = IrohResolvePool<ReadRequestPoolItem>; | ||
|
|
||
| type AddedBlobItem = (Hash, HashSet<(Address, SubscriptionId, PublicKey)>); | ||
| type AddedBlobItem = (Hash, u64, HashSet<(Address, SubscriptionId, PublicKey)>); | ||
| type OpenReadRequestItem = (Hash, Hash, u32, u32, Address, MethodNum); | ||
|
|
||
| /// These are the extra state items that the chain interpreter needs, | ||
|
|
@@ -121,6 +121,7 @@ impl From<&CheckpointPoolItem> for ResolveKey { | |
| pub struct BlobPoolItem { | ||
| subscriber: Address, | ||
| hash: Hash, | ||
| size: u64, | ||
| id: SubscriptionId, | ||
| source: NodeId, | ||
| } | ||
|
|
@@ -135,6 +136,7 @@ impl From<&BlobPoolItem> for IrohTaskType { | |
| fn from(value: &BlobPoolItem) -> Self { | ||
| Self::ResolveBlob { | ||
| source: IrohResolveSource { id: value.source }, | ||
| size: value.size, | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -302,11 +304,12 @@ where | |
| })?; | ||
|
|
||
| // Create IPC messages to add blobs to the pool | ||
| for (hash, sources) in added_blobs { | ||
| for (hash, size, sources) in added_blobs { | ||
| for (subscriber, id, source) in sources { | ||
| msgs.push(ChainMessage::Ipc(IpcMessage::BlobPending(PendingBlob { | ||
| subscriber, | ||
| hash, | ||
| size, | ||
| id: id.clone(), | ||
| source, | ||
| }))); | ||
|
|
@@ -348,11 +351,12 @@ where | |
| }) | ||
| .await; | ||
| if is_globally_finalized { | ||
| tracing::debug!(hash = ?item.hash, "blob has quorum; adding tx to chain"); | ||
| tracing::debug!(hash = ?item.hash, size = item.size, "blob has quorum; adding tx to chain"); | ||
| blobs.push(ChainMessage::Ipc(IpcMessage::BlobFinalized( | ||
| FinalizedBlob { | ||
| subscriber: item.subscriber, | ||
| hash: item.hash, | ||
| size: item.size, | ||
| id: item.id.clone(), | ||
| source: item.source, | ||
| succeeded, | ||
|
|
@@ -569,6 +573,7 @@ where | |
| let item = BlobPoolItem { | ||
| subscriber: blob.subscriber, | ||
| hash: blob.hash, | ||
| size: blob.size, | ||
| id: blob.id, | ||
| source: blob.source, | ||
| }; | ||
|
|
@@ -871,6 +876,7 @@ where | |
| source, | ||
| subscriber: blob.subscriber, | ||
| hash, | ||
| size: blob.size, | ||
| id: blob.id.clone(), | ||
| }; | ||
| let params = RawBytes::serialize(params)?; | ||
|
|
@@ -887,6 +893,7 @@ where | |
| env.blob_pool.add(BlobPoolItem { | ||
| subscriber: blob.subscriber, | ||
| hash: blob.hash, | ||
| size: blob.size, | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
| id: blob.id.clone(), | ||
| source: blob.source, | ||
| }) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -128,6 +128,8 @@ pub struct FinalizedBlob { | |
| pub subscriber: Address, | ||
| /// The blake3 hash of the blob. | ||
| pub hash: Hash, | ||
| /// The size of the blob. | ||
| pub size: u64, | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| /// Identifier used to differentiate blob additions for the same subscriber. | ||
| pub id: SubscriptionId, | ||
| /// The node ID of the source node serving validators the blob. | ||
|
|
@@ -143,6 +145,8 @@ pub struct PendingBlob { | |
| pub subscriber: Address, | ||
| /// The blake3 hash of the blob. | ||
| pub hash: Hash, | ||
| /// The size of the blob. | ||
| pub size: u64, | ||
| /// Identifier used to differentiate blob additions for the same subscriber. | ||
| pub id: SubscriptionId, | ||
| /// The node ID of the source node serving validators the blob. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returns the size of the blob as
u64so that it could be added to the resolution pool