Skip to content

feat(storage): implement iteration support #1714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/src/storage_mngr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ fn create_appropriate_backend(

match conf.backend {
config::StorageBackend::HashMap => Ok(encrypted_backend!(
backends::hashmap::Backend::new(),
backends::hashmap::Backend::default(),
passwd
)),
config::StorageBackend::RocksDB => {
Expand Down
21 changes: 14 additions & 7 deletions storage/src/backends/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//!
//! High-order storage backend that hashes the key and
//! encrypts/decrypts the value when putting/getting it.
use crate::storage::{Result, Storage};
use crate::storage::{Result, Storage, StorageIterator};
use failure::bail;
use witnet_crypto::{cipher, hash::calculate_sha256, pbkdf2::pbkdf2_sha256};
use witnet_protected::Protected;

Expand Down Expand Up @@ -52,7 +53,7 @@ impl<T: Storage> Storage for Backend<T> {
})
}

fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
let hash_key = calculate_sha256(key.as_ref());
let iv = cipher::generate_random(IV_LENGTH)?;
let salt = cipher::generate_random(SALT_LENGTH)?;
Expand All @@ -65,10 +66,14 @@ impl<T: Storage> Storage for Backend<T> {
self.backend.put(hash_key.as_ref().to_vec(), final_value)
}

fn delete(&mut self, key: &[u8]) -> Result<()> {
fn delete(&self, key: &[u8]) -> Result<()> {
let hash_key = calculate_sha256(key);
self.backend.delete(hash_key.as_ref())
}

fn prefix_iterator<'a, 'b: 'a>(&'a self, _prefix: &'b [u8]) -> Result<StorageIterator<'a>> {
bail!("Iteration is not supported when using encrypted storage")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a problem: it looks like there is a hidden feature that allows to use an encrypted storage for the node. I would try to deprecate that in some way to avoid future problems. Otherwise, we must always assume that iteration can fail.

}
}

