Skip to content

Rust query modules with parallelisation #328

Open
@risufaj

Description

@risufaj

Hello,

I've been trying to develop a query module in rust that tries to parallelize some loops using the rayon library. The parallelization loops involve accessing nodes and edges. However, I get this error : G cannot be shared between threads safely where G is a MemgraphGraph object. I've seen that some of the query modules in C++ have parallelisation, but I haven't been able to find anything for rust.

Here is some code to reproduce this.
Cargo.toml

name = "parallel-example"
version = "0.1.0"
edition = "2018"

[dependencies]
c_str_macro = "1.0.2"
rayon = "1.5"


rsmgp-sys = { git = "https://github.com/memgraph/mage.git", tag="v1.7.0"}

[lib]
name = "parallel_example"
crate-type = ["cdylib"]

The src folder contains these files:
lib.rs

mod example;

use crate::example::MemgraphGraph;
use crate::example::example as example_algorithm;
use c_str_macro::c_str;
use rsmgp_sys::memgraph::*;
use rsmgp_sys::mgp::*;
use rsmgp_sys::result::*;
use rsmgp_sys::rsmgp::*;
use rsmgp_sys::value::*;
use rsmgp_sys::{close_module, define_optional_type, define_procedure, define_type, init_module};
use std::collections::{HashMap};
use std::ffi::{CString};
use std::os::raw::c_int;
use std::panic;

init_module!(|memgraph: &Memgraph| -> Result<()> {
    memgraph.add_read_procedure(
        example,
        c_str!("example"),
        &[define_type!("node_list", Type::List, Type::Int),],
        &[],
        &[
            define_type!("node_id", Type::Int),
        ],
    )?;
    Ok(())
});

fn write_nodes_to_records(memgraph: &Memgraph, nodes: Vec<i64>) -> Result<()> {
    for node_id in nodes {
        let record = memgraph.result_record()?;
        record.insert_int(c_str!("node_id"), node_id)?;
    }
    Ok(())
}

define_procedure!(example, |memgraph: &Memgraph| -> Result<()> {
    let args = memgraph.args()?;
    let Value::List(node_list) = args.value_at(0)? else {
        panic!("Failed to read node_list")
    };



    let node_list: Vec<i64> = node_list
        .iter()?
        .map(|value| match value {
            Value::Int(i) => i as i64,
            _ => panic!("Failed converting node_list to vector"),
        })
        .collect();

    let graph = MemgraphGraph::from_graph(memgraph);

    let result = example_algorithm(
        graph,
        &node_list
    );
    write_nodes_to_records(memgraph, result)?;
    Ok(())
});

close_module!(|| -> Result<()> { Ok(()) });

example.rs

use rsmgp_sys::memgraph::*;
use rsmgp_sys::result::Error as MgpError;
use rsmgp_sys::value::*;
use std::io;
use c_str_macro::c_str;
use rayon::prelude::*;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Vertex {
    pub id: i64,
}

#[derive(Debug)]
pub enum GraphError {
    IoError(io::Error),
    MgpError(MgpError),
}

impl From<io::Error> for GraphError {
    fn from(error: io::Error) -> Self {
        Self::IoError(error)
    }
}

impl From<MgpError> for GraphError {
    fn from(error: MgpError) -> Self {
        Self::MgpError(error)
    }
}

pub trait Graph {
    fn vertices_iter(&self) -> Result<Vec<Vertex>, GraphError>;
    fn neighbors(&self, vertex: Vertex) -> Result<Vec<Vertex>, GraphError>;
    fn weighted_neighbors(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
    fn add_vertex(&mut self, vertex: Vertex) -> Result<(), GraphError>;
    fn add_edge(&mut self, source: Vertex, target: Vertex, weight: f32) -> Result<(), GraphError>;
    fn num_vertices(&self) -> usize;
    fn get_vertex_by_id(&self, id: i64) -> Option<Vertex>;
    fn outgoing_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
    fn incoming_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError>;
}

pub struct MemgraphGraph<'a> {
    graph: &'a Memgraph,
}

impl<'a> MemgraphGraph<'a> {
    pub fn from_graph(graph: &'a Memgraph) -> Self {
        Self { graph }
    }
}

impl<'a> Graph for MemgraphGraph<'a> {
    fn vertices_iter(&self) -> Result<Vec<Vertex>, GraphError> {
        let vertices_iter = self.graph.vertices_iter()?;
        let vertices: Vec<_> = vertices_iter.map(|v| Vertex { id: v.id() }).collect();
        Ok(vertices)
    }

    fn incoming_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError> {
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let iter = vertex_mgp.in_edges()?.map(|e| {
            let target_vertex = e.from_vertex().unwrap();
            // if the vertex doesn't have a weight, we assume it's 1.0
            let weight = e
                .property(&c_str!("weight"))
                .ok()
                .and_then(|p| {
                    if let Value::Float(f) = p.value {
                        Some(f)
                    } else {
                        None
                    }
                })
                .unwrap_or(1.0);

            Ok::<(Vertex, f64), GraphError>((
                Vertex {
                    id: target_vertex.id(),
                },
                weight,
            ))
                .unwrap()
        });
        let incoming_edges: Vec<_> = iter.collect();
        Ok(incoming_edges)

    }

    fn outgoing_edges(&self, vertex: Vertex) -> Result<Vec<(Vertex, f64)>, GraphError> {
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let outgoing_edges_iter = vertex_mgp.out_edges()?.map(|e| {
            let target_vertex = e.to_vertex().unwrap();
            // if the vertex doesn't have a weight, we assume it's 1.0
            let weight = e
                .property(&c_str!("weight"))
                .ok()
                .and_then(|p| {
                    if let Value::Float(f) = p.value {
                        Some(f)
                    } else {
                        None
                    }
                })
                .unwrap_or(1.0);

            Ok::<(Vertex, f64), GraphError>((
                Vertex {
                    id: target_vertex.id(),
                },
                weight,
            ))
                .unwrap()
        });
        let outgoing_edges: Vec<_> = outgoing_edges_iter.collect();
        Ok(outgoing_edges)
    }

    fn weighted_neighbors(&self, vertex: Vertex) -> Result<Vec<(Vertex,f64)>, GraphError> {
        let mut outgoing_edges = self.outgoing_edges(vertex).unwrap();
        let incoming_edges = self.incoming_edges(vertex).unwrap();

        outgoing_edges.extend(incoming_edges);

        Ok(outgoing_edges)
    }

    fn neighbors(&self, vertex: Vertex) -> Result<Vec<Vertex>, GraphError> {
        let mut neighbors = vec![];
        let vertex_mgp = self.graph.vertex_by_id(vertex.id)?;
        let neighbors_iter = vertex_mgp.out_edges()?.map(|e| e.to_vertex());
        for neighbor_mgp in neighbors_iter {
            neighbors.push(Vertex {
                id: neighbor_mgp?.id(),
            });
        }
        let neighbors_in = vertex_mgp.in_edges()?.map(|e| e.from_vertex());
        for neighbor_mgp in neighbors_in {
            neighbors.push(Vertex {
                id: neighbor_mgp?.id(),
            });
        }
        Ok(neighbors)
    }

    fn add_vertex(&mut self, _vertex: Vertex) -> Result<(), GraphError> {
        !unimplemented!()
    }

    fn add_edge(&mut self, _source: Vertex, _target: Vertex, _weight: f32) -> Result<(), GraphError> {
        // let source_mgp = self.graph.vertex_by_id(source.id)?;
        // let target_mgp = self.graph.vertex_by_id(target.id)?;
        // self.graph.create_edge(source_mgp, target_mgp, weight)?;
        // Ok(())
        !unimplemented!()
    }

    fn num_vertices(&self) -> usize {
        self.graph.vertices_iter().unwrap().count()
    }

    fn get_vertex_by_id(&self, id: i64) -> Option<Vertex> {
        match self.graph.vertex_by_id(id) {
            Ok(_) => Some(Vertex { id }),
            Err(_) => None,
        }
    }
}

pub fn example<G: Graph>(
    graph: G,
    node_list: &[i64]
) -> Vec<i64> {

    node_list.par_iter()
        .filter_map(|&node_id| {
            graph.get_vertex_by_id(node_id)
        })
        .flat_map(|node| {
            graph.neighbors(node).unwrap_or_else(|_| Vec::new())
        })
        .map(|vertex| vertex.id)
        .collect()
}

Any help on how to move this forward would be great.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    In Progress

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions