Skip to content

Commit

Permalink
Rework drop_collection() and drop_index()
Browse files Browse the repository at this point in the history
  • Loading branch information
katyo committed Sep 18, 2018
1 parent 92a8ef6 commit fafeb47
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 50 deletions.
40 changes: 40 additions & 0 deletions ledb-actix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ impl Handler<GetCollections> for Storage {
}
}

/// Drop collection
#[allow(non_snake_case)]
pub fn DropCollection<C: Into<Identifier>>(coll: C) -> DropCollectionMsg {
DropCollectionMsg(coll.into())
}

pub struct DropCollectionMsg(Identifier);

impl Message for DropCollectionMsg {
type Result = LeResult<bool>;
}

impl Handler<DropCollectionMsg> for Storage {
type Result = <DropCollectionMsg as Message>::Result;

fn handle(&mut self, DropCollectionMsg(collection): DropCollectionMsg, _: &mut Self::Context) -> Self::Result {
self.db.drop_collection(collection)
}
}

/// Get indexes
#[allow(non_snake_case)]
pub fn GetIndexes<C: Into<Identifier>>(coll: C) -> GetIndexesMsg {
Expand Down Expand Up @@ -98,6 +118,26 @@ impl Handler<EnsureIndexMsg> for Storage {
}
}

/// Drop index of collection
#[allow(non_snake_case)]
pub fn DropIndex<C: Into<Identifier>, F: Into<Identifier>>(coll: C, field: F) -> DropIndexMsg {
DropIndexMsg(coll.into(), field.into())
}

pub struct DropIndexMsg(Identifier, Identifier);

impl Message for DropIndexMsg {
type Result = LeResult<bool>;
}

impl Handler<DropIndexMsg> for Storage {
type Result = <DropIndexMsg as Message>::Result;

fn handle(&mut self, DropIndexMsg(collection, field): DropIndexMsg, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.drop_index(field)
}
}

/// Insert new document into collection
#[allow(non_snake_case)]
pub fn Insert<C: Into<Identifier>, T: Serialize>(coll: C, data: T) -> InsertMsg<T> {
Expand Down
119 changes: 81 additions & 38 deletions ledb/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ impl CollectionDef {
}
}

struct Indexes {
alive: Vec<Arc<Index>>,
dead: Vec<Arc<Index>>,
}

/// Collection of documents
pub struct Collection {
pub(crate) name: String,
pub(crate) indexes: RwLock<Vec<Arc<Index>>>,
pub(crate) env: Arc<Environment>,
pub(crate) db: Arc<Database<'static>>,
indexes: RwLock<Indexes>,
env: Arc<Environment>,
db: Arc<Database<'static>>,
}

impl Collection {
Expand All @@ -40,12 +45,13 @@ impl Collection {
env.clone(), Some(&db_name), &db_opts)
.wrap_err()?);

let indexes = RwLock::new(
index_defs
let indexes = RwLock::new(Indexes {
alive: index_defs
.into_iter()
.map(|def| Index::new(env.clone(), def).map(Arc::new))
.collect::<Result<Vec<_>>>()?
);
.collect::<Result<Vec<_>>>()?,
dead: Vec::new(),
});