fn get_secret(password: &[u8], salt: &[u8]) -> Protected {
Expand All @@ -79,11 +84,12 @@ fn get_secret(password: &[u8], salt: &[u8]) -> Protected {
mod tests {
use super::*;
use crate::backends::hashmap;
use std::sync::RwLock;

#[test]
fn test_encrypt_decrypt() {
let password = "".into();
let mut backend = Backend::new(password, hashmap::Backend::new());
let backend = Backend::new(password, hashmap::Backend::default());

assert_eq!(None, backend.get(b"name").unwrap());
backend.put("name".into(), "johnny".into()).unwrap();
Expand All @@ -95,11 +101,12 @@ mod tests {
let password1 = "pass1".into();
let password2 = "pass2".into();

let mut backend1 = Backend::new(password1, hashmap::Backend::new());
let backend1 = Backend::new(password1, hashmap::Backend::default());

backend1.put("name".into(), "johnny".into()).unwrap();

let backend2 = Backend::new(password2, backend1.inner().clone());
let backend1_clone = RwLock::new(backend1.inner().read().unwrap().clone());
let backend2 = Backend::new(password2, backend1_clone);

assert_ne!(
backend2.get(b"name").unwrap_or(None),
Expand All @@ -110,7 +117,7 @@ mod tests {
#[test]
fn test_delete() {
let password = "".into();
let mut backend = Backend::new(password, hashmap::Backend::new());
let backend = Backend::new(password, hashmap::Backend::default());

assert_eq!(None, backend.get(b"name").unwrap());
backend.put("name".into(), "johnny".into()).unwrap();
Expand Down
60 changes: 51 additions & 9 deletions storage/src/backends/hashmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,80 @@
//! Storage backend that keeps data in a heap-allocated HashMap.
use std::collections::HashMap;

use crate::storage::{Result, Storage};
use crate::storage::{Result, Storage, StorageIterator};
use std::sync::{RwLock, RwLockReadGuard};

/// HashMap backend
pub type Backend = HashMap<Vec<u8>, Vec<u8>>;
pub type Backend = RwLock<HashMap<Vec<u8>, Vec<u8>>>;

impl Storage for Backend {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(Backend::get(self, key).map(|slice| slice.to_vec()))
Ok(self.read().unwrap().get(key).map(|slice| slice.to_vec()))
}

fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
Backend::insert(self, key, value);
fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
self.write().unwrap().insert(key, value);
Ok(())
}

fn delete(&mut self, key: &[u8]) -> Result<()> {
Backend::remove(self, key);
fn delete(&self, key: &[u8]) -> Result<()> {
self.write().unwrap().remove(key);
Ok(())
}

fn prefix_iterator<'a, 'b: 'a>(&'a self, prefix: &'b [u8]) -> Result<StorageIterator<'a>> {
Ok(Box::new(DBIterator {
data: self.read().unwrap(),
prefix,
skip: 0,
}))
}
}

struct DBIterator<'a, 'b> {
data: RwLockReadGuard<'a, HashMap<Vec<u8>, Vec<u8>>>,
prefix: &'b [u8],
skip: usize,
}

impl<'a, 'b> Iterator for DBIterator<'a, 'b> {
type Item = (Vec<u8>, Vec<u8>);

fn next(&mut self) -> Option<Self::Item> {
// TODO: is this correct? Add tests
let mut skip = self.skip;
let res = self
.data
.iter()
.skip(skip)
.map(|x| {
skip += 1;
x
})
.filter_map(|(k, v)| {
if k.starts_with(self.prefix.as_ref()) {
Some((k.clone(), v.clone()))
} else {
None
}
})
.next();
self.skip = skip;
res
}
}

#[cfg(test)]
mod tests {
use super::*;

fn backend() -> Box<dyn Storage> {
Box::new(Backend::new())
Box::new(Backend::default())
}

#[test]
fn test_hashmap() {
let mut storage = backend();
let storage = backend();

assert_eq!(None, storage.get(b"name").unwrap());
storage.put(b"name".to_vec(), b"john".to_vec()).unwrap();
Expand Down
10 changes: 7 additions & 3 deletions storage/src/backends/nobackend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! This backend performs no storage at all and always fails to do any operation.
use failure::bail;

use crate::storage::{Result, Storage};
use crate::storage::{Result, Storage, StorageIterator};

/// A Backend that is not persisted
///
Expand All @@ -16,11 +16,15 @@ impl Storage for Backend {
bail!("This is a no backend storage")
}

fn put(&mut self, _key: Vec<u8>, _value: Vec<u8>) -> Result<()> {
fn put(&self, _key: Vec<u8>, _value: Vec<u8>) -> Result<()> {
bail!("This is a no backend storage")
}

fn delete(&mut self, _key: &[u8]) -> Result<()> {
fn delete(&self, _key: &[u8]) -> Result<()> {
bail!("This is a no backend storage")
}

fn prefix_iterator<'a, 'b: 'a>(&'a self, _prefix: &'b [u8]) -> Result<StorageIterator<'a>> {
bail!("This is a no backend storage")
}
}
105 changes: 94 additions & 11 deletions storage/src/backends/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use failure::Fail;
#[cfg(test)]
use rocksdb_mock as rocksdb;

use crate::storage::{Result, Storage};
use crate::storage::{Result, Storage, StorageIterator};

/// Rocksdb backend
pub type Backend = rocksdb::DB;
Expand All @@ -22,15 +22,26 @@ impl Storage for Backend {
Ok(result)
}

fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
Backend::put(self, key, value).map_err(Error)?;
Ok(())
}

fn delete(&mut self, key: &[u8]) -> Result<()> {
fn delete(&self, key: &[u8]) -> Result<()> {
Backend::delete(self, &key).map_err(Error)?;
Ok(())
}

fn prefix_iterator<'a, 'b: 'a>(&'a self, prefix: &'b [u8]) -> Result<StorageIterator<'a>> {
Ok(Box::new(
Backend::iterator(
self,
rocksdb::IteratorMode::From(prefix, rocksdb::Direction::Forward),
)
.take_while(move |(k, _v)| k.starts_with(prefix))
.map(|(k, v)| (k.into(), v.into())),
))
}
}

#[cfg(test)]
Expand All @@ -43,7 +54,7 @@ mod tests {

#[test]
fn test_rocksdb() {
let mut storage = backend();
let storage = backend();

assert_eq!(None, storage.get(b"name").unwrap());
storage.put(b"name".to_vec(), b"john".to_vec()).unwrap();
Expand All @@ -56,12 +67,24 @@ mod tests {
#[cfg(test)]
mod rocksdb_mock {
use super::*;
use std::sync::{RwLock, RwLockReadGuard};

pub type Error = failure::Error;

pub enum IteratorMode<'a> {
Start,
End,
From(&'a [u8], Direction),
}

pub enum Direction {
Forward,
Reverse,
}

#[derive(Default)]
pub struct DB {
data: Vec<(Vec<u8>, Vec<u8>)>,
data: RwLock<Vec<(Vec<u8>, Vec<u8>)>>,
}

impl DB {
Expand All @@ -70,7 +93,7 @@ mod rocksdb_mock {
}

fn search<K: AsRef<[u8]>>(&self, key: &K) -> Option<usize> {
for (i, (k, _)) in self.data.iter().enumerate() {
for (i, (k, _)) in self.data.read().unwrap().iter().enumerate() {
if key.as_ref() == k.as_slice() {
return Some(i);
}
Expand All @@ -79,22 +102,82 @@ mod rocksdb_mock {
}

pub fn get<K: AsRef<[u8]>>(&self, key: &K) -> Result<Option<Vec<u8>>> {
Ok(self.search(key).map(|idx| self.data[idx].1.clone()))
Ok(self
.search(key)
.map(|idx| self.data.read().unwrap()[idx].1.clone()))
}

pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&mut self, key: K, value: V) -> Result<()> {
pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<()> {
match self.search(&key) {
Some(idx) => self.data[idx].1 = value.as_ref().to_vec(),
Some(idx) => self.data.write().unwrap()[idx].1 = value.as_ref().to_vec(),
None => self
.data
.write()
.unwrap()
.push((key.as_ref().to_vec(), value.as_ref().to_vec())),
}
Ok(())
}

pub fn delete<K: AsRef<[u8]>>(&mut self, key: &K) -> Result<()> {
self.search(key).map(|idx| self.data.remove(idx));
pub fn delete<K: AsRef<[u8]>>(&self, key: &K) -> Result<()> {
self.search(key)
.map(|idx| self.data.write().unwrap().remove(idx));
Ok(())
}

pub fn iterator<'a, 'b: 'a>(
&'a self,
iterator_mode: IteratorMode<'b>,
) -> DBIterator<'a, 'b> {
match iterator_mode {
IteratorMode::Start => unimplemented!(),
IteratorMode::End => unimplemented!(),
IteratorMode::From(prefix, _direction) => self.prefix_iterator(prefix),
}
}

pub fn prefix_iterator<'a, 'b: 'a, P: AsRef<[u8]> + ?Sized>(
&'a self,
prefix: &'b P,
) -> DBIterator<'a, 'b> {
DBIterator {
data: self.data.read().unwrap(),
prefix: prefix.as_ref(),
skip: 0,
}
}
}

pub struct DBIterator<'a, 'b> {
data: RwLockReadGuard<'a, Vec<(Vec<u8>, Vec<u8>)>>,
prefix: &'b [u8],
skip: usize,
}

impl<'a, 'b> Iterator for DBIterator<'a, 'b> {
type Item = (Box<[u8]>, Box<[u8]>);

fn next(&mut self) -> Option<Self::Item> {
// TODO: is this even used somewhere?
let mut skip = self.skip;
let res = self
.data
.iter()
.skip(skip)
.map(|x| {
skip += 1;
x
})
.filter_map(|(k, v)| {
if k.starts_with(self.prefix.as_ref()) {
Some((k.clone().into_boxed_slice(), v.clone().into_boxed_slice()))
} else {
None
}
})
.next();
self.skip = skip;
res
}
}
}
10 changes: 8 additions & 2 deletions storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ pub trait Storage {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>>;

/// Put a value in the storage
fn put(&mut self, key: Vec<u8>, value: Vec<u8>) -> Result<()>;
fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<()>;

/// Delete a value from the storage
fn delete(&mut self, key: &[u8]) -> Result<()>;
fn delete(&self, key: &[u8]) -> Result<()>;

/// Create an iterator over all the keys that start with the given prefix
fn prefix_iterator<'a, 'b: 'a>(&'a self, prefix: &'b [u8]) -> Result<StorageIterator<'a>>;
}

/// Iterator over key-value pairs
pub type StorageIterator<'a> = Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>;