Skip to content

feat(torii): task network (dep graph) & processors #3172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions crates/torii/adigraphmap/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
edition.workspace = true
license.workspace = true
name = "torii-adigraphmap"
repository.workspace = true
version.workspace = true

[dependencies]
petgraph = "0.6"
thiserror.workspace = true
tracing.workspace = true
futures-util.workspace = true
tokio.workspace = true
12 changes: 12 additions & 0 deletions crates/torii/adigraphmap/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use thiserror::Error;

/// Errors that can occur when working with a DigraphMap
#[derive(Error, Debug)]
pub enum AcyclicDigraphMapError {
#[error("Node with key {0:?} not found")]
NodeNotFound(String),
#[error("Adding edge would create a cycle")]
CycleDetected,
#[error("Duplicate node key: {0:?}")]
DuplicateKey(String),
}
297 changes: 297 additions & 0 deletions crates/torii/adigraphmap/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
use error::AcyclicDigraphMapError;
use petgraph::graph::{DiGraph, NodeIndex};
use std::collections::{HashMap, HashSet};
use std::hash::Hash;

pub mod error;

/// Result type for DigraphMap operations
pub type Result<T> = std::result::Result<T, AcyclicDigraphMapError>;

/// A directed graph that maps dependencies between nodes with values of type `V`.
/// Each node has a unique key of type `K` that can be used to identify it.
#[derive(Debug)]
pub struct AcyclicDigraphMap<K, V>
where
K: Eq + Hash + Clone + std::fmt::Debug,
V: Clone,
{
/// The underlying directed graph
graph: DiGraph<V, ()>,

/// Map from node keys to their indices in the graph
node_indices: HashMap<K, NodeIndex>,
}

impl<K, V> AcyclicDigraphMap<K, V>
where
K: Eq + Hash + Clone + std::fmt::Debug,
V: Clone,
{
/// Create a new empty DigraphMap
pub fn new() -> Self {
Self { graph: DiGraph::new(), node_indices: HashMap::new() }
}

/// Add a node with dependencies to the graph
pub fn add_node_with_dependencies(
&mut self,
key: K,
value: V,
dependencies: Vec<K>,
) -> Result<NodeIndex> {
let node_idx = self.add_node(key.clone(), value)?;
for dependency in dependencies {
self.add_dependency(&dependency, &key)?;
}
Ok(node_idx)
}
Comment on lines +36 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! add_node_with_dependencies should be atomic.

The method adds a node first, then adds dependencies one by one. If adding a dependency fails (e.g., due to a cycle), the node remains in the graph without all its dependencies, leaving the graph in a potentially inconsistent state.

Consider making the operation atomic by validating all dependencies first:

pub fn add_node_with_dependencies(
    &mut self,
    key: K,
    value: V,
    dependencies: Vec<K>,
) -> Result<NodeIndex> {
+    // Validate all dependencies exist first
+    for dependency in &dependencies {
+        if !self.node_indices.contains_key(dependency) {
+            return Err(AcyclicDigraphMapError::NodeNotFound(format!("{:?}", dependency)));
+        }
+    }
+    
+    // Check for cycles before adding anything
+    if self.node_indices.contains_key(&key) {
+        return Err(AcyclicDigraphMapError::DuplicateKey(format!("{:?}", key)));
+    }
+    
+    // Temporarily add the node to check for cycles
+    let node_idx = self.graph.add_node(value.clone());
+    
+    // Check if adding any dependency would create a cycle
+    for dependency in &dependencies {
+        let from_idx = self.node_indices.get(dependency).unwrap().clone();
+        if petgraph::algo::has_path_connecting(&self.graph, node_idx, from_idx, None) {
+            // Remove the temporary node if we'd create a cycle
+            self.graph.remove_node(node_idx);
+            return Err(AcyclicDigraphMapError::CycleDetected);
+        }
+    }
+    
+    // Remove the temporary node if checks passed
+    self.graph.remove_node(node_idx);
+    
     let node_idx = self.add_node(key.clone(), value)?;
     for dependency in dependencies {
         self.add_dependency(&dependency, &key)?;
     }
     Ok(node_idx)
}

This solution is more complex but ensures the graph remains consistent. Alternatively, you could implement a transaction-like pattern that rolls back changes on error.


/// Add a node to the graph with the given key and value
pub fn add_node(&mut self, key: K, value: V) -> Result<NodeIndex> {
if self.node_indices.contains_key(&key) {
return Err(AcyclicDigraphMapError::DuplicateKey(format!("{:?}", key)));
}

let node_idx = self.graph.add_node(value);
self.node_indices.insert(key, node_idx);
Ok(node_idx)
}

/// Add an edge representing a dependency from `from` to `to`.
/// This means that `from` must be processed before `to`.
pub fn add_dependency(&mut self, from: &K, to: &K) -> Result<()> {
let from_idx = self.get_node_index(from)?;
let to_idx = self.get_node_index(to)?;

// Check if adding this edge would create a cycle
if self.would_create_cycle(from_idx, to_idx) {
return Err(AcyclicDigraphMapError::CycleDetected);
}

self.graph.add_edge(from_idx, to_idx, ());
Ok(())
}

/// Get a reference to the value associated with a key
pub fn get(&self, key: &K) -> Option<&V> {
self.node_indices.get(key).map(|&idx| &self.graph[idx])
}

/// Get a mutable reference to the value associated with a key
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
let idx = self.node_indices.get(key).cloned();
idx.map(move |idx| &mut self.graph[idx])
}

