Skip to content

feat(torii): task network (dep graph) & processors #3172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Apr 17, 2025

Summary by CodeRabbit

  • New Features

    • Introduced a comprehensive modular framework for processing blockchain events and transactions, covering standards like ERC20, ERC721, ERC1155, ERC4906, World, and UDC.
    • Added specialized event processors for token transfers, metadata updates, event and model registration, upgrades, and controller contract deployments.
    • Developed a generic acyclic directed graph data structure to manage dependencies with cycle detection and topological sorting.
    • Built an asynchronous task network manager enabling parallel execution of tasks respecting dependency constraints and concurrency limits.
    • Enhanced transaction processing with support for multiple StarkNet transaction versions and nested call parsing.
  • Chores

    • Added new workspace crates and manifest files to support modularization, including crates for processors, acyclic graph management, and task network.

Copy link
Contributor

coderabbitai bot commented Apr 17, 2025

Walkthrough

Ohayo sensei! This update introduces three new Rust crates: torii-processors, torii-adigraphmap, and torii-task-network. The torii-processors crate defines a comprehensive, extensible framework for processing blockchain events, blocks, and transactions, including a suite of event processors for various token standards and world contract operations. The torii-adigraphmap crate implements a generic acyclic directed graph structure with dependency management and topological sorting capabilities. The torii-task-network crate provides an asynchronous task manager that schedules and executes tasks with dependency constraints in topological order, leveraging the graph structure from torii-adigraphmap. Each crate includes its own error handling and is configured for workspace integration.

Changes

File(s) / Path(s) Change Summary
crates/torii/processors/Cargo.toml, crates/torii/adigraphmap/Cargo.toml, crates/torii/task-network/Cargo.toml Added new Cargo.toml manifest files for the respective crates, specifying workspace-based configuration and dependencies.
crates/torii/processors/src/lib.rs Introduced core async traits (EventProcessor, BlockProcessor, TransactionProcessor) and config struct for blockchain data processing.
crates/torii/processors/src/processors/mod.rs Added processor module managing collections of event, block, and transaction processors, and their initialization logic.
crates/torii/processors/src/processors/controller.rs Implemented ControllerProcessor for handling cartridge controller deployment events.
crates/torii/processors/src/processors/erc20_legacy_transfer.rs, crates/torii/processors/src/processors/erc20_transfer.rs Added processors for ERC20 legacy and standard transfer events, including validation, processing, and database integration.
crates/torii/processors/src/processors/erc721_legacy_transfer.rs, crates/torii/processors/src/processors/erc721_transfer.rs Added processors for ERC721 legacy and standard transfer events with event validation, entity identification, and storage.
crates/torii/processors/src/processors/erc1155_transfer_batch.rs, crates/torii/processors/src/processors/erc1155_transfer_single.rs Added processors for ERC1155 batch and single transfer events, parsing event data and recording NFT transfers.
crates/torii/processors/src/processors/erc4906_metadata_update.rs, crates/torii/processors/src/processors/erc4906_batch_metadata_update.rs Added processors for ERC-4906 metadata update and batch update events, updating NFT metadata in the database.
crates/torii/processors/src/processors/event_message.rs Implemented processor for generic event messages, including deserialization and storage.
crates/torii/processors/src/processors/metadata_update.rs Added processor for handling metadata update events, including IPFS metadata retrieval and storage.
crates/torii/processors/src/processors/raw_event.rs Added a catch-all raw event processor with no-op logic.
crates/torii/processors/src/processors/register_event.rs, crates/torii/processors/src/processors/register_model.rs Added processors for registering events and models, fetching schema/layout, and database registration.
crates/torii/processors/src/processors/store_del_record.rs, crates/torii/processors/src/processors/store_set_record.rs, crates/torii/processors/src/processors/store_update_record.rs, crates/torii/processors/src/processors/store_update_member.rs Added processors for store record operations (set, delete, update record/member) with event deserialization and entity management.
crates/torii/processors/src/processors/upgrade_event.rs, crates/torii/processors/src/processors/upgrade_model.rs Added processors for event/model upgrade events, detecting and handling schema changes.
crates/torii/processors/src/processors/store_transaction.rs Implemented processor for extracting and storing transaction call data, including parsing modern and legacy execute calls.
crates/torii/adigraphmap/src/lib.rs, crates/torii/adigraphmap/src/error.rs Implemented an acyclic digraph map with dependency management, topological sorting, and error handling.
crates/torii/task-network/src/lib.rs, crates/torii/task-network/src/error.rs Implemented an async task manager with dependency-aware scheduling and error unification.

Sequence Diagram(s)

sequenceDiagram
    participant Blockchain
    participant Provider
    participant Processors
    participant EventProcessor
    participant Database

    Blockchain->>Provider: Emits block/tx/event
    Provider->>Processors: Passes event/block/tx
    Processors->>EventProcessor: Selects processor by contract type & event key
    EventProcessor->>Database: Processes and stores result
    EventProcessor-->>Processors: Processing complete
Loading
sequenceDiagram
    participant TaskNetwork
    participant TaskHandler
    participant Semaphore

    loop For each topological level
        TaskNetwork->>Semaphore: Acquire permits for level
        par For each task in level
            TaskNetwork->>TaskHandler: Execute task async
            TaskHandler-->>TaskNetwork: Task result
        end
        Semaphore-->>TaskNetwork: Release permits
    end
    TaskNetwork-->>TaskHandler: All tasks processed
Loading

Possibly related PRs

Suggested reviewers

  • glihm

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 26

♻️ Duplicate comments (2)
crates/torii/processors/src/lib.rs (2)

63-65: Bound adjustment identical to previous comment

BlockProcessor has the same missing Send bound; please align it with the fix suggested above.


75-77: Same Send‑bound issue for TransactionProcessor

Replicate the provider bound change here as well.

🧹 Nitpick comments (32)
crates/torii/processors/src/processors/raw_event.rs (1)

36-49: No-op process implementation is acceptable for now.

The empty implementation with just Ok(()) makes sense for a catch-all processor that doesn't need specific processing logic. The comment "We can choose to consider them, or not" suggests this design is intentional.

However, you might want to consider adding more detailed logging or telemetry to track the volume and types of events being caught by this fallback processor. This could help identify patterns that might warrant dedicated processors in the future.

 async fn process(
     &self,
     _world: &WorldContractReader<P>,
     _db: &mut Sql,
     _block_number: u64,
     _block_timestamp: u64,
     _event_id: &str,
     _event: &Event,
     _config: &EventProcessorConfig,
 ) -> Result<(), Error> {
     // We can choose to consider them, or not.
+    tracing::debug!(
+        target: "torii::indexer::processors::raw_event",
+        event_id = %_event_id,
+        from_address = ?_event.from_address,
+        keys = ?_event.keys,
+        data_len = %_event.data.len(),
+        "Processing unhandled event"
+    );

     Ok(())
 }
crates/torii/processors/src/processors/erc20_legacy_transfer.rs (3)

52-61: Validation logic is accurate for legacy ERC20 transfers.

The validation correctly checks for 1 key and 4 data elements according to the legacy ERC20 contract format cited in the comment.

However, the return statement could be simplified:

-        if event.keys.len() == 1 && event.data.len() == 4 {
-            return true;
-        }
-
-        false
+        event.keys.len() == 1 && event.data.len() == 4

77-78: Cairo deserialization could benefit from more robust error handling.

The code deserializes the value but doesn't handle potential issues with malformed data beyond propagating the error.

-        let value = U256Cainome::cairo_deserialize(&event.data, 2)?;
-        let value = U256::from_words(value.low, value.high);
+        let value = match U256Cainome::cairo_deserialize(&event.data, 2) {
+            Ok(v) => v,
+            Err(e) => {
+                tracing::warn!(
+                    target: LOG_TARGET,
+                    error = %e,
+                    token_address = ?token_address,
+                    from = ?from,
+                    to = ?to,
+                    "Failed to deserialize ERC20 transfer value"
+                );
+                return Err(anyhow::anyhow!("Failed to deserialize ERC20 transfer value: {}", e));
+            }
+        };
+        let value = U256::from_words(value.low, value.high);

90-90: Consider enhancing debug logging for transfers.

The current debug log contains the basic transfer details, but could include the token address for completeness.

-        debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "Legacy ERC20 Transfer.");
+        debug!(
+            target: LOG_TARGET,
+            token_address = ?token_address,
+            from = ?from,
+            to = ?to,
+            value = ?value,
+            "Legacy ERC20 Transfer."
+        );
crates/torii/processors/src/processors/erc1155_transfer_single.rs (1)

41-70: Expose the operator for richer analytics (optional)

We currently ignore operator (event.keys[1]). Indexing that field would let downstream consumers distinguish between direct and approved transfers, which can be valuable for audit trails.

+        let operator = event.keys[1];
         let from = event.keys[2];
         let to = event.keys[3];

         db.handle_nft_transfer(
             token_address,
+            operator,            // new
             from,
             to,
             token_id,
             amount,
             block_timestamp,
             event_id,
         )

Adapting the DB signature is, of course, a wider change, so treat this as a nice‑to‑have.

crates/torii/processors/src/processors/erc4906_batch_metadata_update.rs (1)

58-62: Consider a bounded iterator for huge ranges

The while token_id <= to_token_id { …; token_id += 1 } loop is correct but could run millions of iterations for gigantic collections, blocking other tasks.

A simple ergonomic tweak keeps readability and avoids accidental infinite loops:

-        let mut token_id = from_token_id;
-        while token_id <= to_token_id {
-            db.update_nft_metadata(token_address, token_id).await?;
-            token_id += U256::from(1u8);
-        }
+        for token_id in (from_token_id..=to_token_id).into_iter() {
+            db.update_nft_metadata(token_address, token_id).await?;
+        }

Internally this still iterates, but the intent is clearer and less error‑prone.

crates/torii/processors/src/processors/erc4906_metadata_update.rs (1)

49-54: Consider a lightweight pre‑flight guard for obviously invalid IDs.
If the on‑chain contract accidentally emits token_id == 0 for collections that start at 1, the DB will still be hit. A quick check (and early return Ok(())) would protect against spurious writes without measurable overhead.

+        // sanity‑check: ignore zero IDs which are invalid for many collections
+        if token_id.is_zero() {
+            debug!(target: LOG_TARGET, "Skipping zero token_id");
+            return Ok(());
+        }
crates/torii/processors/src/processors/erc20_transfer.rs (2)

