Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 1 addition & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ base64 = "0.22.1"
bincode = "2.0.1"
bitvec = "1.0.1"
blockifier = { version = "0.16.0-rc.2", features = ["node_api", "reexecution"] }
bloomfilter = "1.0.16"
bytes = "1.4.0"
cached = "0.44.0"
# This one needs to match the version used by blockifier
Expand Down Expand Up @@ -121,6 +120,7 @@ serde_json = "1.0.142"
serde_with = "3.7.0"
sha2 = "0.10.7"
sha3 = "0.10"
siphasher = "1.0.1"
smallvec = "1.15.1"
# This one needs to match the version used by blockifier
starknet-types-core = "=0.2.4"
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ anyhow = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true, features = ["serde"] }
bitvec = { workspace = true }
bloomfilter = { workspace = true }
cached = { workspace = true }
const_format = { workspace = true }
fake = { workspace = true }
Expand Down Expand Up @@ -44,6 +43,7 @@ serde_json = { workspace = true, features = [
] }
serde_with = { workspace = true }
sha3 = { workspace = true }
siphasher = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
173 changes: 95 additions & 78 deletions crates/storage/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@
//! specific set of keys without having to load and check each individual bloom
//! filter.

use std::sync::{Arc, Mutex};
use std::hash::{Hash, Hasher};
use std::sync::{Arc, LazyLock, Mutex};

use bloomfilter::Bloom;
use bitvec::prelude::*;
use cached::{Cached, SizedCache};
use pathfinder_common::BlockNumber;
use pathfinder_crypto::Felt;
use siphasher::sip::SipHasher13;