/// Get the node index for a key, or return an error if the key doesn't exist
fn get_node_index(&self, key: &K) -> Result<NodeIndex> {
self.node_indices
.get(key)
.cloned()
.ok_or_else(|| AcyclicDigraphMapError::NodeNotFound(format!("{:?}", key)))
}

/// Check if adding an edge from `from` to `to` would create a cycle
fn would_create_cycle(&self, from: NodeIndex, to: NodeIndex) -> bool {
// If there's already a path from to -> from, adding from -> to would create a cycle
petgraph::algo::has_path_connecting(&self.graph, to, from, None)
}
/// Get the nodes in topological order (respecting dependencies)
pub fn topo_sort(&self) -> Vec<(K, V)> {
let mut result = Vec::new();

// Perform topological sort (guaranteed to succeed since graph is acyclic)
let indices = petgraph::algo::toposort(&self.graph, None)
.expect("Graph is guaranteed to be acyclic due to dependency checks");

// Collect nodes in topological order
for idx in indices {
let node_value = self.graph[idx].clone();
if let Some(key) =
self.node_indices.iter().find(|&(_, &v)| v == idx).map(|(k, _)| k.clone())
{
result.push((key, node_value));
}
}
Comment on lines +108 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Topological helpers perform an O(N²) key‑lookup – keep a reverse map for linear time.

topo_sort() and topo_sort_by_level() reconstruct the (key, value) pairs by scanning self.node_indices for every single vertex (iter().find(|&(_, &v)| v == idx)).
For a graph with N vertices, that turns the overall complexity of these helpers into O(N²) and triggers a lot of allocator churn because of repeated clones.

A tiny reverse map maintained on insertion avoids this:

@@ struct AcyclicDigraphMap<K, V>
-    node_indices: HashMap<K, NodeIndex>,
+    node_indices: HashMap<K, NodeIndex>,
+    index_keys: HashMap<NodeIndex, K>, // <— new
@@ fn add_node(&mut self, key: K, value: V)
-        self.node_indices.insert(key, node_idx);
+        self.index_keys.insert(node_idx, key.clone());
+        self.node_indices.insert(key, node_idx);
@@ fn topo_sort(&self)
-            if let Some(key) =
-                self.node_indices.iter().find(|&(_, &v)| v == idx).map(|(k, _)| k.clone())
-            {
-                result.push((key, node_value));
-            }
+            if let Some(key) = self.index_keys.get(&idx) {
+                result.push((key.clone(), node_value));
+            }

The same one‑liner swap (self.index_keys.get(&node)) removes the quadratic scan inside topo_sort_by_level() as well.

Also applies to: 150-156


result
}

/// Get the nodes grouped by topological level, where each level contains nodes
/// that can be processed in parallel (no dependencies among them).
/// Returns a vector of vectors, where each inner vector represents a level.
pub fn topo_sort_by_level(&self) -> Vec<Vec<(K, V)>> {
let mut levels: Vec<Vec<(K, V)>> = Vec::new();
let mut remaining_nodes: HashSet<NodeIndex> = self.node_indices.values().cloned().collect();
let mut incoming_edges: HashMap<NodeIndex, usize> = HashMap::new();

// Initialize incoming edge counts
for node in self.graph.node_indices() {
let incoming = self.graph.neighbors_directed(node, petgraph::Incoming).count();
incoming_edges.insert(node, incoming);
}

// Find nodes with no incoming edges (roots) as level 0
let mut current_level_nodes: Vec<NodeIndex> = incoming_edges
.iter()
.filter(|&(_, &count)| count == 0)
.map(|(&node, _)| node)
.collect();

while !current_level_nodes.is_empty() {
let mut next_level_nodes: Vec<NodeIndex> = Vec::new();
let mut level_nodes: Vec<(K, V)> = Vec::new();

// Process all nodes in the current level
for node in current_level_nodes {
if remaining_nodes.remove(&node) {
// Find the key and value for this node
if let Some(key) =
self.node_indices.iter().find(|&(_, &v)| v == node).map(|(k, _)| k.clone())
{
let value = self.graph[node].clone();
level_nodes.push((key, value));
}

// Update dependencies for neighbors
for neighbor in self.graph.neighbors_directed(node, petgraph::Outgoing) {
let count = incoming_edges.get_mut(&neighbor).unwrap();
*count -= 1;
if *count == 0 {
next_level_nodes.push(neighbor);
}
}
}
}

if !level_nodes.is_empty() {
levels.push(level_nodes);
}

current_level_nodes = next_level_nodes;
}

levels
}

