Open
Description
Hello,
I've been trying to develop a query module in rust that tries to parallelize some loops using the rayon library. The parallelization loops involve accessing nodes and edges. However, I get this error : G cannot be shared between threads
safely where G is a MemgraphGraph object. I've seen that some of the query modules in C++ have parallelisation, but I haven't been able to find anything for rust.
Here is some code to reproduce this.
Cargo.toml
name = "parallel-example"
version = "0.1.0"
edition = "2018"
[dependencies]
c_str_macro = "1.0.2"
rayon = "1.5"
rsmgp-sys = { git = "https://github.com/memgraph/mage.git", tag="v1.7.0"}
[lib]
name = "parallel_example"
crate-type = ["cdylib"]
The src folder contains these files:
lib.rs
mod example;
use crate::example::MemgraphGraph;
use crate::example::example as example_algorithm;
use c_str_macro::c_str;
use rsmgp_sys::memgraph::*;
use rsmgp_sys::mgp::*;
use rsmgp_sys::result::*;
use rsmgp_sys::rsmgp::*;
use rsmgp_sys::value::*;
use rsmgp_sys::{close_module, define_optional_type, define_procedure, define_type, init_module};
use std::collections::{HashMap};
use std::ffi::{CString};
use std::os::raw::c_int;
use std::panic;
init_module!(|memgraph: &Memgraph| -> Result<()> {
memgraph.add_read_procedure(
example,
c_str!("example"),
&[define_type!("node_list", Type::List, Type::Int),],
&[],
&[
define_type!("node_id", Type::Int),
],
)?;
Ok(())
});
fn write_nodes_to_records(memgraph: &Memgraph, nodes: Vec<i64>) -> Result<()> {
for node_id in nodes {
let record = memgraph.result_record()?;
record.insert_int(c_str!("node_id"), node_id)?;
}
Ok(())
}
define_procedure!(example, |memgraph: &Memgraph| -> Result<()> {
let args = memgraph.args()?;
let Value::List(node_list) = args.value_at(0)? else {
panic!("Failed to read node_list")
};
let node_list: Vec<i64> = node_list
.iter()?
.map(|value| match value {
Value::Int(i) => i as i64,
_ => panic!("Failed converting node_list to vector"),
})
.collect();
let graph = MemgraphGraph::from_graph(memgraph);
let result = example_algorithm(
graph,
&node_list
);
write_nodes_to_records(memgraph, result)?;
Ok(())
});
close_module!(|| -> Result<()> { Ok(()) });
example.rs
use rsmgp_sys::memgraph::*;
use rsmgp_sys::result::Error as MgpError;
use rsmgp_sys::value::*;
use std::io;
use c_str_macro::c_str;
use rayon::prelude::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Vertex {
pub id: i64,
}
#[derive(Debug)]
pub enum GraphError {
IoError(io::Error),
MgpError(MgpError),
}
impl From<io::Error> for GraphError {
fn from(error: io::Error) -> Self {
Self::IoError(error)
}
}
impl From<MgpError> for GraphError {
fn from(error: MgpError) -> Self {
Self::MgpError(error)
}
}
pub trait Graph {
fn vertices_iter(&self) -> Result<Vec<Vertex>, GraphError>;
fn neighbors(&self, vertex: Vertex) -> Result<Vec<Vertex>, GraphError>;
fn weighted_neighbors(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
fn add_vertex(&mut self, vertex: Vertex) -> Result<(), GraphError>;
fn add_edge(&mut self, source: Vertex, target: Vertex, weight: f32) -> Result<(), GraphError>;
fn num_vertices(&self) -> usize;
fn get_vertex_by_id(&self, id: i64) -> Option<Vertex>;
fn outgoing_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
fn incoming_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
}
pub struct MemgraphGraph<'a> {
graph: &'a Memgraph,
}
impl<'a> MemgraphGraph<'a> {
pub fn from_graph(graph: &'a Memgraph) -> Self {
Self { graph }
}
}
impl<'a> Graph for MemgraphGraph<'a> {
fn vertices_iter(&self) -> Result<Vec<Vertex>, GraphError> {
let vertices_iter = self.graph.vertices_iter()?;
let vertices: Vec<_> = vertices_iter.map(|v| Vertex { id: v.id() }).collect();
Ok(vertices)
}
fn incoming_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError> {
let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
let iter = vertex_mgp.in_edges()?.map(|e| {
let target_vertex = e.from_vertex().unwrap();
// if the vertex doesn't have a weight, we assume it's 1.0
let weight = e
.property(&c_str!("weight"))
.ok()
.and_then(|p| {
if let Value::Float(f) = p.value {
Some(f)
} else {
None
}
})
.unwrap_or(1.0);
Ok::<(Vertex, f64), GraphError>((
Vertex {
id: target_vertex.id(),
},
weight,
))
.unwrap()
});
let incoming_edges: Vec<_> = iter.collect();
Ok(incoming_edges)
}
fn outgoing_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError> {
let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
let outgoing_edges_iter = vertex_mgp.out_edges()?.map(|e| {
let target_vertex = e.to_vertex().unwrap();
// if the vertex doesn't have a weight, we assume it's 1.0
let weight = e
.property(&c_str!("weight"))
.ok()
.and_then(|p| {
if let Value::Float(f) = p.value {
Some(f)
} else {
None
}
})
.unwrap_or(1.0);
Ok::<(Vertex, f64), GraphError>((
Vertex {
id: target_vertex.id(),
},
weight,
))
.unwrap()
});
let outgoing_edges: Vec<_> = outgoing_edges_iter.collect();
Ok(outgoing_edges)
}
fn weighted_neighbors(&self, vertex: Vertex) -> Result<Vec<(Vertex,f64)>, GraphError> {
let mut outgoing_edges = self.outgoing_edges(vertex).unwrap();
let incoming_edges = self.incoming_edges(vertex).unwrap();
outgoing_edges.extend(incoming_edges);
Ok(outgoing_edges)
}
fn neighbors(&self, vertex: Vertex) -> Result<Vec<Vertex>, GraphError> {
let mut neighbors = vec![];
let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
let neighbors_iter = vertex_mgp.out_edges()?.map(|e| e.to_vertex());
for neighbor_mgp in neighbors_iter {
neighbors.push(Vertex {
id: neighbor_mgp?.id(),
});
}
let neighbors_in = vertex_mgp.in_edges()?.map(|e| e.from_vertex());
for neighbor_mgp in neighbors_in {
neighbors.push(Vertex {
id: neighbor_mgp?.id(),
});
}
Ok(neighbors)
}
fn add_vertex(&mut self, _vertex: Vertex) -> Result<(), GraphError> {
!unimplemented!()
}
fn add_edge(&mut self, _source: Vertex, _target: Vertex, _weight: f32) -> Result<(), GraphError> {
// let source_mgp = self.graph.vertex_by_id(source.id)?;
// let target_mgp = self.graph.vertex_by_id(target.id)?;
// self.graph.create_edge(source_mgp, target_mgp, weight)?;
// Ok(())
!unimplemented!()
}
fn num_vertices(&self) -> usize {
self.graph.vertices_iter().unwrap().count()
}
fn get_vertex_by_id(&self, id: i64) -> Option<Vertex> {
match self.graph.vertex_by_id(id) {
Ok(_) => Some(Vertex { id }),
Err(_) => None,
}
}
}
pub fn example<G: Graph>(
graph: G,
node_list: &[i64]
) -> Vec<i64> {
node_list.par_iter()
.filter_map(|&node_id| {
graph.get_vertex_by_id(node_id)
})
.flat_map(|node| {
graph.neighbors(node).unwrap_or_else(|_| Vec::new())
})
.map(|vertex| vertex.id)
.collect()
}
Any help on how to move this forward would be great.
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
In Progress
Status
Todo