44-56: Hashing strategy groups opposite‑direction transfers but ignores amount.
Using max(from, to) makes A→B and B→A share a task, which is great, but two concurrent transfers A→B for different amounts will still serialize because they hash to the same task‑id. If throughput becomes an issue, add the block number or event index to the hash to widen the distribution.

No immediate change required, just something to keep in mind while benchmarking.


85-85: Nit: missing space after first key in debug! macro.
Log output currently renders as ..,from=0xabc, to = ... (note the space asymmetry). Tiny, but consistent formatting helps log scrapers.

-        debug!(target: LOG_TARGET,from = ?from, to = ?to, value = ?value, "ERC20 Transfer.");
+        debug!(target: LOG_TARGET, from = ?from, to = ?to, value = ?value, "ERC20 Transfer.");
crates/torii/processors/src/processors/store_update_member.rs (1)

31-33: Ohayo sensei! validate() always succeeds – tighten the checks
Currently any event passes through, even if the key/data layout is malformed, which may later cause panics deeper in the pipeline. Consider validating:

  • event.keys.len() == 4 (hash, model selector, entity id, member selector)
  • !event.data.is_empty() – there should be at least one felt containing the new value(s)

This guards early and lets you surface a clean error instead of crashing during deserialization.

crates/torii/processors/src/processors/erc1155_transfer_batch.rs (2)

27-33: Ohayo sensei! Strengthen validate() for malformed data
event.data.is_empty() being false doesn’t guarantee the array is long enough to contain ids_len * 2 + values_len * 2 + 2 felts. A stricter check (e.g. at least 3 felts) will let you return early and avoid out‑of‑bounds reads later.


38-40: Ohayo sensei! Consider finer‑grained TaskId for better parallelism
Using the global sequential ID forces all ERC‑1155 batch transfers to serialize. Hashing (token_address, from, to) (or even (token_address, token_id)) would still guarantee ordering per NFT while allowing unrelated collections to process in parallel, improving throughput.

crates/torii/processors/src/processors/erc721_legacy_transfer.rs (2)

99-100: Ohayo sensei! Clarify log message for legacy path
The debug string says "ERC721 Transfer.", which is identical to the non‑legacy processor and can confuse log consumers. Append “legacy” to make grepping easier.

-debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "ERC721 Transfer.");
+debug!(target: LOG_TARGET, from = ?from, to = ?to, token_id = ?token_id, "Legacy ERC721 Transfer.");

72-98: Ohayo sensei! Duplicate logic with erc721_transfer.rs – extract a helper
Erc721LegacyTransferProcessor and the modern variant share almost identical hashing and DB‑write logic. Extracting a small util (e.g. compute_erc721_task_id, handle_erc721_transfer) would reduce duplication and ensure future fixes land in one spot.

crates/torii/processors/src/processors/erc721_transfer.rs (2)

50-55: Ohayo sensei – potential hash collisions in canonical address pairing

Using only std::cmp::max(from, to) drops half of the information:
pairs (A, B) and (C, B) both compress to B when B is the larger address, increasing the chance of unrelated transfers sharing the same task‑id and therefore being processed sequentially.

Consider hashing both addresses in a stable order instead:

-        let canonical_pair = std::cmp::max(event.keys[1], event.keys[2]);
-        canonical_pair.hash(&mut hasher);
+        let (low, high) = if event.keys[1] < event.keys[2] {
+            (event.keys[1], event.keys[2])
+        } else {
+            (event.keys[2], event.keys[1])
+        };
+        low.hash(&mut hasher);
+        high.hash(&mut hasher);

80-82: Ohayo sensei – deserialisation helper can panic on malformed events

U256Cainome::cairo_deserialize(&event.keys, 3)? will bubble an error that results in the whole block failing.
You may want to replace ? with graceful logging + return Ok(()) to skip malformed transfers instead of aborting indexing.

crates/torii/processors/src/processors/upgrade_event.rs (1)

70-76: Ohayo sensei – avoid shadowing the term “model” for events

The comment hints at future renaming but the variable and log messages still use “model”, which is confusing when processing events.
Renaming now will save a lot of grep‑pain later.