Ok(Self { name, indexes, env, db })
}
Expand Down Expand Up @@ -251,11 +257,16 @@ impl Collection {

/// Remove all documents from the collection
///
/// Shortcut for `Collection::remove(None)`.
///
#[inline]
pub fn purge(&self) -> Result<usize> {
self.remove(None)
pub fn purge(&self) -> Result<()> {
let txn = WriteTransaction::new(self.env.clone()).wrap_err()?;
let mut access = txn.access();

let indexes = self.indexes.read().wrap_err()?;
for index in indexes.alive.iter() {
index.purge(&mut access)?;
}

access.clear_db(&self.db).wrap_err()
}

/// Checks the collection contains document with specified primary key
Expand Down Expand Up @@ -340,7 +351,7 @@ impl Collection {
let indexes = self.indexes.read().wrap_err()?;
let mut access = txn.access();

for index in indexes.iter() {
for index in indexes.alive.iter() {
index.update_index(&mut access, old_doc, new_doc)?;
}
}
Expand All @@ -366,7 +377,7 @@ impl Collection {
/// Get indexes info from the collection
pub fn get_indexes(&self) -> Result<Vec<(String, IndexKind, KeyType)>> {
let indexes = self.indexes.read().wrap_err()?;
Ok(indexes.iter().map(|index| (index.path.clone(), index.kind, index.key)).collect())
Ok(indexes.alive.iter().map(|index| (index.path.clone(), index.kind, index.key)).collect())
}

/// Ensure index for the collection
Expand All @@ -375,7 +386,7 @@ impl Collection {
if index.kind == kind && index.key == key {
return Ok(false);
} else {
self.remove_index(&path)?;
self.drop_index(&path)?;
}
}

Expand All @@ -386,19 +397,32 @@ impl Collection {
pub fn create_index<P: AsRef<str>>(&self, path: P, kind: IndexKind, key: KeyType) -> Result<bool> {
let path = path.as_ref();

{
let dead_pos = {
let indexes = self.indexes.read().wrap_err()?;
if let Some(_) = indexes.iter().find(|index| index.path == path) {
// search alive index
if let Some(_) = indexes.alive.iter().find(|index| index.path == path) {
return Ok(false);
}
// search dead index
indexes.dead.iter().position(|index| index.path == path && index.kind == kind && index.key == key)
};

// try to restore dead index
if let Some(pos) = dead_pos {
// restore dead collection
let mut indexes = self.indexes.write().wrap_err()?;
let index = indexes.dead.remove(pos);
indexes.alive.push(index);
return Ok(true);
}


// create new index
let index = Index::new(self.env.clone(), IndexDef(self.name.clone(), path.into(), kind, key))
.map(Arc::new)?;

{
let mut indexes = self.indexes.write().wrap_err()?;
indexes.push(index.clone());
indexes.alive.push(index.clone());
}

{
Expand Down Expand Up @@ -427,41 +451,40 @@ impl Collection {
}

/// Remove index from the collection
pub fn remove_index<P: AsRef<str>>(&self, path: P) -> Result<bool> {
pub fn drop_index<P: AsRef<str>>(&self, path: P) -> Result<bool> {
let path = path.as_ref();

let mut indexes = self.indexes.write().wrap_err()?;

let index_pos = if let Some(index) = indexes.iter().position(|index| index.path == path) {
index
} else {
return Ok(false);
};

let index = indexes.remove(index_pos);
let found_pos = {
let indexes = self.indexes.read().wrap_err()?;
indexes.alive.iter().position(|index| index.path == path)
};

if let Ok(Index { db, .. }) = Arc::try_unwrap(index) {
if let Ok(db) = Arc::try_unwrap(db) {
db.delete().wrap_err()?;
}
}

Ok(true)
Ok(if let Some(pos) = found_pos {
let mut indexes = self.indexes.write().wrap_err()?;
let index = indexes.alive.remove(pos);
let txn = WriteTransaction::new(self.env.clone())?;
let mut access = txn.access();
index.to_delete(&mut access)?;
indexes.dead.push(index);
true
} else {
false
})
}

/// Checks the index for specified field exists for the collection
pub fn has_index<P: AsRef<str>>(&self, path: P) -> Result<bool> {
let path = path.as_ref();
let indexes = self.indexes.read().wrap_err()?;

Ok(indexes.iter().any(|index| index.path == path))
Ok(indexes.alive.iter().any(|index| index.path == path))
}

pub(crate) fn get_index<P: AsRef<str>>(&self, path: P) -> Result<Option<Arc<Index>>> {
let path = path.as_ref();
let indexes = self.indexes.read().wrap_err()?;

Ok(indexes.iter().find(|index| index.path == path).map(Clone::clone))
Ok(indexes.alive.iter().find(|index| index.path == path).map(Clone::clone))
}

pub(crate) fn req_index<P: AsRef<str>>(&self, path: P) -> Result<Arc<Index>> {
Expand All @@ -471,6 +494,26 @@ impl Collection {
Err(format!("Missing index for field '{}'", path.as_ref())).wrap_err()
}
}

pub(crate) fn to_delete(&self) -> Result<()> {
self.purge()
}

pub(crate) fn do_delete(self) -> Result<()> {
let mut indexes = self.indexes.write().wrap_err()?;

for index in indexes.dead.drain(..) {
if let Ok(index) = Arc::try_unwrap(index) {
index.do_delete()?;
}
}

if let Ok(db) = Arc::try_unwrap(self.db) {
db.delete().wrap_err()?;
}

Ok(())
}
}

pub(crate) struct PrimaryIterator {
Expand Down
16 changes: 16 additions & 0 deletions ledb/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,22 @@ impl Index {
pub(crate) fn query_iter(&self, txn: Arc<ReadTransaction<'static>>, order: OrderKind) -> Result<IndexIterator> {
IndexIterator::new(txn, self.db.clone(), order)
}

pub(crate) fn purge(&self, access: &mut WriteAccessor) -> Result<()> {
access.clear_db(&self.db).wrap_err()
}

pub(crate) fn to_delete(&self, access: &mut WriteAccessor) -> Result<()> {
self.purge(access)
}

pub(crate) fn do_delete(self) -> Result<()> {
if let Ok(db) = Arc::try_unwrap(self.db) {
db.delete().wrap_err()?;
}

Ok(())
}
}

fn extract_field_values<'a, 'i: 'a, I: Iterator<Item = &'i str> + Clone>(doc: &'a Value, typ: KeyType, path: &'a I, keys: &mut HashSet<KeyData>) {
Expand Down
Loading

0 comments on commit fafeb47

Please sign in to comment.