Skip to content

Commit

Permalink
Added web handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
katyo committed Sep 18, 2018
1 parent 7a0bbe3 commit 89084de
Show file tree
Hide file tree
Showing 4 changed files with 554 additions and 300 deletions.
20 changes: 20 additions & 0 deletions ledb-actix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,34 @@
name = "ledb-actix"
version = "0.1.0"
authors = ["Kayo Phoenix <kayo@illumium.org>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/katyo/ledb"
documentation = "https://docs.rs/ledb-actix"
homepage = "https://github.com/katyo/ledb"
keywords = ["actor", "document storage", "persistence", "json", "cbor", "lmdb", "btree"]
categories = ["database", "web"]
description = "LEDB Actor for Actix actor framework and actix-web storage server"

[badges]
travis-ci = { repository = "katyo/ledb" }
appveyor = { repository = "katyo/ledb" }

[dependencies]
serde = "1"
serde_derive = { version = "1", optional = true }
ledb = { path = "../ledb" }
futures = { version = "0.1", optional = true }
actix = "0.7"
actix-web = { version = "0.7", optional = true }

[dev-dependencies]
serde_derive = "1"
serde_json = "1"
futures = "0.1"
tokio = "0.1"
actix-web = "0.7"

[features]
default = ["web"]
web = ["futures", "serde_derive", "actix-web"]
346 changes: 346 additions & 0 deletions ledb-actix/src/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,346 @@
use std::marker::PhantomData;
use std::sync::Arc;
use std::path::Path;
use serde::{Serialize, de::DeserializeOwned};
use ledb::{Storage as LeStorage, Result as LeResult, Identifier};
use actix::{Actor, Addr, Message, SyncContext, SyncArbiter, Handler};

pub use ledb::{Filter, Comp, Cond, Order, OrderKind, IndexKind, KeyType, KeyData, Modify, Action, Primary, Document, DocumentsIterator, Value};

#[derive(Clone)]
pub struct Storage {
db: Arc<LeStorage>,
}

impl Storage {
pub fn new<P: AsRef<Path>>(path: P) -> LeResult<Self> {
Ok(Self { db: Arc::new(LeStorage::open(path)?) })
}

pub fn start(self, threads: usize) -> Addr<Self> {
SyncArbiter::start(threads, move || self.clone())
}
}

impl Actor for Storage {
type Context = SyncContext<Self>;
}

/// Get collections request
pub struct GetCollections;

pub type ListCollections = Vec<String>;

impl Message for GetCollections {
type Result = LeResult<ListCollections>;
}

impl Handler<GetCollections> for Storage {
type Result = <GetCollections as Message>::Result;

fn handle(&mut self, _: GetCollections, _: &mut Self::Context) -> Self::Result {
self.db.get_collections()
}
}

/// Drop collection
#[allow(non_snake_case)]
pub fn DropCollection<C: Into<Identifier>>(coll: C) -> DropCollectionMsg {
DropCollectionMsg(coll.into())
}

pub struct DropCollectionMsg(Identifier);

impl Message for DropCollectionMsg {
type Result = LeResult<bool>;
}

impl Handler<DropCollectionMsg> for Storage {
type Result = <DropCollectionMsg as Message>::Result;

fn handle(&mut self, DropCollectionMsg(collection): DropCollectionMsg, _: &mut Self::Context) -> Self::Result {
self.db.drop_collection(collection)
}
}

/// Get indexes
#[allow(non_snake_case)]
pub fn GetIndexes<C: Into<Identifier>>(coll: C) -> GetIndexesMsg {
GetIndexesMsg(coll.into())
}

pub type ListIndexes = Vec<(String, IndexKind, KeyType)>;

pub struct GetIndexesMsg(Identifier);

impl Message for GetIndexesMsg {
type Result = LeResult<ListIndexes>;
}

impl Handler<GetIndexesMsg> for Storage {
type Result = <GetIndexesMsg as Message>::Result;

fn handle(&mut self, GetIndexesMsg(collection): GetIndexesMsg, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.get_indexes()
}
}

/// Ensure index for collection
#[allow(non_snake_case)]
pub fn EnsureIndex<C: Into<Identifier>, F: Into<Identifier>>(coll: C, field: F, kind: IndexKind, key: KeyType) -> EnsureIndexMsg {
EnsureIndexMsg(coll.into(), field.into(), kind, key)
}

pub struct EnsureIndexMsg(Identifier, Identifier, IndexKind, KeyType);

impl Message for EnsureIndexMsg {
type Result = LeResult<bool>;
}

impl Handler<EnsureIndexMsg> for Storage {
type Result = <EnsureIndexMsg as Message>::Result;

fn handle(&mut self, EnsureIndexMsg(collection, field, kind, key): EnsureIndexMsg, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.ensure_index(field, kind, key)
}
}

/// Drop index of collection
#[allow(non_snake_case)]
pub fn DropIndex<C: Into<Identifier>, F: Into<Identifier>>(coll: C, field: F) -> DropIndexMsg {
DropIndexMsg(coll.into(), field.into())
}

pub struct DropIndexMsg(Identifier, Identifier);

impl Message for DropIndexMsg {
type Result = LeResult<bool>;
}

impl Handler<DropIndexMsg> for Storage {
type Result = <DropIndexMsg as Message>::Result;

fn handle(&mut self, DropIndexMsg(collection, field): DropIndexMsg, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.drop_index(field)
}
}

/// Insert new document into collection
#[allow(non_snake_case)]
pub fn Insert<C: Into<Identifier>, T: Serialize>(coll: C, data: T) -> InsertMsg<T> {
InsertMsg(coll.into(), data)
}

pub struct InsertMsg<T>(Identifier, T);

impl<T: Serialize> Message for InsertMsg<T> {
type Result = LeResult<Primary>;
}

impl<T: Serialize> Handler<InsertMsg<T>> for Storage {
type Result = <InsertMsg<T> as Message>::Result;

fn handle(&mut self, InsertMsg(collection, document): InsertMsg<T>, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.insert(&document)
}
}

/// Get the previously inserted document by primary key
#[allow(non_snake_case)]
pub fn Get<C: Into<Identifier>, T: DeserializeOwned>(coll: C, id: Primary) -> GetMsg<T> {
GetMsg(coll.into(), id, PhantomData)
}

pub struct GetMsg<T>(Identifier, Primary, PhantomData<T>);

impl<T: DeserializeOwned + 'static> Message for GetMsg<T> {
type Result = LeResult<Option<Document<T>>>;
}

impl<T: DeserializeOwned + 'static> Handler<GetMsg<T>> for Storage {
type Result = <GetMsg<T> as Message>::Result;

fn handle(&mut self, GetMsg(collection, identifier, ..): GetMsg<T>, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.get(identifier)
}
}

/// Put new version of the previously inserted document
#[allow(non_snake_case)]
pub fn Put<C: Into<Identifier>, T: Serialize>(coll: C, data: Document<T>) -> PutMsg<T> {
PutMsg(coll.into(), data)
}

pub struct PutMsg<T>(Identifier, Document<T>);

impl<T: Serialize> Message for PutMsg<T> {
type Result = LeResult<()>;
}

impl<T: Serialize> Handler<PutMsg<T>> for Storage {
type Result = <PutMsg<T> as Message>::Result;

fn handle(&mut self, PutMsg(collection, document): PutMsg<T>, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.put(&document)
}
}

/// Delete the previously inserted document
#[allow(non_snake_case)]
pub fn Delete<C: Into<Identifier>>(coll: C, id: Primary) -> DeleteMsg {
DeleteMsg(coll.into(), id)
}

pub struct DeleteMsg(Identifier, Primary);

impl Message for DeleteMsg {
type Result = LeResult<bool>;
}

impl Handler<DeleteMsg> for Storage {
type Result = <DeleteMsg as Message>::Result;

fn handle(&mut self, DeleteMsg(collection, id): DeleteMsg, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.delete(id)
}
}

/// Update documents using filter and modifier
#[allow(non_snake_case)]
pub fn Update<C: Into<Identifier>>(coll: C, filter: Option<Filter>, modify: Modify) -> UpdateMsg {
UpdateMsg(coll.into(), filter, modify)
}