-        let model = match db.model(event.selector).await {
+        let stored_event = match db.model(event.selector).await {
crates/torii/processors/src/processors/store_set_record.rs (2)

38-43: Ohayo sensei – possible out‑of‑bounds in task_identifier
event.keys[1] and event.keys[2] assume ≥ 3 keys. If the upstream contract ever changes or emits a malformed log, this will panic before validate has a chance to filter. Either guard length or reuse the improved validate suggested above.

-fn task_identifier(&self, event: &Event) -> TaskId {
+fn task_identifier(&self, event: &Event) -> TaskId {
+    if event.keys.len() < 3 {
+        return TASK_ID_SEQUENTIAL; // fall back to sequential processing
+    }
     let mut hasher = DefaultHasher::new();
     event.keys[1].hash(&mut hasher);
     event.keys[2].hash(&mut hasher);
     hasher.finish()
 }

100-104: Ohayo sensei – clarify keys_and_unpacked ordering
The concatenation order [keys, values] is subtle; a quick comment or helper makes maintenance easier and prevents accidental re‑ordering in future edits.

crates/torii/processors/src/processors/upgrade_model.rs (3)

30-34: Ohayo sensei – unused validate compromise clarity
Same as the previous file: unconditional true defeats the purpose of the hook and can hide malformed events. Consider removing the method or adding real checks.


40-44: Ohayo sensei – hashing all keys may over‑serialize TaskId
If the event has many keys, this defeats deduplication and can scatter logically‑related upgrades across tasks. Hashing only the selector (event.keys[1]) would suffice and group upgrades per‑model.


92-97: Ohayo sensei – blocking call risk inside async context
model.set_block(...).await; performs an on‑chain RPC for every upgrade event. In a high‑traffic chain this may turn the event loop into sequential IO. Consider batching or caching results between events of the same block.

crates/torii/processors/src/task_manager.rs (1)

74-87: Ohayo sensei – semaphore created once, but tasks span multiple priorities
Creating the semaphore outside the priority loop is great; double‑check max_concurrent_tasks sizing so high‑priority floods don’t starve lower ones (there’s no fairness re‑queue). A comment explaining this policy would help future maintainers.

crates/torii/processors/src/processors/controller.rs (2)

26-52: Ohayo sensei — lazy_static is overkill for a plain constant
CARTRIDGE_MAGIC is fully known at compile‑time, so a const (or static without lazy_static!) will avoid an extra runtime init guard and removes the macro dependency.

-use lazy_static::lazy_static;
-
-lazy_static! {
-    pub(crate) static ref CARTRIDGE_MAGIC: [Felt; 22] = [
+/// Felt representation of "https://x.cartridge.gg/"
+pub(crate) const CARTRIDGE_MAGIC: [Felt; 22] = [-    ];
-}
+];

You can now drop the lazy_static import.


82-85: Validation comment contradicts the check
The comment says “has no keys” but the code enforces len() == 1. Please clarify which is correct for a ContractDeployed UDC event—otherwise fellow devs (and future you) will be confused.

-        // ContractDeployed event has no keys and contains username in data
-        event.keys.len() == 1 && !event.data.is_empty()
+        // UDC `ContractDeployed` emits exactly one key (class hash)
+        event.keys.len() == 1 && !event.data.is_empty()
crates/torii/processors/src/processors/metadata_update.rs (2)

35-37: No sanity‑check in validate – intentional?
validate always returns true. If that is deliberate, a short comment will prevent future attempts to “fix” it.

fn validate(&self, _event: &Event) -> bool {
    // All world events are well‑formed, no additional filters needed.
    true
}

87-90: Detached Tokio task may outlive DB connection
tokio::spawn moves a cloned Sql, but if the node shuts down before the task finishes you may leak in‑flight work. Consider using tokio::task::spawn_blocking with a JoinSet or hand the job to the task manager so graceful shutdown awaits completion.

crates/torii/processors/src/processors/store_transaction.rs (2)

180-211: Magic offsets for v2/v3 outside calls feel brittle
Hard‑coding indices (calldata[5], calldata[4]) ties the parser to a single compiler version. Consider parsing using the struct layout (e.g., via CairoSerde) or at least centralise the offsets as const with a unit test.


284-303: Silent fallback when execute deserialization fails
If both deserializers fail we silently store an empty call list. This hides bugs and makes debugging harder. Emit a warn! so operators know a transaction was skipped.

crates/torii/processors/src/lib.rs (3)

44-46: Avoid intermediate allocation when joining event keys

Collecting into a Vec<String> allocates one String per key plus the vector itself.
A small tweak avoids both allocations:

-        event.keys.iter().map(|i| format!("{:#064x}", i)).collect::<Vec<_>>().join(",")
+        event
+            .keys
+            .iter()
+            .map(|i| format!("{:#064x}", i))
+            .collect::<std::borrow::Cow<str>>()

Alternatively, use itertools::join or fmt::Write with a pre‑allocated String.


23-25: Potential namespace‑case mismatch

HashSet<String> is case‑sensitive. If namespaces come from user input or chain data with inconsistent casing, indexing may silently skip events. Consider normalising to lowercase on insertion & lookup or documenting the expectation.


28-34: API ergonomics of TaskProcessor

dependencies currently allocates a new Vec on every call. If the set of dependencies is static per processor, returning a slice or iterator would avoid repeated allocations:

-    fn dependencies(&self) -> Vec<TaskId>;
+    fn dependencies(&self) -> &[TaskId];

This requires small adjustments in call‑sites but improves performance and conveys immutability.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a64d6ad and 1298084.

📒 Files selected for processing (25)
  • crates/torii/processors/Cargo.toml (1 hunks)
  • crates/torii/processors/src/lib.rs (1 hunks)
  • crates/torii/processors/src/processors/controller.rs (1 hunks)
  • crates/torii/processors/src/processors/erc1155_transfer_batch.rs (1 hunks)
  • crates/torii/processors/src/processors/erc1155_transfer_single.rs (1 hunks)
  • crates/torii/processors/src/processors/erc20_legacy_transfer.rs (1 hunks)
  • crates/torii/processors/src/processors/erc20_transfer.rs (1 hunks)
  • crates/torii/processors/src/processors/erc4906_batch_metadata_update.rs (1 hunks)
  • crates/torii/processors/src/processors/erc4906_metadata_update.rs (1 hunks)
  • crates/torii/processors/src/processors/erc721_legacy_transfer.rs (1 hunks)
  • crates/torii/processors/src/processors/erc721_transfer.rs (1 hunks)
  • crates/torii/processors/src/processors/event_message.rs (1 hunks)
  • crates/torii/processors/src/processors/metadata_update.rs (1 hunks)
  • crates/torii/processors/src/processors/mod.rs (1 hunks)
  • crates/torii/processors/src/processors/raw_event.rs (1 hunks)
  • crates/torii/processors/src/processors/register_event.rs (1 hunks)
  • crates/torii/processors/src/processors/register_model.rs (1 hunks)
  • crates/torii/processors/src/processors/store_del_record.rs (1 hunks)
  • crates/torii/processors/src/processors/store_set_record.rs (1 hunks)
  • crates/torii/processors/src/processors/store_transaction.rs (1 hunks)
  • crates/torii/processors/src/processors/store_update_member.rs (1 hunks)
  • crates/torii/processors/src/processors/store_update_record.rs (1 hunks)
  • crates/torii/processors/src/processors/upgrade_event.rs (1 hunks)
  • crates/torii/processors/src/processors/upgrade_model.rs (1 hunks)
  • crates/torii/processors/src/task_manager.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (5)
crates/torii/processors/src/processors/raw_event.rs (7)
crates/torii/processors/src/lib.rs (5)
  • event_key (42-42)
  • validate (48-48)
  • process (51-60)
  • process (66-72)
  • process (78-88)
crates/torii/processors/src/processors/erc20_transfer.rs (5)
  • event_key (25-27)
  • validate (29-38)
  • task_priority (40-42)
  • task_identifier (44-56)
  • process (58-88)
crates/torii/processors/src/processors/event_message.rs (5)
  • event_key (27-29)
  • validate (31-33)
  • task_priority (35-37)
  • task_identifier (39-50)
  • process (52-97)
crates/torii/processors/src/processors/store_del_record.rs (4)
  • event_key (25-27)
  • validate (29-31)
  • task_priority (33-35)
  • task_identifier (37-42)
crates/torii/processors/src/processors/store_update_member.rs (2)
  • task_priority (35-37)
  • task_identifier (39-46)
crates/torii/processors/src/processors/store_update_record.rs (2)
  • task_priority (34-36)
  • task_identifier (38-45)
crates/torii/processors/src/processors/upgrade_event.rs (2)
  • task_priority (36-38)
  • task_identifier (40-44)
crates/torii/processors/src/processors/erc4906_metadata_update.rs (6)
crates/torii/processors/src/lib.rs (6)
  • event_key (42-42)
  • validate (48-48)
  • event (45-45)
  • process (51-60)
  • process (66-72)
  • process (78-88)
crates/torii/processors/src/processors/erc1155_transfer_single.rs (5)
  • event_key (23-25)
  • validate (27-31)
  • task_priority (33-35)
  • task_identifier (37-39)
  • process (41-74)
crates/torii/processors/src/processors/erc20_transfer.rs (5)
  • event_key (25-27)
  • validate (29-38)
  • task_priority (40-42)
  • task_identifier (44-56)
  • process (58-88)
crates/torii/processors/src/processors/erc4906_batch_metadata_update.rs (5)
  • event_key (23-25)
  • validate (27-31)
  • task_priority (33-35)
  • task_identifier (37-39)
  • process (41-73)
crates/torii/processors/src/processors/erc721_transfer.rs (5)
  • event_key (25-27)
  • validate (29-38)
  • task_priority (40-42)
  • task_identifier (44-64)
  • process (66-96)
crates/torii/processors/src/processors/metadata_update.rs (5)
  • event_key (31-33)
  • validate (35-37)
  • task_priority (39-41)
  • task_identifier (43-47)
  • process (49-93)
crates/torii/processors/src/processors/erc721_legacy_transfer.rs (8)
crates/torii/processors/src/processors/erc20_legacy_transfer.rs (4)
  • contract_type (44-46)
  • event_key (48-50)
  • validate (52-61)
  • process (63-93)
crates/torii/processors/src/processors/erc1155_transfer_batch.rs (5)
  • event_key (23-25)
  • validate (27-32)
  • task_priority (34-36)
  • task_identifier (38-40)
  • process (42-111)
crates/torii/processors/src/processors/erc1155_transfer_single.rs (5)
  • event_key (23-25)
  • validate (27-31)
  • task_priority (33-35)
  • task_identifier (37-39)
  • process (41-74)
crates/torii/processors/src/processors/erc4906_metadata_update.rs (5)
  • event_key (22-24)
  • validate (26-29)
  • task_priority (31-33)
  • task_identifier (35-37)
  • process (39-63)
crates/torii/processors/src/processors/erc721_transfer.rs (5)
  • event_key (25-27)
  • validate (29-38)
  • task_priority (40-42)
  • task_identifier (44-64)
  • process (66-96)
crates/torii/processors/src/processors/store_del_record.rs (4)
  • event_key (25-27)
  • validate (29-31)
  • task_priority (33-35)
  • task_identifier (37-42)
crates/torii/processors/src/processors/store_set_record.rs (4)
  • event_key (26-28)
  • validate (30-32)
  • task_priority (34-36)
  • task_identifier (38-43)
crates/torii/processors/src/processors/store_update_record.rs (2)
  • task_priority (34-36)
  • task_identifier (38-45)
crates/torii/processors/src/task_manager.rs (8)
crates/torii/processors/src/processors/erc20_transfer.rs (2)
  • task_identifier (44-56)
  • event_key (25-27)
crates/torii/processors/src/processors/erc721_transfer.rs (2)
  • task_identifier (44-64)
  • event_key (25-27)
crates/torii/processors/src/processors/event_message.rs (2)
  • task_identifier (39-50)
  • event_key (27-29)
crates/torii/processors/src/processors/metadata_update.rs (2)
  • task_identifier (43-47)
  • event_key (31-33)
crates/torii/processors/src/processors/raw_event.rs (2)
  • task_identifier (31-34)
  • event_key (19-21)
crates/torii/processors/src/processors/register_event.rs (2)
  • task_identifier (40-44)
  • event_key (26-28)
crates/torii/processors/src/processors/store_set_record.rs (1)
  • task_identifier (38-43)
crates/torii/processors/src/lib.rs (3)
  • contract_type (41-41)
  • event (45-45)
  • event_key (42-42)
crates/torii/processors/src/processors/mod.rs (6)
crates/dojo/world/src/contracts/abigen/world.rs (3)
  • register_event (3854-3869)
  • upgrade_event (4245-4260)
  • upgrade_model (4280-4295)
crates/torii/sqlite/sqlite/src/lib.rs (1)
  • store_transaction (517-556)
crates/torii/processors/src/lib.rs (1)
  • contract_type (41-41)
crates/torii/processors/src/processors/controller.rs (1)
  • contract_type (74-76)
crates/torii/processors/src/processors/erc20_legacy_transfer.rs (1)
  • contract_type (44-46)
crates/torii/processors/src/processors/erc721_legacy_transfer.rs (1)
  • contract_type (27-29)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build
🔇 Additional comments (17)
crates/torii/processors/Cargo.toml (2)

1-8: Ohayo Sensei! The package configuration looks good.

The crate metadata is well-structured and leverages workspace configuration. Good job on creating a clean and concise package definition.


9-17: Dependencies look complete and appropriate.

The dependencies include all necessary components for processing blockchain events - database access through torii-sqlite, blockchain interaction via dojo-world and starknet, async support with tokio and async-trait, and utilities like anyhow, tracing, and cainome for error handling, logging, and serialization.

crates/torii/processors/src/processors/raw_event.rs (4)

11-12: The default processor structure looks good, sensei.

The empty struct with Default and Debug traits is appropriate for this basic processor implementation.


19-21: Empty event key for catch-all processor - intentional design choice?

Using an empty string as the event key suggests this processor is meant to catch all events that don't match other processors.

Is this intentional to serve as a fallback processor for events not handled by other specialized processors?


23-25: Always validates events - appropriate for catch-all processor.

This behavior ensures that all events will be accepted by this processor, which aligns with its apparent purpose as a fallback processor.


31-34:

❓ Verification inconclusive

TODO for raw event parallelization.

Currently using sequential task ID which means raw events won't be processed in parallel.

Would you like me to create an issue to track this parallelization enhancement for raw events?


🏁 Script executed:

#!/bin/bash
# Check if there's a GitHub issue tracking this TODO
gh issue list --search "raw events parallelization" --repo "$(git remote get-url origin | sed 's/.*github.com\///' | sed 's/\.git//')"

Length of output: 152


No existing GitHub issue for raw event parallelization found

ohayo sensei! I searched for an issue tracking the “raw events parallelization” TODO in crates/torii/processors/src/processors/raw_event.rs (lines 31–34) and didn’t find one. Could you double‑check if a ticket already exists, or should we open a new GitHub issue to track this enhancement?

crates/torii/processors/src/processors/register_model.rs (3)

36-38: Highest task priority (0) for model registration - excellent choice, sensei!

Assigning the highest priority (0) to model registration is appropriate since models need to be registered before any events that might reference them.


80-85: Conditional block context setting looks good.

Setting the block context only when strict model reading is enabled is a good approach. This provides flexibility while ensuring accurate data retrieval in strict mode.


92-109: Comprehensive logging implementation - excellent work, sensei!

The logging is well-structured with different levels (info for high-level events, debug for details) and includes all relevant information. The use of structured logging attributes will make filtering and analyzing logs much easier.

crates/torii/processors/src/processors/erc20_legacy_transfer.rs (1)

22-36: Task processing implementation looks solid.

The processor correctly implements the TaskProcessor trait with no dependencies and a task identifier based on the contract address, which makes sense for grouping transfers from the same token contract.

crates/torii/processors/src/processors/erc1155_transfer_single.rs (1)

27-31: Ohayo sensei — validation looks solid!

The key & data‑length check accurately guards against malformed TransferSingle events. Nice and concise.

crates/torii/processors/src/processors/erc4906_metadata_update.rs (1)

26-29: Ohayo sensei! Validation logic looks spot‑on.
The key & data length check correctly distinguishes a single‑token ERC‑4906 update from the batch variant – nice attention to the spec.

crates/torii/processors/src/processors/store_update_record.rs (1)

103-112: Key‑field pruning mutates shared Ty in‑place – clone first to avoid surprises.
If another processor still holds a reference to the same schema instance, mutating children here could cause hard‑to‑trace bugs. Cloning before mutation isolates the change:

-        let mut entity = model.schema;
+        let mut entity = model.schema.clone();
crates/torii/processors/src/processors/mod.rs (1)

116-121: Ohayo sensei – return type of get_event_processors no longer matches map key/value

After changing the map, update the accessor accordingly and drop the hard‑coded Felt key that currently does not compile:

-    ) -> &HashMap<Felt, Vec<Box<dyn EventProcessor<P>>>> {
+    ) -> &EventProcessorMap<P> {

You may also want to return an Option rather than panicking with unwrap().

crates/torii/processors/src/processors/store_set_record.rs (1)

30-32: Ohayo sensei – validate is a no‑op, consider real guards
Returning true unconditionally means the later logic may happily index event.keys[1] & event.keys[2] without checking vector length or semantics. A cheap upfront guard (e.g. event.keys.len() >= 3) would turn potential panics into early, recoverable errors.

Would you like a small patch to tighten this?

crates/torii/processors/src/processors/upgrade_model.rs (1)

70-82: Ohayo sensei – error filtering misses namespace gating
When a model is missing you ignore all namespaces, not just the filtered subset (config.namespaces). Align with StoreSetRecordProcessor to avoid indexing noise when namespace filtering is active.

crates/torii/processors/src/processors/controller.rs (1)

131-133: Uniqueness/duplication handling for controllers
db.add_controller is called unconditionally. If the same address is redeployed (reorg or replay) will this violate a unique constraint? Consider making the DB insert idempotent or using INSERT … ON CONFLICT DO NOTHING.

Comment on lines +58 to +68
let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
panic!(
"Expected {} event to be well formed.",
<RegisterModelProcessor as EventProcessor<P>>::event_key(self)
)
}) {
WorldEvent::ModelRegistered(e) => e,
_ => {
unreachable!()
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Unwrap usage in event deserialization could be improved.

The code uses unwrap_or_else with a panic, which is risky in production code. While the comment explains why this should be safe, it would be more robust to handle the error gracefully.

-        let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
-            panic!(
-                "Expected {} event to be well formed.",
-                <RegisterModelProcessor as EventProcessor<P>>::event_key(self)
-            )
-        }) {
+        let event = match WorldEvent::try_from(event) {
+            Ok(event) => event,
+            Err(e) => {
+                tracing::error!(
+                    target: LOG_TARGET,
+                    error = %e,
+                    "Failed to parse {} event. This indicates a version mismatch between Torii and World.",
+                    <RegisterModelProcessor as EventProcessor<P>>::event_key(self)
+                );
+                return Err(anyhow::anyhow!(
+                    "Failed to parse {} event: {}",
+                    <RegisterModelProcessor as EventProcessor<P>>::event_key(self),
+                    e
+                ));
+            }
+        };

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +71 to +72
let namespace = event.namespace.to_string().unwrap();
let name = event.name.to_string().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Unwrapped string conversions should handle potential errors.

Similar to the previous comment, the unwrap calls for string conversion could be handled more gracefully.

-        let namespace = event.namespace.to_string().unwrap();
-        let name = event.name.to_string().unwrap();
+        let namespace = event.namespace.to_string().map_err(|_| {
+            anyhow::anyhow!("Failed to convert namespace to string")
+        })?;
+        let name = event.name.to_string().map_err(|_| {
+            anyhow::anyhow!("Failed to convert model name to string")
+        })?;

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +29 to +42
fn validate(&self, _event: &Event) -> bool {
true
}

fn task_priority(&self) -> TaskPriority {
2
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Guard against out‑of‑bounds panics in task_identifier

task_identifier blindly indexes event.keys[1] and event.keys[2], yet validate unconditionally returns true. A malformed or truncated event would panic the whole indexer before you even hit process. Let’s harden validation:

-    fn validate(&self, _event: &Event) -> bool {
-        true
+    fn validate(&self, event: &Event) -> bool {
+        // keys: [hash(StoreDelRecord), selector, entity_id]
+        event.keys.len() >= 3 && event.data.is_empty()
     }

If stronger guarantees are ever relaxed upstream, this small check keeps the node alive.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn validate(&self, _event: &Event) -> bool {
true
}
fn task_priority(&self) -> TaskPriority {
2
}
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
fn validate(&self, event: &Event) -> bool {
// keys: [hash(StoreDelRecord), selector, entity_id]
event.keys.len() >= 3 && event.data.is_empty()
}
fn task_priority(&self) -> TaskPriority {
2
}
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}

Comment on lines +56 to +66
let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
panic!(
"Expected {} event to be well formed.",
<StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
)
}) {
WorldEvent::StoreDelRecord(e) => e,
_ => {
unreachable!()
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace panic! with graceful error propagation

A malformed event will currently panic! and bring down the runtime. Bubble the error instead so higher‑level supervisors can decide whether to skip or retry.

-        let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
-            panic!(
-                "Expected {} event to be well formed.",
-                <StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
-            )
-        }) {
+        let event = match WorldEvent::try_from(event).map_err(|e| {
+            anyhow::anyhow!(
+                "Failed to decode {} event: {e}",
+                <StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
+            )
+        })? {

This preserves observability while keeping the processor fault‑tolerant.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
panic!(
"Expected {} event to be well formed.",
<StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
)
}) {
WorldEvent::StoreDelRecord(e) => e,
_ => {
unreachable!()
}
};
let event = match WorldEvent::try_from(event).map_err(|e| {
anyhow::anyhow!(
"Failed to decode {} event: {e}",
<StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
)
})? {
WorldEvent::StoreDelRecord(e) => e,
_ => {
unreachable!()
}
};

Comment on lines +31 to +50
fn validate(&self, _event: &Event) -> bool {
true
}

fn task_priority(&self) -> TaskPriority {
1
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
panic!("Expected EventEmitted keys to be well formed: {:?}", e);
});
// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);
hasher.finish()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei — make validate reflect reality to avoid panics

