Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Disk Buckets: cleanup api (is_free) #22068

Merged
merged 1 commit into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl<T: Clone + Copy> Bucket<T> {
pub fn keys(&self) -> Vec<Pubkey> {
let mut rv = vec![];
for i in 0..self.index.capacity() {
if self.index.uid(i).is_none() {
if self.index.is_free(i) {
continue;
}
let ix: &IndexEntry = self.index.get(i);
Expand All @@ -121,7 +121,7 @@ impl<T: Clone + Copy> Bucket<T> {
let mut result = Vec::with_capacity(self.index.count.load(Ordering::Relaxed) as usize);
for i in 0..self.index.capacity() {
let ii = i % self.index.capacity();
if self.index.uid(ii).is_none() {
if self.index.is_free(ii) {
continue;
}
let ix: &IndexEntry = self.index.get(ii);
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<T: Clone + Copy> Bucket<T> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.uid(ii).is_none() {
if index.is_free(ii) {
continue;
}
let elem: &mut IndexEntry = index.get_mut(ii);
Expand All @@ -173,7 +173,7 @@ impl<T: Clone + Copy> Bucket<T> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i % index.capacity();
if index.uid(ii).is_none() {
if index.is_free(ii) {
continue;
}
let elem: &IndexEntry = index.get(ii);
Expand All @@ -194,7 +194,7 @@ impl<T: Clone + Copy> Bucket<T> {
let ix = Self::bucket_index_ix(index, key, random);
for i in ix..ix + index.max_search() {
let ii = i as u64 % index.capacity();
if index.uid(ii).is_some() {
if !index.is_free(ii) {
continue;
}
index.allocate(ii, elem_uid, is_resizing).unwrap();
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<T: Clone + Copy> Bucket<T> {
let pos = thread_rng().gen_range(0, cap);
for i in pos..pos + self.index.max_search() {
let ix = i % cap;
if best_bucket.uid(ix).is_none() {
if best_bucket.is_free(ix) {
let elem_loc = elem.data_loc(current_bucket);
let old_slots = elem.num_slots;
elem.set_storage_offset(ix);
Expand Down
56 changes: 56 additions & 0 deletions bucket_map/src/bucket_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct Header {
}

impl Header {
/// try to lock this entry with 'uid'
/// return true if it could be locked
fn try_lock(&mut self, uid: Uid) -> bool {
if self.lock == UID_UNLOCKED {
self.lock = uid;
Expand All @@ -53,17 +55,23 @@ impl Header {
false
}
}
/// mark this entry as unlocked
fn unlock(&mut self, expected: Uid) {
assert_eq!(expected, self.lock);
self.lock = UID_UNLOCKED;
}
/// uid that has locked this entry or None if unlocked
fn uid(&self) -> Option<Uid> {
if self.lock == UID_UNLOCKED {
None
} else {
Some(self.lock)
}
}
/// true if this entry is unlocked
fn is_unlocked(&self) -> bool {
self.lock == UID_UNLOCKED
}
}

pub struct BucketStorage {
Expand Down Expand Up @@ -153,11 +161,19 @@ impl BucketStorage {
}
}

/// return uid allocated at index 'ix' or None if vacant
pub fn uid(&self, ix: u64) -> Option<Uid> {
assert!(ix < self.capacity(), "bad index size");
self.header_ptr(ix).uid()
}

/// true if the entry at index 'ix' is free (as opposed to being allocated)
pub fn is_free(&self, ix: u64) -> bool {
// note that the terminology in the implementation is locked or unlocked.
// but our api is allocate/free
self.header_ptr(ix).is_unlocked()
}

/// caller knows id is not empty
pub fn uid_unchecked(&self, ix: u64) -> Uid {
self.uid(ix).unwrap()
Expand Down Expand Up @@ -366,3 +382,43 @@ impl BucketStorage {
1 << self.capacity_pow2
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_bucket_storage() {
let tmpdir1 = std::env::temp_dir().join("bucket_map_test_mt");
let paths: Vec<PathBuf> = [tmpdir1]
.iter()
.filter(|x| std::fs::create_dir_all(x).is_ok())
.cloned()
.collect();
assert!(!paths.is_empty());

let mut storage =
BucketStorage::new(Arc::new(paths), 1, 1, 1, Arc::default(), Arc::default());
let ix = 0;
let uid = Uid::MAX;
assert!(storage.is_free(ix));
assert!(storage.allocate(ix, uid, false).is_ok());
assert!(storage.allocate(ix, uid, false).is_err());
assert!(!storage.is_free(ix));
assert_eq!(storage.uid(ix), Some(uid));
assert_eq!(storage.uid_unchecked(ix), uid);
storage.free(ix, uid);
assert!(storage.is_free(ix));
assert_eq!(storage.uid(ix), None);
let uid = 1;
assert!(storage.is_free(ix));
assert!(storage.allocate(ix, uid, false).is_ok());
assert!(storage.allocate(ix, uid, false).is_err());
assert!(!storage.is_free(ix));
assert_eq!(storage.uid(ix), Some(uid));
assert_eq!(storage.uid_unchecked(ix), uid);
storage.free(ix, uid);
assert!(storage.is_free(ix));
assert_eq!(storage.uid(ix), None);
}
}