pub struct UpdateMsg(Identifier, Option<Filter>, Modify);

impl Message for UpdateMsg {
type Result = LeResult<usize>;
}

impl Handler<UpdateMsg> for Storage {
type Result = <UpdateMsg as Message>::Result;

fn handle(&mut self, UpdateMsg(collection, filter, modify): UpdateMsg, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.update(filter, modify)
}
}

/// Remove documents using filter
#[allow(non_snake_case)]
pub fn Remove<C: Into<Identifier>>(coll: C, filter: Option<Filter>) -> RemoveMsg {
RemoveMsg(coll.into(), filter)
}

pub struct RemoveMsg(Identifier, Option<Filter>);

impl Message for RemoveMsg {
type Result = LeResult<usize>;
}

impl Handler<RemoveMsg> for Storage {
type Result = <RemoveMsg as Message>::Result;

fn handle(&mut self, RemoveMsg(collection, filter): RemoveMsg, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.remove(filter)
}
}

/// Find documents using filter and ordering
#[allow(non_snake_case)]
pub fn Find<C: Into<Identifier>, T>(coll: C, filter: Option<Filter>, order: Order) -> FindMsg<T> {
FindMsg(coll.into(), filter, order, PhantomData)
}

pub struct FindMsg<T>(Identifier, Option<Filter>, Order, PhantomData<T>);

impl<T: DeserializeOwned + 'static> Message for FindMsg<T> {
type Result = LeResult<DocumentsIterator<T>>;
}

impl<T: DeserializeOwned + 'static> Handler<FindMsg<T>> for Storage {
type Result = <FindMsg<T> as Message>::Result;

fn handle(&mut self, FindMsg(collection, filter, order, ..): FindMsg<T>, _: &mut Self::Context) -> Self::Result {
self.db.collection(collection)?.find(filter, order)
}
}

#[cfg(test)]
mod tests {
use std::fs::remove_dir_all;
use serde_json::{from_value};
use futures::{Future};
use tokio::{spawn};
use actix::{System};
use super::{Storage, EnsureIndex, Insert, Find, IndexKind, KeyType};

macro_rules! json_val {
($($json:tt)+) => {
from_value(json!($($json)+)).unwrap()
};
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct BlogPost {
pub title: String,
pub tags: Vec<String>,
pub content: String,
}

static DB_PATH: &'static str = ".test-db";

#[test]
fn test() {
System::run(|| {
let _ = remove_dir_all(DB_PATH);

let storage = Storage::new(DB_PATH).unwrap();

let addr = storage.start(3);
let addr1 = addr.clone();
let addr2 = addr.clone();
let addr3 = addr.clone();

spawn(
addr.send(
Insert::<_, BlogPost>("blog", json_val!({
"title": "Absurd",
"tags": ["absurd", "psychology"],
"content": "Still nothing..."
}))
).and_then(move |res| {
assert_eq!(res.unwrap(), 1);

addr1.send(Insert::<_, BlogPost>("blog", json_val!({
"title": "Lorem ipsum",
"tags": ["lorem", "ipsum"],
"content": "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum."
})))
}).and_then(move |res| {
assert_eq!(res.unwrap(), 2);

addr3.send(EnsureIndex("blog", "tags", IndexKind::Duplicate, KeyType::String))
}).and_then(move |res| {
assert!(res.is_ok());

addr2.send(Find::<_, BlogPost>("blog",
json_val!({ "tags": { "$eq": "psychology" } }),
json_val!("$asc")))
}).map(|res| {
let mut docs = res.unwrap();
assert_eq!(docs.size_hint(), (1, Some(1)));
let doc = docs.next().unwrap().unwrap();
let doc_data: BlogPost = json_val!({
"title": "Absurd",
"tags": ["absurd", "psychology"],
"content": "Still nothing..."
});
assert_eq!(doc.get_data(), &doc_data);
assert!(docs.next().is_none());

System::current().stop();
}).map_err(|_| ())
);
});
}
}
Loading

0 comments on commit 89084de

Please sign in to comment.