task_identifier deserialises event.data and will panic! if the layout is wrong, yet validate always returns true. Harden it:

-    fn validate(&self, _event: &Event) -> bool {
-        true
+    fn validate(&self, event: &Event) -> bool {
+        // Expect at least one felt to decode the keys vector length.
+        !event.data.is_empty()
     }

Alternatively, keep validate permissive but handle the error inside task_identifier without panicking.

This small guard saves the whole processor from crashing on malicious / corrupt data.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn validate(&self, _event: &Event) -> bool {
true
}
fn task_priority(&self) -> TaskPriority {
1
}
fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
let keys = Vec::<Felt>::cairo_deserialize(&event.data, 0).unwrap_or_else(|e| {
panic!("Expected EventEmitted keys to be well formed: {:?}", e);
});
// selector
event.keys[1].hash(&mut hasher);
// entity id
let entity_id = poseidon_hash_many(&keys);
entity_id.hash(&mut hasher);
hasher.finish()
}
fn validate(&self, event: &Event) -> bool {
// Expect at least one felt to decode the keys vector length.
!event.data.is_empty()
}

Comment on lines +96 to +116
async fn try_retrieve(mut db: Sql, resource: Felt, uri_str: String) {
match metadata(uri_str.clone()).await {
Ok((metadata, icon_img, cover_img)) => {
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap();
info!(
target: LOG_TARGET,
resource = %format!("{:#x}", resource),
"Updated resource metadata from ipfs."
);
}
Err(e) => {
error!(
target: LOG_TARGET,
resource = %format!("{:#x}", resource),
uri = %uri_str,
error = %e,
"Retrieving resource uri."
);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Multiple unwrap() calls inside background task
update_metadata(...).unwrap() and several ? operators will panic within the spawned task, bypassing the Result handling of the processor. Wrap in if let Err(e) = … { error!(…) } to keep the indexer alive.

Comment on lines +74 to +75
let uri_str = event.uri.to_string().unwrap();
info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

unwrap() here can bubble a panic from malformed URIs
A bad URI emitted by a buggy contract (or corrupted node) will crash the processor. Return an error instead:

-let uri_str = event.uri.to_string().unwrap();
+let Some(uri_str) = event.uri.to_string() else {
+    return Err(Error::msg("Invalid URI in MetadataUpdate event"));
+};

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +145 to +161
let to_offset = base_offset;
let selector_offset = to_offset + 1;
let calldata_offset = selector_offset + 2;
let calldata_len: usize = calldata[selector_offset + 1].try_into().unwrap();
let contract_address = calldata[to_offset];

let contract_class =
contract_class_cache.get(contract_address, BlockId::Tag(BlockTag::Pending)).await?;

let entrypoint = get_entrypoint_name_from_class(&contract_class, calldata[selector_offset])
.unwrap_or(format!("{:#x}", calldata[selector_offset]));

// Calculate next offset: current offset + contract_address + selector + calldata_len +
// calldata
let next_offset = calldata_offset + calldata_len;

Ok((
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

parse_outside_call assumes perfect calldata – can panic
Several unchecked index reads (calldata[selector_offset + 1], etc.). Add length guards to avoid panics on malformed input.

Comment on lines +116 to +137
async fn parse_legacy_execute_call<P: Provider + Send + Sync + std::fmt::Debug>(
contract_class_cache: &ContractClassCache<P>,
call: &LegacyExecuteCall,
full_calldata: &[Felt],
caller_address: Felt,
call_type: CallType,
) -> Result<ParsedCall> {
let contract_class = contract_class_cache
.get(call.contract_address, BlockId::Tag(BlockTag::Pending))
.await?;

let entrypoint = get_entrypoint_name_from_class(&contract_class, call.selector)
.unwrap_or(format!("{:#x}", call.selector));

Ok(ParsedCall {
contract_address: call.contract_address,
entrypoint,
calldata: full_calldata[call.data_offset..call.data_offset + call.data_length].to_vec(),
call_type,
caller_address,
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bounds‑check before slicing legacy calldata
full_calldata[off..off+len] will panic if the values are wrong. Safeguard it, sensei:

-let data = full_calldata[call.data_offset..call.data_offset + call.data_length].to_vec();
+if call.data_offset + call.data_length > full_calldata.len() {
+    return Err(anyhow::anyhow!("Legacy calldata slice out of bounds"));
+}
+let data = full_calldata[call.data_offset..call.data_offset + call.data_length].to_vec();

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +36 to +40
#[async_trait]
pub trait EventProcessor<P>: Send + Sync
where
P: Provider + Sync,
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei – Add Send bound to provider generic to satisfy async_trait default

#[async_trait] produces a Send future by default.
Because P is awaited across .process(), the compiler will complain if P is not Send.
Either add the bound to all processor traits or switch to #[async_trait(?Send)] (less common).

-where
-    P: Provider + Sync,
+where
+    P: Provider + Send + Sync,

Apply the same change to BlockProcessor and TransactionProcessor for consistency.
Failing to do so will cause a compile‑time error once a non‑Send provider is used.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[async_trait]
pub trait EventProcessor<P>: Send + Sync
where
P: Provider + Sync,
{
#[async_trait]
pub trait EventProcessor<P>: Send + Sync
where
P: Provider + Send + Sync,
{

Copy link

codecov bot commented Apr 17, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 55.61%. Comparing base (a64d6ad) to head (763b573).
Report is 9 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3172      +/-   ##
==========================================
- Coverage   55.63%   55.61%   -0.02%     
==========================================
  Files         443      443              
  Lines       62987    62987              
==========================================
- Hits        35041    35029      -12     
- Misses      27946    27958      +12     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
crates/torii/digraphmap/src/lib.rs (4)

51-53: Pass the key once – the second Debug format is unnecessary

Here we already know the variant’s Display will format the string; feeding it
a string that has gone through format!("{:?}", key) adds the superfluous
quotes discussed earlier. Passing key.to_string() (or keeping Debug in the
error and changing the Display string as shown in error.rs) avoids the cosmetic
issue.

-return Err(DigraphMapError::DuplicateKey(format!("{:?}", key)));
+return Err(DigraphMapError::DuplicateKey(key.to_string()));

87-92: Same quoting concern for NodeNotFound

-.ok_or_else(|| DigraphMapError::NodeNotFound(format!("{:?}", key)))
+.ok_or_else(|| DigraphMapError::NodeNotFound(key.to_string()))

Keeping the key un‑quoted yields cleaner error text.
If you adopt the {0} change in error.rs, this line can remain as‑is with
format!("{:?}", …) removed.


105-118: topo_sort is currently O(n²); maintain a reverse map for O(n)

For every node index you iterate over the entire node_indices map to locate
its key. In large graphs this becomes noticeably quadratic.

A lightweight fix is to store a second HashMap<NodeIndex, K> when you insert
nodes. You already pay the cost of a single extra key clone at insertion time,
but topological sorts become strictly linear.

@@ struct DigraphMap
-    /// Map from node keys to their indices in the graph
+    /// Map from node keys to their indices in the graph
     node_indices: HashMap<K, NodeIndex>,
+    /// Reverse map for fast lookup during topo‑sort
+    index_to_key: HashMap<NodeIndex, K>,

and on insertion:

 self.node_indices.insert(key.clone(), node_idx);
 self.index_to_key.insert(node_idx, key);

then inside topo_sort:

if let Some(key) = self.index_to_key.get(&idx) {
    result.push((key.clone(), node_value));
}

This leaves algorithmic complexity at O(n + e).


160-210: Great test coverage – consider an explicit cycle topo‑sort test

You already assert that adding a back‑edge errors. To exercise the warning path
in topo_sort, add a test that builds a graph with a deliberate cycle, calls
topo_sort, and ensures you still get all nodes back (ordering arbitrary).
This protects the fallback branch from future regressions.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 1298084 and d15e4a3.

📒 Files selected for processing (3)
  • crates/torii/digraphmap/Cargo.toml (1 hunks)
  • crates/torii/digraphmap/src/error.rs (1 hunks)
  • crates/torii/digraphmap/src/lib.rs (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • crates/torii/digraphmap/Cargo.toml
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build
🔇 Additional comments (1)
crates/torii/digraphmap/src/lib.rs (1)

32-38: Ohayo sensei! Nice, idiomatic constructor 👌

The new() implementation is clean and straight to the point. Nothing to tweak
here.

Comment on lines 6 to 11
#[error("Node with key {0:?} not found")]
NodeNotFound(String),
#[error("Adding edge would create a cycle")]
CycleDetected,
#[error("Duplicate node key: {0:?}")]
DuplicateKey(String),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid double‑quoting in Display messages by using {0} instead of {0:?}

thiserror will feed the inner String into the format string.
Because the code currently does

DigraphMapError::NodeNotFound(format!("{:?}", key))

and the Display line again prints {0:?}, a key like "a" ends up rendered as
Node with key "\"a\"" not found, i.e. with twice the quotes.
Switching to {0} keeps the human‑readable value while avoiding the extra
escaping.

-#[error("Node with key {0:?} not found")]
+#[error("Node with key {0} not found")]-#[error("Duplicate node key: {0:?}")]
+#[error("Duplicate node key: {0}")]

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
crates/torii/adigraphmap/src/lib.rs (1)

96-99: Cycle test uses has_path_connecting – consider early exit optimisation.

petgraph::algo::has_path_connecting performs a full walk each time a new edge is considered.
For dense graphs this can dominate build time. A lightweight alternative is to maintain a Dag and rely on the returned Err from add_edge, or keep ancestor sets per node.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between d15e4a3 and 3771f49.

📒 Files selected for processing (3)
  • crates/torii/adigraphmap/Cargo.toml (1 hunks)
  • crates/torii/adigraphmap/src/error.rs (1 hunks)
  • crates/torii/adigraphmap/src/lib.rs (1 hunks)
✅ Files skipped from review due to trivial changes (2)
  • crates/torii/adigraphmap/Cargo.toml
  • crates/torii/adigraphmap/src/error.rs
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build
🔇 Additional comments (1)
crates/torii/adigraphmap/src/lib.rs (1)

42-46: Dependency insertion requires pre‑existing nodes – is that intentional, sensei?

add_node_with_dependencies() first inserts the current node, then calls add_dependency() which fails with NodeNotFound if any dependency hasn’t been added yet.
That forces callers to create all dependency nodes beforehand, disallowing a more declarative “add everything in any order” style.

If the stricter contract was not intended, you could lazily insert missing dependency nodes with a placeholder value (or return a richer error that lists the missing keys).

Would you like this crate to support out‑of‑order insertion? Let me know and I can sketch the patch.

Comment on lines +108 to +116
// Collect nodes in topological order
for idx in indices {
let node_value = self.graph[idx].clone();
if let Some(key) =
self.node_indices.iter().find(|&(_, &v)| v == idx).map(|(k, _)| k.clone())
{
result.push((key, node_value));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Topological helpers perform an O(N²) key‑lookup – keep a reverse map for linear time.

topo_sort() and topo_sort_by_level() reconstruct the (key, value) pairs by scanning self.node_indices for every single vertex (iter().find(|&(_, &v)| v == idx)).
For a graph with N vertices, that turns the overall complexity of these helpers into O(N²) and triggers a lot of allocator churn because of repeated clones.

A tiny reverse map maintained on insertion avoids this:

@@ struct AcyclicDigraphMap<K, V>
-    node_indices: HashMap<K, NodeIndex>,
+    node_indices: HashMap<K, NodeIndex>,
+    index_keys: HashMap<NodeIndex, K>, // <— new
@@ fn add_node(&mut self, key: K, value: V)
-        self.node_indices.insert(key, node_idx);
+        self.index_keys.insert(node_idx, key.clone());
+        self.node_indices.insert(key, node_idx);
@@ fn topo_sort(&self)
-            if let Some(key) =
-                self.node_indices.iter().find(|&(_, &v)| v == idx).map(|(k, _)| k.clone())
-            {
-                result.push((key, node_value));
-            }
+            if let Some(key) = self.index_keys.get(&idx) {
+                result.push((key.clone(), node_value));
+            }

The same one‑liner swap (self.index_keys.get(&node)) removes the quadratic scan inside topo_sort_by_level() as well.

Also applies to: 150-156

@Larkooo Larkooo changed the title refactor(torii): processors crate & generalized tasks feat(torii): task network (dep graph) & processors Apr 17, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
crates/torii/task-network/src/lib.rs (2)

32-38: Guard against zero‑concurrency deadlock

Semaphore::new(0) causes every worker to wait forever. Reject max_concurrent_tasks == 0 during construction (and maybe upper‑bound it to usize::MAX / 2 to avoid overflow surprises).

pub fn new(max_concurrent_tasks: usize) -> Self {
-    Self {
-        tasks: AcyclicDigraphMap::new(),
-        max_concurrent_tasks,
-    }
+    assert!(max_concurrent_tasks > 0, "max_concurrent_tasks must be > 0");
+    Self {
+        tasks: AcyclicDigraphMap::new(),
+        max_concurrent_tasks,
+    }
}

122-126: Consider clearing the graph even on early error

If process_tasks returns early due to a join or task error, the internal graph remains populated, so a subsequent call will rerun failed tasks unless the caller remembers to call clear() manually. Dropping the tasks in a finally‑style block (tokio::select! or match) guarantees a clean state for the next run.

This is a minor maintenance safeguard; feel free to keep the current semantics if callers always rebuild a fresh TaskNetwork.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 3771f49 and 5095c09.

📒 Files selected for processing (3)
  • crates/torii/task-network/Cargo.toml (1 hunks)
  • crates/torii/task-network/src/error.rs (1 hunks)
  • crates/torii/task-network/src/lib.rs (1 hunks)
✅ Files skipped from review due to trivial changes (2)
  • crates/torii/task-network/src/error.rs
  • crates/torii/task-network/Cargo.toml
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build

Comment on lines 93 to 120
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await.map_err(|e| TaskNetworkError::SemaphoreError(e))?;

debug!(
target: LOG_TARGET,
task_id = %task_id,
level = level_idx,
"Processing task."
);

match task_handler(task_id, task_clone).await {
Ok(_) => Ok(()),
Err(e) => {
error!(
target: LOG_TARGET,
error = %e,
task_id = %task_id,
"Error processing task."
);
Err(e)
}
}
}));
}

// Wait for all tasks in this level to complete before proceeding to the next level
try_join_all(handles).await.map_err(|e| TaskNetworkError::JoinError(e))?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Ohayo sensei – task‑level failures are silently swallowed

try_join_all only fails fast on a JoinHandle panic/cancel.
Because every JoinHandle resolves to Ok(Result<(), anyhow::Error>), an error returned by your task_handler is wrapped inside the Ok variant and therefore never bubbles up. process_tasks will happily return Ok(()), masking real processing errors.

-            try_join_all(handles).await.map_err(|e| TaskNetworkError::JoinError(e))?;
+            // Wait for all handles and propagate the first task‑handler error (if any)
+            let task_results = try_join_all(handles)
+                .await
+                .map_err(TaskNetworkError::JoinError)?;
+
+            if let Some(err) = task_results.into_iter().find_map(|inner| inner.err()) {
+                return Err(TaskNetworkError::TaskHandlerError(err));
+            }

Make sure TaskNetworkError has a TaskHandlerError(anyhow::Error) variant (or equivalent) so callers can react appropriately.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

♻️ Duplicate comments (13)
crates/torii/adigraphmap/src/lib.rs (2)

108-116: Ohayo sensei! Topological helpers perform an O(N²) key-lookup – keep a reverse map for linear time.

The topo_sort() method uses an inefficient linear search to find keys by node index, which results in O(N²) complexity for the entire topological sort. This is particularly costly for large graphs.

Consider adding a reverse mapping from node indices to keys:

 pub struct AcyclicDigraphMap<K, V>
 where
     K: Eq + Hash + Clone + std::fmt::Debug,
     V: Clone,
 {
     /// The underlying directed graph
     graph: DiGraph<V, ()>,
 
     /// Map from node keys to their indices in the graph
     node_indices: HashMap<K, NodeIndex>,
+    /// Map from node indices to their keys for efficient reverse lookup
+    index_keys: HashMap<NodeIndex, K>,
 }

Then update add_node() and topo_sort():

 pub fn add_node(&mut self, key: K, value: V) -> Result<NodeIndex> {
     if self.node_indices.contains_key(&key) {
         return Err(AcyclicDigraphMapError::DuplicateKey(format!("{:?}", key)));
     }

     let node_idx = self.graph.add_node(value);
     self.node_indices.insert(key.clone(), node_idx);
+    self.index_keys.insert(node_idx, key);
     Ok(node_idx)
 }

 // In topo_sort()
-if let Some(key) =
-    self.node_indices.iter().find(|&(_, &v)| v == idx).map(|(k, _)| k.clone())
-{
-    result.push((key, node_value));
-}
+if let Some(key) = self.index_keys.get(&idx) {
+    result.push((key.clone(), node_value));
+}

150-156: Ohayo sensei! Same O(N²) issue in topo_sort_by_level method.

The same linear search inefficiency exists in the topo_sort_by_level() method, which could significantly impact performance.

With the index_keys map suggested earlier, you can replace this lookup:

-if let Some(key) =
-    self.node_indices.iter().find(|&(_, &v)| v == node).map(|(k, _)| k.clone())
-{
-    let value = self.graph[node].clone();
-    level_nodes.push((key, value));
-}
+if let Some(key) = self.index_keys.get(&node) {
+    let value = self.graph[node].clone();
+    level_nodes.push((key.clone(), value));
+}
crates/torii/processors/src/processors/store_update_member.rs (2)

70-76: Ohayo sensei! Replace the panic with a recoverable error

try_from(event).unwrap_or_else(|_| panic!(...)) will abort the whole processor task for a single bad event. Prefer bubbling the error so the task manager can decide whether to skip, retry, or mark the event failed.

-let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
-    panic!(
-        "Expected {} event to be well formed.",
-        <StoreUpdateMemberProcessor as EventProcessor<P>>::event_key(self)
-    )
-}) {
+let event = match WorldEvent::try_from(event).map_err(|e| {
+    anyhow::anyhow!(
+        "Invalid {} event format: {e}",
+        <StoreUpdateMemberProcessor as EventProcessor<P>>::event_key(self)
+    )
+})? {

89-106: Ohayo sensei! Avoid string‑matching on error messages

Err(e) if e.to_string().contains("no rows") is brittle and locale‑dependent. If the SQL layer exposes a specific error kind/enum (e.g. sqlx::Error::RowNotFound), match on that instead.

-Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
+Err(sqlx::Error::RowNotFound) if !config.namespaces.is_empty() => {

If such an enum isn't available, consider adding a helper in torii_sqlite that returns a typed error you can match on.

crates/torii/processors/src/processors/store_del_record.rs (2)

52-54: Ohayo sensei! Guard against out‑of‑bounds panics in validate

validate unconditionally returns true, yet the processor accesses specific indices of event.keys. A malformed or truncated event would panic the whole indexer. Let's harden validation:

-fn validate(&self, _event: &Event) -> bool {
-    true
+fn validate(&self, event: &Event) -> bool {
+    // keys: [hash(StoreDelRecord), selector, entity_id]
+    event.keys.len() >= 3 && event.data.is_empty()
}

79-89: Ohayo sensei! Replace panic! with graceful error propagation

A malformed event will currently panic! and bring down the runtime. Bubble the error instead so higher‑level supervisors can decide whether to skip or retry.

-let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
-    panic!(
-        "Expected {} event to be well formed.",
-        <StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
-    )
-}) {
+let event = match WorldEvent::try_from(event).map_err(|e| {
+    anyhow::anyhow!(
+        "Failed to decode {} event: {e}",
+        <StoreDelRecordProcessor as EventProcessor<P>>::event_key(self)
+    )
+})? {
crates/torii/processors/src/processors/store_set_record.rs (2)

80-97: Ohayo sensei! Avoid string‑based SQL error matching.

Matching e.to_string().contains("no rows") is brittle and locale‑dependent. If the SQL layer exposes a specific error kind/enum, match on that instead.


66-76: 🛠️ Refactor suggestion

Ohayo sensei! Replace panic with graceful error propagation.

The current implementation panics on deserialization failure. This is not resilient and could bring down the entire processing pipeline.

-let event = match WorldEvent::try_from(event).unwrap_or_else(|_| {
-    panic!(
-        "Expected {} event to be well formed.",
-        <StoreSetRecordProcessor as EventProcessor<P>>::event_key(self)
-    )
-}) {
-    WorldEvent::StoreSetRecord(e) => e,
-    _ => {
-        unreachable!()
-    }
+let event = match WorldEvent::try_from(event) {
+    Ok(WorldEvent::StoreSetRecord(e)) => e,
+    Ok(_) => {
+        return Err(anyhow::anyhow!(
+            "Received a non-StoreSetRecord event under key {}",
+            self.event_key()
+        ));
+    }
+    Err(e) => {
+        return Err(anyhow::anyhow!(
+            "Failed to decode {} event: {}",
+            self.event_key(),
+            e
+        ));
+    }
}
crates/torii/processors/src/processors/mod.rs (2)

51-52: Ohayo sensei! Fatal type‑mismatch in EventProcessorMap definition

EventProcessorMap is declared as HashMap<EventKey, Box<…>>, yet get_event_processors (and the design intent of allowing multiple processors per key) expects a HashMap<…, Vec<Box<…>>>.

This discrepancy will prevent processors with the same event key (like the two "Transfer" processors for ERC‑20/721) from both being registered, as they'll silently overwrite each other.

type EventKey = String;
-type EventProcessorMap<P> = HashMap<EventKey, Box<dyn EventProcessor<P>>>;
+type EventProcessorMap<P> = HashMap<EventKey, Vec<Box<dyn EventProcessor<P>>>>;

123-127: Ohayo sensei! Processors are being overwritten instead of accumulated

Because .insert(key, processor) is used, only the last processor for a given event‑key survives.
After applying the previous fix to make EventProcessorMap use a Vec, accumulate processors with entry(key).or_default().push(...):

-let key = processor.event_key();
-event_processors_map.entry(contract_type).or_default().insert(key, processor);
+let key = processor.event_key();
+event_processors_map
+    .entry(contract_type)
+    .or_default()
+    .entry(key)
+    .or_default()
+    .push(processor);
crates/torii/processors/src/lib.rs (3)

30-34: Ohayo sensei – Add Send bound to provider generic to satisfy async_trait default

The async_trait macro produces a Send future by default. Because the provider P is awaited within the .process() implementation, the compiler will require P to be Send.

-where
-    P: Provider + Sync,
+where
+    P: Provider + Send + Sync,

This is a critical fix to ensure your code compiles when a provider is used in an async context.


79-81: ⚠️ Potential issue

Ohayo sensei – Add Send bound to provider generic in this trait as well

For consistency with the other processor traits and to ensure proper async functionality, add the Send bound to the provider generic in TransactionProcessor too.

-pub trait TransactionProcessor<P: Provider + Sync + std::fmt::Debug>: Send + Sync {
+pub trait TransactionProcessor<P: Provider + Send + Sync + std::fmt::Debug>: Send + Sync {

67-69: ⚠️ Potential issue

Ohayo sensei – Add Send bound to provider generic here too

Similar to the issue with EventProcessor, the BlockProcessor trait should also include the Send bound on provider P for consistency and to avoid compiler errors with async traits.

-pub trait BlockProcessor<P: Provider + Sync>: Send + Sync {
+pub trait BlockProcessor<P: Provider + Send + Sync>: Send + Sync {
🧹 Nitpick comments (7)
crates/torii/adigraphmap/src/lib.rs (4)

189-192: Ohayo sensei! Clear method should update index_keys too.

If you implement the suggested index_keys map, don't forget to clear it here too.

pub fn clear(&mut self) {
    self.graph.clear();
    self.node_indices.clear();
+    self.index_keys.clear();
}

121-176: Ohayo sensei! topo_sort_by_level would benefit from algorithm documentation.

The implementation of topo_sort_by_level is complex and could benefit from more detailed documentation explaining the algorithm.

Consider adding a more detailed doc comment, for example:

/// Get the nodes grouped by topological level, where each level contains nodes
/// that can be processed in parallel (no dependencies among them).
/// Returns a vector of vectors, where each inner vector represents a level.
///
/// # Algorithm
///
/// 1. Start with nodes that have no incoming edges (level 0)
/// 2. For each level:
///    a. Process all nodes in the current level
///    b. For each outgoing edge, decrement the incoming edge count of the target
///    c. Any node whose incoming edge count becomes 0 is added to the next level
/// 3. Continue until all nodes are processed
///
/// This ensures that processing all nodes in a level makes the next level ready
/// for processing, allowing for parallel execution within each level.
pub fn topo_sort_by_level(&self) -> Vec<Vec<(K, V)>> {
    // ...existing implementation...
}

205-297: Ohayo sensei! Tests look comprehensive but could use additional cases.

The current tests are good, covering the basic functionality, but they miss some edge cases.

Consider adding tests for:

  1. Empty graph behavior:
#[test]
fn test_empty_graph() {
    let graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new();
    assert!(graph.is_empty());
    assert_eq!(graph.len(), 0);
    assert!(graph.topo_sort().is_empty());
    assert!(graph.topo_sort_by_level().is_empty());
}
  1. More complex cycle detection:
#[test]
fn test_complex_cycle_detection() {
    let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new();
    // Create a graph: a -> b -> c -> d
    graph.add_node("a".to_string(), 1).unwrap();
    graph.add_node("b".to_string(), 2).unwrap();
    graph.add_node("c".to_string(), 3).unwrap();
    graph.add_node("d".to_string(), 4).unwrap();
    
    graph.add_dependency(&"a".to_string(), &"b".to_string()).unwrap();
    graph.add_dependency(&"b".to_string(), &"c".to_string()).unwrap();
    graph.add_dependency(&"c".to_string(), &"d".to_string()).unwrap();
    
    // Attempting to add d -> a would create a cycle
    assert!(graph.add_dependency(&"d".to_string(), &"a".to_string()).is_err());
}
  1. Test for modifying values:
#[test]
fn test_get_mut() {
    let mut graph: AcyclicDigraphMap<String, i32> = AcyclicDigraphMap::new();
    graph.add_node("a".to_string(), 1).unwrap();
    
    // Modify the value
    if let Some(val) = graph.get_mut(&"a".to_string()) {
        *val = 42;
    }
    
    // Verify the change
    assert_eq!(graph.get(&"a".to_string()), Some(&42));
}

33-33: Ohayo sensei! Initialize with capacity when known in advance.

When creating a new AcyclicDigraphMap, it's often possible to know the expected capacity in advance.

Consider adding a constructor with capacity:

pub fn new() -> Self {
    Self { graph: DiGraph::new(), node_indices: HashMap::new() }
}

+/// Create a new empty DigraphMap with the specified capacity
+pub fn with_capacity(capacity: usize) -> Self {
+    Self {
+        graph: DiGraph::with_capacity(capacity, capacity * 2), // Nodes and edges
+        node_indices: HashMap::with_capacity(capacity),
+    }
+}

This can significantly improve performance by reducing rehashing when adding many nodes at once.

crates/torii/processors/src/lib.rs (3)

18-22: Ohayo sensei – Consider adding documentation to explain config fields

The EventProcessorConfig struct would benefit from documentation explaining the purpose of each field, especially strict_model_reader whose function isn't immediately clear from the name.

 #[derive(Clone, Debug, Default)]
+/// Configuration for event processors.
 pub struct EventProcessorConfig {
+    /// Set of namespaces to index. If empty, all namespaces are indexed.
     pub namespaces: HashSet<String>,
+    /// When true, enforces strict validation of models during reading operations.
     pub strict_model_reader: bool,
 }

38-40: Ohayo sensei – Consider using iterators more idiomatically

The current implementation of event_keys_as_string could be written more idiomatically:

 fn event_keys_as_string(&self, event: &Event) -> String {
-    event.keys.iter().map(|i| format!("{:#064x}", i)).collect::<Vec<_>>().join(",")
+    event.keys.iter()
+        .map(|i| format!("{:#064x}", i))
+        .collect::<Vec<_>>()
+        .join(",")
 }

This improves readability without changing functionality.


46-50: Ohayo sensei – Consider documenting hash-based task identification strategy

The task identifier implementation uses a deterministic hash of the event keys. This is a good approach but would benefit from a brief documentation comment explaining the strategy and why it's appropriate.

+/// Generates a unique task identifier by hashing the event keys.
+/// This ensures the same event always produces the same task ID.
 fn task_identifier(&self, event: &Event) -> TaskId {
     let mut hasher = DefaultHasher::new();
     event.keys.hash(&mut hasher);
     hasher.finish()
 }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between f3397b0 and 482af7c.

📒 Files selected for processing (13)
  • crates/torii/adigraphmap/Cargo.toml (1 hunks)
  • crates/torii/adigraphmap/src/error.rs (1 hunks)
  • crates/torii/adigraphmap/src/lib.rs (1 hunks)
  • crates/torii/processors/Cargo.toml (1 hunks)
  • crates/torii/processors/src/lib.rs (1 hunks)
  • crates/torii/processors/src/processors/mod.rs (1 hunks)
  • crates/torii/processors/src/processors/store_del_record.rs (1 hunks)
  • crates/torii/processors/src/processors/store_set_record.rs (1 hunks)
  • crates/torii/processors/src/processors/store_transaction.rs (1 hunks)
  • crates/torii/processors/src/processors/store_update_member.rs (1 hunks)
  • crates/torii/processors/src/processors/store_update_record.rs (1 hunks)
  • crates/torii/processors/src/processors/upgrade_event.rs (1 hunks)
  • crates/torii/processors/src/processors/upgrade_model.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
  • crates/torii/processors/Cargo.toml
  • crates/torii/adigraphmap/Cargo.toml
  • crates/torii/processors/src/processors/upgrade_model.rs
  • crates/torii/processors/src/processors/upgrade_event.rs
  • crates/torii/processors/src/processors/store_transaction.rs
  • crates/torii/processors/src/processors/store_update_record.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/torii/processors/src/processors/store_set_record.rs (6)
crates/torii/sqlite/sqlite/src/utils.rs (1)
  • felts_to_sql_string (35-38)
crates/torii/processors/src/processors/store_del_record.rs (4)
  • contract_type (44-46)
  • event_key (48-50)
  • validate (52-54)
  • process (67-126)
crates/torii/processors/src/processors/store_update_member.rs (4)
  • contract_type (46-48)
  • event_key (50-52)
  • validate (54-56)
  • process (58-137)
crates/torii/processors/src/processors/store_update_record.rs (4)
  • contract_type (45-47)
  • event_key (49-51)
  • validate (53-55)
  • process (57-128)
crates/torii/processors/src/lib.rs (7)
  • event (39-39)
  • contract_type (35-35)
  • event_key (36-36)
  • validate (52-52)
  • process (55-64)
  • process (70-76)
  • process (81-91)
crates/torii/processors/src/processors/upgrade_model.rs (4)
  • contract_type (27-29)
  • event_key (31-33)
  • validate (37-39)
  • process (42-138)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build
🔇 Additional comments (3)
crates/torii/adigraphmap/src/error.rs (1)

1-12: Ohayo sensei! Error enum is well-defined and comprehensive.

The error enum is well structured with clear error messages and appropriate derivation of the Error and Debug traits. The three error cases (node not found, cycle detection, and duplicate key) cover all the necessary failure scenarios for an acyclic digraph implementation.

crates/torii/processors/src/lib.rs (2)

42-44: Ohayo sensei – Task dependencies default implementation looks good

The default implementation returning an empty vector is a sensible default that can be overridden by specific processors that need to express dependencies.


54-65: Ohayo sensei – The process method appropriately allows many arguments

The #[allow(clippy::too_many_arguments)] attribute is justified here given the nature of the method. It needs substantial context to process events correctly. The parameters are well-organized and clearly named.

Comment on lines +36 to +48
/// Add a node with dependencies to the graph
pub fn add_node_with_dependencies(
&mut self,
key: K,
value: V,
dependencies: Vec<K>,
) -> Result<NodeIndex> {
let node_idx = self.add_node(key.clone(), value)?;
for dependency in dependencies {
self.add_dependency(&dependency, &key)?;
}
Ok(node_idx)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! add_node_with_dependencies should be atomic.

The method adds a node first, then adds dependencies one by one. If adding a dependency fails (e.g., due to a cycle), the node remains in the graph without all its dependencies, leaving the graph in a potentially inconsistent state.

Consider making the operation atomic by validating all dependencies first:

pub fn add_node_with_dependencies(
    &mut self,
    key: K,
    value: V,
    dependencies: Vec<K>,
) -> Result<NodeIndex> {
+    // Validate all dependencies exist first
+    for dependency in &dependencies {
+        if !self.node_indices.contains_key(dependency) {
+            return Err(AcyclicDigraphMapError::NodeNotFound(format!("{:?}", dependency)));
+        }
+    }
+    
+    // Check for cycles before adding anything
+    if self.node_indices.contains_key(&key) {
+        return Err(AcyclicDigraphMapError::DuplicateKey(format!("{:?}", key)));
+    }
+    
+    // Temporarily add the node to check for cycles
+    let node_idx = self.graph.add_node(value.clone());
+    
+    // Check if adding any dependency would create a cycle
+    for dependency in &dependencies {
+        let from_idx = self.node_indices.get(dependency).unwrap().clone();
+        if petgraph::algo::has_path_connecting(&self.graph, node_idx, from_idx, None) {
+            // Remove the temporary node if we'd create a cycle
+            self.graph.remove_node(node_idx);
+            return Err(AcyclicDigraphMapError::CycleDetected);
+        }
+    }
+    
+    // Remove the temporary node if checks passed
+    self.graph.remove_node(node_idx);
+    
     let node_idx = self.add_node(key.clone(), value)?;
     for dependency in dependencies {
         self.add_dependency(&dependency, &key)?;
     }
     Ok(node_idx)
}

This solution is more complex but ensures the graph remains consistent. Alternatively, you could implement a transaction-like pattern that rolls back changes on error.

Comment on lines +54 to +56
fn validate(&self, _event: &Event) -> bool {
true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Improve event validation to prevent processing invalid events.

The current implementation of validate simply returns true, which means all events tagged as "StoreUpdateMember" will be processed regardless of their structure. Consider adding validation to check if the event contains the expected keys and data structure.

fn validate(&self, event: &Event) -> bool {
-    true
+    // Keys should be [hash(StoreUpdateMember), model_selector, entity_id, member_selector]
+    event.keys.len() >= 4 && !event.data.is_empty()
}

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +129 to +132
let mut values = event.values.to_vec();
member.ty.deserialize(&mut values)?;

let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Add validation for sufficient values in the event data.

The code deserializes event.values without checking if it contains enough data for the member type, which could lead to runtime errors or incorrect data if the event is malformed.

let mut values = event.values.to_vec();
+// Validate that we have enough values for deserialization
+if values.is_empty() {
+    return Err(anyhow::anyhow!("Empty values array in StoreUpdateMember event"));
+}
member.ty.deserialize(&mut values)?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut values = event.values.to_vec();
member.ty.deserialize(&mut values)?;
let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] });
let mut values = event.values.to_vec();
// Validate that we have enough values for deserialization
if values.is_empty() {
return Err(anyhow::anyhow!("Empty values array in StoreUpdateMember event"));
}
member.ty.deserialize(&mut values)?;
let wrapped_ty = Ty::Struct(Struct { name: schema.name(), children: vec![member] });

Comment on lines +23 to +39
impl<Event> TaskProcessor<Event> for StoreUpdateMemberProcessor {
fn dependencies(&self, event: &Event) -> Vec<TaskId> {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
vec![hasher.finish()]
}

fn identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Protect against potential IndexOutOfBounds errors in TaskProcessor implementation.

The methods dependencies and identifier access event keys by index without checking if these indices are valid, which could cause a panic if the event structure is unexpected.

fn dependencies(&self, event: &Event) -> Vec<TaskId> {
    let mut hasher = DefaultHasher::new();
    // model selector
-   event.keys[1].hash(&mut hasher);
+   if event.keys.len() > 1 {
+       event.keys[1].hash(&mut hasher);
+   }
    vec![hasher.finish()]
}

fn identifier(&self, event: &Event) -> TaskId {
    let mut hasher = DefaultHasher::new();
    // model selector
-   event.keys[1].hash(&mut hasher);
-   // entity id
-   event.keys[2].hash(&mut hasher);
+   if event.keys.len() > 1 {
+       event.keys[1].hash(&mut hasher);
+       // entity id
+       if event.keys.len() > 2 {
+           event.keys[2].hash(&mut hasher);
+       }
+   }
    hasher.finish()
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
impl<Event> TaskProcessor<Event> for StoreUpdateMemberProcessor {
fn dependencies(&self, event: &Event) -> Vec<TaskId> {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
vec![hasher.finish()]
}
fn identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[1].hash(&mut hasher);
// entity id
event.keys[2].hash(&mut hasher);
hasher.finish()
}
}
impl<Event> TaskProcessor<Event> for StoreUpdateMemberProcessor {
fn dependencies(&self, event: &Event) -> Vec<TaskId> {
let mut hasher = DefaultHasher::new();
// model selector
if event.keys.len() > 1 {
event.keys[1].hash(&mut hasher);
}
vec![hasher.finish()]
}
fn identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
// model selector
if event.keys.len() > 1 {
event.keys[1].hash(&mut hasher);
// entity id
if event.keys.len() > 2 {
event.keys[2].hash(&mut hasher);
}
}
hasher.finish()
}
}

Comment on lines +93 to +109
let model = match db.model(event.selector).await {
Ok(m) => m,
Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
debug!(
target: LOG_TARGET,
selector = %event.selector,
"Model does not exist, skipping."
);
return Ok(());
}
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to retrieve model with selector {:#x}: {}",
event.selector,
e
));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Refactor error handling to use typed errors instead of string matching.

String matching on error messages is brittle and could break if the underlying library changes its error messages.

-Err(e) if e.to_string().contains("no rows") && !config.namespaces.is_empty() => {
+Err(e) if is_row_not_found_error(&e) && !config.namespaces.is_empty() => {

Add a helper function to torii_sqlite or locally:

fn is_row_not_found_error(e: &dyn std::error::Error) -> bool {
    // Check for specific error types from your SQL library
    // Example for sqlx:
    // e.downcast_ref::<sqlx::Error>().map_or(false, |e| matches!(e, sqlx::Error::RowNotFound))
    e.to_string().contains("no rows") // Temporary fallback
}

Comment on lines +20 to +27
impl TaskProcessor for StoreDelRecordProcessor {
fn dependencies(&self, event: &Event) -> Vec<TaskId> {
let mut hasher = DefaultHasher::new();
// model selector
event.keys[0].hash(&mut hasher);

vec![hasher.finish()]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! Fix inconsistent key indexing in dependencies method.

The dependencies method uses event.keys[0] while other processors and the identifier method in this same processor use event.keys[1] for the model selector. This inconsistency could lead to incorrect dependency tracking.

fn dependencies(&self, event: &Event) -> Vec<TaskId> {
    let mut hasher = DefaultHasher::new();
    // model selector
-   event.keys[0].hash(&mut hasher);
+   event.keys[1].hash(&mut hasher);

    vec![hasher.finish()]
}

Comment on lines +56 to +65
fn task_priority(&self) -> TaskPriority {
2
}

fn task_identifier(&self, event: &Event) -> TaskId {
let mut hasher = DefaultHasher::new();
event.keys[1].hash(&mut hasher);
event.keys[2].hash(&mut hasher);
hasher.finish()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Remove redundant task_identifier method.

The task_identifier method duplicates the functionality already implemented in the identifier method from the TaskProcessor trait. This duplication could lead to maintenance issues if they get out of sync.

-fn task_priority(&self) -> TaskPriority {
-    2
-}
-
-fn task_identifier(&self, event: &Event) -> TaskId {
-    let mut hasher = DefaultHasher::new();
-    event.keys[1].hash(&mut hasher);
-    event.keys[2].hash(&mut hasher);
-    hasher.finish()
-}
+fn task_priority(&self) -> TaskPriority {
+    2
+}

Comment on lines +50 to +52
fn validate(&self, _event: &Event) -> bool {
true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Add proper event validation.

The current validation always returns true, which means all events will be processed regardless of their structure. This could lead to runtime errors or panics when accessing event keys or data.

-fn validate(&self, _event: &Event) -> bool {
-    true
+fn validate(&self, event: &Event) -> bool {
+    // Ensure we have the expected key structure
+    // keys: [hash(StoreSetRecord), selector, entity_id, ...]
+    event.keys.len() >= 3 && !event.values.is_empty()
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn validate(&self, _event: &Event) -> bool {
true
}
fn validate(&self, event: &Event) -> bool {
// Ensure we have the expected key structure
// keys: [hash(StoreSetRecord), selector, entity_id, ...]
event.keys.len() >= 3 && !event.values.is_empty()
}

Comment on lines +109 to +112
let mut keys_and_unpacked = [event.keys, event.values].concat();

let mut entity = model.schema;
entity.deserialize(&mut keys_and_unpacked)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Add validation before array concatenation and deserialization.

The code concatenates keys and values arrays and then deserializes into the entity without validating the structure or lengths. This could lead to incorrect data or runtime errors.

-let mut keys_and_unpacked = [event.keys, event.values].concat();
+// Validate the expected structure before concatenation
+if event.keys.len() < 3 {
+    return Err(anyhow::anyhow!("Insufficient keys in StoreSetRecord event"));
+}
+
+let mut keys_and_unpacked = [event.keys, event.values].concat();

-entity.deserialize(&mut keys_and_unpacked)?;
+// Wrap deserialization in explicit error context
+entity.deserialize(&mut keys_and_unpacked).context("Failed to deserialize entity from event data")?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut keys_and_unpacked = [event.keys, event.values].concat();
let mut entity = model.schema;
entity.deserialize(&mut keys_and_unpacked)?;
// Validate the expected structure before concatenation
if event.keys.len() < 3 {
return Err(anyhow::anyhow!("Insufficient keys in StoreSetRecord event"));
}
let mut keys_and_unpacked = [event.keys, event.values].concat();
let mut entity = model.schema;
// Wrap deserialization in explicit error context
entity
.deserialize(&mut keys_and_unpacked)
.context("Failed to deserialize entity from event data")?;

Comment on lines +133 to +138
pub fn get_event_processors(
&self,
contract_type: ContractType,
) -> &HashMap<EventKey, Box<dyn EventProcessor<P>>> {
self.event_processors.get(&contract_type).unwrap()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Add error handling for missing contract types.

The get_event_processors method unwraps the result of get(&contract_type) without checking if it exists. This would panic if called with an unsupported contract type.

pub fn get_event_processors(
    &self,
    contract_type: ContractType,
) -> &HashMap<EventKey, Box<dyn EventProcessor<P>>> {
-    self.event_processors.get(&contract_type).unwrap()
+    self.event_processors.get(&contract_type).unwrap_or_else(|| {
+        // Either return an empty map or panic with a better error message
+        panic!("No processors registered for contract type {:?}", contract_type)
+    })
}

Better yet, return an Option or Result to allow callers to handle missing contract types gracefully:

pub fn get_event_processors(
    &self,
    contract_type: ContractType,
-) -> &HashMap<EventKey, Box<dyn EventProcessor<P>>> {
-    self.event_processors.get(&contract_type).unwrap()
+) -> Option<&HashMap<EventKey, Box<dyn EventProcessor<P>>>> {
+    self.event_processors.get(&contract_type)
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
crates/torii/task-network/src/lib.rs (1)

94-121: ⚠️ Potential issue

‼️ Task‑handler errors are still silently swallowed – propagate the inner Err

Ohayo sensei!
try_join_all only fails fast on a JoinError. Because each JoinHandle wraps the actual task result inside an Ok(_), an error returned by your task_handler ends up as Ok(Err(anyhow::Error)). The current map_err(TaskNetworkError::JoinError) therefore converts only panics/cancels, allowing real task failures to slip through and process_tasks returns Ok(()).

The exact same issue was raised in the previous review, so tagging this as a duplicate.

A minimal change that preserves the level semantics:

-            // Wait for all tasks in this level to complete before proceeding to the next level
-            try_join_all(handles).await.map_err(|e| TaskNetworkError::JoinError(e))?;
+            // Wait for all handles and surface the *first* task‑handler error (if any)
+            let task_results = try_join_all(handles)
+                .await
+                .map_err(TaskNetworkError::JoinError)?;
+
+            if let Some(err) = task_results.into_iter().find_map(|inner| inner.err()) {
+                return Err(TaskNetworkError::TaskHandlerError(err));
+            }

You’ll need a new enum variant:

TaskHandlerError(anyhow::Error)

in TaskNetworkError, plus a From<anyhow::Error> impl if you’d like ergonomic ? usage upstream.

Without this fix, upstream callers can never detect a failed task – scary for anything more critical than a test harness!

🧹 Nitpick comments (1)
crates/torii/task-network/src/lib.rs (1)

145-147: Consider using available_parallelism().unwrap_or(1) instead of hard‑coding 4

Falling back to 4 threads on single‑core CI boxes can lead to unnecessary context‑switching overhead. A safer default is 1, which scales naturally when available_parallelism succeeds.

-        Self::new(std::thread::available_parallelism().map_or(4, |p| p.get()))
+        Self::new(std::thread::available_parallelism().map_or(1, |p| p.get()))
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 482af7c and 763b573.

📒 Files selected for processing (1)
  • crates/torii/task-network/src/lib.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: build

@Larkooo Larkooo closed this Apr 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant