Skip to content

Commit

Permalink
Json move (#49)
Browse files Browse the repository at this point in the history
* move to json messages

* wip

* .

* .

* wip

* .

* .

* .

* .

* .

* todo fix planner

* .

* versions update

* misc
  • Loading branch information
alexandre-ricciardi authored Oct 19, 2024
1 parent d2114fe commit 1fa1ab5
Show file tree
Hide file tree
Showing 37 changed files with 389 additions and 430 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ docker run -p8182:8182 --rm -it ghcr.io/alexandre-ricciardi/alexandre-ricciardi/
This will expose an endpoint on 8182 port.

## Interface
Zawgl exposes a WebSocket on the configured port that transports [Bson](https://crates.io/crates/bson) documents.
Zawgl exposes a WebSocket on the configured port that transports Json messages.

Zawgl replies contain a list of graphs representing all the matching instance of the query.

Expand Down
7 changes: 3 additions & 4 deletions client/zawgl-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
[package]
name = "zawgl-client"
version = "0.1.5"
authors = ["Alexandre RICCIARDI <alexandre.ricciardi@pm.me>"]
authors = ["Alexandre RICCIARDI <aricciardi501@gmail.com>"]
edition = "2021"
description = "Zawgl Graph Database Client"
license = "MIT"
[lib]
name = "zawgl_client"

[dependencies]
tokio-tungstenite = "0.21.0"
tokio-tungstenite = "0.24.0"
tokio = { version = "1.28.1", features = ["full"] }
futures-util = "^0.3"
log = "0.4"
simple_logger = "5.0.0"
serde_json = "1.0"
parking_lot = "0.12.1"
bson = "2.0.1"
serde_json = "1.0.128"
url = "2.2.2"
futures-channel = "0.3.21"
uuid = { version = "1.1.2", features = ["v4"] }
2 changes: 1 addition & 1 deletion client/zawgl-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ let mut params = Parameters::new();
params.insert("pid".to_string(), Value::Integer(12));
let r = client.execute_cypher_request_with_parameters("match (n:Person) where id(n) = $pid return n", params).await;
```
The response is a Bson document, see example below:
The response is a Json message, see example below:
```json
{
"request_id": "969f462c-ec71-41ab-bed8-0b46314f5965",
Expand Down
68 changes: 22 additions & 46 deletions client/zawgl-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
pub mod parameters;

use std::sync::{Arc, Mutex};
use std::{io::Cursor, collections::HashMap};
use std::collections::HashMap;

use futures_channel::mpsc::UnboundedSender;
use futures_channel::oneshot::{Sender, Canceled};
use futures_util::StreamExt;
use parameters::{Parameters, Value};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use bson::{Document, doc, Bson};
use serde_json::{from_str, json, value::Value};
use uuid::Uuid;
use log::*;


type SharedChannelsMap = Arc<Mutex<HashMap<String, Sender<Document>>>>;
type SharedChannelsMap = Arc<Mutex<HashMap<String, Sender<Value>>>>;

/// Zawgl graph database client
pub struct Client {
Expand All @@ -25,8 +22,7 @@ pub struct Client {
impl Client {

pub async fn new(address: &str) -> Self {
let url = url::Url::parse(address).unwrap();
let (ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
let (ws_stream, _) = connect_async(address).await.expect("Failed to connect");
let (write, read) = ws_stream.split();
let (request_tx, request_rx) = futures_channel::mpsc::unbounded();
let (error_tx, error_rx) = tokio::sync::mpsc::unbounded_channel();
Expand All @@ -38,14 +34,12 @@ impl Client {
read.for_each(|message| async {
match message {
Ok(msg) => {
let doc = Document::from_reader(Cursor::new(msg.into_data())).expect("response");
let id = doc.get_str("request_id");
if let Ok(request_id) = id {
if let Some(tx) = clone.lock().unwrap().remove(request_id) {
let res = tx.send(doc);
if let Err(d) = res {
error!("parsing document {}", d)
}
let doc: Value = from_str(&msg.into_text().expect("json message")).expect("response");
let request_id = doc["request_id"].as_str().unwrap();
if let Some(tx) = clone.lock().unwrap().remove(request_id) {
let res = tx.send(doc);
if let Err(d) = res {
error!("parsing document {}", d.to_string())
}
}
},
Expand All @@ -58,13 +52,13 @@ impl Client {
}
}).await
});
Client{request_tx, map_rx_channels: map.clone(), error_rx}
Client{request_tx, map_rx_channels: Arc::clone(&map), error_rx}
}

/// Executes a cypher request with parameters
pub async fn execute_cypher_request_with_parameters(&mut self, query: &str, params: Parameters) -> Result<Document, Canceled> {
pub async fn execute_cypher_request_with_parameters(&mut self, query: &str, params: Value) -> Result<Value, Canceled> {
let uuid = Uuid::new_v4();
let (tx, rx) = futures_channel::oneshot::channel::<Document>();
let (tx, rx) = futures_channel::oneshot::channel::<Value>();
self.map_rx_channels.lock().unwrap().insert(uuid.to_string(), tx);
tokio::spawn(send_request(self.request_tx.clone(), uuid.to_string(), query.to_string(), params));
tokio::select! {
Expand All @@ -80,37 +74,19 @@ impl Client {
}

/// Executes a cypher request
pub async fn execute_cypher_request(&mut self, query: &str) -> Result<Document, Canceled> {
self.execute_cypher_request_with_parameters(query, Parameters::new()).await
}
}

fn extract_value(value: Value) -> Bson {
match value {
Value::String(sv) => Bson::String(sv),
Value::Integer(iv) => Bson::Int64(iv),
Value::Float(fv) => Bson::Double(fv),
Value::Bool(bv) => Bson::Boolean(bv),
Value::Parameters(params) => Bson::Document(build_parameters(params)),
}
}

fn build_parameters(params: Parameters) -> Document {
let mut doc = Document::new();
for (name, value) in params {
doc.insert(name, extract_value(value));
pub async fn execute_cypher_request(&mut self, query: &str) -> Result<Value, Canceled> {
self.execute_cypher_request_with_parameters(query, json!({})).await
}
doc
}

async fn send_request(tx: futures_channel::mpsc::UnboundedSender<Message>, id: String, query: String, params: Parameters) -> Option<()> {
let mut msg = "!application/openCypher".as_bytes().to_vec();
let doc = doc!{
async fn send_request(tx: futures_channel::mpsc::UnboundedSender<Message>, id: String, query: String, params: Value) -> Option<()> {
let mut msg = "!application/openCypher".to_string();
let doc = json!({
"request_id": id,
"query" : query,
"parameters": build_parameters(params),
};
doc.to_writer(&mut msg).ok()?;
tx.unbounded_send(Message::binary(msg)).unwrap();
"parameters": params,
});
msg.push_str(&doc.to_string());
tx.unbounded_send(Message::text(msg.to_string())).unwrap();
Some(())
}
13 changes: 0 additions & 13 deletions client/zawgl-client/src/parameters.rs

This file was deleted.

4 changes: 2 additions & 2 deletions integration/cypher-tests/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "cypher-tests"
version = "0.1.2"
authors = ["Alexandre RICCIARDI <alexandre.ricciardi@pm.me>"]
authors = ["Alexandre RICCIARDI <aricciardi501@gmail.com>"]
edition = "2021"

[lib]
Expand All @@ -15,4 +15,4 @@ log = "0.4"
simple_logger = "5.0.0"
tokio = { version = "1.28.2", features = ["full"] }
serde = { version = "1.0.105", features = ["derive"] }
bson = "2.0.1"
serde_json = "1.0.128"
16 changes: 8 additions & 8 deletions integration/cypher-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

use bson::Document;
use log::LevelFilter;
use serde_json::Value;
use simple_logger::SimpleLogger;
use zawgl_core::{model::init::InitContext, test_utils::build_dir_path_and_rm_old};
use zawgl_client::Client;
use std::future::Future;
Expand All @@ -27,7 +29,6 @@ pub async fn run_test<F, T>(db_name: &str, port: i32, lambda: F) where F : FnOnc

println!("BEGIN RUN {}", db_name);
let db_dir = build_dir_path_and_rm_old(db_name).expect("error");

let ctx = InitContext::new(&db_dir).expect("can't create database context");
let (tx_start, rx_start) = tokio::sync::oneshot::channel::<()>();
let address = format!("localhost:{}", port);
Expand Down Expand Up @@ -64,16 +65,15 @@ pub async fn run_test<F, T>(db_name: &str, port: i32, lambda: F) where F : FnOnc

}

pub fn extract_node_id(d: Document) -> Option<i64> {
let res = d.get_document("result").ok()?;
let graphs = res.get_array("graphs").ok()?;
pub fn extract_node_id(d: Value) -> Option<i64> {
let res = &d["result"]["graphs"];
let graphs = res.as_array()?;
assert_eq!(graphs.len(), 1);
let mut res = None;
for g in graphs {
let graph = g.as_document()?;
let nodes = graph.get_array("nodes").ok()?;
let nodes = g["nodes"].as_array()?;
for n in nodes {
let nid = n.as_document()?.get_i64("id").ok();
let nid = n["id"].as_i64();
res = nid;
}
assert_eq!(nodes.len(), 1);
Expand Down
55 changes: 31 additions & 24 deletions integration/cypher-tests/tests/aggregation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

use zawgl_client::{Client, parameters::{Parameters, Value}};
use serde_json::json;
use zawgl_client::Client;
use cypher_tests::{run_test, extract_node_id};

#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
Expand All @@ -34,11 +35,11 @@ async fn _test_aggregation(mut client: Client) {
let id1 = extract_node_id(d1).expect("node id");
let id2 = extract_node_id(d2).expect("node id");
for i in 0..100 {

let mut p = Parameters::new();
p.insert("pid1".to_string(), Value::Integer(id1));
p.insert("pid2".to_string(), Value::Integer(id2));
p.insert("weight".to_string(), Value::Integer(i));
let p = json!({
"pid1": id1,
"pid2": id2,
"weight": i
});
let result = client.execute_cypher_request_with_parameters("match (s:Person) where id(s) = $pid1 match (t:Person) where id(t) = $pid2 create (s)-[:IsFriendOf]->(new:Person {weight: $weight})-[:IsFriendOf]->(t) return new, s, t", p).await;
let res = result.expect("new person");
println!("{}", res.to_string());
Expand All @@ -64,9 +65,10 @@ async fn _test_aggregation_issue(mut client: Client) {
let id2 = extract_node_id(d2).expect("node id");
for _ in 0..100 {

let mut p = Parameters::new();
p.insert("pid1".to_string(), Value::Integer(id1));
p.insert("pid2".to_string(), Value::Integer(id2));
let p = json!({
"pid1": id1,
"pid2": id2
});
let result = client.execute_cypher_request_with_parameters("match (s:Person) where id(s) = $pid1 match (t:Person) where id(t) = $pid2 return s, t", p).await;
let res = result.expect("new person");
println!("{}", res.to_string());
Expand All @@ -91,9 +93,10 @@ async fn _test_aggregation_match_issue(mut client: Client) {
let _res = r.expect("new person");

}
let mut p = Parameters::new();
p.insert("pid1".to_string(), Value::Integer(id1));
p.insert("pid2".to_string(), Value::Integer(id2));
let p = json!({
"pid1": id1,
"pid2": id2
});
let result = client.execute_cypher_request_with_parameters("match (s:Person) where id(s) = $pid1 return s", p).await;
let res = result.expect("new person");
println!("{}", res.to_string());
Expand All @@ -113,25 +116,28 @@ async fn _test_aggregation_1(mut client: Client) {
for _ in 0..10 {
let result = client.execute_cypher_request("create (test:Person) return test").await;
if let Ok(doc) = result {
println!("{}", doc.to_string());
let id = extract_node_id(doc).expect("node id");
for __ in 0..10 {
let mut p = Parameters::new();
p.insert("pid".to_string(), Value::Integer(id));
p.insert("weight".to_string(), Value::Integer(1));
let result = client.execute_cypher_request_with_parameters("match (s:Person) where id(s) = $pid create (s)-[:IsFriendOf]->(new:Person {weight: $weight}) return new, s", p).await;
let res = result.expect("new person");
let p = json!({
"pid": id,
"weight": 1
});
let r1 = client.execute_cypher_request_with_parameters("match (s:Person) where id(s) = $pid create (s)-[:IsFriendOf]->(new:Person {weight: $weight}) return new, s", p).await;
let res = r1.expect("new person");
println!("{}", res.to_string());
assert_eq!(1, res["result"]["graphs"].as_array().unwrap().len());
}
}
}

let result = client.execute_cypher_request("match (test:Person)-[:IsFriendOf]->(new:Person) return test, sum(new.weight) as sum").await;
if let Ok(d) = result {
println!("{}", d.to_string());
let values = d.get_document("result").expect("result").get_array("values").expect("values");
let values = d["result"]["values"].as_array().expect("values");
assert_eq!(10, values.len());
for value in values {
let sum = value.as_array().expect("row")[1].as_document().expect("res value").get_f64("sum").expect("the sum");
let sum = value.as_array().expect("row")[1]["sum"].as_f64().expect("sum");
assert_eq!(10., sum);
}
} else {
Expand All @@ -151,9 +157,10 @@ async fn _test_aggregation_2(mut client: Client) {
if let Ok(doc) = result {
let id = extract_node_id(doc).expect("node id");
for __ in 0..10 {
let mut p = Parameters::new();
p.insert("pid".to_string(), Value::Integer(id));
p.insert("weight".to_string(), Value::Integer(1));
let p = json!({
"pid": id,
"weight": 1
});
let result = client.execute_cypher_request_with_parameters("match (s:Person) where id(s) = $pid
create (s)-[:IsFriendOf]->(new:Person {weight: $weight})
create (new)-[:IsFriendOf]->(new1:Person {weight: $weight})
Expand All @@ -168,10 +175,10 @@ async fn _test_aggregation_2(mut client: Client) {

let result = client.execute_cypher_request("match (test:Person)-[:IsFriendOf]->(new:Person)-[:IsFriendOf]->(new1:Person) return test, new, sum(new1.weight) as sum").await;
if let Ok(d) = result {
let values = d.get_document("result").expect("result").get_array("values").expect("values");
let values = d["result"]["values"].as_array().expect("values");
assert_eq!(100, values.len());
for value in values {
let sum = value.as_array().expect("row")[2].as_document().expect("res value").get_f64("sum").expect("the sum");
let sum = value.as_array().expect("row")[2]["sum"].as_f64().expect("the sum");
assert_eq!(2., sum);
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion integration/cypher-tests/tests/open_cypher_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn test_cypher_error_0() {
async fn test_cypher_syntax_error(mut client: Client) {
let r = client.execute_cypher_request("create (n:Person)) return n").await;
if let Ok(d) = r {
d.get_str("error").expect("error");
d["error"].as_str().expect("error");
} else {
assert!(false, "no response from server")
}
Expand Down
Loading

0 comments on commit 1fa1ab5

Please sign in to comment.