/// Maximum number of blocks to aggregate in a single `AggregateBloom`.
pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 =
Expand All @@ -81,7 +83,7 @@ pub const AGGREGATE_BLOOM_BLOCK_RANGE_LEN: u64 =
/// rotated by 90 degrees (transposed).
#[derive(Clone)]
pub struct AggregateBloom {
/// A [AGGREGATE_BLOOM_BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_LEN] matrix
/// A [AGGREGATE_BLOOM_BLOCK_RANGE_LEN] by [BloomFilter::BITVEC_BITS] matrix
/// stored in a single array.
bitmap: Vec<u8>,

Expand All @@ -103,7 +105,7 @@ impl AggregateBloom {
/// \[`from_block`, `from_block + (AGGREGATE_BLOOM_BLOCK_RANGE_LEN) - 1`\]
pub fn new(from_block: BlockNumber) -> Self {
let to_block = from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1;
let bitmap = vec![0; Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN];
let bitmap = vec![0; Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_BITS];
Self::from_parts(from_block, to_block, bitmap)
}

Expand All @@ -115,7 +117,7 @@ impl AggregateBloom {
) -> Self {
let bitmap = zstd::bulk::decompress(
&compressed_bitmap,
Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN,
Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_BITS,
)
.expect("Decompressing aggregate Bloom filter");

Expand All @@ -126,7 +128,7 @@ impl AggregateBloom {
assert_eq!(from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN - 1, to_block);
assert_eq!(
bitmap.len(),
Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_LEN
Self::BLOCK_RANGE_BYTES * BloomFilter::BITVEC_BITS
);

Self {
Expand All @@ -149,18 +151,16 @@ impl AggregateBloom {
///
/// Panics if the block number is not in the range of blocks that this
/// aggregate covers.
pub fn insert(&mut self, bloom: &BloomFilter, block_number: BlockNumber) {
pub fn insert(&mut self, bloom: BloomFilter, block_number: BlockNumber) {
assert!(
(self.from_block..=self.to_block).contains(&block_number),
"Block number {} is not in the range {}..={}",
block_number,
self.from_block,
self.to_block
);
assert_eq!(bloom.0.number_of_hash_functions(), BloomFilter::K_NUM);

let bloom_bytes = bloom.0.bit_vec().to_bytes();
assert_eq!(bloom_bytes.len(), BloomFilter::BITVEC_BYTES);
let bloom_bytes = bloom.into_bytes();

let relative_block_number = usize::try_from(block_number.get() - self.from_block.get())
.expect("usize can fit a u64");
Expand Down Expand Up @@ -420,84 +420,101 @@ impl AggregateBloomCache {
}
}

// The seed used by the hash functions of the filter.
const SEED: [u8; 32] = [
0xef, 0x51, 0x88, 0x74, 0xef, 0x08, 0x3d, 0xf6, 0x7d, 0x7a, 0x93, 0xb7, 0xb3, 0x13, 0x1f, 0x87,
0xd3, 0x26, 0xbd, 0x49, 0xc7, 0x18, 0xcc, 0xe5, 0xd7, 0xe8, 0xa0, 0xdb, 0xea, 0x80, 0x67, 0x52,
];

// Base hash functions used by the Bloom filter.
//
// Computed once from the seed and reused for all Bloom filters.
static SIPS: LazyLock<[siphasher::sip::SipHasher13; 2]> = LazyLock::new(|| {
let k1 = u64::from_le_bytes(SEED[0..8].try_into().unwrap());
let k2 = u64::from_le_bytes(SEED[8..16].try_into().unwrap());
let k3 = u64::from_le_bytes(SEED[16..24].try_into().unwrap());
let k4 = u64::from_le_bytes(SEED[24..32].try_into().unwrap());
[
SipHasher13::new_with_keys(k1, k2),
SipHasher13::new_with_keys(k3, k4),
]
});

/// The number of hash functions used by the Bloom filter.
const K_NUM: usize = 12;

/// A Bloom filter implementation for StarkNet events.
///
/// Based on the Bloom filter implementation from the [`bloomfilter`](https://crates.io/crates/bloomfilter/1.0.15) crate.
#[derive(Clone)]
pub(crate) struct BloomFilter(Bloom<Felt>);
pub(crate) struct BloomFilter {
bit_vec: bitvec::vec::BitVec<u8, bitvec::order::Msb0>,
}

impl BloomFilter {
// The size of the bitmap used by the Bloom filter.
const BITVEC_LEN: usize = 16_384;
// The size of the bitmap used by the Bloom filter (in bytes).
const BITVEC_BYTES: usize = Self::BITVEC_LEN / 8;
// The number of hash functions used by the Bloom filter.
// We need this value to be able to re-create the filter with the deserialized
// bitmap.
const K_NUM: u32 = 12;
// The maximal number of items anticipated to be inserted into the Bloom filter.
const ITEMS_COUNT: usize = 1024;
// The seed used by the hash functions of the filter.
// This is a randomly generated vector of 32 bytes.
const SEED: [u8; 32] = [
0xef, 0x51, 0x88, 0x74, 0xef, 0x08, 0x3d, 0xf6, 0x7d, 0x7a, 0x93, 0xb7, 0xb3, 0x13, 0x1f,
0x87, 0xd3, 0x26, 0xbd, 0x49, 0xc7, 0x18, 0xcc, 0xe5, 0xd7, 0xe8, 0xa0, 0xdb, 0xea, 0x80,
0x67, 0x52,
];
/// The size of the bitmap used by the Bloom filter.
const BITVEC_BITS: usize = 16_384;
/// The size of the bitmap used by the Bloom filter (in bytes).
const BITVEC_BYTES: usize = Self::BITVEC_BITS / 8;

/// Crate a new empty bloom filter.
pub fn new() -> Self {
let bloom = Bloom::new_with_seed(Self::BITVEC_BYTES, Self::ITEMS_COUNT, &Self::SEED);
assert_eq!(bloom.number_of_hash_functions(), Self::K_NUM);

Self(bloom)
let bit_vec = bitvec::bitvec![u8, bitvec::order::Msb0; 0; Self::BITVEC_BITS];
Self { bit_vec }
}

/// Create a bloom filter from a compressed byte array of the bitmap.
pub fn from_compressed_bytes(bytes: &[u8]) -> Self {
let bytes = zstd::bulk::decompress(bytes, Self::BITVEC_BYTES * 2)
.expect("Decompressing Bloom filter");
Self::from_bytes(&bytes)
}
let bit_vec = BitVec::from_vec(bytes);

fn from_bytes(bytes: &[u8]) -> Self {
let k1 = u64::from_le_bytes(Self::SEED[0..8].try_into().unwrap());
let k2 = u64::from_le_bytes(Self::SEED[8..16].try_into().unwrap());
let k3 = u64::from_le_bytes(Self::SEED[16..24].try_into().unwrap());
let k4 = u64::from_le_bytes(Self::SEED[24..32].try_into().unwrap());
let bloom = Bloom::from_existing(
bytes,
Self::BITVEC_BYTES as u64 * 8,
Self::K_NUM,
[(k1, k2), (k3, k4)],
);
Self(bloom)
Self { bit_vec }
}

pub fn to_compressed_bytes(&self) -> Vec<u8> {
let bytes = self.to_bytes();
/// Convert the bloom filter to a compressed byte array.
pub fn into_compressed_bytes(self) -> Vec<u8> {
let bytes = self.into_bytes();
zstd::bulk::compress(&bytes, 0).expect("Compressing Bloom filter")
}

fn to_bytes(&self) -> Vec<u8> {
self.0.bitmap()
/// Convert the bloom filter to a byte array.
fn into_bytes(self) -> Vec<u8> {
self.bit_vec.into_vec()
}

pub fn set(&mut self, key: &Felt) {
self.0.set(key);
/// Record the presence of an item.
pub fn set(&mut self, item: &Felt) {
let mut hashes = [0u64, 0u64];
for k_i in 0..K_NUM {
let bit_offset = Self::bloom_hash(&mut hashes, item, k_i) as usize % Self::BITVEC_BITS;
self.bit_vec.set(bit_offset, true);
}
}

// Workaround to get the indices of the keys in the filter.
// Needed because the `bloomfilter` crate doesn't provide a
// way to get this information.
fn indices_for_key(key: &Felt) -> Vec<usize> {
// Use key on an empty Bloom filter
let mut bloom = Self::new();
bloom.set(key);

bloom
.0
.bit_vec()
.iter()
.enumerate()
.filter(|(_, bit)| *bit)
.map(|(i, _)| i)
.collect()
/// Compute the bit indices for the given key.
fn indices_for_key(key: &Felt) -> [usize; K_NUM] {
let mut indices = [0usize; K_NUM];
let mut hashes = [0u64, 0u64];
let iter = (0..K_NUM)
.map(|k_i| Self::bloom_hash(&mut hashes, key, k_i) as usize % Self::BITVEC_BITS);
for (i, idx) in iter.enumerate() {
indices[i] = idx;
}
indices
}

fn bloom_hash(hashes: &mut [u64; 2], item: &Felt, k_i: usize) -> u64 {
if k_i < 2 {
let sip = &mut SIPS[k_i].clone();
item.hash(sip);
let hash = sip.finish();
hashes[k_i] = hash;
hash
} else {
(hashes[0]).wrapping_add((k_i as u64).wrapping_mul(hashes[1]))
% 0xFFFF_FFFF_FFFF_FFC5u64 //largest u64 prime
}
}
}

Expand Down Expand Up @@ -535,7 +552,7 @@ mod tests {
bloom.set(&KEY);
bloom.set(&KEY1);

aggregate_bloom_filter.insert(&bloom, from_block);
aggregate_bloom_filter.insert(bloom, from_block);

let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]);
let expected = blockrange![from_block];
Expand All @@ -550,8 +567,8 @@ mod tests {
let mut bloom = BloomFilter::new();
bloom.set(&KEY);

aggregate_bloom_filter.insert(&bloom, from_block);
aggregate_bloom_filter.insert(&bloom, from_block + 1);
aggregate_bloom_filter.insert(bloom.clone(), from_block);
aggregate_bloom_filter.insert(bloom, from_block + 1);

let block_matches = aggregate_bloom_filter.blocks_for_keys(&[KEY]);
let expected = blockrange![from_block, from_block + 1];
Expand All @@ -567,8 +584,8 @@ mod tests {
bloom.set(&KEY);
bloom.set(&KEY1);

aggregate_bloom_filter.insert(&bloom, from_block);
aggregate_bloom_filter.insert(&bloom, from_block + 1);
aggregate_bloom_filter.insert(bloom.clone(), from_block);
aggregate_bloom_filter.insert(bloom, from_block + 1);

let block_matches_empty = aggregate_bloom_filter.blocks_for_keys(&[KEY_NOT_IN_FILTER]);
assert_eq!(block_matches_empty, BlockRange::EMPTY);
Expand All @@ -582,16 +599,16 @@ mod tests {
let mut bloom = BloomFilter::new();
bloom.set(&KEY);

aggregate_bloom_filter.insert(&bloom, from_block);
aggregate_bloom_filter.insert(&bloom, from_block + 1);
aggregate_bloom_filter.insert(bloom.clone(), from_block);
aggregate_bloom_filter.insert(bloom.clone(), from_block + 1);

let compressed_bitmap = aggregate_bloom_filter.compress_bitmap();
let mut decompressed = AggregateBloom::from_existing_compressed(
aggregate_bloom_filter.from_block,
aggregate_bloom_filter.to_block,
compressed_bitmap,
);
decompressed.insert(&bloom, from_block + 2);
decompressed.insert(bloom, from_block + 2);

let block_matches = decompressed.blocks_for_keys(&[KEY]);
let expected = blockrange![from_block, from_block + 1, from_block + 2];
Expand All @@ -610,10 +627,10 @@ mod tests {
let mut bloom = BloomFilter::new();
bloom.set(&KEY);

aggregate_bloom_filter.insert(&bloom, from_block);
aggregate_bloom_filter.insert(bloom.clone(), from_block);

let invalid_insert_pos = from_block + AGGREGATE_BLOOM_BLOCK_RANGE_LEN;
aggregate_bloom_filter.insert(&bloom, invalid_insert_pos);
aggregate_bloom_filter.insert(bloom, invalid_insert_pos);
}
}

Expand Down
Loading
Loading