/// Check if the graph is empty
pub fn is_empty(&self) -> bool {
self.graph.node_count() == 0
}

/// Get the number of nodes in the graph
pub fn len(&self) -> usize {
self.graph.node_count()
}

/// Remove all nodes and edges from the graph
pub fn clear(&mut self) {
self.graph.clear();
self.node_indices.clear();
}
}

impl<K, V> Default for AcyclicDigraphMap<K, V>
where
K: Eq + Hash + Clone + std::fmt::Debug,
V: Clone,
{
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_add_and_get() {
let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new();
graph.add_node("a".to_string(), 1).unwrap();
graph.add_node("b".to_string(), 2).unwrap();

assert_eq!(graph.get(&"a".to_string()), Some(&1));
assert_eq!(graph.get(&"b".to_string()), Some(&2));
assert_eq!(graph.get(&"c".to_string()), None);
}

#[test]
fn test_add_dependency() {
let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new();
graph.add_node("a".to_string(), 1).unwrap();
graph.add_node("b".to_string(), 2).unwrap();

assert!(graph.add_dependency(&"a".to_string(), &"b".to_string()).is_ok());

// The order should be a, b
let result = graph.topo_sort();
assert_eq!(result.len(), 2);
assert_eq!(result[0].0, "a");
assert_eq!(result[1].0, "b");
}

#[test]
fn test_add_node_with_dependencies() {
let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new();
graph.add_node_with_dependencies("b".to_string(), 2, vec![]).unwrap();
graph.add_node_with_dependencies("a".to_string(), 1, vec!["b".to_string()]).unwrap();

let result = graph.topo_sort();
assert_eq!(result.len(), 2);
assert_eq!(result[0].0, "b");
assert_eq!(result[1].0, "a");
}

#[test]
fn test_cycle_detection() {
let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new();
graph.add_node("a".to_string(), 1).unwrap();
graph.add_node("b".to_string(), 2).unwrap();

assert!(graph.add_dependency(&"a".to_string(), &"b".to_string()).is_ok());
assert!(graph.add_dependency(&"b".to_string(), &"a".to_string()).is_err());
}

#[test]
fn test_topo_sort_by_level() {
let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new();

// Create a graph:
// a -> b -> d
// a -> c -> d
// Expected levels:
// Level 0: [a]
// Level 1: [b, c]
// Level 2: [d]

graph.add_node("a".to_string(), 1).unwrap();
graph.add_node("b".to_string(), 2).unwrap();
graph.add_node("c".to_string(), 3).unwrap();
graph.add_node("d".to_string(), 4).unwrap();

graph.add_dependency(&"a".to_string(), &"b".to_string()).unwrap();
graph.add_dependency(&"a".to_string(), &"c".to_string()).unwrap();
graph.add_dependency(&"b".to_string(), &"d".to_string()).unwrap();
graph.add_dependency(&"c".to_string(), &"d".to_string()).unwrap();

let levels = graph.topo_sort_by_level();

assert_eq!(levels.len(), 3, "Should have 3 levels");

// Level 0: a
assert_eq!(levels[0].len(), 1);
assert_eq!(levels[0][0].0, "a");

// Level 1: b, c (order within level may vary)
assert_eq!(levels[1].len(), 2);
let level1_keys: Vec<&str> = levels[1].iter().map(|(k, _)| k.as_str()).collect();
assert!(level1_keys.contains(&"b"));
assert!(level1_keys.contains(&"c"));

// Level 2: d
assert_eq!(levels[2].len(), 1);
assert_eq!(levels[2][0].0, "d");
}
}
19 changes: 19 additions & 0 deletions crates/torii/processors/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "torii-processors"
edition.workspace = true
license.workspace = true
license-file.workspace = true
repository.workspace = true
version.workspace = true

[dependencies]
torii-sqlite = { path = "../sqlite/sqlite" }
dojo-world.workspace = true
starknet.workspace = true
anyhow.workspace = true
async-trait.workspace = true
tokio.workspace = true
tracing.workspace = true
cainome.workspace = true
dojo-types.workspace = true
cainome-cairo-serde.workspace = true
Loading
Loading